Skip to content

Commit d5f8d96

Browse files
committed
Add alarmLog to all nodes
1 parent 7d5481c commit d5f8d96

1 file changed

Lines changed: 74 additions & 36 deletions

File tree

src/tree.rs

Lines changed: 74 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::sync::Arc;
77
use async_compression::tokio::write::GzipEncoder;
88
use chrono::TimeZone;
99
use futures::io::{BufReader, BufWriter};
10+
use futures::stream::FuturesUnordered;
1011
use futures::{Stream, StreamExt, TryStreamExt};
1112
use log::{error, info, warn};
1213
use sha1::Digest;
@@ -19,6 +20,7 @@ use shvrpc::rpcmessage::{RpcError, RpcErrorCode};
1920
use shvrpc::{RpcMessage, RpcMessageMetaTags};
2021
use tokio::fs::DirEntry;
2122
use tokio::io::{AsyncBufRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
23+
use tokio::sync::Semaphore;
2224
use tokio_stream::wrappers::ReadDirStream;
2325
use tokio_util::compat::TokioAsyncReadCompatExt;
2426
use tokio_util::io::ReaderStream;
@@ -319,6 +321,7 @@ async fn root_request_handler(
319321
nodes.append(&mut children_on_path(&app_state.sites_data.read().await.sites_info, ROOT_PATH).unwrap_or_default());
320322
Ok(nodes.into())
321323
}
324+
METH_ALARM_LOG => alarmlog_handler(rq, app_state).await,
322325
_ => Ok("Not implemented".into()),
323326
}
324327
}
@@ -933,44 +936,74 @@ async fn alarmlog_handler(
933936
let params = AlarmLogParams::try_from(rq.param().unwrap_or_default())
934937
.map_err(|err| RpcError::new(RpcErrorCode::InvalidParam, format!("Wrong alarmLog parameters: {err}")))?;
935938

936-
let site_path = rq.shv_path().unwrap_or_default();
937-
938-
let getlog_params = GetLog2Params {
939-
since: GetLog2Since::DateTime(params.since),
940-
until: Some(params.until),
941-
with_snapshot: true,
942-
..Default::default()
943-
};
944-
945939
#[derive(ToRpcValue)]
946940
struct AlarmLog {
947941
snapshot: Vec<AlarmWithTimestamp>,
948942
events: Vec<AlarmWithTimestamp>,
949943
}
950944

951-
let typeinfos = &app_state.sites_data.read().await.typeinfos;
952-
let Some(Ok(type_info)) = typeinfos.get(site_path) else {
953-
return Ok(BTreeMap::from([(site_path.to_string(), RpcValue::from(shvproto::List::new()))]).into());
954-
};
945+
let typeinfos = app_state.sites_data.read().await.typeinfos.clone();
955946

956-
let mut tmp_alarms = Vec::<AlarmWithTimestamp>::new();
947+
let valid_typeinfos = typeinfos.iter().filter_map(|(site_path, type_info)| type_info.as_ref().ok().map(|type_info| (site_path, type_info)));
957948

958-
let log = getlog_handler(site_path, &getlog_params, app_state.clone()).await?;
959-
for entry in log.snapshot_entries {
960-
update_alarms(&mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec));
961-
}
949+
let site_path_prefix = rq.shv_path().unwrap_or_default();
950+
let getlog_params = Arc::new(GetLog2Params {
951+
since: GetLog2Since::DateTime(params.since),
952+
until: Some(params.until),
953+
with_snapshot: true,
954+
..Default::default()
955+
});
956+
957+
const MAX_SYNC_TASKS_DEFAULT: usize = 8;
958+
let semaphore = Arc::new(Semaphore::new(MAX_SYNC_TASKS_DEFAULT));
959+
let getlog_results = valid_typeinfos
960+
.filter(|(site_path, _)| site_path.starts_with(site_path_prefix))
961+
.map(|(site_path, type_info)| {
962+
let app_state = app_state.clone();
963+
let getlog_params = getlog_params.clone();
964+
let semaphore = semaphore.clone();
965+
async move {
966+
let permit = semaphore
967+
.clone()
968+
.acquire_owned()
969+
.await
970+
.unwrap_or_else(|e| panic!("Cannot acquire semaphore: {e}"));
971+
let log = getlog_handler(site_path.as_str(), getlog_params.clone().as_ref(), app_state).await;
972+
drop(permit);
973+
(site_path, log, type_info)
974+
}
975+
})
976+
.collect::<FuturesUnordered<_>>()
977+
.collect::<Vec<_>>()
978+
.await
979+
.into_iter()
980+
.filter_map(|(site_path, result, type_info)|
981+
result
982+
.inspect_err(|err| warn!("alarmlog: Failed to fetch log for site '{site_path}': {err}"))
983+
.ok()
984+
.map(|result| (site_path, result, type_info))
985+
)
986+
.map(|(site_path, log, type_info)| {
987+
let mut tmp_alarms = Vec::<AlarmWithTimestamp>::new();
988+
for entry in log.snapshot_entries {
989+
update_alarms(&mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec));
990+
}
962991

963-
let snapshot = tmp_alarms.clone();
992+
let snapshot = tmp_alarms.clone();
964993

965-
let events = log.event_entries
966-
.into_iter()
967-
.flat_map(|entry| update_alarms(&mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec)))
968-
.collect::<Vec<_>>();
994+
let events = log.event_entries
995+
.into_iter()
996+
.flat_map(|entry| update_alarms(&mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec)))
997+
.collect::<Vec<_>>();
969998

