Skip to content

Commit 8e3269b

Browse files
committed
Reuse old subscribers
1 parent 1cf8c54 commit 8e3269b

1 file changed

Lines changed: 134 additions & 40 deletions

File tree

src/sites.rs

Lines changed: 134 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use std::sync::Arc;
33
use std::time::Duration;
44

55
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
6-
use futures::stream::{FuturesUnordered, SelectAll};
7-
use futures::StreamExt;
6+
use futures::stream::FuturesUnordered;
7+
use futures::{FutureExt, StreamExt};
88
use log::{debug, error, info, warn};
99
use shvclient::clientapi::{CallRpcMethodErrorKind, RpcCall, RpcCallDirExists, RpcCallLsList, Subscriber};
1010
use shvclient::clientnode::{METH_DIR, SIG_CHNG};
@@ -16,6 +16,7 @@ use shvrpc::util::find_longest_path_prefix;
1616
use shvrpc::{join_path, RpcMessageMetaTags};
1717
use tokio::sync::Semaphore;
1818
use tokio::time::timeout;
19+
use tokio_stream::StreamMap;
1920

2021
use crate::alarm::{collect_alarms, collect_state_alarms, Alarm};
2122
use crate::getlog::{getlog_handler};
@@ -343,12 +344,44 @@ pub(crate) fn parse_notification(msg: &shvrpc::RpcMessage, sites_info: &BTreeMap
343344
}
344345

345346
struct SitesTaskState {
346-
mntchng_subscribers: SelectAll<Subscriber>,
347-
subscribers: SelectAll<Subscriber>,
347+
mntchng_subscribers: StreamMap<String, Subscriber>,
348+
subscribers: StreamMap<String, Subscriber>,
348349
online_status_channels: BTreeMap<String, UnboundedSender<SiteOnlineStatus>>,
349350
online_status_task: Option<tokio::task::JoinHandle<()>>,
350351
}
351352

