55from typing import Any , Callable , Coroutine , cast
66
77import prometheus_client
8+ from pydantic import ValidationError
89
910import plugins
1011import registry as registry
1112from base_exception import BaseSentinelaException
1213from configs import configs
14+ from data_models .request_payload import RequestPayload
1315from models import Alert , Issue
1416
1517_logger = logging .getLogger ("request_handler" )
3133)
3234
3335
34- async def alert_acknowledge (message_payload : dict [ Any , Any ] ) -> None :
36+ async def alert_acknowledge (message_payload : RequestPayload ) -> None :
3537 """Acknowledge an alert"""
36- alert_id = message_payload ["target_id" ]
38+ alert_id = message_payload . params ["target_id" ]
3739 alert = await Alert .get_by_id (alert_id )
3840 if alert is None :
3941 _logger .info (f"Alert '{ alert_id } ' not found" )
@@ -42,9 +44,9 @@ async def alert_acknowledge(message_payload: dict[Any, Any]) -> None:
4244 await alert .acknowledge ()
4345
4446
45- async def alert_lock (message_payload : dict [ Any , Any ] ) -> None :
47+ async def alert_lock (message_payload : RequestPayload ) -> None :
4648 """Lock an alert"""
47- alert_id = message_payload ["target_id" ]
49+ alert_id = message_payload . params ["target_id" ]
4850 alert = await Alert .get_by_id (alert_id )
4951 if alert is None :
5052 _logger .info (f"Alert '{ alert_id } ' not found" )
@@ -53,9 +55,9 @@ async def alert_lock(message_payload: dict[Any, Any]) -> None:
5355 await alert .lock ()
5456
5557
56- async def alert_solve (message_payload : dict [ Any , Any ] ) -> None :
58+ async def alert_solve (message_payload : RequestPayload ) -> None :
5759 """Solve all alert's issues"""
58- alert_id = message_payload ["target_id" ]
60+ alert_id = message_payload . params ["target_id" ]
5961 alert = await Alert .get_by_id (alert_id )
6062 if alert is None :
6163 _logger .info (f"Alert '{ alert_id } ' not found" )
@@ -64,9 +66,9 @@ async def alert_solve(message_payload: dict[Any, Any]) -> None:
6466 await alert .solve_issues ()
6567
6668
67- async def issue_drop (message_payload : dict [ Any , Any ] ) -> None :
69+ async def issue_drop (message_payload : RequestPayload ) -> None :
6870 """Drop an issue"""
69- issue_id = message_payload ["target_id" ]
71+ issue_id = message_payload . params ["target_id" ]
7072 issue = await Issue .get_by_id (issue_id )
7173 if issue is None :
7274 _logger .info (f"Issue '{ issue_id } ' not found" )
@@ -83,7 +85,7 @@ async def issue_drop(message_payload: dict[Any, Any]) -> None:
8385}
8486
8587
86- def get_action (action_name : str ) -> Callable [[dict [ Any , Any ] ], Coroutine [Any , Any , None ]] | None :
88+ def get_action (action_name : str ) -> Callable [[RequestPayload ], Coroutine [Any , Any , None ]] | None :
8789 """Get the action function by its name, checking if it is a plugin action"""
8890 if action_name .startswith ("plugin." ):
8991 plugin_name , action_name = action_name .split ("." )[1 :3 ]
@@ -103,31 +105,41 @@ def get_action(action_name: str) -> Callable[[dict[Any, Any]], Coroutine[Any, An
103105 _logger .warning (f"Action '{ plugin_name } .{ action_name } ' unknown" )
104106 return None
105107
106- return cast (Callable [[dict [ Any , Any ] ], Coroutine [Any , Any , None ]], action )
108+ return cast (Callable [[RequestPayload ], Coroutine [Any , Any , None ]], action )
107109
108110 return actions .get (action_name )
109111
110112
111113async def run (message : dict [Any , Any ]) -> None :
112114 """Process a received request"""
113- message_payload = message ["payload" ]
114- action_name = message_payload ["action" ]
115+ try :
116+ message_payload = RequestPayload (** message ["payload" ])
117+ except KeyError :
118+ _logger .error (f"Message '{ json .dumps (message )} ' missing 'payload' field" )
119+ return
120+ except ValidationError as e :
121+ _logger .error (f"Invalid payload: { e } " )
122+ return
123+
124+ action_name = message_payload .action
115125
116126 action = get_action (action_name )
117127
118128 if action is None :
119- _logger .warning (f"Got request with unknown action '{ json .dumps (message_payload )} '" )
129+ _logger .warning (
130+ f"Got request with unknown action '{ json .dumps (message_payload .to_dict ())} '"
131+ )
120132 return
121133
122134 try :
123135 with prometheus_request_execution_time .labels (action_name = action_name ).time ():
124136 await asyncio .wait_for (action (message_payload ), configs .executor_request_timeout )
125137 except asyncio .TimeoutError :
126138 prometheus_request_timeout_count .labels (action_name = action_name ).inc ()
127- _logger .error (f"Timed out executing request '{ json .dumps (message_payload )} '" )
139+ _logger .error (f"Timed out executing request '{ json .dumps (message_payload . to_dict () )} '" )
128140 except BaseSentinelaException as e :
129141 raise e
130142 except Exception :
131143 prometheus_request_error_count .labels (action_name = action_name ).inc ()
132- _logger .error (f"Error executing request '{ json .dumps (message_payload )} '" )
144+ _logger .error (f"Error executing request '{ json .dumps (message_payload . to_dict () )} '" )
133145 _logger .error (traceback .format_exc ().strip ())
0 commit comments