Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions aiosqlite/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,15 @@ def __init__(
DeprecationWarning,
)

def _stop_running(self):
def _stop_running(self) -> asyncio.Future:
self._running = False
# PEP 661 is not accepted yet, so we cannot type a sentinel
self._tx.put_nowait(_STOP_RUNNING_SENTINEL) # type: ignore[arg-type]

function = partial(lambda: _STOP_RUNNING_SENTINEL)
future = asyncio.get_event_loop().create_future()

self._tx.put_nowait((future, function))

return future

@property
def _conn(self) -> sqlite3.Connection:
Expand Down Expand Up @@ -95,16 +100,17 @@ def run(self) -> None:
# futures)

tx_item = self._tx.get()
if tx_item is _STOP_RUNNING_SENTINEL:
break

future, function = tx_item

try:
LOG.debug("executing %s", function)
result = function()
LOG.debug("operation %s completed", function)
future.get_loop().call_soon_threadsafe(set_result, future, result)

if result is _STOP_RUNNING_SENTINEL:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This likely caused #369
I guess the problem is that in some situation _STOP_RUNNING SENTINEL is not received and the while loop runs forever, stopping the Pytest from exiting.

It might of course be some bad usages of aiosqlite in airflow or in airflow tests, but I think at the very least some diagnostics of the situation and detecting bad use would be good, because we are now totally in the dark what could be the reason - simply pytest session hangs forever with all tests passing.

break

except BaseException as e: # noqa B036
LOG.debug("returning exception %s", e)
future.get_loop().call_soon_threadsafe(set_exception, future, e)
Expand All @@ -129,7 +135,7 @@ async def _connect(self) -> "Connection":
self._tx.put_nowait((future, self._connector))
self._connection = await future
except BaseException:
self._stop_running()
await self._stop_running()
self._connection = None
raise

Expand Down Expand Up @@ -170,7 +176,7 @@ async def close(self) -> None:
LOG.info("exception occurred while closing connection")
raise
finally:
self._stop_running()
await self._stop_running()
self._connection = None

@contextmanager
Expand Down
11 changes: 11 additions & 0 deletions aiosqlite/tests/smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,17 @@ async def test_cursor_on_closed_connection_loop(self):
except sqlite3.ProgrammingError:
pass

async def test_close_blocking_until_transaction_queue_empty(self):
db = await aiosqlite.connect(self.db)
# Insert transactions into the
# transaction queue '_tx'
for i in range(1000):
await db.execute(f"select 1, {i}")
# Wait for all transactions to complete
await db.close()
# Check no more transaction pending
self.assertEqual(db._tx.empty(), True)

async def test_close_twice(self):
db = await aiosqlite.connect(self.db)

Expand Down