Skip to content

Commit ae71a93

Browse files
committed
Reuse old subscribers
1 parent 60c05c0 commit ae71a93

1 file changed

Lines changed: 147 additions & 44 deletions

File tree

src/sites.rs

Lines changed: 147 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use std::sync::Arc;
33
use std::time::Duration;
44

55
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
6-
use futures::stream::{FuturesUnordered, SelectAll};
7-
use futures::StreamExt;
6+
use futures::stream::FuturesUnordered;
7+
use futures::{FutureExt, StreamExt};
88
use log::{debug, error, info, warn};
99
use shvclient::clientapi::{CallRpcMethodErrorKind, RpcCall, RpcCallDirExists, RpcCallLsList, Subscriber};
1010
use shvclient::clientnode::{METH_DIR, SIG_CHNG};
@@ -16,6 +16,7 @@ use shvrpc::util::find_longest_path_prefix;
1616
use shvrpc::{join_path, RpcMessageMetaTags};
1717
use tokio::sync::Semaphore;
1818
use tokio::time::timeout;
19+
use tokio_stream::StreamMap;
1920

2021
use crate::alarm::{collect_alarms, collect_state_alarms, Alarm};
2122
use crate::getlog::{getlog_handler};
@@ -343,12 +344,44 @@ pub(crate) fn parse_notification(msg: &shvrpc::RpcMessage, sites_info: &BTreeMap
343344
}
344345

345346
struct SitesTaskState {
346-
mntchng_subscribers: SelectAll<Subscriber>,
347-
subscribers: SelectAll<Subscriber>,
347+
mntchng_subscribers: StreamMap<String, Subscriber>,
348+
subscribers: StreamMap<String, Subscriber>,
348349
online_status_channels: BTreeMap<String, UnboundedSender<SiteOnlineStatus>>,
349350
online_status_task: Option<tokio::task::JoinHandle<()>>,
350351
}
351352

