Skip to content

Commit 2d86bc7

Browse files
committed
Add alarmLog to all nodes
1 parent 75da50a commit 2d86bc7

1 file changed

Lines changed: 74 additions & 36 deletions

File tree

src/tree.rs

Lines changed: 74 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::sync::Arc;
77
use async_compression::tokio::write::GzipEncoder;
88
use chrono::TimeZone;
99
use futures::io::{BufReader, BufWriter};
10+
use futures::stream::FuturesUnordered;
1011
use futures::{Stream, StreamExt, TryStreamExt};
1112
use log::{error, info, warn};
1213
use sha1::Digest;
@@ -19,6 +20,7 @@ use shvrpc::rpcmessage::{RpcError, RpcErrorCode};
1920
use shvrpc::{RpcMessage, RpcMessageMetaTags};
2021
use tokio::fs::DirEntry;
2122
use tokio::io::{AsyncBufRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
23+
use tokio::sync::Semaphore;
2224
use tokio_stream::wrappers::ReadDirStream;
2325
use tokio_util::compat::TokioAsyncReadCompatExt;
2426
use tokio_util::io::ReaderStream;
@@ -319,6 +321,7 @@ async fn root_request_handler(
319321
nodes.append(&mut children_on_path(&app_state.sites_data.read().await.sites_info, ROOT_PATH).unwrap_or_default());
320322
Ok(nodes.into())
321323
}
324+
METH_ALARM_LOG => alarmlog_handler(rq, app_state).await,
322325
_ => Ok("Not implemented".into()),
323326
}
324327
}
@@ -957,44 +960,74 @@ async fn alarmlog_handler(
957960
let params = AlarmLogParams::try_from(rq.param().unwrap_or_default())
958961
.map_err(|err| RpcError::new(RpcErrorCode::InvalidParam, format!("Wrong alarmLog parameters: {err}")))?;
959962

960-
let site_path = rq.shv_path().unwrap_or_default();
961-
962-
let getlog_params = GetLog2Params {
963-
since: GetLog2Since::DateTime(params.since),
964-
until: Some(params.until),
965-
with_snapshot: true,
966-
..Default::default()
967-
};
968-
969963
#[derive(ToRpcValue)]
970964
struct AlarmLog {
971965
snapshot: Vec<AlarmWithTimestamp>,
972966
events: Vec<AlarmWithTimestamp>,
973967
}
974968

975-
let typeinfos = &app_state.sites_data.read().await.typeinfos;
976-
let Some(Ok(type_info)) = typeinfos.get(site_path) else {
977-
return Ok(BTreeMap::from([(site_path.to_string(), RpcValue::from(shvproto::List::new()))]).into());
978-
};
969+
let typeinfos = app_state.sites_data.read().await.typeinfos.clone();
979970

980-
let mut tmp_alarms = Vec::<AlarmWithTimestamp>::new();
971+
let valid_typeinfos = typeinfos.iter().filter_map(|(site_path, type_info)| type_info.as_ref().ok().map(|type_info| (site_path, type_info)));
981972

982-
let log = getlog_handler(site_path, &getlog_params, app_state.clone()).await?;
983-
for entry in log.snapshot_entries {
984-
update_alarms(&mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec));
985-
}
973+
let site_path_prefix = rq.shv_path().unwrap_or_default();
974+
let getlog_params = Arc::new(GetLog2Params {
975+
since: GetLog2Since::DateTime(params.since),
976+
until: Some(params.until),
977+
with_snapshot: true,
978+
..Default::default()
979+
});
980+
981+
const MAX_SYNC_TASKS_DEFAULT: usize = 8;
982+
let semaphore = Arc::new(Semaphore::new(MAX_SYNC_TASKS_DEFAULT));
983+
let alarmlog_results = valid_typeinfos
984+
.filter(|(site_path, _)| site_path.starts_with(site_path_prefix) && (site_path.len() == site_path_prefix.len() || site_path.as_bytes()[site_path_prefix.len()] == b'/'))
985+
.map(|(site_path, type_info)| {
986+
let app_state = app_state.clone();
987+
let getlog_params = getlog_params.clone();
988+
let semaphore = semaphore.clone();
989+
async move {
990+
let permit = semaphore
991+
.clone()
992+
.acquire_owned()
993+
.await
994+
.unwrap_or_else(|e| panic!("Cannot acquire semaphore: {e}"));
995+
let log = getlog_handler(site_path.as_str(), getlog_params.clone().as_ref(), app_state).await;
996+
drop(permit);
997+
(site_path, log, type_info)
998+
}
999+
})
1000+
.collect::<FuturesUnordered<_>>()
1001+
.collect::<Vec<_>>()
1002+
.await
1003+
.into_iter()
1004+
.filter_map(|(site_path, result, type_info)|
1005+
result
1006+
.inspect_err(|err| warn!("alarmlog: Failed to fetch log for site '{site_path}': {err}"))
1007+
.ok()
1008+
.map(|result| (site_path, result, type_info))
1009+
)
1010+
.map(|(site_path, log, type_info)| {
1011+
let mut tmp_alarms = Vec::<AlarmWithTimestamp>::new();
1012+
for entry in log.snapshot_entries {
1013+
update_alarms(&mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec));
1014+
}
9861015

987-
let snapshot = tmp_alarms.clone();
1016+
let snapshot = tmp_alarms.clone();
9881017

989-
let events = log.event_entries
990-
.into_iter()
991-
.flat_map(|entry| update_alarms(&mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec)))
992-
.collect::<Vec<_>>();
1018+
let events = log.event_entries
1019+
.into_iter()
1020+
.flat_map(|entry| update_alarms(&mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec)))
1021+
.collect::<Vec<_>>();
1022+
1023+
(site_path.to_string(), AlarmLog {
1024+
events,
1025+
snapshot,
1026+
})
1027+
})
1028+
.collect::<BTreeMap<_,_>>();
9931029

