From 2d86bc7e5fb81133f001efc99b3dd2ffca82a4bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=A1clav=20Kubern=C3=A1t?= Date: Thu, 23 Oct 2025 15:50:53 +0200 Subject: [PATCH] Add alarmLog to all nodes --- src/tree.rs | 110 +++++++++++++++++++++++++++++++++++----------------- 1 file changed, 74 insertions(+), 36 deletions(-) diff --git a/src/tree.rs b/src/tree.rs index dc28702..0e04ec7 100644 --- a/src/tree.rs +++ b/src/tree.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use async_compression::tokio::write::GzipEncoder; use chrono::TimeZone; use futures::io::{BufReader, BufWriter}; +use futures::stream::FuturesUnordered; use futures::{Stream, StreamExt, TryStreamExt}; use log::{error, info, warn}; use sha1::Digest; @@ -19,6 +20,7 @@ use shvrpc::rpcmessage::{RpcError, RpcErrorCode}; use shvrpc::{RpcMessage, RpcMessageMetaTags}; use tokio::fs::DirEntry; use tokio::io::{AsyncBufRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; +use tokio::sync::Semaphore; use tokio_stream::wrappers::ReadDirStream; use tokio_util::compat::TokioAsyncReadCompatExt; use tokio_util::io::ReaderStream; @@ -319,6 +321,7 @@ async fn root_request_handler( nodes.append(&mut children_on_path(&app_state.sites_data.read().await.sites_info, ROOT_PATH).unwrap_or_default()); Ok(nodes.into()) } + METH_ALARM_LOG => alarmlog_handler(rq, app_state).await, _ => Ok("Not implemented".into()), } } @@ -957,44 +960,74 @@ async fn alarmlog_handler( let params = AlarmLogParams::try_from(rq.param().unwrap_or_default()) .map_err(|err| RpcError::new(RpcErrorCode::InvalidParam, format!("Wrong alarmLog parameters: {err}")))?; - let site_path = rq.shv_path().unwrap_or_default(); - - let getlog_params = GetLog2Params { - since: GetLog2Since::DateTime(params.since), - until: Some(params.until), - with_snapshot: true, - ..Default::default() - }; - #[derive(ToRpcValue)] struct AlarmLog { snapshot: Vec, events: Vec, } - let typeinfos = &app_state.sites_data.read().await.typeinfos; - let Some(Ok(type_info)) = typeinfos.get(site_path) else { - return Ok(BTreeMap::from([(site_path.to_string(), RpcValue::from(shvproto::List::new()))]).into()); - }; + let typeinfos = app_state.sites_data.read().await.typeinfos.clone(); - let mut tmp_alarms = Vec::::new(); + let valid_typeinfos = typeinfos.iter().filter_map(|(site_path, type_info)| type_info.as_ref().ok().map(|type_info| (site_path, type_info))); - let log = getlog_handler(site_path, &getlog_params, app_state.clone()).await?; - for entry in log.snapshot_entries { - update_alarms(&mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec)); - } + let site_path_prefix = rq.shv_path().unwrap_or_default(); + let getlog_params = Arc::new(GetLog2Params { + since: GetLog2Since::DateTime(params.since), + until: Some(params.until), + with_snapshot: true, + ..Default::default() + }); + + const MAX_SYNC_TASKS_DEFAULT: usize = 8; + let semaphore = Arc::new(Semaphore::new(MAX_SYNC_TASKS_DEFAULT)); + let alarmlog_results = valid_typeinfos + .filter(|(site_path, _)| site_path.starts_with(site_path_prefix) && (site_path.len() == site_path_prefix.len() || site_path.as_bytes()[site_path_prefix.len()] == b'/')) + .map(|(site_path, type_info)| { + let app_state = app_state.clone(); + let getlog_params = getlog_params.clone(); + let semaphore = semaphore.clone(); + async move { + let permit = semaphore + .clone() + .acquire_owned() + .await + .unwrap_or_else(|e| panic!("Cannot acquire semaphore: {e}")); + let log = getlog_handler(site_path.as_str(), getlog_params.clone().as_ref(), app_state).await; + drop(permit); + (site_path, log, type_info) + } + }) + .collect::>() + .collect::>() + .await + .into_iter() + .filter_map(|(site_path, result, type_info)| + result + .inspect_err(|err| warn!("alarmlog: Failed to fetch log for site '{site_path}': {err}")) + .ok() + .map(|result| (site_path, result, type_info)) + ) + .map(|(site_path, log, type_info)| { + let mut tmp_alarms = Vec::::new(); + for entry in log.snapshot_entries { + update_alarms(&mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec)); + } - let snapshot = tmp_alarms.clone(); + let snapshot = tmp_alarms.clone(); - let events = log.event_entries - .into_iter() - .flat_map(|entry| update_alarms(&mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec))) - .collect::>(); + let events = log.event_entries + .into_iter() + .flat_map(|entry| update_alarms(&mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec))) + .collect::>(); + + (site_path.to_string(), AlarmLog { + events, + snapshot, + }) + }) + .collect::>(); - Ok(AlarmLog { - events, - snapshot, - }.into()) + Ok(alarmlog_results.into()) } type JournalEntryStream = Pin>> + Send + Sync>>; @@ -1285,6 +1318,7 @@ pub(crate) async fn methods_getter( signals: &[], description: "", }, + &META_METHOD_ALARM_LOG, // TODO: All root node methods: // // appName @@ -1302,18 +1336,22 @@ pub(crate) async fn methods_getter( let children = children_on_path(&sites_data.sites_info, &path)?; if children.is_empty() { // `path` is a site path - let meta_methods = if sites_data.sites_info - .get(&path) + let is_pushlog = sites_data.sites_info.get(&path) .and_then(|site_info| sites_data.sub_hps.get(&site_info.sub_hp)) - .is_some_and(|sub_hp_info| matches!(sub_hp_info, SubHpInfo::PushLog)) { - MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_PUSH_LOG]) - } else { - MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_ALARM_TABLE, &META_METHOD_ALARM_LOG]) - }; - Some(meta_methods) + .is_some_and(|sub_hp_info| matches!(sub_hp_info, SubHpInfo::PushLog)); + + if is_pushlog { + return Some(MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_PUSH_LOG])) + } + + if sites_data.typeinfos.get(&path).is_some_and(Result::is_ok) { + Some(MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_ALARM_TABLE, &META_METHOD_ALARM_LOG])) + } else { + Some(MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_ALARM_LOG])) + } } else { // `path` is a dir in the middle of the tree - Some(MetaMethods::from(&[])) + Some(MetaMethods::from(&[&META_METHOD_ALARM_LOG])) } } }