Skip to content

Commit 77bda3c

Browse files
committed
Reuse old subscribers
1 parent 60c05c0 commit 77bda3c

1 file changed

Lines changed: 136 additions & 47 deletions

File tree

src/sites.rs

Lines changed: 136 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::Arc;
33
use std::time::Duration;
44

55
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
6-
use futures::stream::{FuturesUnordered, SelectAll};
6+
use futures::stream::{FusedStream, FuturesUnordered};
77
use futures::StreamExt;
88
use log::{debug, error, info, warn};
99
use shvclient::clientapi::{CallRpcMethodErrorKind, RpcCall, RpcCallDirExists, RpcCallLsList, Subscriber};
@@ -16,6 +16,7 @@ use shvrpc::util::find_longest_path_prefix;
1616
use shvrpc::{join_path, RpcMessageMetaTags};
1717
use tokio::sync::Semaphore;
1818
use tokio::time::timeout;
19+
use tokio_stream::StreamMap;
1920

2021
use crate::alarm::{collect_alarms, collect_state_alarms, Alarm};
2122
use crate::getlog::{getlog_handler};
@@ -343,12 +344,41 @@ pub(crate) fn parse_notification(msg: &shvrpc::RpcMessage, sites_info: &BTreeMap
343344
}
344345

345346
struct SitesTaskState {
346-
mntchng_subscribers: SelectAll<Subscriber>,
347-
subscribers: SelectAll<Subscriber>,
347+
mntchng_subscribers: StreamMap<String, Subscriber>,
348+
subscribers: StreamMap<String, Subscriber>,
348349
online_status_channels: BTreeMap<String, UnboundedSender<SiteOnlineStatus>>,
349350
online_status_task: Option<tokio::task::JoinHandle<()>>,
350351
}
351352

