@@ -7,6 +7,7 @@ use std::sync::Arc;
77use async_compression:: tokio:: write:: GzipEncoder ;
88use chrono:: TimeZone ;
99use futures:: io:: { BufReader , BufWriter } ;
10+ use futures:: stream:: FuturesUnordered ;
1011use futures:: { Stream , StreamExt , TryStreamExt } ;
1112use log:: { error, info, warn} ;
1213use sha1:: Digest ;
@@ -19,6 +20,7 @@ use shvrpc::rpcmessage::{RpcError, RpcErrorCode};
1920use shvrpc:: { RpcMessage , RpcMessageMetaTags } ;
2021use tokio:: fs:: DirEntry ;
2122use tokio:: io:: { AsyncBufRead , AsyncReadExt , AsyncSeekExt , AsyncWriteExt } ;
23+ use tokio:: sync:: Semaphore ;
2224use tokio_stream:: wrappers:: ReadDirStream ;
2325use tokio_util:: compat:: TokioAsyncReadCompatExt ;
2426use tokio_util:: io:: ReaderStream ;
@@ -319,6 +321,7 @@ async fn root_request_handler(
319321 nodes. append ( & mut children_on_path ( & app_state. sites_data . read ( ) . await . sites_info , ROOT_PATH ) . unwrap_or_default ( ) ) ;
320322 Ok ( nodes. into ( ) )
321323 }
324+ METH_ALARM_LOG => alarmlog_handler ( rq, app_state) . await ,
322325 _ => Ok ( "Not implemented" . into ( ) ) ,
323326 }
324327}
@@ -957,44 +960,74 @@ async fn alarmlog_handler(
957960 let params = AlarmLogParams :: try_from ( rq. param ( ) . unwrap_or_default ( ) )
958961 . map_err ( |err| RpcError :: new ( RpcErrorCode :: InvalidParam , format ! ( "Wrong alarmLog parameters: {err}" ) ) ) ?;
959962
960- let site_path = rq. shv_path ( ) . unwrap_or_default ( ) ;
961-
962- let getlog_params = GetLog2Params {
963- since : GetLog2Since :: DateTime ( params. since ) ,
964- until : Some ( params. until ) ,
965- with_snapshot : true ,
966- ..Default :: default ( )
967- } ;
968-
969963 #[ derive( ToRpcValue ) ]
970964 struct AlarmLog {
971965 snapshot : Vec < AlarmWithTimestamp > ,
972966 events : Vec < AlarmWithTimestamp > ,
973967 }
974968
975- let typeinfos = & app_state. sites_data . read ( ) . await . typeinfos ;
976- let Some ( Ok ( type_info) ) = typeinfos. get ( site_path) else {
977- return Ok ( BTreeMap :: from ( [ ( site_path. to_string ( ) , RpcValue :: from ( shvproto:: List :: new ( ) ) ) ] ) . into ( ) ) ;
978- } ;
969+ let typeinfos = app_state. sites_data . read ( ) . await . typeinfos . clone ( ) ;
979970
980- let mut tmp_alarms = Vec :: < AlarmWithTimestamp > :: new ( ) ;
971+ let valid_typeinfos = typeinfos . iter ( ) . filter_map ( | ( site_path , type_info ) | type_info . as_ref ( ) . ok ( ) . map ( |type_info| ( site_path , type_info ) ) ) ;
981972
982- let log = getlog_handler ( site_path, & getlog_params, app_state. clone ( ) ) . await ?;
983- for entry in log. snapshot_entries {
984- update_alarms ( & mut tmp_alarms, type_info, & entry. path , & entry. value , shvproto:: DateTime :: from_epoch_msec ( entry. epoch_msec ) ) ;
985- }
973+ let site_path_prefix = rq. shv_path ( ) . unwrap_or_default ( ) ;
974+ let getlog_params = Arc :: new ( GetLog2Params {
975+ since : GetLog2Since :: DateTime ( params. since ) ,
976+ until : Some ( params. until ) ,
977+ with_snapshot : true ,
978+ ..Default :: default ( )
979+ } ) ;
980+
981+ const MAX_SYNC_TASKS_DEFAULT : usize = 8 ;
982+ let semaphore = Arc :: new ( Semaphore :: new ( MAX_SYNC_TASKS_DEFAULT ) ) ;
983+ let getlog_results = valid_typeinfos
984+ . filter ( |( site_path, _) | site_path. starts_with ( site_path_prefix) )
985+ . map ( |( site_path, type_info) | {
986+ let app_state = app_state. clone ( ) ;
987+ let getlog_params = getlog_params. clone ( ) ;
988+ let semaphore = semaphore. clone ( ) ;
989+ async move {
990+ let permit = semaphore
991+ . clone ( )
992+ . acquire_owned ( )
993+ . await
994+ . unwrap_or_else ( |e| panic ! ( "Cannot acquire semaphore: {e}" ) ) ;
995+ let log = getlog_handler ( site_path. as_str ( ) , getlog_params. clone ( ) . as_ref ( ) , app_state) . await ;
996+ drop ( permit) ;
997+ ( site_path, log, type_info)
998+ }
999+ } )
1000+ . collect :: < FuturesUnordered < _ > > ( )
1001+ . collect :: < Vec < _ > > ( )
1002+ . await
1003+ . into_iter ( )
1004+ . filter_map ( |( site_path, result, type_info) |
1005+ result
1006+ . inspect_err ( |err| warn ! ( "alarmlog: Failed to fetch log for site '{site_path}': {err}" ) )
1007+ . ok ( )
1008+ . map ( |result| ( site_path, result, type_info) )
1009+ )
1010+ . map ( |( site_path, log, type_info) | {
1011+ let mut tmp_alarms = Vec :: < AlarmWithTimestamp > :: new ( ) ;
1012+ for entry in log. snapshot_entries {
1013+ update_alarms ( & mut tmp_alarms, type_info, & entry. path , & entry. value , shvproto:: DateTime :: from_epoch_msec ( entry. epoch_msec ) ) ;
1014+ }
9861015
987- let snapshot = tmp_alarms. clone ( ) ;
1016+ let snapshot = tmp_alarms. clone ( ) ;
9881017
989- let events = log. event_entries
990- . into_iter ( )
991- . flat_map ( |entry| update_alarms ( & mut tmp_alarms, type_info, & entry. path , & entry. value , shvproto:: DateTime :: from_epoch_msec ( entry. epoch_msec ) ) )
992- . collect :: < Vec < _ > > ( ) ;
1018+ let events = log. event_entries
1019+ . into_iter ( )
1020+ . flat_map ( |entry| update_alarms ( & mut tmp_alarms, type_info, & entry. path , & entry. value , shvproto:: DateTime :: from_epoch_msec ( entry. epoch_msec ) ) )
1021+ . collect :: < Vec < _ > > ( ) ;
1022+
1023+ ( site_path. to_string ( ) , AlarmLog {
1024+ events,
1025+ snapshot,
1026+ } )
1027+ } )
1028+ . collect :: < BTreeMap < _ , _ > > ( ) ;
9931029
994- Ok ( AlarmLog {
995- events,
996- snapshot,
997- } . into ( ) )
1030+ Ok ( getlog_results. into ( ) )
9981031}
9991032
10001033type JournalEntryStream = Pin < Box < dyn Stream < Item = Result < JournalEntry , Box < dyn Error + Send + Sync > > > + Send + Sync > > ;
@@ -1285,6 +1318,7 @@ pub(crate) async fn methods_getter(
12851318 signals : & [ ] ,
12861319 description : "" ,
12871320 } ,
1321+ & META_METHOD_ALARM_LOG ,
12881322 // TODO: All root node methods:
12891323 //
12901324 // appName
@@ -1302,18 +1336,22 @@ pub(crate) async fn methods_getter(
13021336 let children = children_on_path ( & sites_data. sites_info , & path) ?;
13031337 if children. is_empty ( ) {
13041338 // `path` is a site path
1305- let meta_methods = if sites_data. sites_info
1306- . get ( & path)
1339+ let is_pushlog = sites_data. sites_info . get ( & path)
13071340 . and_then ( |site_info| sites_data. sub_hps . get ( & site_info. sub_hp ) )
1308- . is_some_and ( |sub_hp_info| matches ! ( sub_hp_info, SubHpInfo :: PushLog ) ) {
1309- MetaMethods :: from ( & [ & META_METHOD_GET_LOG , & META_METHOD_PUSH_LOG ] )
1310- } else {
1311- MetaMethods :: from ( & [ & META_METHOD_GET_LOG , & META_METHOD_ALARM_TABLE , & META_METHOD_ALARM_LOG ] )
1312- } ;
1313- Some ( meta_methods)
1341+ . is_some_and ( |sub_hp_info| matches ! ( sub_hp_info, SubHpInfo :: PushLog ) ) ;
1342+
1343+ if is_pushlog {
1344+ return Some ( MetaMethods :: from ( & [ & META_METHOD_GET_LOG , & META_METHOD_PUSH_LOG ] ) )
1345+ }
1346+
1347+ if sites_data. typeinfos . get ( & path) . is_some_and ( Result :: is_ok) {
1348+ Some ( MetaMethods :: from ( & [ & META_METHOD_GET_LOG , & META_METHOD_ALARM_TABLE , & META_METHOD_ALARM_LOG ] ) )
1349+ } else {
1350+ Some ( MetaMethods :: from ( & [ & META_METHOD_GET_LOG , & META_METHOD_ALARM_LOG ] ) )
1351+ }
13141352 } else {
13151353 // `path` is a dir in the middle of the tree
1316- Some ( MetaMethods :: from ( & [ ] ) )
1354+ Some ( MetaMethods :: from ( & [ & META_METHOD_ALARM_LOG ] ) )
13171355 }
13181356 }
13191357 }
0 commit comments