Skip to content

Commit e92627e

Browse files
committed
Added watchdog for checking processes to end
1 parent 4a09c8c commit e92627e

File tree

1 file changed

+94
-0
lines changed

1 file changed

+94
-0
lines changed

app/processing_watchdog.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
"""
2+
Watchdog для мониторинга и прерывания длительных обработок попыток и их прерывания
3+
Запускать как отдельный сервис. Два значения читаются из конфига - макс. время попытки и интервал проверок
4+
[constants]
5+
processing_limit = ()
6+
interval_time = ()
7+
"""
8+
9+
import time
10+
from datetime import datetime, timezone
11+
12+
from app.config import Config
13+
from app.mongo_odm import TrainingsDBManager
14+
from app.root_logger import get_root_logger
15+
from app.status import TrainingStatus, PresentationStatus, AudioStatus
16+
17+
logger = get_root_logger(service_name='processing_watchdog')
18+
19+
DEFAULT_MAX_SECONDS = 600
20+
DEFAULT_INTERVAL_SECONDS = 60
21+
22+
def get_config_values():
23+
try:
24+
return int(Config.c.constants.processing_limit), int(Config.c.constants.interval_time)
25+
except Exception:
26+
return DEFAULT_MAX_SECONDS, DEFAULT_INTERVAL_SECONDS
27+
28+
def time_now():
29+
return datetime.now(timezone.utc)
30+
31+
def run_once(max_seconds):
32+
"""
33+
Ищет все тренировки, у которых задан processing_start_timestamp
34+
и проверяет их статус
35+
"""
36+
now = time_now()
37+
trainings = TrainingsDBManager().get_trainings()
38+
candidates = []
39+
for training in trainings:
40+
process_started = getattr(training, "processing_start_timestamp", None)
41+
if process_started is None:
42+
continue
43+
try:
44+
started = datetime.fromtimestamp(process_started.time, timezone.utc)
45+
except Exception:
46+
continue
47+
elapsed = (now - started).total_seconds()
48+
if elapsed <= max_seconds:
49+
continue
50+
status = getattr(training, "status", None)
51+
if status in [TrainingStatus.PROCESSED, TrainingStatus.PROCESSING_FAILED, TrainingStatus.PREPARATION_FAILED]:
52+
continue
53+
candidates.append((training, elapsed))
54+
for training, elapsed in candidates:
55+
training_id = getattr(training, "_id", None)
56+
if training_id is None:
57+
continue
58+
try:
59+
msg = ("Техническая ошибка: время обработки превысило лимит {:.0f} секунд. "
60+
"Оценка выставлена автоматически как 0. Пожалуйста, обратитесь к администраторам."
61+
).format(elapsed)
62+
logger.warning("Training %s exceeded processing timeout (%.0f s)", training_id, elapsed)
63+
TrainingsDBManager().append_verdict(training_id, msg)
64+
TrainingsDBManager().set_score(training_id, 0)
65+
try:
66+
pres_status = getattr(training, "presentation_status", None)
67+
if pres_status not in [PresentationStatus.PROCESSED, PresentationStatus.PROCESSING_FAILED]:
68+
TrainingsDBManager().change_presentation_status(training_id, PresentationStatus.PROCESSING_FAILED)
69+
except Exception:
70+
logger.exception("Failed to mark presentation status as failed for training %s", training_id)
71+
try:
72+
audio_status = getattr(training, "audio_status", None)
73+
if audio_status not in [AudioStatus.PROCESSED, AudioStatus.PROCESSING_FAILED]:
74+
TrainingsDBManager().change_audio_status(training_id, AudioStatus.PROCESSING_FAILED)
75+
except Exception:
76+
logger.exception("Failed to mark audio status as failed for training %s", training_id)
77+
except Exception:
78+
logger.exception("Error while handling timeout for training %s", training_id)
79+
80+
def run():
81+
max_seconds, interval_time = get_config_values()
82+
logger.info("Processing watchdog started. Timeout = %s s, interval time = %s s", max_seconds, interval_time)
83+
while True:
84+
try:
85+
run_once(max_seconds)
86+
except Exception:
87+
logger.exception("Unhandled error in processing watchdog main loop")
88+
time.sleep(interval_time)
89+
90+
if __name__ == "__main__":
91+
import sys
92+
if len(sys.argv) >= 2:
93+
Config.init_config(sys.argv[1])
94+
run()

0 commit comments

Comments
 (0)