970-
Ok(AlarmLog {
971-
events,
972-
snapshot,
973-
}.into())
999+
(site_path.to_string(), AlarmLog {
1000+
events,
1001+
snapshot,
1002+
})
1003+
})
1004+
.collect::<BTreeMap<_,_>>();
1005+
1006+
Ok(getlog_results.into())
9741007
}
9751008

9761009
type JournalEntryStream = Pin<Box<dyn Stream<Item = Result<JournalEntry, Box<dyn Error + Send + Sync>>> + Send + Sync>>;
@@ -1261,6 +1294,7 @@ pub(crate) async fn methods_getter(
12611294
signals: &[],
12621295
description: "",
12631296
},
1297+
&META_METHOD_ALARM_LOG,
12641298
// TODO: All root node methods:
12651299
//
12661300
// appName
@@ -1278,18 +1312,22 @@ pub(crate) async fn methods_getter(
12781312
let children = children_on_path(&sites_data.sites_info, &path)?;
12791313
if children.is_empty() {
12801314
// `path` is a site path
1281-
let meta_methods = if sites_data.sites_info
1282-
.get(&path)
1315+
let is_pushlog = sites_data.sites_info.get(&path)
12831316
.and_then(|site_info| sites_data.sub_hps.get(&site_info.sub_hp))
1284-
.is_some_and(|sub_hp_info| matches!(sub_hp_info, SubHpInfo::PushLog)) {
1285-
MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_PUSH_LOG])
1286-
} else {
1287-
MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_ALARM_TABLE, &META_METHOD_ALARM_LOG])
1288-
};
1289-
Some(meta_methods)
1317+
.is_some_and(|sub_hp_info| matches!(sub_hp_info, SubHpInfo::PushLog));
1318+
1319+
if is_pushlog {
1320+
return Some(MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_PUSH_LOG]))
1321+
}
1322+
1323+
if sites_data.typeinfos.get(&path).is_some_and(Result::is_ok) {
1324+
Some(MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_ALARM_TABLE, &META_METHOD_ALARM_LOG]))
1325+
} else {
1326+
Some(MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_ALARM_LOG]))
1327+
}
12901328
} else {
12911329
// `path` is a dir in the middle of the tree
1292-
Some(MetaMethods::from(&[]))
1330+
Some(MetaMethods::from(&[&META_METHOD_ALARM_LOG]))
12931331
}
12941332
}
12951333
}

0 commit comments

Comments
 (0)