Skip to content

Commit

Permalink
fix(mssql): avoid calling .commit() unless a DDL operation is being…
Browse files Browse the repository at this point in the history
… performed (#9658)

This PR attempts to address flaky behavior of MS SQL observed in CI.

It seems like calling `commit()`/`rollback()` with a `SELECT` statement,
AKA autocommit behavior
can cause this issue.

The origin is not 100% clear to me, but there are a number of places
where the
same problem (function sequence error) with the same solution show up
(disabling autocommit or not calling commit with a `SELECT` statement):

-
https://stackoverflow.com/questions/25769043/function-sequence-error-in-pyodbc
- ansible-collections/community.general#1137
-
https://www.reddit.com/r/learnpython/comments/x8568m/function_sequence_error_in_sqlalchemy/
- explorerhq/sql-explorer#423


Here I avoid calling `cur.commit()` unless a DDL statement is being
executed.

The remaining case is `raw_sql()`, which I opted to avoid calling
`commit` in
since that would have this problem when calling it with a `SELECT`
statement.

Users of `raw_sql` are therefore responsible for handling
commit/rollback when
invoking it for DDL statements.

Closes #9654.
  • Loading branch information
cpcloud authored Jul 22, 2024
1 parent 0ad47de commit 69c5bf0
Showing 1 changed file with 20 additions and 13 deletions.
33 changes: 20 additions & 13 deletions ibis/backends/mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ def current_database(self) -> str:

@contextlib.contextmanager
def begin(self):
with contextlib.closing(self.con.cursor()) as cur:
yield cur

@contextlib.contextmanager
def _ddl_begin(self):
con = self.con
cur = con.cursor()
try:
Expand All @@ -272,22 +277,24 @@ def _safe_raw_sql(self, query, *args, **kwargs):
cur.execute(query, *args, **kwargs)
yield cur

@contextlib.contextmanager
def _safe_ddl(self, query, *args, **kwargs):
with contextlib.suppress(AttributeError):
query = query.sql(self.dialect)

with self._ddl_begin() as cur:
cur.execute(query, *args, **kwargs)
yield cur

def raw_sql(self, query: str | sg.Expression, **kwargs: Any) -> Any:
with contextlib.suppress(AttributeError):
query = query.sql(self.dialect)

con = self.con
cursor = con.cursor()

try:
cursor.execute(query, **kwargs)
except Exception:
con.rollback()
cursor.close()
raise
else:
con.commit()
return cursor
cursor.execute(query, **kwargs)
return cursor

def create_catalog(self, name: str, force: bool = False) -> None:
expr = (
Expand All @@ -308,11 +315,11 @@ def create_catalog(self, name: str, force: bool = False) -> None:
if force
else stmt
)
with self._safe_raw_sql(create_stmt):
with self._safe_ddl(create_stmt):
pass

def drop_catalog(self, name: str, force: bool = False) -> None:
with self._safe_raw_sql(
with self._safe_ddl(
sge.Drop(
kind="DATABASE",
this=sg.to_identifier(name, quoted=self.compiler.quoted),
Expand Down Expand Up @@ -583,7 +590,7 @@ def create_table(

this = sg.table(name, catalog=catalog, db=db, quoted=self.compiler.quoted)
raw_this = sg.table(name, catalog=catalog, db=db, quoted=False)
with self._safe_raw_sql(create_stmt) as cur:
with self._safe_ddl(create_stmt) as cur:
if query is not None:
# You can specify that a table is temporary for the sqlglot `Create` but not
# for the subsequent `Insert`, so we need to shove a `#` in
Expand Down Expand Up @@ -665,7 +672,7 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
data = df.itertuples(index=False)

insert_stmt = self._build_insert_template(name, schema=schema, columns=True)
with self._safe_raw_sql(create_stmt) as cur:
with self._safe_ddl(create_stmt) as cur:
if not df.empty:
cur.executemany(insert_stmt, data)

Expand Down

0 comments on commit 69c5bf0

Please sign in to comment.