Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions docs/plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,22 @@ Actions are used as custom behaviors to requests received by sentinela. If senti

Actions must have the following signature:
```python
async def action_name(message_payload: dict[Any, Any]):
from data_models.request_payload import RequestPayload


async def action_name(message_payload: RequestPayload):
```

The `RequestPayload` object contains the action name and the parameters sent by the request. The parameters will vary depending on the action.

An example of the action call made by Sentinela is:
```python
await plugin.my_plugin.actions.action_name({"key": "value"})
from data_models.request_payload import RequestPayload


await plugin.my_plugin.actions.action_name(
RequestPayload(action="my_plugin.action_name", params={"key": "value"})
)
```

## Notifications
Expand Down
8 changes: 4 additions & 4 deletions src/commands/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def alert_acknowledge(alert_id: int) -> None:
type="request",
payload={
"action": "alert_acknowledge",
"target_id": alert_id,
"params": {"target_id": alert_id},
},
)

Expand All @@ -51,7 +51,7 @@ async def alert_lock(alert_id: int) -> None:
type="request",
payload={
"action": "alert_lock",
"target_id": alert_id,
"params": {"target_id": alert_id},
},
)

Expand All @@ -62,7 +62,7 @@ async def alert_solve(alert_id: int) -> None:
type="request",
payload={
"action": "alert_solve",
"target_id": alert_id,
"params": {"target_id": alert_id},
},
)

Expand All @@ -73,6 +73,6 @@ async def issue_drop(issue_id: int) -> None:
type="request",
payload={
"action": "issue_drop",
"target_id": issue_id,
"params": {"target_id": issue_id},
},
)
26 changes: 18 additions & 8 deletions src/components/executor/monitor_handler.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import asyncio
import json
import logging
import traceback
from datetime import datetime
from typing import Any, cast
from typing import Any, Literal, cast

import prometheus_client
from pydantic import ValidationError

import registry as registry
from base_exception import BaseSentinelaException
from data_models.process_monitor_payload import ProcessMonitorPayload
from internal_database import get_session
from models import Alert, Issue, Monitor
from utils.async_tools import do_concurrently
Expand Down Expand Up @@ -279,7 +282,7 @@ async def _alerts_routine(monitor: Monitor) -> None:
await do_concurrently(*[alert.update() for alert in monitor.active_alerts])


async def _run_routines(monitor: Monitor, tasks: list[str]) -> None:
async def _run_routines(monitor: Monitor, tasks: list[Literal["search", "update"]]) -> None:
"""Run all routines for a monitor, based on a list of tasks"""
# Monitor instrumentation metrics
prometheus_labels = {
Expand Down Expand Up @@ -321,9 +324,16 @@ async def _run_routines(monitor: Monitor, tasks: list[str]) -> None:
async def run(message: dict[Any, Any]) -> None:
"""Process a message with type 'process_monitor', loading the monitor and executing it's
routines, while also detecting errors and reporting them accordingly"""
message_payload = message["payload"]
try:
message_payload = ProcessMonitorPayload(**message["payload"])
except KeyError:
_logger.error(f"Message '{json.dumps(message)}' missing 'payload' field")
return
except ValidationError as e:
_logger.error(f"Invalid payload: {e}")
return

monitor_id = message_payload["monitor_id"]
monitor_id = message_payload.monitor_id
monitor = await Monitor.get_by_id(monitor_id)
if monitor is None:
_logger.error(f"Monitor {monitor_id} not found. Skipping message")
Expand All @@ -340,17 +350,17 @@ async def run(message: dict[Any, Any]) -> None:
"monitor_name": monitor.name,
}

try:
monitor_running = prometheus_monitor_running.labels(**prometheus_labels)
monitor_running.inc()
monitor_running = prometheus_monitor_running.labels(**prometheus_labels)
monitor_running.inc()

try:
monitor.set_running(True)
await monitor.save()

