Skip to content

Commit c0b8dde

Browse files
authored
fix: worker thread getting stuck (#105)
* fix: worker thread getting stuck * fix: add type narrowing for error_details in test
1 parent 4a42ef3 commit c0b8dde

File tree

2 files changed

+60
-4
lines changed

2 files changed

+60
-4
lines changed

src/task_processor/processor.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,17 @@ def _run_task(
128128
)
129129
task_run: AnyTaskRun = task.task_runs.model(started_at=timezone.now(), task=task) # type: ignore[attr-defined]
130130
result: str
131+
executor = None
131132

132133
try:
133-
with ThreadPoolExecutor(max_workers=1) as executor:
134-
future = executor.submit(task.run)
135-
timeout = task.timeout.total_seconds() if task.timeout else None
136-
future.result(timeout=timeout) # Wait for completion or timeout
134+
# Use explicit executor management to avoid blocking on shutdown
135+
# when tasks timeout but continue running in worker threads.
136+
# The default context manager behavior (wait=True) would block
137+
# the TaskRunner thread indefinitely waiting for stuck workers.
138+
executor = ThreadPoolExecutor(max_workers=1)
139+
future = executor.submit(task.run)
140+
timeout = task.timeout.total_seconds() if task.timeout else None
141+
future.result(timeout=timeout) # Wait for completion or timeout
137142

138143
task_run.result = result = TaskResult.SUCCESS.value
139144
task_run.finished_at = timezone.now()
@@ -176,6 +181,13 @@ def _run_task(
176181
delay_until,
177182
)
178183

184+
finally:
185+
# Always shutdown the executor without waiting for worker threads.
186+
# This prevents the TaskRunner thread from blocking indefinitely
187+
# when a task times out but continues running in a worker thread.
188+
if executor is not None:
189+
executor.shutdown(wait=False)
190+
179191
labels = {
180192
"task_identifier": task_identifier,
181193
"task_type": registered_task.task_type.value.lower(),

tests/unit/task_processor/test_unit_task_processor_processor.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -972,3 +972,47 @@ def my_task() -> None:
972972
# Then
973973
recurring_task.refresh_from_db(using=current_database)
974974
assert recurring_task.is_locked is False
975+
976+
977+
@pytest.mark.multi_database
978+
@pytest.mark.task_processor_mode
979+
def test_run_task_does_not_block_on_timeout(
980+
current_database: str,
981+
sleep_task: TaskHandler[[int]],
982+
) -> None:
983+
"""
984+
Verify that when a task times out, the calling thread (TaskRunner)
985+
does not block indefinitely waiting for the worker thread to finish.
986+
987+
"""
988+
# Given - a task that will take longer than the timeout
989+
task = Task.create(
990+
sleep_task.task_identifier,
991+
scheduled_for=timezone.now(),
992+
args=(10,), # Task will sleep for 10 seconds
993+
timeout=timedelta(milliseconds=100), # But timeout after 100ms
994+
)
995+
task.save(using=current_database)
996+
997+
# When - we run the task
998+
start_time = time.time()
999+
task_runs = run_tasks(current_database)
1000+
elapsed_time = time.time() - start_time
1001+
1002+
# Then - the function should return quickly (within ~1 second)
1003+
# Not block for 10 seconds waiting for the worker thread
1004+
assert elapsed_time < 2.0, (
1005+
f"run_tasks blocked for {elapsed_time:.2f} seconds, "
1006+
"indicating it's waiting for the worker thread to finish"
1007+
)
1008+
1009+
# And the task should be marked as failed due to timeout
1010+
assert len(task_runs) == 1
1011+
task_run = task_runs[0]
1012+
assert task_run.result == TaskResult.FAILURE.value
1013+
assert task_run.error_details is not None
1014+
assert "TimeoutError" in task_run.error_details
1015+
1016+
task.refresh_from_db(using=current_database)
1017+
assert task.completed is False
1018+
assert task.num_failures == 1

0 commit comments

Comments
 (0)