994-
Ok(AlarmLog {
995-
events,
996-
snapshot,
997-
}.into())
1030+
Ok(alarmlog_results.into())
9981031
}
9991032

10001033
type JournalEntryStream = Pin<Box<dyn Stream<Item = Result<JournalEntry, Box<dyn Error + Send + Sync>>> + Send + Sync>>;
@@ -1285,6 +1318,7 @@ pub(crate) async fn methods_getter(
12851318
signals: &[],
12861319
description: "",
12871320
},
1321+
&META_METHOD_ALARM_LOG,
12881322
// TODO: All root node methods:
12891323
//
12901324
// appName
@@ -1302,18 +1336,22 @@ pub(crate) async fn methods_getter(
13021336
let children = children_on_path(&sites_data.sites_info, &path)?;
13031337
if children.is_empty() {
13041338
// `path` is a site path
1305-
let meta_methods = if sites_data.sites_info
1306-
.get(&path)
1339+
let is_pushlog = sites_data.sites_info.get(&path)
13071340
.and_then(|site_info| sites_data.sub_hps.get(&site_info.sub_hp))
1308-
.is_some_and(|sub_hp_info| matches!(sub_hp_info, SubHpInfo::PushLog)) {
1309-
MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_PUSH_LOG])
1310-
} else {
1311-
MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_ALARM_TABLE, &META_METHOD_ALARM_LOG])
1312-
};
1313-
Some(meta_methods)
1341+
.is_some_and(|sub_hp_info| matches!(sub_hp_info, SubHpInfo::PushLog));
1342+
1343+
if is_pushlog {
1344+
return Some(MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_PUSH_LOG]))
1345+
}
1346+
1347+
if sites_data.typeinfos.get(&path).is_some_and(Result::is_ok) {
1348+
Some(MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_ALARM_TABLE, &META_METHOD_ALARM_LOG]))
1349+
} else {
1350+
Some(MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_ALARM_LOG]))
1351+
}
13141352
} else {
13151353
// `path` is a dir in the middle of the tree
1316-
Some(MetaMethods::from(&[]))
1354+
Some(MetaMethods::from(&[&META_METHOD_ALARM_LOG]))
13171355
}
13181356
}
13191357
}

0 commit comments

Comments
 (0)