@@ -3,8 +3,8 @@ use std::sync::Arc;
33use std:: time:: Duration ;
44
55use futures:: channel:: mpsc:: { UnboundedReceiver , UnboundedSender } ;
6- use futures:: stream:: { FuturesUnordered , SelectAll } ;
7- use futures:: StreamExt ;
6+ use futures:: stream:: FuturesUnordered ;
7+ use futures:: { FutureExt , StreamExt } ;
88use log:: { debug, error, info, warn} ;
99use shvclient:: clientapi:: { CallRpcMethodErrorKind , RpcCall , RpcCallDirExists , RpcCallLsList , Subscriber } ;
1010use shvclient:: clientnode:: { METH_DIR , SIG_CHNG } ;
@@ -16,6 +16,7 @@ use shvrpc::util::find_longest_path_prefix;
1616use shvrpc:: { join_path, RpcMessageMetaTags } ;
1717use tokio:: sync:: Semaphore ;
1818use tokio:: time:: timeout;
19+ use tokio_stream:: StreamMap ;
1920
2021use crate :: alarm:: { collect_alarms, collect_state_alarms, Alarm } ;
2122use crate :: getlog:: { getlog_handler} ;
@@ -343,12 +344,44 @@ pub(crate) fn parse_notification(msg: &shvrpc::RpcMessage, sites_info: &BTreeMap
343344}
344345
345346struct SitesTaskState {
346- mntchng_subscribers : SelectAll < Subscriber > ,
347- subscribers : SelectAll < Subscriber > ,
347+ mntchng_subscribers : StreamMap < String , Subscriber > ,
348+ subscribers : StreamMap < String , Subscriber > ,
348349 online_status_channels : BTreeMap < String , UnboundedSender < SiteOnlineStatus > > ,
349350 online_status_task : Option < tokio:: task:: JoinHandle < ( ) > > ,
350351}
351352
353+ async fn init_subscribers (
354+ old_subscribers : & mut StreamMap < String , Subscriber > ,
355+ client_cmd_tx : & ClientCommandSender ,
356+ subscriptions : impl IntoIterator < Item = ( String , String ) > ,
357+ ) -> StreamMap < String , Subscriber > {
358+ let mut subscribers = StreamMap :: new ( ) ;
359+ let mut new_subscribers = subscriptions
360+ . into_iter ( )
361+ . filter_map ( |( path, signal) | {
362+ let key = format ! ( "{path}:*:{signal}" ) ;
363+ match old_subscribers. remove ( & key) {
364+ Some ( subscriber) => {
365+ subscribers. insert ( key, subscriber) ;
366+ None
367+ }
368+ None => {
369+ Some ( async move {
370+ let subscriber = subscribe ( client_cmd_tx, & path, & signal) . await ;
371+ ( key, subscriber)
372+ } )
373+ }
374+ }
375+ } )
376+ . collect :: < FuturesUnordered < _ > > ( ) ;
377+
378+ while let Some ( ( key, subscriber) ) = new_subscribers. next ( ) . await {
379+ subscribers. insert ( key, subscriber) ;
380+ }
381+
382+ subscribers
383+ }
384+
352385enum PeriodicSyncCommand {
353386 Enable ,
354387 Disable ,
@@ -359,6 +392,7 @@ async fn reload_sites(
359392 shv_api_version : shvclient:: clientapi:: ShvApiVersion ,
360393 client_cmd_tx : & ClientCommandSender ,
361394 app_state : & Arc < State > ,
395+ old_state : & mut SitesTaskState ,
362396) -> Option < SitesTaskState >
363397{
364398 let _reload_guard = crate :: ReloadGuard :: new ( & app_state. sites_reload_in_progress ) ;
@@ -424,44 +458,35 @@ async fn reload_sites(
424458 . join( "\n " )
425459 ) ;
426460
427- // Subscribe mntchng
428- let mntchng_subscribers = sites_info
461+ let sites_without_pushlog = sites_info
429462 . iter ( )
430463 . filter ( |( _, site) | sub_hps
431464 . get ( & site. sub_hp )
432465 . is_some_and ( |sub_hp| !matches ! ( sub_hp, SubHpInfo :: PushLog ) )
433- )
434- . map ( |( path, _) | {
435- subscribe ( client_cmd_tx, subscription_prefix_path ( join_path ! ( "shv" , path) , & shv_api_version) , "mntchng" )
436- } )
437- . collect :: < FuturesUnordered < _ > > ( )
438- . collect :: < SelectAll < _ > > ( )
439- . await ;
466+ ) ;
467+
468+ // Subscribe mntchng
469+ let mntchng_subscribers = init_subscribers (
470+ & mut old_state. mntchng_subscribers ,
471+ client_cmd_tx,
472+ sites_without_pushlog. clone ( ) . map ( |( path, _) | ( subscription_prefix_path ( join_path ! ( "shv" , path) , & shv_api_version) , "mntchng" . to_string ( ) ) )
473+ ) . await ;
440474
441475 log:: info!( "Loading typeinfo" ) ;
442- let subscribers = sites_info
443- . iter ( )
444- . filter ( |( _, site) | sub_hps
445- . get ( & site. sub_hp )
446- . is_some_and ( |sub_hp| !matches ! ( sub_hp, SubHpInfo :: PushLog ) )
447- )
448- . flat_map ( |( path, _) | {
449- let shv_path = join_path ! ( "shv" , path) ;
450- let sub_chng = subscribe ( client_cmd_tx, subscription_prefix_path ( & shv_path, & shv_api_version) , SIG_CHNG ) ;
451- const SIG_CMDLOG : & str = "cmdlog" ;
452- let sub_cmdlog = subscribe ( client_cmd_tx, subscription_prefix_path ( & shv_path, & shv_api_version) , SIG_CMDLOG ) ;
453- [ sub_chng, sub_cmdlog]
454- } )
455- . collect :: < FuturesUnordered < _ > > ( )
456- . collect :: < SelectAll < _ > > ( )
457- . await ;
476+ let subscribers = init_subscribers (
477+ & mut old_state. subscribers ,
478+ client_cmd_tx,
479+ sites_without_pushlog. clone ( ) . flat_map ( |( path, _) | {
480+ let shv_path = join_path ! ( "shv" , path) ;
481+ const SIG_CMDLOG : & str = "cmdlog" ;
482+ [
483+ ( subscription_prefix_path ( & shv_path, & shv_api_version) , SIG_CHNG . to_string ( ) ) ,
484+ ( subscription_prefix_path ( & shv_path, & shv_api_version) , SIG_CMDLOG . to_string ( ) ) ,
485+ ]
486+ } )
487+ ) . await ;
458488
459- let typeinfos = sites_info
460- . iter ( )
461- . filter ( |( _, site) | sub_hps
462- . get ( & site. sub_hp )
463- . is_some_and ( |sub_hp| !matches ! ( sub_hp, SubHpInfo :: PushLog ) )
464- )
489+ let typeinfos = sites_without_pushlog
465490 . map ( |( path, _) | {
466491 let client_cmd_tx = ClientCommandSender :: clone ( client_cmd_tx) ;
467492 async move {
@@ -611,13 +636,12 @@ pub(crate) async fn sites_task(
611636 let mut client_evt_rx = std:: pin:: pin!( client_evt_rx. fuse( ) . peekable( ) ) ;
612637
613638 let mut state = SitesTaskState {
614- mntchng_subscribers : SelectAll :: < Subscriber > :: default ( ) ,
615- subscribers : SelectAll :: < Subscriber > :: default ( ) ,
639+ mntchng_subscribers : StreamMap :: new ( ) ,
640+ subscribers : StreamMap :: new ( ) ,
616641 online_status_channels : BTreeMap :: new ( ) ,
617642 online_status_task : None ,
618643 } ;
619644
620-
621645 let ( periodic_sync_tx, mut periodic_sync_rx) = futures:: channel:: mpsc:: unbounded ( ) ;
622646 let periodic_sync_task = {
623647 let app_state = app_state. clone ( ) ;
@@ -668,13 +692,20 @@ pub(crate) async fn sites_task(
668692 let shv_api_version = loop {
669693 match client_evt_rx. next ( ) . await {
670694 Some ( ClientEvent :: Connected ( shv_api_version) ) => break shv_api_version,
671- Some ( ClientEvent :: ConnectionFailed ( _) ) | Some ( ClientEvent :: Disconnected ) => ( ) ,
695+ Some ( ClientEvent :: ConnectionFailed ( _) ) | Some ( ClientEvent :: Disconnected ) => {
696+ state = SitesTaskState {
697+ mntchng_subscribers : StreamMap :: new ( ) ,
698+ subscribers : StreamMap :: new ( ) ,
699+ online_status_channels : BTreeMap :: new ( ) ,
700+ online_status_task : None ,
701+ } ;
702+ } ,
672703 None => break ' main_loop,
673704 }
674705 } ;
675706
676707 ' sites_loop: loop {
677- let mut state = match reload_sites ( shv_api_version. clone ( ) , & client_cmd_tx, & app_state) . await {
708+ state = match reload_sites ( shv_api_version. clone ( ) , & client_cmd_tx, & app_state, & mut state ) . await {
678709 Some ( s) => s,
679710 None => continue ' main_loop,
680711 } ;
@@ -691,7 +722,10 @@ pub(crate) async fn sites_task(
691722 continue ' sites_loop;
692723 } ,
693724 } ,
694- mntchng_frame = state. mntchng_subscribers. select_next_some( ) => {
725+ mntchng_frame = state. mntchng_subscribers. next( ) . fuse( ) => {
726+ let Some ( ( _, mntchng_frame) ) = mntchng_frame else {
727+ continue ;
728+ } ;
695729 let msg = match mntchng_frame. to_rpcmesage( ) {
696730 Ok ( msg) => msg,
697731 Err ( err) => {
@@ -723,7 +757,10 @@ pub(crate) async fn sites_task(
723757 }
724758 }
725759 }
726- notification_frame = state. subscribers. select_next_some( ) => {
760+ notification_frame = state. subscribers. next( ) . fuse( ) => {
761+ let Some ( ( _, notification_frame) ) = notification_frame else {
762+ continue ;
763+ } ;
727764 let msg = match notification_frame. to_rpcmesage( ) {
728765 Ok ( msg) => msg,
729766 Err ( err) => {
@@ -862,6 +899,15 @@ mod tests {
862899 }
863900 }
864901
902+ #[ async_trait:: async_trait]
903+ impl TestStep < SitesTaskTestState > for SitesCommand {
904+ async fn exec ( & self , _client_command_reciever : & mut UnboundedReceiver < ClientCommand > , _subscriptions : & mut HashMap < String , UnboundedSender < RpcFrame > > , state : & mut SitesTaskTestState ) {
905+ match self {
906+ SitesCommand :: ReloadSites => state. sites_cmd_tx . unbounded_send ( SitesCommand :: ReloadSites ) . unwrap ( ) ,
907+ }
908+ }
909+ }
910+
865911 #[ derive( Debug ) ]
866912 enum ExpectDirtylogCommand {
867913 ProcessNotification ,
@@ -1034,6 +1080,45 @@ mod tests {
10341080 Box :: new ( ExpectSignal ( "node" , "onlinestatuschng" , 0 . into ( ) ) ) ,
10351081 ] ,
10361082 } ,
1083+ TestCase {
1084+ name : "Reload sites reuses subscriptions" ,
1085+ steps : & [
1086+ Box :: new ( ClientEvent :: Connected ( shvclient:: clientapi:: ShvApiVersion :: V3 ) ) ,
1087+ Box :: new ( ExpectCall ( "sites" , "getSites" , Ok ( some_broker ( ) ) ) ) ,
1088+ Box :: new ( ExpectSubscription ( "shv/legacy_sync_path_device/*:*:mntchng" . try_into ( ) . unwrap ( ) ) ) ,
1089+ Box :: new ( ExpectSubscription ( "shv/node/*:*:mntchng" . try_into ( ) . unwrap ( ) ) ) ,
1090+ Box :: new ( ExpectSubscription ( "shv/node_with_hp_meta/*:*:mntchng" . try_into ( ) . unwrap ( ) ) ) ,
1091+ Box :: new ( ExpectSubscription ( "shv/legacy_sync_path_device/*:*:chng" . try_into ( ) . unwrap ( ) ) ) ,
1092+ Box :: new ( ExpectSubscription ( "shv/legacy_sync_path_device/*:*:cmdlog" . try_into ( ) . unwrap ( ) ) ) ,
1093+ Box :: new ( ExpectSubscription ( "shv/node/*:*:chng" . try_into ( ) . unwrap ( ) ) ) ,
1094+ Box :: new ( ExpectSubscription ( "shv/node/*:*:cmdlog" . try_into ( ) . unwrap ( ) ) ) ,
1095+ Box :: new ( ExpectSubscription ( "shv/node_with_hp_meta/*:*:chng" . try_into ( ) . unwrap ( ) ) ) ,
1096+ Box :: new ( ExpectSubscription ( "shv/node_with_hp_meta/*:*:cmdlog" . try_into ( ) . unwrap ( ) ) ) ,
1097+ Box :: new ( ExpectCall ( "sites/legacy_sync_path_device/_files" , "ls" , Ok ( shvproto:: List :: new ( ) . into ( ) ) ) ) ,
1098+ Box :: new ( ExpectCall ( "sites/node/_files" , "ls" , Ok ( shvproto:: List :: new ( ) . into ( ) ) ) ) ,
1099+ Box :: new ( ExpectCall ( "sites/node_with_hp_meta/_files" , "ls" , Ok ( shvproto:: List :: new ( ) . into ( ) ) ) ) ,
1100+ Box :: new ( ExpectSyncCommand :: SyncAll ) ,
1101+ Box :: new ( SitesCommand :: ReloadSites ) ,
1102+ Box :: new ( ExpectCall ( "sites" , "getSites" , Ok ( some_broker ( ) ) ) ) ,
1103+ Box :: new ( ExpectCall ( "sites/legacy_sync_path_device/_files" , "ls" , Ok ( shvproto:: List :: new ( ) . into ( ) ) ) ) ,
1104+ Box :: new ( ExpectCall ( "sites/node/_files" , "ls" , Ok ( shvproto:: List :: new ( ) . into ( ) ) ) ) ,
1105+ Box :: new ( ExpectCall ( "sites/node_with_hp_meta/_files" , "ls" , Ok ( shvproto:: List :: new ( ) . into ( ) ) ) ) ,
1106+ Box :: new ( ExpectSyncCommand :: SyncAll ) ,
1107+ ] ,
1108+ starting_files : vec ! [ ] ,
1109+ expected_file_paths : vec ! [ ] ,
1110+ cleanup_steps : & [
1111+ Box :: new ( ExpectUnsubscription ) ,
1112+ Box :: new ( ExpectUnsubscription ) ,
1113+ Box :: new ( ExpectUnsubscription ) ,
1114+ Box :: new ( ExpectUnsubscription ) ,
1115+ Box :: new ( ExpectUnsubscription ) ,
1116+ Box :: new ( ExpectUnsubscription ) ,
1117+ Box :: new ( ExpectUnsubscription ) ,
1118+ Box :: new ( ExpectUnsubscription ) ,
1119+ Box :: new ( ExpectUnsubscription ) ,
1120+ ] ,
1121+ } ,
10371122 TestCase {
10381123 name : "Periodic sync" ,
10391124 steps : & [
@@ -1077,4 +1162,3 @@ mod tests {
10771162 Ok ( ( ) )
10781163 }
10791164}
1080-
0 commit comments