353+
async fn init_subscribers(
354+
old_subscribers: &mut StreamMap<String, Subscriber>,
355+
client_cmd_tx: &ClientCommandSender,
356+
subscriptions: impl IntoIterator<Item = (String, String)>,
357+
) -> StreamMap<String, Subscriber> {
358+
let mut subscribers = StreamMap::new();
359+
let mut new_sub_futures = vec![];
360+
for (path, signal) in subscriptions {
361+
let key = format!("{path}:*:{signal}");
362+
363+
if let Some(subscriber) = old_subscribers.remove(&key) {
364+
// Keep existing subscriber
365+
subscribers.insert(key, subscriber);
366+
} else {
367+
// Queue up creation for missing subscriber
368+
new_sub_futures.push(async move {
369+
let subscriber = subscribe(client_cmd_tx, &path, &signal).await;
370+
(key, subscriber)
371+
});
372+
}
373+
}
374+
375+
for (key, subscriber) in futures::future::join_all(new_sub_futures).await {
376+
subscribers.insert(key, subscriber);
377+
}
378+
379+
subscribers
380+
}
381+
352382
enum PeriodicSyncCommand {
353383
Enable,
354384
Disable,
@@ -359,6 +389,7 @@ async fn reload_sites(
359389
shv_api_version: shvclient::clientapi::ShvApiVersion,
360390
client_cmd_tx: &ClientCommandSender,
361391
app_state: &Arc<State>,
392+
old_state: &mut SitesTaskState,
362393
) -> Option<SitesTaskState>
363394
{
364395
let _reload_guard = crate::ReloadGuard::new(&app_state.sites_reload_in_progress);
@@ -424,44 +455,35 @@ async fn reload_sites(
424455
.join("\n")
425456
);
426457

427-
// Subscribe mntchng
428-
let mntchng_subscribers = sites_info
458+
let sites_without_pushlog = sites_info
429459
.iter()
430460
.filter(|(_, site)| sub_hps
431461
.get(&site.sub_hp)
432462
.is_some_and(|sub_hp| !matches!(sub_hp, SubHpInfo::PushLog))
433-
)
434-
.map(|(path, _)| {
435-
subscribe(client_cmd_tx, subscription_prefix_path(join_path!("shv", path), &shv_api_version), "mntchng")
436-
})
437-
.collect::<FuturesUnordered<_>>()
438-
.collect::<SelectAll<_>>()
439-
.await;
463+
);
464+
465+
// Subscribe mntchng
466+
let mntchng_subscribers = init_subscribers(
467+
&mut old_state.mntchng_subscribers,
468+
client_cmd_tx,
469+
sites_without_pushlog.clone().map(|(path, _)| (subscription_prefix_path(join_path!("shv", path), &shv_api_version), "mntchng".to_string()))
470+
).await;
440471

441472
log::info!("Loading typeinfo");
442-
let subscribers = sites_info
443-
.iter()
444-
.filter(|(_, site)| sub_hps
445-
.get(&site.sub_hp)
446-
.is_some_and(|sub_hp| !matches!(sub_hp, SubHpInfo::PushLog))
447-
)
448-
.flat_map(|(path, _)| {
449-
let shv_path = join_path!("shv", path);
450-
let sub_chng = subscribe(client_cmd_tx, subscription_prefix_path(&shv_path, &shv_api_version), SIG_CHNG);
451-
const SIG_CMDLOG: &str = "cmdlog";
452-
let sub_cmdlog = subscribe(client_cmd_tx, subscription_prefix_path(&shv_path, &shv_api_version), SIG_CMDLOG);
453-
[sub_chng, sub_cmdlog]
454-
})
455-
.collect::<FuturesUnordered<_>>()
456-
.collect::<SelectAll<_>>()
457-
.await;
473+
let subscribers = init_subscribers(
474+
&mut old_state.subscribers,
475+
client_cmd_tx,
476+
sites_without_pushlog.clone().flat_map(|(path, _)| {
477+
let shv_path = join_path!("shv", path);
478+
const SIG_CMDLOG: &str = "cmdlog";
479+
[
480+
(subscription_prefix_path(&shv_path, &shv_api_version), SIG_CHNG.to_string()),
481+
(subscription_prefix_path(&shv_path, &shv_api_version), SIG_CMDLOG.to_string()),
482+
]
483+
})
484+
).await;
458485

459-
let typeinfos = sites_info
460-
.iter()
461-
.filter(|(_, site)| sub_hps
462-
.get(&site.sub_hp)
463-
.is_some_and(|sub_hp| !matches!(sub_hp, SubHpInfo::PushLog))
464-
)
486+
let typeinfos = sites_without_pushlog
465487
.map(|(path, _)| {
466488
let client_cmd_tx = ClientCommandSender::clone(client_cmd_tx);
467489
async move {
@@ -611,13 +633,12 @@ pub(crate) async fn sites_task(
611633
let mut client_evt_rx = std::pin::pin!(client_evt_rx.fuse().peekable());
612634

613635
let mut state = SitesTaskState {
614-
mntchng_subscribers: SelectAll::<Subscriber>::default(),
615-
subscribers: SelectAll::<Subscriber>::default(),
636+
mntchng_subscribers: StreamMap::new(),
637+
subscribers: StreamMap::new(),
616638
online_status_channels: BTreeMap::new(),
617639
online_status_task: None,
618640
};
619641

620-
621642
let (periodic_sync_tx, mut periodic_sync_rx) = futures::channel::mpsc::unbounded();
622643
let periodic_sync_task = {
623644
let app_state = app_state.clone();
@@ -668,13 +689,27 @@ pub(crate) async fn sites_task(
668689
let shv_api_version = loop {
669690
match client_evt_rx.next().await {
670691
Some(ClientEvent::Connected(shv_api_version)) => break shv_api_version,
671-
Some(ClientEvent::ConnectionFailed(_)) | Some(ClientEvent::Disconnected) => (),
692+
Some(ClientEvent::ConnectionFailed(_)) | Some(ClientEvent::Disconnected) => {
693+
if let Some(online_status_task) = state.online_status_task {
694+
state.online_status_channels.clear();
695+
if let Err(err) = online_status_task.await {
696+
log::error!("Failed to join online_status_task: {err}")
697+
};
698+
}
699+
700+
state = SitesTaskState {
701+
mntchng_subscribers: StreamMap::new(),
702+
subscribers: StreamMap::new(),
703+
online_status_channels: BTreeMap::new(),
704+
online_status_task: None,
705+
};
706+
},
672707
None => break 'main_loop,
673708
}
674709
};
675710

676711
'sites_loop: loop {
677-
let mut state = match reload_sites(shv_api_version.clone(), &client_cmd_tx, &app_state).await {
712+
state = match reload_sites(shv_api_version.clone(), &client_cmd_tx, &app_state, &mut state).await {
678713
Some(s) => s,
679714
None => continue 'main_loop,
680715
};
@@ -684,14 +719,18 @@ pub(crate) async fn sites_task(
684719
.unwrap_or_else(|e| log::error!("Cannot send periodic sync enable command: {e}"));
685720

686721
loop {
687-
futures::select! {
722+
tokio::select! {
688723
_event = client_evt_rx.as_mut().peek() => continue 'main_loop,
689-
sites_command = sites_cmd_rx.select_next_some() => match sites_command {
690-
SitesCommand::ReloadSites => {
724+
sites_command = sites_cmd_rx.next(), if !sites_cmd_rx.is_terminated() => match sites_command {
725+
Some(SitesCommand::ReloadSites) => {
691726
continue 'sites_loop;
692727
},
728+
None => (),
693729
},
694-
mntchng_frame = state.mntchng_subscribers.select_next_some() => {
730+
mntchng_frame = state.mntchng_subscribers.next(), if !state.mntchng_subscribers.is_empty() => {
731+
let Some((_, mntchng_frame)) = mntchng_frame else {
732+
continue;
733+
};
695734
let msg = match mntchng_frame.to_rpcmesage() {
696735
Ok(msg) => msg,
697736
Err(err) => {
@@ -723,7 +762,10 @@ pub(crate) async fn sites_task(
723762
}
724763
}
725764
}
726-
notification_frame = state.subscribers.select_next_some() => {
765+
notification_frame = state.subscribers.next(), if !state.subscribers.is_empty() => {
766+
let Some((_, notification_frame)) = notification_frame else {
767+
continue;
768+
};
727769
let msg = match notification_frame.to_rpcmesage() {
728770
Ok(msg) => msg,
729771
Err(err) => {
@@ -760,7 +802,7 @@ pub(crate) async fn sites_task(
760802
impl_update_alarms(&mut *app_state.alarms.write().await, collect_alarms, "alarmmod");
761803
impl_update_alarms(&mut *app_state.state_alarms.write().await, collect_state_alarms, "statealarmmod");
762804
}
763-
complete => break 'main_loop,
805+
else => break 'main_loop,
764806
}
765807
}
766808
}
@@ -862,6 +904,15 @@ mod tests {
862904
}
863905
}
864906

907+
#[async_trait::async_trait]
908+
impl TestStep<SitesTaskTestState> for SitesCommand {
909+
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand>,_subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SitesTaskTestState) {
910+
match self {
911+
SitesCommand::ReloadSites => state.sites_cmd_tx.unbounded_send(SitesCommand::ReloadSites).unwrap(),
912+
}
913+
}
914+
}
915+
865916
#[derive(Debug)]
866917
enum ExpectDirtylogCommand {
867918
ProcessNotification,
@@ -1021,6 +1072,46 @@ mod tests {
10211072
],
10221073
starting_files: vec![],
10231074
expected_file_paths: vec![],
1075+
cleanup_steps: &[
1076+
Box::new(ExpectSignal("node", "onlinestatuschng", 0.into())),
1077+
Box::new(ExpectUnsubscription),
1078+
Box::new(ExpectUnsubscription),
1079+
Box::new(ExpectUnsubscription),
1080+
Box::new(ExpectUnsubscription),
1081+
Box::new(ExpectUnsubscription),
1082+
Box::new(ExpectUnsubscription),
1083+
Box::new(ExpectUnsubscription),
1084+
Box::new(ExpectUnsubscription),
1085+
Box::new(ExpectUnsubscription),
1086+
],
1087+
},
1088+
TestCase {
1089+
name: "Reload sites reuses subscriptions",
1090+
steps: &[
1091+
Box::new(ClientEvent::Connected(shvclient::clientapi::ShvApiVersion::V3)),
1092+
Box::new(ExpectCall("sites", "getSites", Ok(some_broker()))),
1093+
Box::new(ExpectSubscription("shv/legacy_sync_path_device/*:*:mntchng".try_into().unwrap())),
1094+
Box::new(ExpectSubscription("shv/node/*:*:mntchng".try_into().unwrap())),
1095+
Box::new(ExpectSubscription("shv/node_with_hp_meta/*:*:mntchng".try_into().unwrap())),
1096+
Box::new(ExpectSubscription("shv/legacy_sync_path_device/*:*:chng".try_into().unwrap())),
1097+
Box::new(ExpectSubscription("shv/legacy_sync_path_device/*:*:cmdlog".try_into().unwrap())),
1098+
Box::new(ExpectSubscription("shv/node/*:*:chng".try_into().unwrap())),
1099+
Box::new(ExpectSubscription("shv/node/*:*:cmdlog".try_into().unwrap())),
1100+
Box::new(ExpectSubscription("shv/node_with_hp_meta/*:*:chng".try_into().unwrap())),
1101+
Box::new(ExpectSubscription("shv/node_with_hp_meta/*:*:cmdlog".try_into().unwrap())),
1102+
Box::new(ExpectCall("sites/legacy_sync_path_device/_files", "ls", Ok(shvproto::List::new().into()))),
1103+
Box::new(ExpectCall("sites/node/_files", "ls", Ok(shvproto::List::new().into()))),
1104+
Box::new(ExpectCall("sites/node_with_hp_meta/_files", "ls", Ok(shvproto::List::new().into()))),
1105+
Box::new(ExpectSyncCommand::SyncAll),
1106+
Box::new(SitesCommand::ReloadSites),
1107+
Box::new(ExpectCall("sites", "getSites", Ok(some_broker()))),
1108+
Box::new(ExpectCall("sites/legacy_sync_path_device/_files", "ls", Ok(shvproto::List::new().into()))),
1109+
Box::new(ExpectCall("sites/node/_files", "ls", Ok(shvproto::List::new().into()))),
1110+
Box::new(ExpectCall("sites/node_with_hp_meta/_files", "ls", Ok(shvproto::List::new().into()))),
1111+
Box::new(ExpectSyncCommand::SyncAll),
1112+
],
1113+
starting_files: vec![],
1114+
expected_file_paths: vec![],
10241115
cleanup_steps: &[
10251116
Box::new(ExpectUnsubscription),
10261117
Box::new(ExpectUnsubscription),
@@ -1031,7 +1122,6 @@ mod tests {
10311122
Box::new(ExpectUnsubscription),
10321123
Box::new(ExpectUnsubscription),
10331124
Box::new(ExpectUnsubscription),
1034-
Box::new(ExpectSignal("node", "onlinestatuschng", 0.into())),
10351125
],
10361126
},
10371127
TestCase {
@@ -1077,4 +1167,3 @@ mod tests {
10771167
Ok(())
10781168
}
10791169
}
1080-

0 commit comments

Comments
 (0)