Skip to content

Commit 7fdea0b

Browse files
committed
add heartbeat during monitor execution
1 parent 8b85237 commit 7fdea0b

File tree

3 files changed

+70
-6
lines changed

3 files changed

+70
-6
lines changed

docs/configuration_file.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,15 @@ application_queue:
6262
- `controller_procedures`: Object. Procedures to be executed by the Controller and their settings.
6363
- `controller_procedures.monitors_stuck`: Object. Settings for the procedure to fix monitors stuck in "queued" or "running" status.
6464
- `controller_procedures.monitors_stuck.schedule`: String using Cron format. Schedule to execute the `monitors_stuck` procedure.
65-
- `controller_procedures.monitors_stuck.params.time_tolerance`: Integer. Time tolerance in seconds for a monitor to be considered as stuck.
65+
- `controller_procedures.monitors_stuck.params.time_tolerance`: Integer. Time tolerance in seconds for a monitor to be considered as stuck. This parameter is directly impacted by the `executor_monitor_heartbeat_time` setting and the recommended value is 2 times the heartbeat time.
6666

6767
## Executor Settings
6868
- `executor_concurrency`: Integer. Number of tasks that can be executed at the same time by each Executor.
6969
- `executor_sleep`: Integer. Time, in seconds, the Executor will sleep when there are no tasks in the queue before trying again.
7070
- `executor_monitor_timeout`: Integer. Timeout, in seconds, for monitor execution.
7171
- `executor_reaction_timeout`: Integer. Timeout, in seconds, for reactions execution.
7272
- `executor_request_timeout`: Integer. Timeout, in seconds, for requests execution.
73-
- `executor_monitor_heartbeat_time`: Integer. Time, in seconds, between each monitor heartbeat.
73+
- `executor_monitor_heartbeat_time`: Integer. Time, in seconds, between each monitor heartbeat. This parameter impacts the controller procedure `monitors_stuck.time_tolerance` parameter.
7474

7575
## Issues Creation
7676
- `max_issues_creation`: Integer. Maximum number of issues that can be created by each monitor in a single search. Can be overridden by the monitors' configuration.

src/components/executor/monitor_handler.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
import prometheus_client
99
from pydantic import ValidationError
1010

