Skip to content

Commit c80525d

Browse files
committed
Implement reloadSites
1 parent ed33e7e commit c80525d

3 files changed

Lines changed: 64 additions & 10 deletions

File tree

src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ pub fn make_client(hp_config: &HpConfig, tasks: &mut AppTasks) -> shvrpc::Result
135135

136136
let (sync_cmd_tx, sync_cmd_rx) = crate::util::dedup_channel();
137137
let (dirtylog_cmd_tx, dirtylog_cmd_rx) = unbounded();
138+
let (sites_cmd_tx, sites_cmd_rx) = unbounded();
138139

139140
let app_state = Arc::new(State {
140141
start_time: std::time::Instant::now(),
@@ -153,7 +154,7 @@ pub fn make_client(hp_config: &HpConfig, tasks: &mut AppTasks) -> shvrpc::Result
153154
.mount_dynamic("", {
154155
let app_state = app_state.clone();
155156
move |rq, cmd_sender|
156-
tree::request_handler(rq, cmd_sender, app_state.clone())
157+
tree::request_handler(rq, cmd_sender, sites_cmd_tx.clone(), app_state.clone())
157158
});
158159

159160
let init_function = {
@@ -183,7 +184,7 @@ pub fn make_client(hp_config: &HpConfig, tasks: &mut AppTasks) -> shvrpc::Result
183184
handle_signal(SignalKind::interrupt());
184185
handle_signal(SignalKind::terminate());
185186

186-
tasks.sites_task = Some(wrap_task(tokio::spawn(sites::sites_task(client_cmd_tx.clone(), client_evt_rx.clone(), app_state.clone()))));
187+
tasks.sites_task = Some(wrap_task(tokio::spawn(sites::sites_task(client_cmd_tx.clone(), client_evt_rx.clone(), app_state.clone(), sites_cmd_rx))));
187188
tasks.sync_task = Some(wrap_task(tokio::spawn(sync::sync_task(client_cmd_tx.clone(), client_evt_rx.clone(), app_state.clone(), sync_cmd_rx))));
188189
tasks.dirtylog_task = Some(wrap_task(tokio::spawn(dirtylog::dirtylog_task(client_cmd_tx, client_evt_rx, app_state.clone(), dirtylog_cmd_rx))));
189190
}

src/sites.rs

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ use futures::channel::mpsc::UnboundedReceiver;
66
use futures::stream::{FuturesUnordered, SelectAll};
77
use futures::StreamExt;
88
use log::{debug, error, warn};
9-
use shvclient::clientapi::{CallRpcMethodErrorKind, RpcCall, RpcCallDirExists, RpcCallLsList, Subscriber};
9+
use shvclient::clientapi::{CallRpcMethodErrorKind, RpcCall, RpcCallDirExists, RpcCallLsList, ShvApiVersion, Subscriber};
1010
use shvclient::clientnode::{METH_DIR, SIG_CHNG};
11-
use shvclient::{ClientCommandSender, ClientEventsReceiver};
11+
use shvclient::{ClientCommandSender, ClientEvent, ClientEventsReceiver};
1212
use shvproto::{DateTime, RpcValue};
1313
use shvrpc::rpcmessage::RpcError;
1414
use shvrpc::typeinfo::TypeInfo;
@@ -29,6 +29,10 @@ pub(crate) enum SiteOnlineStatus {
2929
Online = 2,
3030
}
3131

32+
pub(crate) enum SitesCommand {
33+
ReloadSites,
34+
}
35+
3236
#[derive(Clone,Default)]
3337
pub(crate) struct SitesData {
3438
pub(crate) sites_info: Arc<BTreeMap<String, SiteInfo>>,
@@ -337,9 +341,46 @@ pub(crate) async fn sites_task(
337341
client_cmd_tx: ClientCommandSender,
338342
client_evt_rx: ClientEventsReceiver,
339343
app_state: Arc<State>,
344+
mut sites_cmd_rx: UnboundedReceiver<SitesCommand>,
340345
)
341346
{
342-
let mut client_evt_rx = client_evt_rx.fuse();
347+
enum Event {
348+
ClientEvent(shvclient::ClientEvent),
349+
SitesCommand(SitesCommand, ShvApiVersion),
350+
}
351+
352+
let (evt_tx, mut evt_rx) = futures::channel::mpsc::unbounded::<Event>();
353+
354+
let channel_combiner = tokio::spawn(async move {
355+
let mut client_evt_rx = client_evt_rx.fuse();
356+
let mut current_shv_api_version = None;
357+
loop {
358+
futures::select! {
359+
client_event = client_evt_rx.select_next_some() => {
360+
match &client_event {
361+
shvclient::ClientEvent::Connected(shv_api_version) => {
362+
current_shv_api_version = Some(shv_api_version.clone());
363+
},
364+
shvclient::ClientEvent::Disconnected => {
365+
current_shv_api_version = None;
366+
},
367+
shvclient::ClientEvent::ConnectionFailed(_) => (),
368+
}
369+
370+
evt_tx.unbounded_send(Event::ClientEvent(client_event)).expect("channel_combiner must work");
371+
},
372+
sites_command = sites_cmd_rx.select_next_some() => {
373+
let Some(current_shv_api_version) = &current_shv_api_version else {
374+
warn!("Got a SitesCommand when disconnected");
375+
continue;
376+
};
377+
evt_tx.unbounded_send(Event::SitesCommand(sites_command, current_shv_api_version.clone())).expect("channel_combiner must work");
378+
},
379+
complete => break,
380+
}
381+
}
382+
});
383+
343384
let mut mntchng_subscribers = SelectAll::<Subscriber>::default();
344385
let mut subscribers = SelectAll::<Subscriber>::default();
345386
let mut online_status_channels = BTreeMap::new();
@@ -394,8 +435,8 @@ pub(crate) async fn sites_task(
394435

395436
'main_loop: loop {
396437
futures::select! {
397-
client_event = client_evt_rx.select_next_some() => match client_event {
398-
shvclient::ClientEvent::Connected(shv_api_version) => {
438+
event = evt_rx.select_next_some() => match event {
439+
Event::SitesCommand(SitesCommand::ReloadSites, shv_api_version) | Event::ClientEvent(shvclient::ClientEvent::Connected(shv_api_version)) => {
399440
log::info!("Getting sites info");
400441

401442
let (sites_info, sub_hps) = 'sites_get_loop: loop {
@@ -597,8 +638,8 @@ pub(crate) async fn sites_task(
597638
.unbounded_send(PeriodicSyncCommand::Enable)
598639
.unwrap_or_else(|e| log::error!("Cannot send periodic sync enable command: {e}"));
599640

600-
}
601-
_ => {
641+
},
642+
Event::ClientEvent(ClientEvent::Disconnected) | Event::ClientEvent(ClientEvent::ConnectionFailed(_)) => {
602643
subscribers.clear();
603644
mntchng_subscribers.clear();
604645
online_status_channels.clear();
@@ -692,6 +733,10 @@ pub(crate) async fn sites_task(
692733
};
693734
}
694735

736+
if let Err(err) = channel_combiner.await {
737+
log::error!("Failed to join online_status_task: {err}")
738+
}
739+
695740
log::debug!("sites task finished");
696741
}
697742

@@ -971,7 +1016,8 @@ mod tests {
9711016
dirtylog_cmd_rx,
9721017
sync_cmd_rx,
9731018
};
974-
let sites_task = tokio::spawn(sites_task(ccs, cer, state));
1019+
let (_sites_cmd_tx, sites_cmd_rx) = futures::channel::mpsc::unbounded();
1020+
let sites_task = tokio::spawn(sites_task(ccs, cer, state, sites_cmd_rx));
9751021
(sites_task, task_state)
9761022
},
9771023
|state| {

src/tree.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::path::Path;
33
use std::sync::Arc;
44

55
use async_compression::tokio::write::GzipEncoder;
6+
use futures::channel::mpsc::UnboundedSender;
67
use futures::{StreamExt, TryStreamExt};
78
use log::error;
89
use sha1::Digest;
@@ -576,7 +577,9 @@ async fn ls_empty() -> Result<Vec<String>, RpcError> {
576577
pub(crate) async fn request_handler(
577578
rq: RpcMessage,
578579
client_cmd_tx: ClientCommandSender,
580+
sites_cmd_tx: UnboundedSender<crate::sites::SitesCommand>,
579581
app_state: Arc<State>,
582+
580583
) -> RequestHandlerResult
581584
{
582585
let path = rq.shv_path().map_or_else(String::new, String::from);
@@ -647,6 +650,10 @@ pub(crate) async fn request_handler(
647650
.map_err(|err| RpcError::new(RpcErrorCode::InvalidParam, format!("Wrong alarmLog parameters: {err}")))?;
648651
Ok(alarmlog_impl(&path, &params, app_state).await)
649652
}),
653+
METH_RELOAD_SITES => m.resolve(METHODS, async move || {
654+
sites_cmd_tx.unbounded_send(crate::sites::SitesCommand::ReloadSites).map_err(|err| RpcError::new(RpcErrorCode::InternalError, format!("Couldn't reload sites: {err}")))?;
655+
Ok("Sites queued for a reload. This might take a while.".to_string())
656+
}),
650657
_ => err_unresolved_request(),
651658
}
652659
}

0 commit comments

Comments
 (0)