monitor_execution_time = prometheus_monitor_execution_time.labels(**prometheus_labels)
with monitor_execution_time.time():
await asyncio.wait_for(
_run_routines(monitor, message_payload["tasks"]), monitor.options.execution_timeout
_run_routines(monitor, message_payload.tasks), monitor.options.execution_timeout
)
except asyncio.TimeoutError:
monitor_timeout_count = prometheus_monitor_timeout_count.labels(**prometheus_labels)
Expand Down
11 changes: 10 additions & 1 deletion src/components/executor/reaction_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any

import prometheus_client
from pydantic import ValidationError

import registry as registry
from base_exception import BaseSentinelaException
Expand Down Expand Up @@ -35,7 +36,15 @@
async def run(message: dict[Any, Any]) -> None:
"""Process a message with type 'event' using the monitor's defined list of reactions for the
event. The execution timeout is for each function individually"""
event_payload = EventPayload(**message["payload"])
try:
event_payload = EventPayload(**message["payload"])
except KeyError:
_logger.error(f"Message '{json.dumps(message)}' missing 'payload' field")
return
except ValidationError as e:
_logger.error(f"Invalid payload: {e}")
return

monitor_id = event_payload.event_source_monitor_id
event_name = event_payload.event_name

Expand Down
42 changes: 27 additions & 15 deletions src/components/executor/request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
from typing import Any, Callable, Coroutine, cast

import prometheus_client
from pydantic import ValidationError

import plugins
import registry as registry
from base_exception import BaseSentinelaException
from configs import configs
from data_models.request_payload import RequestPayload
from models import Alert, Issue

_logger = logging.getLogger("request_handler")
Expand All @@ -31,9 +33,9 @@
)


async def alert_acknowledge(message_payload: dict[Any, Any]) -> None:
async def alert_acknowledge(message_payload: RequestPayload) -> None:
"""Acknowledge an alert"""
alert_id = message_payload["target_id"]
alert_id = message_payload.params["target_id"]
alert = await Alert.get_by_id(alert_id)
if alert is None:
_logger.info(f"Alert '{alert_id}' not found")
Expand All @@ -42,9 +44,9 @@ async def alert_acknowledge(message_payload: dict[Any, Any]) -> None:
await alert.acknowledge()


async def alert_lock(message_payload: dict[Any, Any]) -> None:
async def alert_lock(message_payload: RequestPayload) -> None:
"""Lock an alert"""
alert_id = message_payload["target_id"]
alert_id = message_payload.params["target_id"]
alert = await Alert.get_by_id(alert_id)
if alert is None:
_logger.info(f"Alert '{alert_id}' not found")
Expand All @@ -53,9 +55,9 @@ async def alert_lock(message_payload: dict[Any, Any]) -> None:
await alert.lock()


async def alert_solve(message_payload: dict[Any, Any]) -> None:
async def alert_solve(message_payload: RequestPayload) -> None:
"""Solve all alert's issues"""
alert_id = message_payload["target_id"]
alert_id = message_payload.params["target_id"]
alert = await Alert.get_by_id(alert_id)
if alert is None:
_logger.info(f"Alert '{alert_id}' not found")
Expand All @@ -64,9 +66,9 @@ async def alert_solve(message_payload: dict[Any, Any]) -> None:
await alert.solve_issues()


async def issue_drop(message_payload: dict[Any, Any]) -> None:
async def issue_drop(message_payload: RequestPayload) -> None:
"""Drop an issue"""
issue_id = message_payload["target_id"]
issue_id = message_payload.params["target_id"]
issue = await Issue.get_by_id(issue_id)
if issue is None:
_logger.info(f"Issue '{issue_id}' not found")
Expand All @@ -83,7 +85,7 @@ async def issue_drop(message_payload: dict[Any, Any]) -> None:
}


