Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def upgrade() -> None:
sa.Column("monitor_id", sa.Integer(), unique=True),
sa.Column("code", sa.String(), nullable=True),
sa.Column("additional_files", sa.JSON, nullable=True),
sa.Column("registered_at", sa.DateTime(timezone=True), nullable=True),

sa.ForeignKeyConstraint(("monitor_id",), ["Monitors.id"]),
)
Expand Down
84 changes: 64 additions & 20 deletions src/components/monitors_loader/monitors_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from configs import configs
from data_models.monitor_options import ReactionOptions
from models import CodeModule, Monitor
from utils.async_tools import do_concurrently
from utils.exception_handling import catch_exceptions
from utils.time import now, time_since, time_until_next_trigger

Expand Down Expand Up @@ -94,6 +95,7 @@ async def register_monitor(
code_module = await CodeModule.get_or_create(monitor_id=monitor.id)
code_module.code = monitor_code
code_module.additional_files = additional_files or {}
code_module.registered_at = now()
await code_module.save()

return monitor
Expand Down Expand Up @@ -190,43 +192,84 @@ def _configure_monitor(monitor_module: MonitorModule) -> None:
monitor_module.reaction_options[event_name].extend(reactions)


async def _load_monitors() -> None:
"""Load all enabled monitors from the database and add them to the registry. If any of the
monitor's modules fails to load, the monitor will not be added to the registry"""
registry.monitors_ready.clear()
async def _disable_monitor(monitor: Monitor) -> None:
"""Disable a monitor"""
await monitor.set_enabled(False)
_logger.warning(f"Monitor '{monitor}' has no code module, it will be disabled")


async def _disable_monitors_without_code_modules() -> None:
"""Disable all monitors that don't have a code module"""
enabled_monitors = await Monitor.get_raw([Monitor.id], [Monitor.enabled.is_(True)])
monitors_ids = {monitor_id for (monitor_id,) in enabled_monitors}

code_modules = await CodeModule.get_raw([CodeModule.monitor_id], [CodeModule.code.is_not(None)])
code_modules_monitor_ids = {monitor_id for (monitor_id,) in code_modules}

monitors_to_disable = await Monitor.get_all(
Monitor.id.in_(monitors_ids - code_modules_monitor_ids)
)
await do_concurrently(*[_disable_monitor(monitor) for monitor in monitors_to_disable])


async def _get_monitors_to_load(
last_load_time: datetime | None,
) -> tuple[dict[int, Monitor], list[CodeModule]]:
"""Get all the monitors that need to be loaded"""
# Get all enabled monitors
loaded_monitors = await Monitor.get_all(Monitor.enabled.is_(True))
monitors_ids = [monitor.id for monitor in loaded_monitors]
monitors = {monitor.id: monitor for monitor in loaded_monitors}

# Get all code modules that were updated since the last load time
# Add a time delta to have some room for code modules that updated right before the last load
if last_load_time is None:
reference_timestamp = None
else:
reference_timestamp = last_load_time - timedelta(seconds=15)

updated_code_modules = await CodeModule.get_updated_code_modules(
monitors_ids=list(monitors.keys()),
reference_timestamp=reference_timestamp,
)
code_modules = [code_module for code_module in updated_code_modules]

# Add monitors that are enabled but aren't in the registry
registry_monitors_ids = set(registry.get_monitors_ids())
update_monitors_ids = {code_module.monitor_id for code_module in updated_code_modules}
pending_monitors = set(monitors.keys()) - registry_monitors_ids - update_monitors_ids
if len(pending_monitors) > 0:
code_modules.extend(await CodeModule.get_all(CodeModule.monitor_id.in_(pending_monitors)))

return monitors, code_modules

code_modules = await CodeModule.get_all(CodeModule.monitor_id.in_(monitors_ids))
code_modules_map = {code_module.monitor_id: code_module for code_module in code_modules}

_logger.info(f"Monitors found: {len(loaded_monitors)}")
async def _load_monitors(last_load_time: datetime | None) -> None:
"""Load all enabled monitors from the database and add them to the registry. If any of the
monitor's modules fails to load, the monitor will not be added to the registry"""
registry.monitors_ready.clear()

monitors, code_modules = await _get_monitors_to_load(last_load_time)
_logger.info(f"Monitors to load: {len(code_modules)}")

# To load the monitors safely, first create all the files and then import them
# Loading right after writing the files can result in an error where the Monitor module is not
# found
monitors_paths = {}
for monitor in loaded_monitors:
for code_module in code_modules:
with catch_exceptions(_logger):
code_module = code_modules_map.get(monitor.id)
if code_module is None:
await monitor.set_enabled(False)
_logger.warning(f"Monitor '{monitor.name}' has no code module, it will be disabled")
continue
monitor = monitors[code_module.monitor_id]

monitors_paths[monitor.id] = module_loader.create_module_files(
module_name=monitor.name,
module_code=code_module.code,
additional_files=code_module.additional_files,
)

for monitor in loaded_monitors:
for code_module in code_modules:
with catch_exceptions(_logger):
monitor_path = monitors_paths.get(monitor.id)
if monitor_path is None:
continue
monitor = monitors[code_module.monitor_id]

monitor_path = monitors_paths[monitor.id]
monitor_module = cast(MonitorModule, module_loader.load_module_from_file(monitor_path))
_configure_monitor(monitor_module)

Expand All @@ -238,11 +281,12 @@ async def _load_monitors() -> None:

async def _run() -> None:
"""Monitors loading loop, loading them recurrently. Stops automatically when the app stops"""
last_load_time: datetime
last_load_time: datetime | None = None

while app.running():
with catch_exceptions(_logger):
await _load_monitors()
await _disable_monitors_without_code_modules()
await _load_monitors(last_load_time)
last_load_time = now()

# The sleep task will start seconds earlier to try to load all monitors before the
Expand Down
24 changes: 23 additions & 1 deletion src/models/code_module.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from sqlalchemy import ForeignKey, Integer, String
from datetime import datetime
from typing import Sequence

from sqlalchemy import DateTime, ForeignKey, Integer, String
from sqlalchemy.dialects import postgresql
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.orm import Mapped, mapped_column
Expand All @@ -16,6 +19,25 @@ class CodeModule(Base):
MutableDict.as_mutable(postgresql.JSON), # type: ignore[arg-type]
nullable=True,
)
registered_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=True)

