Skip to content

Commit d9f28c9

Browse files
Wait for transaction queue to complete when closing connection (#305)
* Test await connection.close() always returns an empty transaction queue * 'await connection.close()' returns once all transaction queue items's results have been forwarded, including the _STOP_RUNNING_SENTINEL result * nits --------- Co-authored-by: Amethyst Reese <amethyst@n7.gg>
1 parent 895fd91 commit d9f28c9

File tree

2 files changed

+25
-8
lines changed

2 files changed

+25
-8
lines changed

aiosqlite/core.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,15 @@ def __init__(
6262
DeprecationWarning,
6363
)
6464

65-
def _stop_running(self):
65+
def _stop_running(self) -> asyncio.Future:
6666
self._running = False
67-
# PEP 661 is not accepted yet, so we cannot type a sentinel
68-
self._tx.put_nowait(_STOP_RUNNING_SENTINEL) # type: ignore[arg-type]
67+
68+
function = partial(lambda: _STOP_RUNNING_SENTINEL)
69+
future = asyncio.get_event_loop().create_future()
70+
71+
self._tx.put_nowait((future, function))
72+
73+
return future
6974

7075
@property
7176
def _conn(self) -> sqlite3.Connection:
@@ -95,16 +100,17 @@ def run(self) -> None:
95100
# futures)
96101

97102
tx_item = self._tx.get()
98-
if tx_item is _STOP_RUNNING_SENTINEL:
99-
break
100-
101103
future, function = tx_item
102104

103105
try:
104106
LOG.debug("executing %s", function)
105107
result = function()
106108
LOG.debug("operation %s completed", function)
107109
future.get_loop().call_soon_threadsafe(set_result, future, result)
110+
111+
if result is _STOP_RUNNING_SENTINEL:
112+
break
113+
108114
except BaseException as e: # noqa B036
109115
LOG.debug("returning exception %s", e)
110116
future.get_loop().call_soon_threadsafe(set_exception, future, e)
@@ -129,7 +135,7 @@ async def _connect(self) -> "Connection":
129135
self._tx.put_nowait((future, self._connector))
130136
self._connection = await future
131137
except BaseException:
132-
self._stop_running()
138+
await self._stop_running()
133139
self._connection = None
134140
raise
135141

@@ -170,7 +176,7 @@ async def close(self) -> None:
170176
LOG.info("exception occurred while closing connection")
171177
raise
172178
finally:
173-
self._stop_running()
179+
await self._stop_running()
174180
self._connection = None
175181

176182
@contextmanager

aiosqlite/tests/smoke.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,17 @@ async def test_cursor_on_closed_connection_loop(self):
414414
except sqlite3.ProgrammingError:
415415
pass
416416

417+
async def test_close_blocking_until_transaction_queue_empty(self):
418+
db = await aiosqlite.connect(self.db)
419+
# Insert transactions into the
420+
# transaction queue '_tx'
421+
for i in range(1000):
422+
await db.execute(f"select 1, {i}")
423+
# Wait for all transactions to complete
424+
await db.close()
425+
# Check no more transaction pending
426+
self.assertEqual(db._tx.empty(), True)
427+
417428
async def test_close_twice(self):
418429
db = await aiosqlite.connect(self.db)
419430

0 commit comments

Comments
 (0)