def get_action(action_name: str) -> Callable[[dict[Any, Any]], Coroutine[Any, Any, None]] | None:
def get_action(action_name: str) -> Callable[[RequestPayload], Coroutine[Any, Any, None]] | None:
"""Get the action function by its name, checking if it is a plugin action"""
if action_name.startswith("plugin."):
plugin_name, action_name = action_name.split(".")[1:3]
Expand All @@ -103,31 +105,41 @@ def get_action(action_name: str) -> Callable[[dict[Any, Any]], Coroutine[Any, An
_logger.warning(f"Action '{plugin_name}.{action_name}' unknown")
return None

return cast(Callable[[dict[Any, Any]], Coroutine[Any, Any, None]], action)
return cast(Callable[[RequestPayload], Coroutine[Any, Any, None]], action)

return actions.get(action_name)


async def run(message: dict[Any, Any]) -> None:
"""Process a received request"""
message_payload = message["payload"]
action_name = message_payload["action"]
try:
message_payload = RequestPayload(**message["payload"])
except KeyError:
_logger.error(f"Message '{json.dumps(message)}' missing 'payload' field")
return
except ValidationError as e:
_logger.error(f"Invalid payload: {e}")
return

action_name = message_payload.action

action = get_action(action_name)

if action is None:
_logger.warning(f"Got request with unknown action '{json.dumps(message_payload)}'")
_logger.warning(
f"Got request with unknown action '{json.dumps(message_payload.to_dict())}'"
)
return

try:
with prometheus_request_execution_time.labels(action_name=action_name).time():
await asyncio.wait_for(action(message_payload), configs.executor_request_timeout)
except asyncio.TimeoutError:
prometheus_request_timeout_count.labels(action_name=action_name).inc()
_logger.error(f"Timed out executing request '{json.dumps(message_payload)}'")
_logger.error(f"Timed out executing request '{json.dumps(message_payload.to_dict())}'")
except BaseSentinelaException as e:
raise e
except Exception:
prometheus_request_error_count.labels(action_name=action_name).inc()
_logger.error(f"Error executing request '{json.dumps(message_payload)}'")
_logger.error(f"Error executing request '{json.dumps(message_payload.to_dict())}'")
_logger.error(traceback.format_exc().strip())
3 changes: 3 additions & 0 deletions src/data_models/process_monitor_payload/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .process_monitor_payload import ProcessMonitorPayload

__all__ = ["ProcessMonitorPayload"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from typing import Literal

from pydantic.dataclasses import dataclass


@dataclass
class ProcessMonitorPayload:
monitor_id: int
tasks: list[Literal["search", "update"]]
3 changes: 3 additions & 0 deletions src/data_models/request_payload/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .request_payload import RequestPayload

__all__ = ["RequestPayload"]
16 changes: 16 additions & 0 deletions src/data_models/request_payload/request_payload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import Any

from pydantic.dataclasses import dataclass


@dataclass
class RequestPayload:
action: str
params: dict[str, Any]

def to_dict(self) -> dict[str, Any]:
return {
field: value
for field in self.__dataclass_fields__
if (value := getattr(self, field)) is not None
}
2 changes: 1 addition & 1 deletion src/plugins/slack/services/pattern_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def resend_notifications(
type="request",
payload={
"action": "plugin.slack.resend_notifications",
"slack_channel": context["channel"],
"params": {"slack_channel": context["channel"]},
},
)

Expand Down
8 changes: 4 additions & 4 deletions tests/commands/test_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ async def test_alert_acknowledge(clear_queue, target_id):
"type": "request",
"payload": {
"action": "alert_acknowledge",
"target_id": target_id,
"params": {"target_id": target_id},
},
}
)
Expand All @@ -114,7 +114,7 @@ async def test_alert_lock(clear_queue, target_id):
"type": "request",
"payload": {
"action": "alert_lock",
"target_id": target_id,
"params": {"target_id": target_id},
},
}
)
Expand All @@ -134,7 +134,7 @@ async def test_alert_solve(clear_queue, target_id):
"type": "request",
"payload": {
"action": "alert_solve",
"target_id": target_id,
"params": {"target_id": target_id},
},
}
)
Expand All @@ -154,7 +154,7 @@ async def test_issue_drop(clear_queue, target_id):
"type": "request",
"payload": {
"action": "issue_drop",
"target_id": target_id,
"params": {"target_id": target_id},
},
}
)
Expand Down
Loading