Skip to content

Commit 32f577d

Browse files
authored
Merge pull request #72 from GabrielSalla/reload-updated-monitors
Reload only updated monitors
2 parents 8d394cc + 45c072a commit 32f577d

File tree

10 files changed

+380
-53
lines changed

10 files changed

+380
-53
lines changed

migrations/versions/202411181208_247390255aee_create_code_modules_table.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ def upgrade() -> None:
2424
sa.Column("monitor_id", sa.Integer(), unique=True),
2525
sa.Column("code", sa.String(), nullable=True),
2626
sa.Column("additional_files", sa.JSON, nullable=True),
27+
sa.Column("registered_at", sa.DateTime(timezone=True), nullable=True),
2728

2829
sa.ForeignKeyConstraint(("monitor_id",), ["Monitors.id"]),
2930
)

src/components/monitors_loader/monitors_loader.py

Lines changed: 64 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from configs import configs
1414
from data_models.monitor_options import ReactionOptions
1515
from models import CodeModule, Monitor
16+
from utils.async_tools import do_concurrently
1617
from utils.exception_handling import catch_exceptions
1718
from utils.time import now, time_since, time_until_next_trigger
1819

@@ -94,6 +95,7 @@ async def register_monitor(
9495
code_module = await CodeModule.get_or_create(monitor_id=monitor.id)
9596
code_module.code = monitor_code
9697
code_module.additional_files = additional_files or {}
98+
code_module.registered_at = now()
9799
await code_module.save()
98100

99101
return monitor
@@ -190,43 +192,84 @@ def _configure_monitor(monitor_module: MonitorModule) -> None:
190192
monitor_module.reaction_options[event_name].extend(reactions)
191193

192194

193-
async def _load_monitors() -> None:
194-
"""Load all enabled monitors from the database and add them to the registry. If any of the
195-
monitor's modules fails to load, the monitor will not be added to the registry"""
196-
registry.monitors_ready.clear()
195+
async def _disable_monitor(monitor: Monitor) -> None:
196+
"""Disable a monitor"""
197+
await monitor.set_enabled(False)
198+
_logger.warning(f"Monitor '{monitor}' has no code module, it will be disabled")
199+
200+
201+
async def _disable_monitors_without_code_modules() -> None:
202+
"""Disable all monitors that don't have a code module"""
203+
enabled_monitors = await Monitor.get_raw([Monitor.id], [Monitor.enabled.is_(True)])
204+
monitors_ids = {monitor_id for (monitor_id,) in enabled_monitors}
205+
206+
code_modules = await CodeModule.get_raw([CodeModule.monitor_id], [CodeModule.code.is_not(None)])
207+
code_modules_monitor_ids = {monitor_id for (monitor_id,) in code_modules}
208+
209+
monitors_to_disable = await Monitor.get_all(
210+
Monitor.id.in_(monitors_ids - code_modules_monitor_ids)
211+
)
212+
await do_concurrently(*[_disable_monitor(monitor) for monitor in monitors_to_disable])
197213

214+
215+
async def _get_monitors_to_load(
216+
last_load_time: datetime | None,
217+
) -> tuple[dict[int, Monitor], list[CodeModule]]:
218+
"""Get all the monitors that need to be loaded"""
219+
# Get all enabled monitors
198220
loaded_monitors = await Monitor.get_all(Monitor.enabled.is_(True))
199-
monitors_ids = [monitor.id for monitor in loaded_monitors]
221+
monitors = {monitor.id: monitor for monitor in loaded_monitors}
222+
223+
# Get all code modules that were updated since the last load time
224+
# Add a time delta to have some room for code modules that updated right before the last load
225+
if last_load_time is None:
226+
reference_timestamp = None
227+
else:
228+
reference_timestamp = last_load_time - timedelta(seconds=15)
229+
230+
updated_code_modules = await CodeModule.get_updated_code_modules(
231+
monitors_ids=list(monitors.keys()),
232+
reference_timestamp=reference_timestamp,
233+
)
234+
code_modules = [code_module for code_module in updated_code_modules]
235+
236+
# Add monitors that are enabled but aren't in the registry
237+
registry_monitors_ids = set(registry.get_monitors_ids())
238+
update_monitors_ids = {code_module.monitor_id for code_module in updated_code_modules}
239+
pending_monitors = set(monitors.keys()) - registry_monitors_ids - update_monitors_ids
240+
if len(pending_monitors) > 0:
241+
code_modules.extend(await CodeModule.get_all(CodeModule.monitor_id.in_(pending_monitors)))
242+
243+
return monitors, code_modules
200244

201-
code_modules = await CodeModule.get_all(CodeModule.monitor_id.in_(monitors_ids))
202-
code_modules_map = {code_module.monitor_id: code_module for code_module in code_modules}
203245

204-
_logger.info(f"Monitors found: {len(loaded_monitors)}")
246+
async def _load_monitors(last_load_time: datetime | None) -> None:
247+
"""Load all enabled monitors from the database and add them to the registry. If any of the
248+
monitor's modules fails to load, the monitor will not be added to the registry"""
249+
registry.monitors_ready.clear()
250+
251+
monitors, code_modules = await _get_monitors_to_load(last_load_time)
252+
_logger.info(f"Monitors to load: {len(code_modules)}")
205253

206254
# To load the monitors safely, first create all the files and then import them
207255
# Loading right after writing the files can result in an error where the Monitor module is not
208256
# found
209257
monitors_paths = {}
210-
for monitor in loaded_monitors:
258+
for code_module in code_modules:
211259
with catch_exceptions(_logger):
212-
code_module = code_modules_map.get(monitor.id)
213-
if code_module is None:
214-
await monitor.set_enabled(False)
215-
_logger.warning(f"Monitor '{monitor.name}' has no code module, it will be disabled")
216-
continue
260+
monitor = monitors[code_module.monitor_id]
217261

218262
monitors_paths[monitor.id] = module_loader.create_module_files(
219263
module_name=monitor.name,
220264
module_code=code_module.code,
221265
additional_files=code_module.additional_files,
222266
)
223267

224-
for monitor in loaded_monitors:
268+
for code_module in code_modules:
225269
with catch_exceptions(_logger):
226-
monitor_path = monitors_paths.get(monitor.id)
227-
if monitor_path is None:
228-
continue
270+
monitor = monitors[code_module.monitor_id]
229271

272+
monitor_path = monitors_paths[monitor.id]
230273
monitor_module = cast(MonitorModule, module_loader.load_module_from_file(monitor_path))
231274
_configure_monitor(monitor_module)
232275

@@ -238,11 +281,12 @@ async def _load_monitors() -> None:
238281

239282
async def _run() -> None:
240283
"""Monitors loading loop, loading them recurrently. Stops automatically when the app stops"""
241-
last_load_time: datetime
284+
last_load_time: datetime | None = None
242285

243286
while app.running():
244287
with catch_exceptions(_logger):
245-
await _load_monitors()
288+
await _disable_monitors_without_code_modules()
289+
await _load_monitors(last_load_time)
246290
last_load_time = now()
247291

248292
# The sleep task will start seconds earlier to try to load all monitors before the

src/models/code_module.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
from sqlalchemy import ForeignKey, Integer, String
1+
from datetime import datetime
2+
from typing import Sequence
3+
4+
from sqlalchemy import DateTime, ForeignKey, Integer, String
25
from sqlalchemy.dialects import postgresql
36
from sqlalchemy.ext.mutable import MutableDict
47
from sqlalchemy.orm import Mapped, mapped_column
@@ -16,6 +19,25 @@ class CodeModule(Base):
1619
MutableDict.as_mutable(postgresql.JSON), # type: ignore[arg-type]
1720
nullable=True,
1821
)
22+
registered_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=True)
1923

