Skip to content

Commit 4a5845b

Browse files
authored
Merge pull request #801 from LincolnPuzey/fix_gh_791
Fix #791: Fix infinite loop when async actor raises `TimeoutError` exception
2 parents 6f84cdb + 311ec1e commit 4a5845b

File tree

2 files changed

+39
-2
lines changed

2 files changed

+39
-2
lines changed

dramatiq/asyncio.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,27 @@ async def wrapped_coro() -> R:
146146
# Use a timeout to be able to catch asynchronously
147147
# raised dramatiq exceptions (Interrupt).
148148
return future.result(timeout=self.interrupt_check_ival)
149-
except concurrent.futures.TimeoutError:
150-
continue
149+
except (
150+
# TODO replace with built-in TimeoutError once 3.10 support dropped.
151+
concurrent.futures.TimeoutError
152+
):
153+
# NOTE: TimeoutError caught here could be from future.result() timing out (i.e. future not done yet),
154+
# or a TimeoutError raised inside the future itself (future is done).
155+
if not future.done():
156+
# future not done, so .result() must've timed out. continue to wait again.
157+
continue
158+
159+
# If execution reaches here, it means a TimeoutError was caught above, and the future is done.
160+
# There are 3 possibilities here:
161+
# 1. TimeoutError was raised inside the future. This will re-raise it.
162+
# 2. First .result() call timed out, but the future completed by the time .done() was called.
163+
# a) This will return the future's result, or
164+
# b) raise the Exception that happened in the future.
165+
return future.result(timeout=0)
166+
# This is outside the 'except' block to avoid any
167+
# "During handling of the above exception, another exception occurred" messages.
168+
# zero timeout used because future is now done.
169+
151170
except Interrupt as e:
152171
# Asynchronously raised from another thread: cancel the
153172
# future.

tests/middleware/test_asyncio.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,24 @@ async def raise_error():
8282
assert e.traceback[-1].name == "raise_actual_error"
8383

8484

85+
def test_event_loop_thread_run_coroutine_timeout_exception(started_thread: EventLoopThread):
86+
"""Test that TimeoutError in coroutine doesn't lead to infinite loop.
87+
88+
Regression test for https://github.com/Bogdanp/dramatiq/issues/791
89+
"""
90+
91+
async def raise_actual_error():
92+
raise TimeoutError("something took too long")
93+
94+
async def raise_error():
95+
await raise_actual_error()
96+
97+
coro = raise_error()
98+
99+
with pytest.raises(TimeoutError, match="something took too long"):
100+
started_thread.run_coroutine(coro)
101+
102+
85103
@pytest.mark.skipif(
86104
threading.current_platform not in threading.supported_platforms,
87105
reason="Threading not supported on this platform.",

0 commit comments

Comments
 (0)