Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 74 additions & 36 deletions src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::Arc;
use async_compression::tokio::write::GzipEncoder;
use chrono::TimeZone;
use futures::io::{BufReader, BufWriter};
use futures::stream::FuturesUnordered;
use futures::{Stream, StreamExt, TryStreamExt};
use log::{error, info, warn};
use sha1::Digest;
Expand All @@ -19,6 +20,7 @@ use shvrpc::rpcmessage::{RpcError, RpcErrorCode};
use shvrpc::{RpcMessage, RpcMessageMetaTags};
use tokio::fs::DirEntry;
use tokio::io::{AsyncBufRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio::sync::Semaphore;
use tokio_stream::wrappers::ReadDirStream;
use tokio_util::compat::TokioAsyncReadCompatExt;
use tokio_util::io::ReaderStream;
Expand Down Expand Up @@ -319,6 +321,7 @@ async fn root_request_handler(
nodes.append(&mut children_on_path(&app_state.sites_data.read().await.sites_info, ROOT_PATH).unwrap_or_default());
Ok(nodes.into())
}
METH_ALARM_LOG => alarmlog_handler(rq, app_state).await,
_ => Ok("Not implemented".into()),
}
}
Expand Down Expand Up @@ -957,44 +960,74 @@ async fn alarmlog_handler(
let params = AlarmLogParams::try_from(rq.param().unwrap_or_default())
.map_err(|err| RpcError::new(RpcErrorCode::InvalidParam, format!("Wrong alarmLog parameters: {err}")))?;

let site_path = rq.shv_path().unwrap_or_default();

let getlog_params = GetLog2Params {
since: GetLog2Since::DateTime(params.since),
until: Some(params.until),
with_snapshot: true,
..Default::default()
};

#[derive(ToRpcValue)]
struct AlarmLog {
snapshot: Vec<AlarmWithTimestamp>,
events: Vec<AlarmWithTimestamp>,
}

let typeinfos = &app_state.sites_data.read().await.typeinfos;
let Some(Ok(type_info)) = typeinfos.get(site_path) else {
return Ok(BTreeMap::from([(site_path.to_string(), RpcValue::from(shvproto::List::new()))]).into());
};
let typeinfos = app_state.sites_data.read().await.typeinfos.clone();

let mut tmp_alarms = Vec::<AlarmWithTimestamp>::new();
let valid_typeinfos = typeinfos.iter().filter_map(|(site_path, type_info)| type_info.as_ref().ok().map(|type_info| (site_path, type_info)));

let log = getlog_handler(site_path, &getlog_params, app_state.clone()).await?;
for entry in log.snapshot_entries {
update_alarms(&mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec));
}
let site_path_prefix = rq.shv_path().unwrap_or_default();
let getlog_params = Arc::new(GetLog2Params {
since: GetLog2Since::DateTime(params.since),
until: Some(params.until),
with_snapshot: true,
..Default::default()
});

const MAX_SYNC_TASKS_DEFAULT: usize = 8;
let semaphore = Arc::new(Semaphore::new(MAX_SYNC_TASKS_DEFAULT));
let alarmlog_results = valid_typeinfos
.filter(|(site_path, _)| site_path.starts_with(site_path_prefix) && (site_path.len() == site_path_prefix.len() || site_path.as_bytes()[site_path_prefix.len()] == b'/'))
.map(|(site_path, type_info)| {
let app_state = app_state.clone();
let getlog_params = getlog_params.clone();
let semaphore = semaphore.clone();
async move {
let permit = semaphore
.clone()
.acquire_owned()
.await
.unwrap_or_else(|e| panic!("Cannot acquire semaphore: {e}"));
let log = getlog_handler(site_path.as_str(), getlog_params.clone().as_ref(), app_state).await;
drop(permit);
(site_path, log, type_info)
}
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>()
.await
.into_iter()
.filter_map(|(site_path, result, type_info)|
result
.inspect_err(|err| warn!("alarmlog: Failed to fetch log for site '{site_path}': {err}"))
.ok()
.map(|result| (site_path, result, type_info))
)
.map(|(site_path, log, type_info)| {
let mut tmp_alarms = Vec::<AlarmWithTimestamp>::new();
for entry in log.snapshot_entries {
update_alarms(&mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec));
}

let snapshot = tmp_alarms.clone();
let snapshot = tmp_alarms.clone();

let events = log.event_entries
.into_iter()
.flat_map(|entry| update_alarms(&mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec)))
.collect::<Vec<_>>();
let events = log.event_entries
.into_iter()
.flat_map(|entry| update_alarms(&mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec)))
.collect::<Vec<_>>();

(site_path.to_string(), AlarmLog {
events,
snapshot,
})
})
.collect::<BTreeMap<_,_>>();

Ok(AlarmLog {
events,
snapshot,
}.into())
Ok(alarmlog_results.into())
}

type JournalEntryStream = Pin<Box<dyn Stream<Item = Result<JournalEntry, Box<dyn Error + Send + Sync>>> + Send + Sync>>;
Expand Down Expand Up @@ -1285,6 +1318,7 @@ pub(crate) async fn methods_getter(
signals: &[],
description: "",
},
&META_METHOD_ALARM_LOG,
// TODO: All root node methods:
//
// appName
Expand All @@ -1302,18 +1336,22 @@ pub(crate) async fn methods_getter(
let children = children_on_path(&sites_data.sites_info, &path)?;
if children.is_empty() {
// `path` is a site path
let meta_methods = if sites_data.sites_info
.get(&path)
let is_pushlog = sites_data.sites_info.get(&path)
.and_then(|site_info| sites_data.sub_hps.get(&site_info.sub_hp))
.is_some_and(|sub_hp_info| matches!(sub_hp_info, SubHpInfo::PushLog)) {
MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_PUSH_LOG])
} else {
MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_ALARM_TABLE, &META_METHOD_ALARM_LOG])
};
Some(meta_methods)
.is_some_and(|sub_hp_info| matches!(sub_hp_info, SubHpInfo::PushLog));

if is_pushlog {
return Some(MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_PUSH_LOG]))
}

if sites_data.typeinfos.get(&path).is_some_and(Result::is_ok) {
Some(MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_ALARM_TABLE, &META_METHOD_ALARM_LOG]))
} else {
Some(MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_ALARM_LOG]))
}
} else {
// `path` is a dir in the middle of the tree
Some(MetaMethods::from(&[]))
Some(MetaMethods::from(&[&META_METHOD_ALARM_LOG]))
}
}
}
Expand Down