-
-
Notifications
You must be signed in to change notification settings - Fork 348
Open
Labels
Description
Checklist
What OS are you using?
Mac, debian
What version of Python are you using?
3.12.8
What version of Dramatiq are you using?
1.17.1
What Broker are you using?
Redis
What did you do?
One of our tasks can raise a TimeoutError for a specific reason. This error is not caught or handled by our task code, so it propagates to dramatiq. When this was raised, we found that the thread would stop processing tasks, and soon (1 minute) the entire container would die with 100% CPU and OOM. I reproduced this on my mac laptop with a simple example:
@dramatiq.actor
async def test():
raise TimeoutError
What happened?
The issue can be traced to how dramatiq polls for future completion on the event loop:
while True:
try:
# Use a timeout to be able to catch asynchronously
# raised dramatiq exceptions (Interrupt).
return future.result(timeout=self.interrupt_check_ival)
except concurrent.futures.TimeoutError:
continue
The future.result call can throw a TimeoutError in 2 scenarios:
- the task is still running, and we have slept for
interrupt_check_ival - the task is no longer running, because it threw TimeoutError itself
In the second case, this block of code turns into an infinite loop with no sleep. future.result will always immediately return with TimeoutError after this. It will run at 100% CPU and brick your process.
This could be fixed by checking whether the future is done or not before continuing:
try:
return future.result(timeout=self.interrupt_check_ival)
except concurrent.futures.TimeoutError:
if future.done(): # then this was the task's own TimeoutError
# re-raise the underlying exception
return future.result() # will raise the original exception
continue