Skip to content

Commit 393bb13

Browse files
committed
Reuse old subscribers
1 parent 9d62fe8 commit 393bb13

1 file changed

Lines changed: 128 additions & 43 deletions

File tree

src/sites.rs

Lines changed: 128 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use std::sync::Arc;
33
use std::time::Duration;
44

55
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
6-
use futures::stream::{FuturesUnordered, SelectAll};
7-
use futures::StreamExt;
6+
use futures::stream::FuturesUnordered;
7+
use futures::{FutureExt, StreamExt};
88
use log::{debug, error, info, warn};
99
use shvclient::clientapi::{CallRpcMethodErrorKind, RpcCall, RpcCallDirExists, RpcCallLsList, Subscriber};
1010
use shvclient::clientnode::{METH_DIR, SIG_CHNG};
@@ -16,6 +16,7 @@ use shvrpc::util::find_longest_path_prefix;
1616
use shvrpc::{join_path, RpcMessageMetaTags};
1717
use tokio::sync::Semaphore;
1818
use tokio::time::timeout;
19+
use tokio_stream::StreamMap;
1920

2021
use crate::alarm::{collect_alarms, collect_state_alarms, Alarm};
2122
use crate::getlog::{getlog_handler};
@@ -343,12 +344,44 @@ pub(crate) fn parse_notification(msg: &shvrpc::RpcMessage, sites_info: &BTreeMap
343344
}
344345

345346
struct SitesTaskState {
346-
mntchng_subscribers: SelectAll<Subscriber>,
347-
subscribers: SelectAll<Subscriber>,
347+
mntchng_subscribers: StreamMap<String, Subscriber>,
348+
subscribers: StreamMap<String, Subscriber>,
348349
online_status_channels: BTreeMap<String, UnboundedSender<SiteOnlineStatus>>,
349350
online_status_task: Option<tokio::task::JoinHandle<()>>,
350351
}
351352