2024
# Code modules won't trigger events when they are created
2125
_enable_creation_event: bool = False
26+
27+
@classmethod
28+
async def get_updated_code_modules(
29+
cls: type["CodeModule"],
30+
monitors_ids: list[int],
31+
reference_timestamp: datetime | None,
32+
) -> Sequence["CodeModule"]:
33+
"""Get all code modules that were updated after a reference timestamp"""
34+
if not monitors_ids:
35+
return []
36+
37+
if reference_timestamp is None:
38+
return await cls.get_all(cls.monitor_id.in_(monitors_ids))
39+
40+
return await cls.get_all(
41+
cls.monitor_id.in_(monitors_ids),
42+
cls.registered_at > reference_timestamp,
43+
)

src/registry/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
add_monitor,
44
get_monitor_module,
55
get_monitors,
6+
get_monitors_ids,
67
init,
78
is_monitor_registered,
89
monitors_pending,
@@ -16,6 +17,7 @@
1617
"MonitorsLoadError",
1718
"add_monitor",
1819
"get_monitor_module",
20+
"get_monitors_ids",
1921
"get_monitors",
2022
"init",
2123
"is_monitor_registered",

src/registry/registry.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@ async def wait_monitor_loaded(monitor_id: int) -> None:
7676
raise MonitorNotRegisteredError(f"Monitor '{monitor_id}' not registered")
7777

7878

79+
def get_monitors_ids() -> list[int]:
80+
"""Get all the monitors"""
81+
return list(_monitors.keys())
82+
83+
7984
def get_monitors() -> list[MonitorInfo]:
8085
"""Get all the monitors"""
8186
return list(_monitors.values())

tests/components/controller/test_controller.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ async def test_run(monkeypatch, clear_queue, clear_database):
338338
assert len(queue_items) == 0
339339

340340
# Load the monitors and wait for a while
341-
await monitors_loader._load_monitors()
341+
await monitors_loader._load_monitors(None)
342342
await asyncio.sleep(0.2)
343343

344344
# Stop the app and wait for the controller task
@@ -399,7 +399,7 @@ async def test_run_monitors_not_ready(caplog, monkeypatch, mocker):
399399

400400
# Run the controller for a while then stop it
401401
await monitors_loader._register_monitors()
402-
await monitors_loader._load_monitors()
402+
await monitors_loader._load_monitors(None)
403403
registry.monitors_ready.clear()
404404

405405
controller_task = asyncio.create_task(controller.run())
@@ -419,7 +419,7 @@ async def test_run_monitors_not_registered(caplog, monkeypatch, mocker):
419419

420420
# Run the controller for a while then stop it
421421
await monitors_loader._register_monitors()
422-
await monitors_loader._load_monitors()
422+
await monitors_loader._load_monitors(None)
423423

424424
controller_task = asyncio.create_task(controller.run())
425425
await asyncio.sleep(0.2)
@@ -442,7 +442,7 @@ def error(*args):
442442
# Run the controller for a while then stop it
443443
await monitors_loader._register_monitors()
444444
controller_task = asyncio.create_task(controller.run())
445-
await monitors_loader._load_monitors()
445+
await monitors_loader._load_monitors(None)
446446
await asyncio.sleep(0.3)
447447

448448
assert_message_in_log(caplog, "ValueError: Not able to get the monitors")

0 commit comments

Comments
 (0)