353+
async fn init_subscribers(
354+
old_subscribers: &mut StreamMap<String, Subscriber>,
355+
client_cmd_tx: &ClientCommandSender,
356+
subscriptions: impl IntoIterator<Item = (String, String)>,
357+
) -> StreamMap<String, Subscriber> {
358+
let mut subscribers = StreamMap::new();
359+
let mut new_subscribers = subscriptions
360+
.into_iter()
361+
.filter_map(|(path, signal)| {
362+
let key = format!("{path}:*:{signal}");
363+
match old_subscribers.remove(&key) {
364+
Some(subscriber) => {
365+
subscribers.insert(key, subscriber);
366+
None
367+
}
368+
None => {
369+
Some(async move {
370+
let subscriber = subscribe(client_cmd_tx, &path, &signal).await;
371+
(key, subscriber)
372+
})
373+
}
374+
}
375+
})
376+
.collect::<FuturesUnordered<_>>();
377+
378+
while let Some((key, subscriber)) = new_subscribers.next().await {
379+
subscribers.insert(key, subscriber);
380+
}
381+
382+
subscribers
383+
}
384+
352385
enum PeriodicSyncCommand {
353386
Enable,
354387
Disable,
@@ -359,6 +392,7 @@ async fn reload_sites(
359392
shv_api_version: shvclient::clientapi::ShvApiVersion,
360393
client_cmd_tx: &ClientCommandSender,
361394
app_state: &Arc<State>,
395+
old_state: &mut SitesTaskState,
362396
) -> Option<SitesTaskState>
363397
{
364398
let _reload_guard = crate::ReloadGuard::new(&app_state.sites_reload_in_progress);
@@ -425,36 +459,37 @@ async fn reload_sites(
425459
);
426460

427461
// Subscribe mntchng
428-
let mntchng_subscribers = sites_info
429-
.iter()
430-
.filter(|(_, site)| sub_hps
431-
.get(&site.sub_hp)
432-
.is_some_and(|sub_hp| !matches!(sub_hp, SubHpInfo::PushLog))
433-
)
434-
.map(|(path, _)| {
435-
subscribe(client_cmd_tx, subscription_prefix_path(join_path!("shv", path), &shv_api_version), "mntchng")
436-
})
437-
.collect::<FuturesUnordered<_>>()
438-
.collect::<SelectAll<_>>()
439-
.await;
462+
let mntchng_subscribers = init_subscribers(
463+
&mut old_state.mntchng_subscribers,
464+
client_cmd_tx,
465+
sites_info
466+
.iter()
467+
.filter(|(_, site)| sub_hps
468+
.get(&site.sub_hp)
469+
.is_some_and(|sub_hp| !matches!(sub_hp, SubHpInfo::PushLog))
470+
)
471+
.map(|(path, _)| (subscription_prefix_path(join_path!("shv", path), &shv_api_version), "mntchng".to_string()))
472+
).await;
440473

441474
log::info!("Loading typeinfo");
442-
let subscribers = sites_info
443-
.iter()
444-
.filter(|(_, site)| sub_hps
445-
.get(&site.sub_hp)
446-
.is_some_and(|sub_hp| !matches!(sub_hp, SubHpInfo::PushLog))
447-
)
448-
.flat_map(|(path, _)| {
449-
let shv_path = join_path!("shv", path);
450-
let sub_chng = subscribe(client_cmd_tx, subscription_prefix_path(&shv_path, &shv_api_version), SIG_CHNG);
451-
const SIG_CMDLOG: &str = "cmdlog";
452-
let sub_cmdlog = subscribe(client_cmd_tx, subscription_prefix_path(&shv_path, &shv_api_version), SIG_CMDLOG);
453-
[sub_chng, sub_cmdlog]
454-
})
455-
.collect::<FuturesUnordered<_>>()
456-
.collect::<SelectAll<_>>()
457-
.await;
475+
let subscribers = init_subscribers(
476+
&mut old_state.subscribers,
477+
client_cmd_tx,
478+
sites_info
479+
.iter()
480+
.filter(|(_, site)| sub_hps
481+
.get(&site.sub_hp)
482+
.is_some_and(|sub_hp| !matches!(sub_hp, SubHpInfo::PushLog))
483+
)
484+
.flat_map(|(path, _)| {
485+
let shv_path = join_path!("shv", path);
486+
const SIG_CMDLOG: &str = "cmdlog";
487+
[
488+
(subscription_prefix_path(&shv_path, &shv_api_version), SIG_CHNG.to_string()),
489+
(subscription_prefix_path(&shv_path, &shv_api_version), SIG_CMDLOG.to_string()),
490+
]
491+
})
492+
).await;
458493

459494
let typeinfos = sites_info
460495
.iter()
@@ -611,13 +646,12 @@ pub(crate) async fn sites_task(
611646
let mut client_evt_rx = std::pin::pin!(client_evt_rx.fuse().peekable());
612647

613648
let mut state = SitesTaskState {
614-
mntchng_subscribers: SelectAll::<Subscriber>::default(),
615-
subscribers: SelectAll::<Subscriber>::default(),
649+
mntchng_subscribers: StreamMap::new(),
650+
subscribers: StreamMap::new(),
616651
online_status_channels: BTreeMap::new(),
617652
online_status_task: None,
618653
};
619654

620-
621655
let (periodic_sync_tx, mut periodic_sync_rx) = futures::channel::mpsc::unbounded();
622656
let periodic_sync_task = {
623657
let app_state = app_state.clone();
@@ -668,13 +702,20 @@ pub(crate) async fn sites_task(
668702
let shv_api_version = loop {
669703
match client_evt_rx.next().await {
670704
Some(ClientEvent::Connected(shv_api_version)) => break shv_api_version,
671-
Some(ClientEvent::ConnectionFailed(_)) | Some(ClientEvent::Disconnected) => (),
705+
Some(ClientEvent::ConnectionFailed(_)) | Some(ClientEvent::Disconnected) => {
706+
state = SitesTaskState {
707+
mntchng_subscribers: StreamMap::new(),
708+
subscribers: StreamMap::new(),
709+
online_status_channels: BTreeMap::new(),
710+
online_status_task: None,
711+
};
712+
},
672713
None => break 'main_loop,
673714
}
674715
};
675716

676717
'sites_loop: loop {
677-
let mut state = match reload_sites(shv_api_version.clone(), &client_cmd_tx, &app_state).await {
718+
state = match reload_sites(shv_api_version.clone(), &client_cmd_tx, &app_state, &mut state).await {
678719
Some(s) => s,
679720
None => continue 'main_loop,
680721
};
@@ -691,7 +732,10 @@ pub(crate) async fn sites_task(
691732
continue 'sites_loop;
692733
},
693734
},
694-
mntchng_frame = state.mntchng_subscribers.select_next_some() => {
735+
mntchng_frame = state.mntchng_subscribers.next().fuse() => {
736+
let Some((_, mntchng_frame)) = mntchng_frame else {
737+
continue;
738+
};
695739
let msg = match mntchng_frame.to_rpcmesage() {
696740
Ok(msg) => msg,
697741
Err(err) => {
@@ -723,7 +767,10 @@ pub(crate) async fn sites_task(
723767
}
724768
}
725769
}
726-
notification_frame = state.subscribers.select_next_some() => {
770+
notification_frame = state.subscribers.next().fuse() => {
771+
let Some((_, notification_frame)) = notification_frame else {
772+
continue;
773+
};
727774
let msg = match notification_frame.to_rpcmesage() {
728775
Ok(msg) => msg,
729776
Err(err) => {
@@ -862,6 +909,15 @@ mod tests {
862909
}
863910
}
864911

912+
#[async_trait::async_trait]
913+
impl TestStep<SitesTaskTestState> for SitesCommand {
914+
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand>,_subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SitesTaskTestState) {
915+
match self {
916+
SitesCommand::ReloadSites => state.sites_cmd_tx.unbounded_send(SitesCommand::ReloadSites).unwrap(),
917+
}
918+
}
919+
}
920+
865921
#[derive(Debug)]
866922
enum ExpectDirtylogCommand {
867923
ProcessNotification,
@@ -1034,6 +1090,45 @@ mod tests {
10341090
Box::new(ExpectSignal("node", "onlinestatuschng", 0.into())),
10351091
],
10361092
},
1093+
TestCase {
1094+
name: "Reload sites reuses subscriptions",
1095+
steps: &[
1096+
Box::new(ClientEvent::Connected(shvclient::clientapi::ShvApiVersion::V3)),
1097+
Box::new(ExpectCall("sites", "getSites", Ok(some_broker()))),
1098+
Box::new(ExpectSubscription("shv/legacy_sync_path_device/*:*:mntchng".try_into().unwrap())),
1099+
Box::new(ExpectSubscription("shv/node/*:*:mntchng".try_into().unwrap())),
1100+
Box::new(ExpectSubscription("shv/node_with_hp_meta/*:*:mntchng".try_into().unwrap())),
1101+
Box::new(ExpectSubscription("shv/legacy_sync_path_device/*:*:chng".try_into().unwrap())),
1102+
Box::new(ExpectSubscription("shv/legacy_sync_path_device/*:*:cmdlog".try_into().unwrap())),
1103+
Box::new(ExpectSubscription("shv/node/*:*:chng".try_into().unwrap())),
1104+
Box::new(ExpectSubscription("shv/node/*:*:cmdlog".try_into().unwrap())),
1105+
Box::new(ExpectSubscription("shv/node_with_hp_meta/*:*:chng".try_into().unwrap())),
1106+
Box::new(ExpectSubscription("shv/node_with_hp_meta/*:*:cmdlog".try_into().unwrap())),
1107+
Box::new(ExpectCall("sites/legacy_sync_path_device/_files", "ls", Ok(shvproto::List::new().into()))),
1108+
Box::new(ExpectCall("sites/node/_files", "ls", Ok(shvproto::List::new().into()))),
1109+
Box::new(ExpectCall("sites/node_with_hp_meta/_files", "ls", Ok(shvproto::List::new().into()))),
1110+
Box::new(ExpectSyncCommand::SyncAll),
1111+
Box::new(SitesCommand::ReloadSites),
1112+
Box::new(ExpectCall("sites", "getSites", Ok(some_broker()))),
1113+
Box::new(ExpectCall("sites/legacy_sync_path_device/_files", "ls", Ok(shvproto::List::new().into()))),
1114+
Box::new(ExpectCall("sites/node/_files", "ls", Ok(shvproto::List::new().into()))),
1115+
Box::new(ExpectCall("sites/node_with_hp_meta/_files", "ls", Ok(shvproto::List::new().into()))),
1116+
Box::new(ExpectSyncCommand::SyncAll),
1117+
],
1118+
starting_files: vec![],
1119+
expected_file_paths: vec![],
1120+
cleanup_steps: &[
1121+
Box::new(ExpectUnsubscription),
1122+
Box::new(ExpectUnsubscription),
1123+
Box::new(ExpectUnsubscription),
1124+
Box::new(ExpectUnsubscription),
1125+
Box::new(ExpectUnsubscription),
1126+
Box::new(ExpectUnsubscription),
1127+
Box::new(ExpectUnsubscription),
1128+
Box::new(ExpectUnsubscription),
1129+
Box::new(ExpectUnsubscription),
1130+
],
1131+
},
10371132
TestCase {
10381133
name: "Periodic sync",
10391134
steps: &[
@@ -1077,4 +1172,3 @@ mod tests {
10771172
Ok(())
10781173
}
10791174
}
1080-

0 commit comments

Comments
 (0)