353+
async fn init_subscribers(
354+
old_subscribers: &mut StreamMap<String, Subscriber>,
355+
client_cmd_tx: &ClientCommandSender,
356+
subscriptions: impl IntoIterator<Item = (String, String)>,
357+
) -> StreamMap<String, Subscriber> {
358+
let mut subscribers = StreamMap::new();
359+
let mut new_subscribers = subscriptions
360+
.into_iter()
361+
.filter_map(|(path, signal)| {
362+
let key = format!("{path}:*:{signal}");
363+
match old_subscribers.remove(&key) {
364+
Some(subscriber) => {
365+
subscribers.insert(key, subscriber);
366+
None
367+
}
368+
None => {
369+
Some(async move {
370+
let subscriber = subscribe(client_cmd_tx, &path, &signal).await;
371+
(key, subscriber)
372+
})
373+
}
374+
}
375+
})
376+
.collect::<FuturesUnordered<_>>();
377+
378+
while let Some((key, subscriber)) = new_subscribers.next().await {
379+
subscribers.insert(key, subscriber);
380+
}
381+
382+
subscribers
383+
}
384+
352385
enum PeriodicSyncCommand {
353386
Enable,
354387
Disable,
@@ -359,6 +392,7 @@ async fn reload_sites(
359392
shv_api_version: shvclient::clientapi::ShvApiVersion,
360393
client_cmd_tx: &ClientCommandSender,
361394
app_state: &Arc<State>,
395+
old_state: &mut SitesTaskState,
362396
) -> Option<SitesTaskState>
363397
{
364398
let _reload_guard = crate::ReloadGuard::new(&app_state.sites_reload_in_progress);
@@ -424,44 +458,36 @@ async fn reload_sites(
424458
.join("\n")
425459
);
426460

427-
// Subscribe mntchng
428-
let mntchng_subscribers = sites_info
461+
let sites_without_pushlog = sites_info
429462
.iter()
430463
.filter(|(_, site)| sub_hps
431464
.get(&site.sub_hp)
432465
.is_some_and(|sub_hp| !matches!(sub_hp, SubHpInfo::PushLog))
433-
)
434-
.map(|(path, _)| {
435-
subscribe(client_cmd_tx, subscription_prefix_path(join_path!("shv", path), &shv_api_version), "mntchng")
436-
})
437-
.collect::<FuturesUnordered<_>>()
438-
.collect::<SelectAll<_>>()
439-
.await;
466+
);
467+
468+
469+
// Subscribe mntchng
470+
let mntchng_subscribers = init_subscribers(
471+
&mut old_state.mntchng_subscribers,
472+
client_cmd_tx,
473+
sites_without_pushlog.clone().map(|(path, _)| (subscription_prefix_path(join_path!("shv", path), &shv_api_version), "mntchng".to_string()))
474+
).await;
440475

441476
log::info!("Loading typeinfo");
442-
let subscribers = sites_info
443-
.iter()
444-
.filter(|(_, site)| sub_hps
445-
.get(&site.sub_hp)
446-
.is_some_and(|sub_hp| !matches!(sub_hp, SubHpInfo::PushLog))
447-
)
448-
.flat_map(|(path, _)| {
449-
let shv_path = join_path!("shv", path);
450-
let sub_chng = subscribe(client_cmd_tx, subscription_prefix_path(&shv_path, &shv_api_version), SIG_CHNG);
451-
const SIG_CMDLOG: &str = "cmdlog";
452-
let sub_cmdlog = subscribe(client_cmd_tx, subscription_prefix_path(&shv_path, &shv_api_version), SIG_CMDLOG);
453-
[sub_chng, sub_cmdlog]
454-
})
455-
.collect::<FuturesUnordered<_>>()
456-
.collect::<SelectAll<_>>()
457-
.await;
477+
let subscribers = init_subscribers(
478+
&mut old_state.subscribers,
479+
client_cmd_tx,
480+
sites_without_pushlog.clone().flat_map(|(path, _)| {
481+
let shv_path = join_path!("shv", path);
482+
const SIG_CMDLOG: &str = "cmdlog";
483+
[
484+
(subscription_prefix_path(&shv_path, &shv_api_version), SIG_CHNG.to_string()),
485+
(subscription_prefix_path(&shv_path, &shv_api_version), SIG_CMDLOG.to_string()),
486+
]
487+
})
488+
).await;
458489

459-
let typeinfos = sites_info
460-
.iter()
461-
.filter(|(_, site)| sub_hps
462-
.get(&site.sub_hp)
463-
.is_some_and(|sub_hp| !matches!(sub_hp, SubHpInfo::PushLog))
464-
)
490+
let typeinfos = sites_without_pushlog
465491
.map(|(path, _)| {
466492
let client_cmd_tx = ClientCommandSender::clone(client_cmd_tx);
467493
async move {
@@ -611,13 +637,12 @@ pub(crate) async fn sites_task(
611637
let mut client_evt_rx = std::pin::pin!(client_evt_rx.fuse().peekable());
612638

613639
let mut state = SitesTaskState {
614-
mntchng_subscribers: SelectAll::<Subscriber>::default(),
615-
subscribers: SelectAll::<Subscriber>::default(),
640+
mntchng_subscribers: StreamMap::new(),
641+
subscribers: StreamMap::new(),
616642
online_status_channels: BTreeMap::new(),
617643
online_status_task: None,
618644
};
619645

620-
621646
let (periodic_sync_tx, mut periodic_sync_rx) = futures::channel::mpsc::unbounded();
622647
let periodic_sync_task = {
623648
let app_state = app_state.clone();
@@ -668,13 +693,20 @@ pub(crate) async fn sites_task(
668693
let shv_api_version = loop {
669694
match client_evt_rx.next().await {
670695
Some(ClientEvent::Connected(shv_api_version)) => break shv_api_version,
671-
Some(ClientEvent::ConnectionFailed(_)) | Some(ClientEvent::Disconnected) => (),
696+
Some(ClientEvent::ConnectionFailed(_)) | Some(ClientEvent::Disconnected) => {
697+
state = SitesTaskState {
698+
mntchng_subscribers: StreamMap::new(),
699+
subscribers: StreamMap::new(),
700+
online_status_channels: BTreeMap::new(),
701+
online_status_task: None,
702+
};
703+
},
672704
None => break 'main_loop,
673705
}
674706
};
675707

676708
'sites_loop: loop {
677-
let mut state = match reload_sites(shv_api_version.clone(), &client_cmd_tx, &app_state).await {
709+
state = match reload_sites(shv_api_version.clone(), &client_cmd_tx, &app_state, &mut state).await {
678710
Some(s) => s,
679711
None => continue 'main_loop,
680712
};
@@ -691,7 +723,10 @@ pub(crate) async fn sites_task(
691723
continue 'sites_loop;
692724
},
693725
},
694-
mntchng_frame = state.mntchng_subscribers.select_next_some() => {
726+
mntchng_frame = state.mntchng_subscribers.next().fuse() => {
727+
let Some((_, mntchng_frame)) = mntchng_frame else {
728+
continue;
729+
};
695730
let msg = match mntchng_frame.to_rpcmesage() {
696731
Ok(msg) => msg,
697732
Err(err) => {
@@ -723,7 +758,10 @@ pub(crate) async fn sites_task(
723758
}
724759
}
725760
}
726-
notification_frame = state.subscribers.select_next_some() => {
761+
notification_frame = state.subscribers.next().fuse() => {
762+
let Some((_, notification_frame)) = notification_frame else {
763+
continue;
764+
};
727765
let msg = match notification_frame.to_rpcmesage() {
728766
Ok(msg) => msg,
729767
Err(err) => {
@@ -862,6 +900,15 @@ mod tests {
862900
}
863901
}
864902

903+
#[async_trait::async_trait]
904+
impl TestStep<SitesTaskTestState> for SitesCommand {
905+
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand>,_subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SitesTaskTestState) {
906+
match self {
907+
SitesCommand::ReloadSites => state.sites_cmd_tx.unbounded_send(SitesCommand::ReloadSites).unwrap(),
908+
}
909+
}
910+
}
911+
865912
#[derive(Debug)]
866913
enum ExpectDirtylogCommand {
867914
ProcessNotification,
@@ -1034,6 +1081,45 @@ mod tests {
10341081
Box::new(ExpectSignal("node", "onlinestatuschng", 0.into())),
10351082
],
10361083
},
1084+
TestCase {
1085+
name: "Reload sites reuses subscriptions",
1086+
steps: &[
1087+
Box::new(ClientEvent::Connected(shvclient::clientapi::ShvApiVersion::V3)),
1088+
Box::new(ExpectCall("sites", "getSites", Ok(some_broker()))),
1089+
Box::new(ExpectSubscription("shv/legacy_sync_path_device/*:*:mntchng".try_into().unwrap())),
1090+
Box::new(ExpectSubscription("shv/node/*:*:mntchng".try_into().unwrap())),
1091+
Box::new(ExpectSubscription("shv/node_with_hp_meta/*:*:mntchng".try_into().unwrap())),
1092+
Box::new(ExpectSubscription("shv/legacy_sync_path_device/*:*:chng".try_into().unwrap())),
1093+
Box::new(ExpectSubscription("shv/legacy_sync_path_device/*:*:cmdlog".try_into().unwrap())),
1094+
Box::new(ExpectSubscription("shv/node/*:*:chng".try_into().unwrap())),
1095+
Box::new(ExpectSubscription("shv/node/*:*:cmdlog".try_into().unwrap())),
1096+
Box::new(ExpectSubscription("shv/node_with_hp_meta/*:*:chng".try_into().unwrap())),
1097+
Box::new(ExpectSubscription("shv/node_with_hp_meta/*:*:cmdlog".try_into().unwrap())),
1098+
Box::new(ExpectCall("sites/legacy_sync_path_device/_files", "ls", Ok(shvproto::List::new().into()))),
1099+
Box::new(ExpectCall("sites/node/_files", "ls", Ok(shvproto::List::new().into()))),
1100+
Box::new(ExpectCall("sites/node_with_hp_meta/_files", "ls", Ok(shvproto::List::new().into()))),
1101+
Box::new(ExpectSyncCommand::SyncAll),
1102+
Box::new(SitesCommand::ReloadSites),
1103+
Box::new(ExpectCall("sites", "getSites", Ok(some_broker()))),
1104+
Box::new(ExpectCall("sites/legacy_sync_path_device/_files", "ls", Ok(shvproto::List::new().into()))),
1105+
Box::new(ExpectCall("sites/node/_files", "ls", Ok(shvproto::List::new().into()))),
1106+
Box::new(ExpectCall("sites/node_with_hp_meta/_files", "ls", Ok(shvproto::List::new().into()))),
1107+
Box::new(ExpectSyncCommand::SyncAll),
1108+
],
1109+
starting_files: vec![],
1110+
expected_file_paths: vec![],
1111+
cleanup_steps: &[
1112+
Box::new(ExpectUnsubscription),
1113+
Box::new(ExpectUnsubscription),
1114+
Box::new(ExpectUnsubscription),
1115+
Box::new(ExpectUnsubscription),
1116+
Box::new(ExpectUnsubscription),
1117+
Box::new(ExpectUnsubscription),
1118+
Box::new(ExpectUnsubscription),
1119+
Box::new(ExpectUnsubscription),
1120+
Box::new(ExpectUnsubscription),
1121+
],
1122+
},
10371123
TestCase {
10381124
name: "Periodic sync",
10391125
steps: &[
@@ -1077,4 +1163,3 @@ mod tests {
10771163
Ok(())
10781164
}
10791165
}
1080-

0 commit comments

Comments
 (0)