Skip to content

Commit 329fcc7

Browse files
committed
Improve stop semantics for connections
- Promote `_stop_running()` to `stop()` and ensure it works without an active event loop. - If the connection is still active when stopping the background thread, call close on the connection before stopping. - Add test cases - Clean up some types
1 parent a869d73 commit 329fcc7

File tree

2 files changed

+61
-33
lines changed

2 files changed

+61
-33
lines changed

aiosqlite/core.py

Lines changed: 43 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,10 @@ def set_exception(fut: asyncio.Future, e: BaseException) -> None:
4242

4343

4444
_STOP_RUNNING_SENTINEL = object()
45+
_TxQueue = SimpleQueue[tuple[Optional[asyncio.Future], Callable[[], Any]]]
4546

4647

47-
def _connection_worker_thread(
48-
tx: SimpleQueue[tuple[asyncio.Future, Callable[[], Any]]],
49-
):
48+
def _connection_worker_thread(tx: _TxQueue):
5049
"""
5150
Execute function calls on a separate thread.
5251
@@ -57,21 +56,23 @@ def _connection_worker_thread(
5756
# even after connection is closed (so we can finalize all
5857
# futures)
5958

60-
tx_item = tx.get()
61-
future, function = tx_item
59+
future, function = tx.get()
6260

6361
try:
6462
LOG.debug("executing %s", function)
6563
result = function()
64+
65+
if future:
66+
future.get_loop().call_soon_threadsafe(set_result, future, result)
6667
LOG.debug("operation %s completed", function)
67-
future.get_loop().call_soon_threadsafe(set_result, future, result)
6868

6969
if result is _STOP_RUNNING_SENTINEL:
7070
break
7171

7272
except BaseException as e: # noqa B036
7373
LOG.debug("returning exception %s", e)
74-
future.get_loop().call_soon_threadsafe(set_exception, future, e)
74+
if future:
75+
future.get_loop().call_soon_threadsafe(set_exception, future, e)
7576

7677

7778
class Connection:
@@ -84,7 +85,7 @@ def __init__(
8485
self._running = True
8586
self._connection: Optional[sqlite3.Connection] = None
8687
self._connector = connector
87-
self._tx: SimpleQueue[tuple[asyncio.Future, Callable[[], Any]]] = SimpleQueue()
88+
self._tx: _TxQueue = SimpleQueue()
8889
self._iter_chunk_size = iter_chunk_size
8990
self._thread = Thread(target=_connection_worker_thread, args=(self._tx,))
9091

@@ -94,14 +95,40 @@ def __init__(
9495
DeprecationWarning,
9596
)
9697

97-
def _stop_running(self) -> asyncio.Future:
98+
def __del__(self):
99+
if self._connection is None:
100+
return
101+
102+
warn(
103+
(
104+
f"{self!r} was deleted before being closed. "
105+
"Please use 'async with' or '.close()' to close the connection properly."
106+
),
107+
ResourceWarning,
108+
stacklevel=1,
109+
)
110+
111+
# Don't try to be creative here, the event loop may have already been closed.
112+
# Simply stop the worker thread, and let the underlying sqlite3 connection
113+
# be finalized by its own __del__.
114+
self.stop()
115+
116+
def stop(self) -> Optional[asyncio.Future]:
117+
"""Stop the background thread. Prefer `async with` or `await close()`"""
98118
self._running = False
99119

100-
function = partial(lambda: _STOP_RUNNING_SENTINEL)
101-
future = asyncio.get_event_loop().create_future()
120+
def close_and_stop():
121+
if self._connection is not None:
122+
self._connection.close()
123+
self._connection = None
124+
return _STOP_RUNNING_SENTINEL
102125

103-
self._tx.put_nowait((future, function))
126+
try:
127+
future = asyncio.get_event_loop().create_future()
128+
except Exception:
129+
future = None
104130

131+
self._tx.put_nowait((future, close_and_stop))
105132
return future
106133

107134
@property
@@ -140,7 +167,7 @@ async def _connect(self) -> "Connection":
140167
self._tx.put_nowait((future, self._connector))
141168
self._connection = await future
142169
except BaseException:
143-
await self._stop_running()
170+
self.stop()
144171
self._connection = None
145172
raise
146173

@@ -181,8 +208,10 @@ async def close(self) -> None:
181208
LOG.info("exception occurred while closing connection")
182209
raise
183210
finally:
184-
await self._stop_running()
185211
self._connection = None
212+
future = self.stop()
213+
if future:
214+
await future
186215

187216
@contextmanager
188217
async def execute(
@@ -410,24 +439,6 @@ async def backup(
410439
sleep=sleep,
411440
)
412441

413-
def __del__(self):
414-
if self._connection is None:
415-
return
416-
417-
warn(
418-
(
419-
f"{self!r} was deleted before being closed. "
420-
"Please use 'async with' or '.close()' to close the connection properly."
421-
),
422-
ResourceWarning,
423-
stacklevel=1,
424-
)
425-
426-
# Don't try to be creative here, the event loop may have already been closed.
427-
# Simply stop the worker thread, and let the underlying sqlite3 connection
428-
# be finalized by its own __del__.
429-
self._stop_running()
430-
431442

432443
def connect(
433444
database: Union[str, Path],

aiosqlite/tests/smoke.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ def _raise_cancelled_error(*_, **__):
404404
...
405405
# Terminate the thread here if the test fails to have a clear error.
406406
if connection._running:
407-
connection._stop_running()
407+
connection.stop()
408408
raise AssertionError("connection thread was not stopped")
409409

410410
async def test_iterdump(self):
@@ -518,3 +518,20 @@ async def test_emits_warning_when_left_open(self):
518518
ResourceWarning, r".*was deleted before being closed.*"
519519
):
520520
del db
521+
522+
async def test_stop_without_close(self):
523+
db = await aiosqlite.connect(":memory:")
524+
await db.stop()
525+
526+
def test_stop_after_event_loop_closed(self):
527+
db = None
528+
529+
async def inner():
530+
nonlocal db
531+
db = await aiosqlite.connect(":memory:")
532+
533+
loop = asyncio.new_event_loop()
534+
loop.run_until_complete(inner())
535+
loop.close()
536+
537+
db.stop()

0 commit comments

Comments
 (0)