Skip to content

Fix: cleanup normally completed job #7580

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions src/backend/base/langflow/services/job_queue/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,36 @@ async def _periodic_cleanup(self) -> None:
except Exception as exc: # noqa: BLE001
logger.error(f"Exception encountered during periodic cleanup: {exc}")

def _is_task_cleanable(self, task: asyncio.Task, main_queue: asyncio.Queue) -> bool:
"""Check if a task is cleanable based on its status and exception.

A task is cleanable if:
1. It is cancelled.
2. It is done and has an exception.
3. It is done and has no exception, and the associated queue is fully consumed.

Args:
task (asyncio.Task): The task to check.
main_queue (asyncio.Queue): The queue associated with the task.

Returns:
bool: True if the task is cleanable, False otherwise.
"""
if task.cancelled():
return True
if task.done():
if task.exception() is not None:
return True
if main_queue.empty():
return True
return False

async def _cleanup_old_queues(self) -> None:
"""Scan all registered job queues and clean up those with completed or failed tasks."""
current_time = asyncio.get_running_loop().time()

for job_id in list(self._queues.keys()):
_, _, task, cleanup_time = self._queues[job_id]
main_queue, _, task, cleanup_time = self._queues[job_id]
if task:
logger.debug(
f"Queue {job_id} status - Done: {task.done()}, "
Expand All @@ -286,7 +310,7 @@ async def _cleanup_old_queues(self) -> None:
)

# Check if task should be marked for cleanup
if task and (task.cancelled() or (task.done() and task.exception() is not None)):
if task and self._is_task_cleanable(task, main_queue):
if cleanup_time is None:
# Mark for cleanup by setting the timestamp
self._queues[job_id] = (
Expand Down
Loading