-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathcapture_manager.py
More file actions
executable file
·112 lines (91 loc) · 4.79 KB
/
capture_manager.py
File metadata and controls
executable file
·112 lines (91 loc) · 4.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/bin/env python3
from __future__ import annotations
import asyncio
import logging
import logging.config
import signal
from asyncio import Task
from datetime import datetime, timedelta
from lacus.default import AbstractManager, get_config
from lacus.lacus import Lacus
logging.config.dictConfig(get_config('logging'))
class CaptureManager(AbstractManager):
def __init__(self, loglevel: int | None=None) -> None:
super().__init__(loglevel)
self.script_name = 'capture_manager'
self.captures: set[Task[None]] = set()
self.lacus = Lacus()
def _clear_ongoing_on_startup(self) -> None:
# At process start, self.captures is empty — no task can be running.
# Any UUID left in lacus:ongoing is a zombie from a previous crash.
# Use clear_capture() per UUID so each gets a proper error result
# and capture_settings are cleaned up.
ongoing = self.lacus.monitoring.get_ongoing_captures()
if ongoing:
self.logger.warning(f'Startup cleanup: clearing {len(ongoing)} zombie capture(s) from lacus:ongoing')
for uuid, _ in ongoing:
self.lacus.core.clear_capture(uuid, 'Cleared on startup: previous process died.')
async def clear_dead_captures(self) -> None:
ongoing = {capture.get_name(): capture for capture in self.captures}
max_capture_time = get_config('generic', 'max_capture_time')
oldest_start_time = datetime.now() - timedelta(seconds=max_capture_time + (max_capture_time / 10))
for expected_uuid, start_time in self.lacus.monitoring.get_ongoing_captures():
if expected_uuid not in ongoing.keys():
self.lacus.core.clear_capture(expected_uuid, 'Capture not in the list of tasks, it has been canceled.')
elif start_time < oldest_start_time:
self.logger.warning(f'{expected_uuid} has been running for too long. Started at {start_time}.')
capture = ongoing[expected_uuid]
max_cancel = 5
while not capture.done() and max_cancel > 0:
capture.cancel(f'Capture as been running for more than {max_capture_time}s.')
try:
await capture
except asyncio.CancelledError:
self.logger.warning(f'{expected_uuid} is canceled now.')
finally:
max_cancel -= 1
if not capture.done():
self.logger.error(f'{expected_uuid} is not done after canceling, trying {max_cancel} more times.')
await asyncio.sleep(1)
# All cancel attempts exhausted but the task is still stuck.
# Free the Redis slot so new captures aren't blocked.
if not capture.done():
self.logger.error(f'{expected_uuid} could not be canceled after 5 attempts, force-clearing from Redis.')
self.lacus.core.clear_capture(expected_uuid, 'Force-cleared: task could not be canceled.')
self.captures.discard(capture)
async def _to_run_forever_async(self) -> None:
def clear_list_callback(task: Task[None]) -> None:
self.captures.discard(task)
self.unset_running()
await self.clear_dead_captures()
if self.force_stop:
return
max_new_captures = get_config('generic', 'concurrent_captures') - len(self.captures)
if max_new_captures <= 0:
if len(self.lacus.monitoring.get_enqueued_captures()) > 0:
self.logger.debug(f'Max amount of captures in parallel reached ({len(self.captures)})')
return
async for capture_task in self.lacus.core.consume_queue(max_new_captures):
self.captures.add(capture_task)
self.set_running()
capture_task.add_done_callback(clear_list_callback)
async def _wait_to_finish_async(self) -> None:
while self.captures:
self.logger.info(f'Waiting for {len(self.captures)} capture(s) to finish...')
self.logger.info(f'Ongoing captures: {", ".join(capture.get_name() for capture in self.captures)}')
await asyncio.sleep(5)
self.logger.info('No more captures')
def main() -> None:
p = CaptureManager()
loop = asyncio.new_event_loop()
loop.add_signal_handler(signal.SIGTERM, lambda: loop.create_task(p.stop_async()))
try:
# Flush stale captures before the event loop starts.
# Valkey persists across container restarts, so lacus:ongoing
# may contain UUIDs from a process that was killed mid-capture.
p._clear_ongoing_on_startup()
loop.run_until_complete(p.run_async(sleep_in_sec=1))
finally:
loop.close()
if __name__ == '__main__':
main()