11+
import components.task_manager as task_manager
1112
import registry as registry
1213
from base_exception import BaseSentinelaException
14+
from configs import configs
1315
from data_models.process_monitor_payload import ProcessMonitorPayload
1416
from internal_database import get_session
1517
from models import Alert, Issue, Monitor
@@ -321,6 +323,13 @@ async def _run_routines(monitor: Monitor, tasks: list[Literal["search", "update"
321323
await _alerts_routine(monitor)
322324

323325

326+
async def _heartbeat_routine(monitor: Monitor) -> None:
327+
"""Heartbeat routine for the monitor, setting the last heartbeat to the current time"""
328+
while True:
329+
await monitor.set_last_heartbeat()
330+
await asyncio.sleep(configs.executor_monitor_heartbeat_time)
331+
332+
324333
async def run(message: dict[Any, Any]) -> None:
325334
"""Process a message with type 'process_monitor', loading the monitor and executing it's
326335
routines, while also detecting errors and reporting them accordingly"""
@@ -339,12 +348,12 @@ async def run(message: dict[Any, Any]) -> None:
339348
_logger.error(f"Monitor {monitor_id} not found. Skipping message")
340349
return
341350

342-
await registry.wait_monitor_loaded(monitor_id)
343-
344351
# Skip executing the monitor if it's already running
345352
if monitor.running:
346353
return
347354

355+
await registry.wait_monitor_loaded(monitor_id)
356+
348357
prometheus_labels = {
349358
"monitor_id": monitor.id,
350359
"monitor_name": monitor.name,
@@ -353,6 +362,10 @@ async def run(message: dict[Any, Any]) -> None:
353362
monitor_running = prometheus_monitor_running.labels(**prometheus_labels)
354363
monitor_running.inc()
355364

365+
heartbeat_task = task_manager.create_task(
366+
_heartbeat_routine(monitor), parent_task=asyncio.current_task()
367+
)
368+
356369
try:
357370
await monitor.set_running(True)
358371

@@ -376,6 +389,9 @@ async def run(message: dict[Any, Any]) -> None:
376389
_logger.error(traceback.format_exc().strip())
377390
_logger.info("Exception caught successfully, going on")
378391
finally:
392+
# Cancel the heartbeat task
393+
heartbeat_task.cancel()
394+
379395
# Refresh the monitor before updating to prevent overwriting information that might have
380396
# changed while the routines were executing
381397
await monitor.refresh()

tests/components/executor/test_monitor_handler.py

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import asyncio
22
import time
3-
from datetime import datetime, timezone
3+
from datetime import datetime, timedelta, timezone
44
from unittest.mock import AsyncMock, MagicMock
55

66
import pytest
@@ -11,7 +11,7 @@
1111
from data_models.monitor_options import AlertOptions, CountRule, IssueOptions, PriorityLevels
1212
from models import Alert, AlertPriority, AlertStatus, Issue, IssueStatus, Monitor
1313
from tests.test_utils import assert_message_in_log, assert_message_not_in_log
14-
from utils.time import time_since
14+
from utils.time import now, time_since
1515

1616
pytestmark = pytest.mark.asyncio(loop_scope="session")
1717

@@ -1100,6 +1100,27 @@ async def do_nothing(monitor): ...
11001100
assert sample_monitor.active_alerts[0].id == alert.id
11011101

11021102

1103+
# Test _heartbeat_routine
1104+
1105+
1106+
async def test_heartbeat_routine(monkeypatch, sample_monitor: Monitor):
1107+
"""'_heartbeat_routine' should handle execution timeouts while running the monitor routines"""
1108+
monkeypatch.setattr(monitor_handler.configs, "executor_monitor_heartbeat_time", 0.5)
1109+
1110+
await sample_monitor.refresh()
1111+
assert sample_monitor.last_heartbeat is None
1112+
1113+
heartbeat_task = asyncio.create_task(monitor_handler._heartbeat_routine(sample_monitor))
1114+
1115+
await asyncio.sleep(0)
1116+
for _ in range(4):
1117+
await sample_monitor.refresh()
1118+
assert sample_monitor.last_heartbeat > now() - timedelta(seconds=0.1)
1119+
await asyncio.sleep(0.5)
1120+
1121+
heartbeat_task.cancel()
1122+
1123+
11031124
# Test run
11041125

11051126

@@ -1158,6 +1179,33 @@ async def test_run_monitor_skip_running(caplog, mocker, sample_monitor: Monitor)
11581179
run_routines_spy.assert_not_called()
11591180

11601181

1182+
@pytest.mark.flaky(reruns=2)
1183+
async def test_run_monitor_heartbeat(monkeypatch, sample_monitor: Monitor):
1184+
"""'run' should handle execution timeouts while running the monitor routines"""
1185+
1186+
async def sleep(monitor, tasks):
1187+
await asyncio.sleep(2.1)
1188+
1189+
monkeypatch.setattr(monitor_handler, "_run_routines", sleep)
1190+
1191+
monkeypatch.setattr(monitor_handler.configs, "executor_monitor_heartbeat_time", 0.5)
1192+
1193+
await sample_monitor.refresh()
1194+
assert sample_monitor.last_heartbeat is None
1195+
1196+
run_task = asyncio.create_task(
1197+
monitor_handler.run({"payload": {"monitor_id": sample_monitor.id, "tasks": ["search"]}})
1198+
)
1199+
1200+
await asyncio.sleep(0.05)
1201+
for _ in range(4):
1202+
await sample_monitor.refresh()
1203+
assert sample_monitor.last_heartbeat > now() - timedelta(seconds=0.1)
1204+
await asyncio.sleep(0.5)
1205+
1206+
await run_task
1207+
1208+
11611209
@pytest.mark.parametrize("tasks", [["search"], ["update"], ["search", "update"]])
11621210
async def test_run_monitor_set_running(mocker, sample_monitor: Monitor, tasks):
11631211
"""'run' should set the monitor's 'running' flag to 'True' while running the routines"""

0 commit comments

Comments
 (0)