Skip to content

Commit cb7ffb9

Browse files
committed
tree: Add stateAlarm
1 parent ebe5b56 commit cb7ffb9

5 files changed

Lines changed: 71 additions & 24 deletions

File tree

src/alarm.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,11 @@ impl AlarmGetter for StateAlarm {
142142
}
143143
}
144144

145-
pub fn collect_alarms(type_info: &TypeInfo, shv_path: impl AsRef<str>, value: &RpcValue) -> Vec<Alarm> {
145+
pub fn collect_alarms(type_info: &TypeInfo, shv_path: &str, value: &RpcValue) -> Vec<Alarm> {
146146
impl_collect_alarms::<CommonAlarm>(type_info, shv_path, value)
147147
}
148148

149-
pub fn collect_state_alarms(type_info: &TypeInfo, shv_path: impl AsRef<str>, value: &RpcValue) -> Vec<Alarm> {
149+
pub fn collect_state_alarms(type_info: &TypeInfo, shv_path: &str, value: &RpcValue) -> Vec<Alarm> {
150150
impl_collect_alarms::<StateAlarm>(type_info, shv_path, value)
151151
}
152152

src/alarmlog.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use shvclient::AppState;
88
use shvproto::{FromRpcValue, ToRpcValue};
99
use tokio::sync::Semaphore;
1010

11+
use crate::alarm::collect_alarms;
1112
use crate::getlog::getlog_handler;
1213
use crate::journalrw::{GetLog2Params, GetLog2Since};
1314
use crate::sites::update_alarms;
@@ -84,14 +85,14 @@ pub(crate) async fn alarmlog_impl(
8485
.map(|(site_path, log, type_info)| {
8586
let mut tmp_alarms = Vec::<AlarmWithTimestamp>::new();
8687
for entry in log.snapshot_entries {
87-
update_alarms(&mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec));
88+
update_alarms(collect_alarms, &mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec));
8889
}
8990

9091
let snapshot = tmp_alarms.clone();
9192

9293
let events = log.event_entries
9394
.into_iter()
94-
.flat_map(|entry| update_alarms(&mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec)))
95+
.flat_map(|entry| update_alarms(collect_alarms, &mut tmp_alarms, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec)))
9596
.collect::<Vec<_>>();
9697

9798
(site_path.to_string(), AlarmLog {

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ struct State {
7272
sites_data: RwLock<sites::SitesData>,
7373
sync_info: sync::SyncInfo,
7474
alarms: RwLock<BTreeMap<String, Vec<AlarmWithTimestamp>>>,
75+
state_alarms: RwLock<BTreeMap<String, Vec<AlarmWithTimestamp>>>,
7576
online_states: RwLock<BTreeMap<String, SiteOnlineStatus>>,
7677
config: HpConfig,
7778
sync_cmd_tx: DedupSender<sync::SyncCommand>,
@@ -94,6 +95,7 @@ pub async fn run(hp_config: &HpConfig, client_config: &ClientConfig) -> shvrpc::
9495
sites_data: RwLock::default(),
9596
sync_info: Default::default(),
9697
alarms: Default::default(),
98+
state_alarms: Default::default(),
9799
online_states: Default::default(),
98100
config: hp_config.clone(),
99101
sync_cmd_tx,

src/sites.rs

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use shvrpc::rpcmessage::{RpcError, RpcErrorCode};
1414
use shvrpc::{join_path, RpcMessageMetaTags};
1515
use tokio::time::timeout;
1616

17-
use crate::alarm::{collect_alarms, Alarm};
17+
use crate::alarm::{collect_alarms, collect_state_alarms, Alarm};
1818
use crate::getlog::getlog_handler;
1919
use crate::typeinfo::TypeInfo;
2020
use crate::util::{subscribe, subscription_prefix_path};
@@ -163,8 +163,9 @@ fn collect_sub_hps(
163163
.collect()
164164
}
165165

166-
pub(crate) fn update_alarms(alarms_for_site: &mut Vec<AlarmWithTimestamp>, type_info: &TypeInfo, property_path: &str, value: &RpcValue, timestamp: shvproto::DateTime) -> Vec<AlarmWithTimestamp> {
167-
let new_alarms = collect_alarms(type_info, property_path, value)
166+
type AlarmCollector = fn(type_info: &TypeInfo, shv_path: &str, value: &RpcValue) -> Vec<Alarm>;
167+
pub(crate) fn update_alarms(alarms_colector: AlarmCollector, alarms_for_site: &mut Vec<AlarmWithTimestamp>, type_info: &TypeInfo, property_path: &str, value: &RpcValue, timestamp: shvproto::DateTime) -> Vec<AlarmWithTimestamp> {
168+
let new_alarms = alarms_colector(type_info, property_path, value)
168169
.into_iter()
169170
.map(|alarm| AlarmWithTimestamp {
170171
alarm,
@@ -524,6 +525,7 @@ pub(crate) async fn sites_task(
524525
..Default::default()
525526
};
526527
let mut alarms = BTreeMap::<String, Vec<AlarmWithTimestamp>>::new();
528+
let mut state_alarms = BTreeMap::<String, Vec<AlarmWithTimestamp>>::new();
527529
for site_path in sites_info.keys() {
528530
let Some(Ok(type_info)) = typeinfos.get(site_path) else {
529531
// No typeinfo for this site - skip
@@ -538,12 +540,17 @@ pub(crate) async fn sites_task(
538540
}
539541
};
540542

541-
let alarms_for_site = alarms.entry(site_path.to_string()).or_default();
542-
543543
let chained_entries = log.snapshot_entries.iter().map(Arc::as_ref).chain(log.event_entries.iter().map(Arc::as_ref));
544-
for entry in chained_entries {
545-
update_alarms(alarms_for_site, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec));
546-
}
544+
let impl_update_alarms = |alarm_table: &mut BTreeMap<String, Vec<AlarmWithTimestamp>>, alarm_collector| {
545+
let alarms_for_site = alarm_table.entry(site_path.to_string()).or_default();
546+
547+
for entry in chained_entries.clone() {
548+
update_alarms(alarm_collector, alarms_for_site, type_info, &entry.path, &entry.value, shvproto::DateTime::from_epoch_msec(entry.epoch_msec));
549+
}
550+
};
551+
552+
impl_update_alarms(&mut alarms, collect_alarms);
553+
impl_update_alarms(&mut state_alarms, collect_state_alarms);
547554
}
548555

549556
*app_state.alarms.write().await = alarms;
@@ -634,14 +641,18 @@ pub(crate) async fn sites_task(
634641
continue;
635642
};
636643

637-
let alarms = &mut *app_state.alarms.write().await;
638-
let alarms_for_site = alarms.entry(parsed_notification.site_path.clone()).or_default();
644+
let impl_update_alarms = |alarm_table: &mut BTreeMap<String, Vec<AlarmWithTimestamp>>, alarm_collector| {
645+
let alarms_for_site = alarm_table.entry(parsed_notification.site_path.clone()).or_default();
639646

640-
let updated = update_alarms(alarms_for_site, type_info, &parsed_notification.property_path, &parsed_notification.param, shvproto::DateTime::now());
641-
if !updated.is_empty() {
642-
client_cmd_tx.send_message(shvrpc::RpcMessage::new_signal(&parsed_notification.site_path, "alarmmod", None))
643-
.unwrap_or_else(|err| log::error!("alarms: Cannot send signal ({err})"));
644-
}
647+
let updated = update_alarms(alarm_collector, alarms_for_site, type_info, &parsed_notification.property_path, &parsed_notification.param, shvproto::DateTime::now());
648+
if !updated.is_empty() {
649+
client_cmd_tx.send_message(shvrpc::RpcMessage::new_signal(&parsed_notification.site_path, "alarmmod", None))
650+
.unwrap_or_else(|err| log::error!("alarms: Cannot send signal ({err})"));
651+
}
652+
};
653+
654+
impl_update_alarms(&mut *app_state.alarms.write().await, collect_alarms);
655+
impl_update_alarms(&mut *app_state.state_alarms.write().await, collect_state_alarms);
645656
}
646657
complete => break,
647658
}

src/tree.rs

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::BTreeMap;
12
use std::sync::Arc;
23

34
use async_compression::tokio::write::GzipEncoder;
@@ -11,6 +12,7 @@ use shvrpc::metamethod::MetaMethod;
1112
use shvrpc::rpcmessage::{RpcError, RpcErrorCode};
1213
use shvrpc::{RpcMessage, RpcMessageMetaTags};
1314
use tokio::io::{AsyncBufRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
15+
use tokio::sync::RwLockReadGuard;
1416
use tokio_stream::wrappers::ReadDirStream;
1517
use tokio_util::io::ReaderStream;
1618

@@ -20,11 +22,12 @@ use crate::getlog::getlog_handler;
2022
use crate::journalrw::{journal_entries_to_rpcvalue, GetLog2Params, Log2Header, Log2Reader};
2123
use crate::pushlog::pushlog_impl;
2224
use crate::sites::SubHpInfo;
23-
use crate::{ClientCommandSender, HpConfig, State, MAX_JOURNAL_DIR_SIZE_DEFAULT};
25+
use crate::{AlarmWithTimestamp, ClientCommandSender, HpConfig, State, MAX_JOURNAL_DIR_SIZE_DEFAULT};
2426

2527
// History site node methods
2628
const METH_GET_LOG: &str = "getLog";
2729
const METH_ALARM_TABLE: &str = "alarmTable";
30+
const METH_STATE_ALARM_TABLE: &str = "stateAlarmTable";
2831
const METH_ALARM_LOG: &str = "alarmLog";
2932
const METH_PUSH_LOG: &str = "pushLog";
3033

@@ -48,6 +51,16 @@ const META_METHOD_ALARM_TABLE: MetaMethod = MetaMethod {
4851
description: "",
4952
};
5053

54+
const META_METHOD_STATE_ALARM_TABLE: MetaMethod = MetaMethod {
55+
name: METH_STATE_ALARM_TABLE,
56+
flags: 0,
57+
access: shvrpc::metamethod::AccessLevel::Read,
58+
param: "RpcValue",
59+
result: "RpcValue",
60+
signals: &[("statealarmmod", Some("Null"))],
61+
description: "",
62+
};
63+
5164
const META_METHOD_ALARM_LOG: MetaMethod = MetaMethod {
5265
name: METH_ALARM_LOG,
5366
flags: 0,
@@ -688,14 +701,33 @@ async fn getlog_handler_rq(
688701
Ok(result)
689702
}
690703

691-
async fn alarmtable_handler(
704+
trait AlarmGetter {
705+
async fn alarm_getter(f: &State) -> RwLockReadGuard<'_, BTreeMap<String, Vec<AlarmWithTimestamp>>>;
706+
}
707+
708+
struct CommonAlarm;
709+
impl AlarmGetter for CommonAlarm {
710+
async fn alarm_getter(f: &State) -> RwLockReadGuard<'_, BTreeMap<String, Vec<AlarmWithTimestamp>>> {
711+
f.alarms.read().await
712+
}
713+
}
714+
715+
struct StateAlarm;
716+
impl AlarmGetter for StateAlarm {
717+
async fn alarm_getter(f: &State) -> RwLockReadGuard<'_, BTreeMap<String, Vec<AlarmWithTimestamp>>> {
718+
f.state_alarms.read().await
719+
}
720+
}
721+
722+
async fn alarmtable_handler<Getter: AlarmGetter>(
692723
site_path: &str,
693724
app_state: AppState<State>,
694725
) -> RpcRequestResult {
695726
if !app_state.sites_data.read().await.sites_info.contains_key(site_path) {
696727
return Err(RpcError::new(RpcErrorCode::InvalidParam, format!("Wrong alarmTable path: {site_path}")));
697728
}
698-
match app_state.alarms.read().await.get(site_path) {
729+
730+
match Getter::alarm_getter(&app_state).await.get(site_path) {
699731
Some(alarms_for_site) => Ok(alarms_for_site.clone().into()),
700732
None => Ok(Vec::<RpcValue>::new().into()),
701733
}
@@ -725,7 +757,8 @@ async fn history_request_handler(
725757
match method {
726758
METH_LS => Ok(children.into()),
727759
METH_GET_LOG => getlog_handler_rq(path, &param, app_state).await,
728-
METH_ALARM_TABLE => alarmtable_handler(path, app_state).await,
760+
METH_ALARM_TABLE => alarmtable_handler::<CommonAlarm>(path, app_state).await,
761+
METH_STATE_ALARM_TABLE => alarmtable_handler::<StateAlarm>(path, app_state).await,
729762
METH_ALARM_LOG => alarmlog_handler(path, &param, app_state).await,
730763
METH_PUSH_LOG => pushlog_handler(path, param, app_state).await,
731764
_ => Err(rpc_error_unknown_method(method)),
@@ -815,7 +848,7 @@ pub(crate) async fn methods_getter(
815848
}
816849

817850
if sites_data.typeinfos.get(&path).is_some_and(Result::is_ok) {
818-
Some(MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_ALARM_TABLE, &META_METHOD_ALARM_LOG]))
851+
Some(MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_ALARM_TABLE, &META_METHOD_STATE_ALARM_TABLE, &META_METHOD_ALARM_LOG]))
819852
} else {
820853
Some(MetaMethods::from(&[&META_METHOD_GET_LOG, &META_METHOD_ALARM_LOG]))
821854
}

0 commit comments

Comments
 (0)