# Code modules won't trigger events when they are created
_enable_creation_event: bool = False

@classmethod
async def get_updated_code_modules(
cls: type["CodeModule"],
monitors_ids: list[int],
reference_timestamp: datetime | None,
) -> Sequence["CodeModule"]:
"""Get all code modules that were updated after a reference timestamp"""
if not monitors_ids:
return []

if reference_timestamp is None:
return await cls.get_all(cls.monitor_id.in_(monitors_ids))

return await cls.get_all(
cls.monitor_id.in_(monitors_ids),
cls.registered_at > reference_timestamp,
)
2 changes: 2 additions & 0 deletions src/registry/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
add_monitor,
get_monitor_module,
get_monitors,
get_monitors_ids,
init,
is_monitor_registered,
monitors_pending,
Expand All @@ -16,6 +17,7 @@
"MonitorsLoadError",
"add_monitor",
"get_monitor_module",
"get_monitors_ids",
"get_monitors",
"init",
"is_monitor_registered",
Expand Down
5 changes: 5 additions & 0 deletions src/registry/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ async def wait_monitor_loaded(monitor_id: int) -> None:
raise MonitorNotRegisteredError(f"Monitor '{monitor_id}' not registered")


def get_monitors_ids() -> list[int]:
"""Get all the monitors"""
return list(_monitors.keys())


def get_monitors() -> list[MonitorInfo]:
"""Get all the monitors"""
return list(_monitors.values())
Expand Down
8 changes: 4 additions & 4 deletions tests/components/controller/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ async def test_run(monkeypatch, clear_queue, clear_database):
assert len(queue_items) == 0

# Load the monitors and wait for a while
await monitors_loader._load_monitors()
await monitors_loader._load_monitors(None)
await asyncio.sleep(0.2)

# Stop the app and wait for the controller task
Expand Down Expand Up @@ -399,7 +399,7 @@ async def test_run_monitors_not_ready(caplog, monkeypatch, mocker):

# Run the controller for a while then stop it
await monitors_loader._register_monitors()
await monitors_loader._load_monitors()
await monitors_loader._load_monitors(None)
registry.monitors_ready.clear()

controller_task = asyncio.create_task(controller.run())
Expand All @@ -419,7 +419,7 @@ async def test_run_monitors_not_registered(caplog, monkeypatch, mocker):

# Run the controller for a while then stop it
await monitors_loader._register_monitors()
await monitors_loader._load_monitors()
await monitors_loader._load_monitors(None)

controller_task = asyncio.create_task(controller.run())
await asyncio.sleep(0.2)
Expand All @@ -442,7 +442,7 @@ def error(*args):
# Run the controller for a while then stop it
await monitors_loader._register_monitors()
controller_task = asyncio.create_task(controller.run())
await monitors_loader._load_monitors()
await monitors_loader._load_monitors(None)
await asyncio.sleep(0.3)

assert_message_in_log(caplog, "ValueError: Not able to get the monitors")
Expand Down
Loading