353+
async fn init_subscribers(
354+
old_subscribers: &mut StreamMap<String, Subscriber>,
355+
client_cmd_tx: &ClientCommandSender,
356+
subscriptions: impl IntoIterator<Item = (String, String)>,
357+
) -> StreamMap<String, Subscriber> {
358+
let mut subscribers = StreamMap::new();
359+
let mut new_subscribers = subscriptions
360+
.into_iter()
361+
.filter_map(|(path, signal)| {
362+
let key = format!("{path}:*:{signal}");
363+
match old_subscribers.remove(&key) {
364+
Some(subscriber) => {
365+
subscribers.insert(key, subscriber);
366+
None
367+
}
368+
None => {
369+
Some(async move {
370+
let subscriber = subscribe(client_cmd_tx, &path, &signal).await;
371+
(key, subscriber)
372+
})
373+
}
374+
}
375+
})
376+
.collect::<FuturesUnordered<_>>();
377+
378+
while let Some((key, subscriber)) = new_subscribers.next().await {
379+
subscribers.insert(key, subscriber);
380+
}
381+
382+
subscribers
383+
}
384+
352385
enum PeriodicSyncCommand {
353386
Enable,
354387
Disable,
@@ -359,6 +392,7 @@ async fn reload_sites(
359392
shv_api_version: shvclient::clientapi::ShvApiVersion,
360393
client_cmd_tx: &ClientCommandSender,
361394
app_state: &Arc<State>,
395+
old_state: &mut SitesTaskState,
362396
) -> Option<SitesTaskState>
363397
{
364398
let _reload_guard = crate::ReloadGuard::new(&app_state.sites_reload_in_progress);
@@ -424,44 +458,35 @@ async fn reload_sites(
424458
.join("\n")
425459
);
426460

427-
// Subscribe mntchng
428-
let mntchng_subscribers = sites_info
461+
let sites_without_pushlog = sites_info
429462
.iter()
430463
.filter(|(_, site)| sub_hps
431464
.get(&site.sub_hp)
432465
.is_some_and(|sub_hp| !matches!(sub_hp, SubHpInfo::PushLog))
433-
)
434-
.map(|(path, _)| {
435-
subscribe(client_cmd_tx, subscription_prefix_path(join_path!("shv", path), &shv_api_version), "mntchng")
436-
})
437-
.collect::<FuturesUnordered<_>>()
438-
.collect::<SelectAll<_>>()
439-
.await;
466+
);
467+
468+
// Subscribe mntchng
469+
let mntchng_subscribers = init_subscribers(
470+
&mut old_state.mntchng_subscribers,
471+
client_cmd_tx,
472+
sites_without_pushlog.clone().map(|(path, _)| (subscription_prefix_path(join_path!("shv", path), &shv_api_version), "mntchng".to_string()))
473+
).await;
440474

441475
log::info!("Loading typeinfo");
442-
let subscribers = sites_info
443-
.iter()
444-
.filter(|(_, site)| sub_hps
445-
.get(&site.sub_hp)
446-
.is_some_and(|sub_hp| !matches!(sub_hp, SubHpInfo::PushLog))
447-
)
448-
.flat_map(|(path, _)| {
449-
let shv_path = join_path!("shv", path);
450-
let sub_chng = subscribe(client_cmd_tx, subscription_prefix_path(&shv_path, &shv_api_version), SIG_CHNG);
451-
const SIG_CMDLOG: &str = "cmdlog";
452-
let sub_cmdlog = subscribe(client_cmd_tx, subscription_prefix_path(&shv_path, &shv_api_version), SIG_CMDLOG);
453-
[sub_chng, sub_cmdlog]
454-
})
455-
.collect::<FuturesUnordered<_>>()
456-
.collect::<SelectAll<_>>()
457-
.await;
476+
let subscribers = init_subscribers(
477+
&mut old_state.subscribers,
478+
client_cmd_tx,
479+
sites_without_pushlog.clone().flat_map(|(path, _)| {
480+
let shv_path = join_path!("shv", path);
481+
const SIG_CMDLOG: &str = "cmdlog";
482+
[
483+
(subscription_prefix_path(&shv_path, &shv_api_version), SIG_CHNG.to_string()),
484+
(subscription_prefix_path(&shv_path, &shv_api_version), SIG_CMDLOG.to_string()),
485+
]
486+
})
487+
).await;
458488

459-
let typeinfos = sites_info
460-
.iter()
461-
.filter(|(_, site)| sub_hps
462-
.get(&site.sub_hp)
463-
.is_some_and(|sub_hp| !matches!(sub_hp, SubHpInfo::PushLog))
464-
)
489+
let typeinfos = sites_without_pushlog
465490
.map(|(path, _)| {
466491
let client_cmd_tx = ClientCommandSender::clone(client_cmd_tx);
467492
async move {
@@ -611,13 +636,12 @@ pub(crate) async fn sites_task(
611636
let mut client_evt_rx = std::pin::pin!(client_evt_rx.fuse().peekable());
612637

613638
let mut state = SitesTaskState {
614-
mntchng_subscribers: SelectAll::<Subscriber>::default(),
615-
subscribers: SelectAll::<Subscriber>::default(),
639+
mntchng_subscribers: StreamMap::new(),
640+
subscribers: StreamMap::new(),
616641
online_status_channels: BTreeMap::new(),
617642
online_status_task: None,
618643
};
619644

620-
621645
let (periodic_sync_tx, mut periodic_sync_rx) = futures::channel::mpsc::unbounded();
622646
let periodic_sync_task = {
623647
let app_state = app_state.clone();
@@ -668,13 +692,27 @@ pub(crate) async fn sites_task(
668692
let shv_api_version = loop {
669693
match client_evt_rx.next().await {
670694
Some(ClientEvent::Connected(shv_api_version)) => break shv_api_version,
671-
Some(ClientEvent::ConnectionFailed(_)) | Some(ClientEvent::Disconnected) => (),
695+
Some(ClientEvent::ConnectionFailed(_)) | Some(ClientEvent::Disconnected) => {
696+
if let Some(online_status_task) = state.online_status_task {
697+
state.online_status_channels.clear();
698+
if let Err(err) = online_status_task.await {
699+
log::error!("Failed to join online_status_task: {err}")
700+
};
701+
}
702+
703+
state = SitesTaskState {
704+
mntchng_subscribers: StreamMap::new(),
705+
subscribers: StreamMap::new(),
706+
online_status_channels: BTreeMap::new(),
707+
online_status_task: None,
708+
};
709+
},
672710
None => break 'main_loop,
673711
}
674712
};
675713

676714
'sites_loop: loop {
677-
let mut state = match reload_sites(shv_api_version.clone(), &client_cmd_tx, &app_state).await {
715+
state = match reload_sites(shv_api_version.clone(), &client_cmd_tx, &app_state, &mut state).await {
678716
Some(s) => s,
679717
None => continue 'main_loop,
680718
};
@@ -684,14 +722,29 @@ pub(crate) async fn sites_task(
684722
.unwrap_or_else(|e| log::error!("Cannot send periodic sync enable command: {e}"));
685723

686724
loop {
725+
let mut mntchng_next = if !state.mntchng_subscribers.is_empty() {
726+
state.mntchng_subscribers.next().left_future()
727+
} else {
728+
futures::future::pending().right_future()
729+
}.fuse();
730+
731+
let mut subscribers_next = if !state.subscribers.is_empty() {
732+
state.subscribers.next().left_future()
733+
} else {
734+
futures::future::pending().right_future()
735+
}.fuse();
736+
687737
futures::select! {
688738
_event = client_evt_rx.as_mut().peek() => continue 'main_loop,
689739
sites_command = sites_cmd_rx.select_next_some() => match sites_command {
690740
SitesCommand::ReloadSites => {
691741
continue 'sites_loop;
692742
},
693743
},
694-
mntchng_frame = state.mntchng_subscribers.select_next_some() => {
744+
mntchng_frame = mntchng_next => {
745+
let Some((_, mntchng_frame)) = mntchng_frame else {
746+
continue;
747+
};
695748
let msg = match mntchng_frame.to_rpcmesage() {
696749
Ok(msg) => msg,
697750
Err(err) => {
@@ -723,7 +776,10 @@ pub(crate) async fn sites_task(
723776
}
724777
}
725778
}
726-
notification_frame = state.subscribers.select_next_some() => {
779+
notification_frame = subscribers_next => {
780+
let Some((_, notification_frame)) = notification_frame else {
781+
continue;
782+
};
727783
let msg = match notification_frame.to_rpcmesage() {
728784
Ok(msg) => msg,
729785
Err(err) => {
@@ -862,6 +918,15 @@ mod tests {
862918
}
863919
}
864920

921+
#[async_trait::async_trait]
922+
impl TestStep<SitesTaskTestState> for SitesCommand {
923+
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand>,_subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SitesTaskTestState) {
924+
match self {
925+
SitesCommand::ReloadSites => state.sites_cmd_tx.unbounded_send(SitesCommand::ReloadSites).unwrap(),
926+
}
927+
}
928+
}
929+
865930
#[derive(Debug)]
866931
enum ExpectDirtylogCommand {
867932
ProcessNotification,
@@ -1021,6 +1086,46 @@ mod tests {
10211086
],
10221087
starting_files: vec![],
10231088
expected_file_paths: vec![],
1089+
cleanup_steps: &[
1090+
Box::new(ExpectSignal("node", "onlinestatuschng", 0.into())),
1091+
Box::new(ExpectUnsubscription),
1092+
Box::new(ExpectUnsubscription),
1093+
Box::new(ExpectUnsubscription),
1094+
Box::new(ExpectUnsubscription),
1095+
Box::new(ExpectUnsubscription),
1096+
Box::new(ExpectUnsubscription),
1097+
Box::new(ExpectUnsubscription),
1098+
Box::new(ExpectUnsubscription),
1099+
Box::new(ExpectUnsubscription),
1100+
],
1101+
},
1102+
TestCase {
1103+
name: "Reload sites reuses subscriptions",
1104+
steps: &[
1105+
Box::new(ClientEvent::Connected(shvclient::clientapi::ShvApiVersion::V3)),
1106+
Box::new(ExpectCall("sites", "getSites", Ok(some_broker()))),
1107+
Box::new(ExpectSubscription("shv/legacy_sync_path_device/*:*:mntchng".try_into().unwrap())),
1108+
Box::new(ExpectSubscription("shv/node/*:*:mntchng".try_into().unwrap())),
1109+
Box::new(ExpectSubscription("shv/node_with_hp_meta/*:*:mntchng".try_into().unwrap())),
1110+
Box::new(ExpectSubscription("shv/legacy_sync_path_device/*:*:chng".try_into().unwrap())),
1111+
Box::new(ExpectSubscription("shv/legacy_sync_path_device/*:*:cmdlog".try_into().unwrap())),
1112+
Box::new(ExpectSubscription("shv/node/*:*:chng".try_into().unwrap())),
1113+
Box::new(ExpectSubscription("shv/node/*:*:cmdlog".try_into().unwrap())),
1114+
Box::new(ExpectSubscription("shv/node_with_hp_meta/*:*:chng".try_into().unwrap())),
1115+
Box::new(ExpectSubscription("shv/node_with_hp_meta/*:*:cmdlog".try_into().unwrap())),
1116+
Box::new(ExpectCall("sites/legacy_sync_path_device/_files", "ls", Ok(shvproto::List::new().into()))),
1117+
Box::new(ExpectCall("sites/node/_files", "ls", Ok(shvproto::List::new().into()))),
1118+
Box::new(ExpectCall("sites/node_with_hp_meta/_files", "ls", Ok(shvproto::List::new().into()))),
1119+
Box::new(ExpectSyncCommand::SyncAll),
1120+
Box::new(SitesCommand::ReloadSites),
1121+
Box::new(ExpectCall("sites", "getSites", Ok(some_broker()))),
1122+
Box::new(ExpectCall("sites/legacy_sync_path_device/_files", "ls", Ok(shvproto::List::new().into()))),
1123+
Box::new(ExpectCall("sites/node/_files", "ls", Ok(shvproto::List::new().into()))),
1124+
Box::new(ExpectCall("sites/node_with_hp_meta/_files", "ls", Ok(shvproto::List::new().into()))),
1125+
Box::new(ExpectSyncCommand::SyncAll),
1126+
],
1127+
starting_files: vec![],
1128+
expected_file_paths: vec![],
10241129
cleanup_steps: &[
10251130
Box::new(ExpectUnsubscription),
10261131
Box::new(ExpectUnsubscription),
@@ -1031,7 +1136,6 @@ mod tests {
10311136
Box::new(ExpectUnsubscription),
10321137
Box::new(ExpectUnsubscription),
10331138
Box::new(ExpectUnsubscription),
1034-
Box::new(ExpectSignal("node", "onlinestatuschng", 0.into())),
10351139
],
10361140
},
10371141
TestCase {
@@ -1077,4 +1181,3 @@ mod tests {
10771181
Ok(())
10781182
}
10791183
}
1080-

0 commit comments

Comments
 (0)