Skip to content

Commit 98d0c3f

Browse files
committed
create heartbeat component
1 parent 9fe8a52 commit 98d0c3f

File tree

4 files changed

+118
-0
lines changed

4 files changed

+118
-0
lines changed

docs/monitoring_sentinela.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,5 +48,6 @@ The Prometheus metrics provided by Sentinela are:
4848
- Labels: `action_name`
4949
- `executor_request_execution_seconds`: Summary - Time to run the request
5050
- Labels: `action_name`
51+
- `heartbeat_average_time`: Gauge - Average time between heartbeats in seconds
5152
- `registry_monitors_ready_timeout_count`: Counter - Count of times the application timed out waiting for monitors to be ready
5253
- `registry_monitor_not_registered_count`: Counter - Count of times a monitor is not registered after a load attempt
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from .heartbeat import run
2+
3+
__all__ = [
4+
"run",
5+
]
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import logging
2+
import time
3+
from collections import deque
4+
from itertools import pairwise
5+
6+
import prometheus_client
7+
8+
import utils.app as app
9+
from configs import configs
10+
11+
_logger = logging.getLogger("heartbeat")
12+
13+
prometheus_heartbeat_average_time = prometheus_client.Gauge(
14+
"heartbeat_average_time", "Average time between heartbeats in seconds"
15+
)
16+
17+
18+
def _is_heartbeat_delayed(timestamps: deque[float], threshold: float) -> bool:
19+
"""Determine if the heartbeat is delayed based on the average latency between timestamps"""
20+
if len(timestamps) < 2:
21+
return False
22+
23+
latencies = [b - a for a, b in pairwise(timestamps)]
24+
average_latency = sum(latencies) / len(latencies)
25+
prometheus_heartbeat_average_time.set(average_latency)
26+
return average_latency > threshold
27+
28+
29+
async def run() -> None:
30+
"""Create a heartbeat for the application to detect when some tasks are not yielding control
31+
back to the event loop. If the heartbeat is delayed, a warning message is logged."""
32+
timestamps = deque[float](maxlen=10)
33+
last_warning_timestamp = 0.0
34+
35+
while app.running():
36+
timestamp = time.time()
37+
timestamps.append(timestamp)
38+
heartbeat_delayed = _is_heartbeat_delayed(timestamps, configs.heartbeat_time * 1.05)
39+
40+
# Prevent warning messages from being sent too frequently
41+
can_warn = timestamp - last_warning_timestamp > 10
42+
if can_warn and heartbeat_delayed:
43+
_logger.warning(
44+
"High average heartbeat interval. "
45+
"Blocking operations are preventing tasks from executing"
46+
)
47+
last_warning_timestamp = timestamp
48+
49+
await app.sleep(configs.heartbeat_time)
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import asyncio
2+
import time
3+
from collections import deque
4+
from unittest.mock import MagicMock
5+
6+
import pytest
7+
8+
import components.heartbeat.heartbeat as heartbeat
9+
import utils.app as app
10+
from configs import configs
11+
12+
pytestmark = pytest.mark.asyncio(loop_scope="session")
13+
14+
15+
@pytest.mark.parametrize(
16+
"timestamps, threshold, expected_result",
17+
[
18+
(deque([100]), 1, False),
19+
(deque([100, 101]), 1, False),
20+
(deque([100, 102]), 1, True),
21+
(deque([1, 3, 5, 7, 9]), 2, False),
22+
(deque([1, 3, 5, 7, 10]), 2, True),
23+
],
24+
)
25+
async def test_is_heartbeat_delayed(timestamps, threshold, expected_result):
26+
"""'is_heartbeat_delayed' should return True when average latency exceeds threshold"""
27+
assert heartbeat._is_heartbeat_delayed(timestamps, threshold) is expected_result
28+
29+
30+
async def test_run(mocker, monkeypatch):
31+
"""'run' should append the current timestamp while app is running"""
32+
monkeypatch.setattr(configs, "heartbeat_time", 0.05)
33+
heartbeat_logger_warning_spy: MagicMock = mocker.spy(heartbeat._logger, "warning")
34+
35+
task = asyncio.create_task(heartbeat.run())
36+
37+
await asyncio.sleep(0.1)
38+
assert heartbeat_logger_warning_spy.call_count == 0
39+
time.sleep(0.1)
40+
await asyncio.sleep(0)
41+
assert heartbeat_logger_warning_spy.call_count == 1
42+
43+
app.stop()
44+
await asyncio.wait_for(task, timeout=0.1)
45+
46+
47+
async def test_run_cooldown(mocker, monkeypatch):
48+
"""'run' should respect the cooldown period between warnings when heartbeat is delayed"""
49+
monkeypatch.setattr(configs, "heartbeat_time", 0.05)
50+
heartbeat_logger_warning_spy: MagicMock = mocker.spy(heartbeat._logger, "warning")
51+
52+
task = asyncio.create_task(heartbeat.run())
53+
54+
await asyncio.sleep(0.1)
55+
time.sleep(0.1)
56+
await asyncio.sleep(0)
57+
assert heartbeat_logger_warning_spy.call_count == 1
58+
time.sleep(0.1)
59+
await asyncio.sleep(0)
60+
assert heartbeat_logger_warning_spy.call_count == 1
61+
62+
app.stop()
63+
await asyncio.wait_for(task, timeout=0.1)

0 commit comments

Comments
 (0)