Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions bin/capture_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ def __init__(self, loglevel: int | None=None) -> None:
self.captures: set[Task[None]] = set()
self.lacus = Lacus()

def _clear_ongoing_on_startup(self) -> None:
# At process start, self.captures is empty — no task can be running.
# Any UUID left in lacus:ongoing is a zombie from a previous crash.
# Use clear_capture() per UUID so each gets a proper error result
# and capture_settings are cleaned up.
ongoing = self.lacus.monitoring.get_ongoing_captures()
if ongoing:
self.logger.warning(f'Startup cleanup: clearing {len(ongoing)} zombie capture(s) from lacus:ongoing')
for uuid, _ in ongoing:
self.lacus.core.clear_capture(uuid, 'Cleared on startup: previous process died.')
Comment on lines +36 to +37

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Clear startup zombies without clear_capture's age gate

Calling self.lacus.core.clear_capture() here does not actually clear most crash leftovers. lacuscore.LacusCore.clear_capture() returns early for any UUID that is still in lacus:ongoing and started less than max_capture_time * 1.1 ago, so after a normal crash/restart these orphaned captures are usually still considered “probably still going.” The manager logs that it cleared them, but /capture_status, /lacus_status, and /is_busy will keep reporting them as ongoing until they age past the timeout window instead of immediately getting the failure result this startup cleanup is meant to write.

Useful? React with 👍 / 👎.


async def clear_dead_captures(self) -> None:
ongoing = {capture.get_name(): capture for capture in self.captures}
max_capture_time = get_config('generic', 'max_capture_time')
Expand All @@ -47,6 +58,12 @@ async def clear_dead_captures(self) -> None:
if not capture.done():
self.logger.error(f'{expected_uuid} is not done after canceling, trying {max_cancel} more times.')
await asyncio.sleep(1)
# All cancel attempts exhausted but the task is still stuck.
# Free the Redis slot so new captures aren't blocked.
if not capture.done():
self.logger.error(f'{expected_uuid} could not be canceled after 5 attempts, force-clearing from Redis.')
self.lacus.core.clear_capture(expected_uuid, 'Force-cleared: task could not be canceled.')
self.captures.discard(capture)
Comment on lines +63 to +66

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Decrement the running count when force-clearing a hung task

This path frees the in-memory slot, but it leaves the Redis running counter one too high. Each capture increments that counter when scheduled (set_running() at line 84), and the only per-task decrement is in clear_list_callback() (lines 70-72) when the task actually finishes. In the exact case handled here—a task that never responds to cancellation—that callback never fires, so a later scripts_controller.py stop|restart capture_manager can block forever waiting for zscore('running', 'capture_manager') to reach None even after the process has exited.

Useful? React with 👍 / 👎.


async def _to_run_forever_async(self) -> None:

Expand Down Expand Up @@ -82,6 +99,10 @@ def main() -> None:
loop.add_signal_handler(signal.SIGTERM, lambda: loop.create_task(p.stop_async()))

try:
# Flush stale captures before the event loop starts.
# Valkey persists across container restarts, so lacus:ongoing
# may contain UUIDs from a process that was killed mid-capture.
p._clear_ongoing_on_startup()
loop.run_until_complete(p.run_async(sleep_in_sec=1))
finally:
loop.close()
Expand Down