Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions app/processing_watchdog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""
Watchdog для мониторинга и прерывания длительных обработок попыток и их прерывания
Запускать как отдельный сервис

Два значения читаются из конфига - макс. время попытки и интервал проверок
[constants]
processing_limit = ()
interval_time = ()

Взаимодействие с базой данных:
- Использует TrainingsDBManager для доступа к тренировкам
- Находит все тренировки, у которых задано поле `processing_start_timestamp`
- Если прошло больше processing_limit секунд:
* Добавляет в базу вердикт с комментарием "обратиться к администраторам" через append_verdict()
* Устанавливает оценку 0 через set_score()
* Меняет статусы presentation и audio на PROCESSING_FAILED, если они ещё не финальные
- После каждой итерации ждёт interval_time секунд и повторяет проверку
"""

import time
from datetime import datetime, timezone

from app.config import Config
from app.mongo_odm import TrainingsDBManager
from app.root_logger import get_root_logger
from app.status import TrainingStatus, PresentationStatus, AudioStatus

logger = get_root_logger(service_name='processing_watchdog')

DEFAULT_MAX_SECONDS = 300
DEFAULT_INTERVAL_SECONDS = 30

def get_config_values():
try:
return int(Config.c.constants.processing_limit), int(Config.c.constants.interval_time)
except Exception as e:
logger.warning("Failed to read config values, using defaults: %s", e)
return DEFAULT_MAX_SECONDS, DEFAULT_INTERVAL_SECONDS

def time_now():
return datetime.now(timezone.utc)

def run_once(max_seconds):
"""
Ищет все тренировки, у которых задан processing_start_timestamp
и проверяет их статус
"""
now = time_now()
trainings = TrainingsDBManager().get_trainings()
candidates = []
for training in trainings:
process_started = getattr(training, "processing_start_timestamp", None)
if process_started is None:
continue
try:
started = datetime.fromtimestamp(process_started.time, timezone.utc)
except Exception as e:
logger.warning("Invalid timestamp in training %s: %s", getattr(training, "_id", "?"), e)
elapsed = (now - started).total_seconds()
if elapsed <= max_seconds:
continue
status = getattr(training, "status", None)
if status in [TrainingStatus.PROCESSED, TrainingStatus.PROCESSING_FAILED, TrainingStatus.PREPARATION_FAILED]:
continue
candidates.append((training, elapsed))
for training, elapsed in candidates:
training_id = getattr(training, "_id", None)
if training_id is None:
continue
try:
msg = ("Техническая ошибка: время обработки превысило лимит {:.0f} секунд. "
"Оценка выставлена автоматически как 0. Пожалуйста, обратитесь к администраторам."
).format(elapsed)
logger.warning("Training %s exceeded processing timeout (%.0f s)", training_id, elapsed)
TrainingsDBManager().append_verdict(training_id, msg)
TrainingsDBManager().set_score(training_id, 0)
try:
pres_status = getattr(training, "presentation_status", None)
if pres_status not in [PresentationStatus.PROCESSED, PresentationStatus.PROCESSING_FAILED]:
TrainingsDBManager().change_presentation_status(training_id, PresentationStatus.PROCESSING_FAILED)
except Exception as e:
logger.exception("Failed to mark presentation status as failed for training %s: %s", training_id, e)
try:
audio_status = getattr(training, "audio_status", None)
if audio_status not in [AudioStatus.PROCESSED, AudioStatus.PROCESSING_FAILED]:
TrainingsDBManager().change_audio_status(training_id, AudioStatus.PROCESSING_FAILED)
except Exception as e:
logger.exception("Failed to mark audio status as failed for training %s: %s", training_id, e)
except Exception as e:
logger.exception("Error while handling timeout for training %s: %s", training_id, e)

def run():
max_seconds, interval_time = get_config_values()
logger.info("Processing watchdog started. Timeout = %s s, interval time = %s s", max_seconds, interval_time)
while True:
try:
run_once(max_seconds)
except Exception as e:
logger.exception("Unhandled error in processing watchdog main loop (%s)", e)
time.sleep(interval_time)

if __name__ == "__main__":
import sys
if len(sys.argv) >= 2:
Config.init_config(sys.argv[1])
run()
2 changes: 2 additions & 0 deletions app_conf/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ lti_consumer_secret=supersecretconsumersecret
version_file=VERSION.json
backup_path=../dump/database-dump/
storage_max_size_mbytes=20000
processing_limit = 600
interval_time = 30

[mongodb]
url=mongodb://db:27017/
Expand Down
2 changes: 2 additions & 0 deletions app_conf/testing.ini
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ lti_consumer_secret=testing_lti_consumer_secret
version_file=VERSION.json
backup_path=../dump/database-dump/
storage_max_size_mbytes=20000
processing_limit = 600
interval_time = 30

[mongodb]
url=mongodb://db:27017/
Expand Down
7 changes: 7 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ services:
volumes:
- whisper_models:/root/.cache/whisper

processing_watchdog:
image: wst-image:v0.2
command: python3 -m processing_watchdog $APP_CONF
restart: always
depends_on:
- db

volumes:
whisper_models:
nltk_data: