Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
925 changes: 565 additions & 360 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
[package]
name = "historyprovider"
version = "2.0.3"
version = "2.1.0"
edition = "2024"

[[bin]]
name = "hp"

[dependencies]
tokio = { version = "1.48.0", features = ["macros", "net", "rt", "rt-multi-thread", "sync", "time"] }
shvproto = { git = "https://github.com/silicon-heaven/libshvproto-rs", branch = "master", version = "3.6" }
shvrpc = { git = "https://github.com/silicon-heaven/libshvrpc-rs", branch = "master", version = "3.11" }
shvclient = { git = "https://github.com/silicon-heaven/libshvclient-rs", branch = "main", version = "0.11", features = ["tokio"] }
shvproto = { git = "https://github.com/silicon-heaven/libshvproto-rs", tag = "3.6.22" }
shvrpc = { git = "https://github.com/silicon-heaven/libshvrpc-rs", tag = "5.0.0" }
shvclient = { git = "https://github.com/silicon-heaven/libshvclient-rs", tag = "0.20.0", features = ["tokio"] }
futures = "0.3.31"
log = "0.4.28"
clap = { version = "4.5.53", features = ["derive"] }
Expand Down Expand Up @@ -42,5 +42,5 @@ humantime = "2.3.0"
[dev-dependencies]
async-broadcast = "0.7.2"
async-trait = "0.1.80"
shvclient = { git = "https://github.com/silicon-heaven/libshvclient-rs", branch = "main", features = ["mocking"] }
shvclient = { git = "https://github.com/silicon-heaven/libshvclient-rs", tag = "0.20.0", features = ["mocking"] }
tempfile = "3.23.0"
3 changes: 1 addition & 2 deletions src/alarmlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::sync::Arc;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use log::warn;
use shvclient::AppState;
use shvproto::{FromRpcValue, ToRpcValue};
use tokio::sync::Semaphore;

Expand All @@ -30,7 +29,7 @@ pub(crate) struct AlarmLog {
pub(crate) async fn alarmlog_impl(
site_path_prefix: &str,
params: &AlarmLogParams,
app_state: AppState<State>,
app_state: Arc<State>,
) -> BTreeMap<String, AlarmLog>
{
let typeinfos = app_state.sites_data.read().await.typeinfos.clone();
Expand Down
15 changes: 8 additions & 7 deletions src/dirtylog.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;

use futures::channel::mpsc::UnboundedReceiver;
use futures::channel::oneshot::Sender as OneshotSender;
use futures::io::BufReader;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use log::{debug, error, info, warn};
use shvclient::{AppState, ClientEventsReceiver};
use shvclient::{ClientCommandSender, ClientEventsReceiver};
use shvproto::DateTime as ShvDateTime;
use shvrpc::metamethod::AccessLevel;
use tokio_util::compat::TokioAsyncReadCompatExt;
Expand All @@ -18,7 +19,7 @@ use crate::journalentry::JournalEntry;
use crate::journalrw::{JournalReaderLog2, JournalWriterLog2, VALUE_FLAG_SPONTANEOUS_BIT};
use crate::sites::ParsedNotification;
use crate::util::{get_files, is_log2_file};
use crate::{ClientCommandSender, State};
use crate::State;


#[derive(Debug)]
Expand All @@ -36,7 +37,7 @@ pub(crate) enum DirtyLogCommand {
pub(crate) async fn dirtylog_task(
_client_cmd_tx: ClientCommandSender,
_client_evt_rx: ClientEventsReceiver,
app_state: AppState<State>,
app_state: Arc<State>,
mut cmd_rx: UnboundedReceiver<DirtyLogCommand>,
) {
// Per site request
Expand Down Expand Up @@ -296,11 +297,11 @@ mod tests {

use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use log::debug;
use shvclient::client::ClientCommand;
use shvclient::clientapi::ClientCommand;
use shvproto::DateTime;
use shvrpc::rpcframe::RpcFrame;

use crate::{State, datachange::DataChange, dirtylog::{DirtyLogCommand, dirtylog_task}, journalentry::JournalEntry, sync::SyncCommand, util::{DedupReceiver, init_logger, testing::{PrettyJoinError, TestStep, run_test}}};
use crate::{datachange::DataChange, dirtylog::{DirtyLogCommand, dirtylog_task}, journalentry::JournalEntry, sync::SyncCommand, util::{DedupReceiver, init_logger, testing::{PrettyJoinError, TestStep, run_test}}};

struct DirtylogTaskTestState {
sender: UnboundedSender<DirtyLogCommand>,
Expand All @@ -311,7 +312,7 @@ mod tests {

#[async_trait::async_trait]
impl TestStep<DirtylogTaskTestState> for TestDirtyLogCommand {
async fn exec(&self, _client_command_receiver: &mut UnboundedReceiver<ClientCommand<State>>, _subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut DirtylogTaskTestState) {
async fn exec(&self, _client_command_receiver: &mut UnboundedReceiver<ClientCommand>, _subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut DirtylogTaskTestState) {
let cmd = match &self.0 {
DirtyLogCommand::ProcessNotification(msg) => DirtyLogCommand::ProcessNotification(msg.clone()),
DirtyLogCommand::Trim { site } => DirtyLogCommand::Trim { site: site.clone() },
Expand All @@ -329,7 +330,7 @@ mod tests {

#[async_trait::async_trait]
impl TestStep<DirtylogTaskTestState> for TestGetDirtyLog {
async fn exec(&self, _client_command_receiver: &mut UnboundedReceiver<ClientCommand<State>>, _subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut DirtylogTaskTestState) {
async fn exec(&self, _client_command_receiver: &mut UnboundedReceiver<ClientCommand>, _subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut DirtylogTaskTestState) {
debug!(target: "test-driver", "Sending DirtyLogCommand::Get");
let (sender, receiver) = futures::channel::oneshot::channel();
state.sender.unbounded_send(DirtyLogCommand::Get { site: self.site.clone(), response_tx: sender }).expect("Sending DirtyLogCommands should succeed");
Expand Down
3 changes: 1 addition & 2 deletions src/getlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use chrono::TimeZone;
use futures::io::BufReader;
use futures::{Stream, StreamExt, TryStreamExt};
use log::{error, info, warn};
use shvclient::AppState;
use shvrpc::rpcmessage::{RpcError, RpcErrorCode};
use tokio::fs::DirEntry;
use tokio_util::compat::TokioAsyncReadCompatExt;
Expand Down Expand Up @@ -47,7 +46,7 @@ fn file_name_to_file_msec(filename: &str) -> Result<i64, String> {
pub(crate) async fn getlog_handler(
site_path: &str,
params: &GetLog2Params,
app_state: AppState<State>,
app_state: Arc<State>,
) -> Result<GetLogResult, RpcError> {
if !app_state.sites_data.read().await.sites_info.contains_key(site_path) {
return Err(RpcError::new(RpcErrorCode::InvalidParam, format!("Wrong getLog path: {site_path}")));
Expand Down
17 changes: 7 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use futures::channel::mpsc::{unbounded, UnboundedSender};
use log::info;
use serde::Deserialize;
use shvclient::AppState;
use shvproto::RpcValue;
use shvrpc::client::ClientConfig;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -79,9 +79,6 @@ struct State {
dirtylog_cmd_tx: UnboundedSender<dirtylog::DirtyLogCommand>,
}

type ClientCommandSender = shvclient::ClientCommandSender<State>;
type Subscriber = shvclient::client::Subscriber<State>;

pub async fn run(hp_config: &HpConfig, client_config: &ClientConfig) -> shvrpc::Result<()> {
info!("Setting up journal dir: {}", &hp_config.journal_dir);
std::fs::create_dir_all(&hp_config.journal_dir)?;
Expand All @@ -90,7 +87,7 @@ pub async fn run(hp_config: &HpConfig, client_config: &ClientConfig) -> shvrpc::
let (sync_cmd_tx, sync_cmd_rx) = crate::util::dedup_channel();
let (dirtylog_cmd_tx, dirtylog_cmd_rx) = unbounded();

let app_state = AppState::new(State {
let app_state = Arc::new(State {
start_time: std::time::Instant::now(),
sites_data: RwLock::default(),
sync_info: Default::default(),
Expand All @@ -103,11 +100,11 @@ pub async fn run(hp_config: &HpConfig, client_config: &ClientConfig) -> shvrpc::
});

shvclient::Client::new()
.mount_dynamic("",
shvclient::MethodsGetter::new(tree::methods_getter),
shvclient::RequestHandler::stateful(tree::request_handler)
)
.with_app_state(app_state.clone())
.mount_dynamic("", {
let app_state = app_state.clone();
move |rq, cmd_sender|
tree::request_handler(rq, cmd_sender, app_state.clone())
})
.run_with_init(client_config, |client_cmd_tx, client_evt_rx| {
tokio::spawn(sites::sites_task(client_cmd_tx.clone(), client_evt_rx.clone(), app_state.clone()));
tokio::spawn(sync::sync_task(client_cmd_tx.clone(), client_evt_rx.clone(), app_state.clone(), sync_cmd_rx));
Expand Down
4 changes: 2 additions & 2 deletions src/pushlog.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::path::Path;
use std::sync::Arc;

use futures::io::{BufReader, BufWriter};
use futures::StreamExt as _;
use log::{error, info, warn};
use shvclient::AppState;
use shvproto::RpcValue;
use tokio_util::compat::TokioAsyncReadCompatExt as _;

Expand All @@ -30,7 +30,7 @@ impl From<PushLogResult> for RpcValue {
pub(crate) async fn pushlog_impl(
log_reader: Log2Reader,
site_path: &str,
app_state: AppState<State>,
app_state: Arc<State>,
) -> PushLogResult
{
info!("pushLog handler, site: {site_path}, log header: {header}", header = log_reader.header);
Expand Down
33 changes: 17 additions & 16 deletions src/sites.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@ use futures::channel::mpsc::UnboundedReceiver;
use futures::stream::{FuturesUnordered, SelectAll};
use futures::StreamExt;
use log::{debug, error, warn};
use shvclient::client::{CallRpcMethodErrorKind, RpcCall, RpcCallDirExists, RpcCallLsList};
use shvclient::clientnode::{find_longest_path_prefix, METH_DIR, SIG_CHNG};
use shvclient::{AppState, ClientEventsReceiver};
use shvclient::clientapi::{CallRpcMethodErrorKind, RpcCall, RpcCallDirExists, RpcCallLsList, Subscriber};
use shvclient::clientnode::{METH_DIR, SIG_CHNG};
use shvclient::{ClientCommandSender, ClientEventsReceiver};
use shvproto::{DateTime, RpcValue};
use shvrpc::rpcmessage::{RpcError, RpcErrorCode};
use shvrpc::util::find_longest_path_prefix;
use shvrpc::{join_path, RpcMessageMetaTags};
use tokio::time::timeout;

use crate::alarm::{collect_alarms, collect_state_alarms, Alarm};
use crate::getlog::getlog_handler;
use crate::typeinfo::TypeInfo;
use crate::util::{subscribe, subscription_prefix_path};
use crate::{AlarmWithTimestamp, ClientCommandSender, State, Subscriber};
use crate::{AlarmWithTimestamp, State};

#[derive(Clone, Copy, Debug, Default, PartialEq)]
pub(crate) enum SiteOnlineStatus {
Expand Down Expand Up @@ -202,7 +203,7 @@ async fn set_online_status(
site: impl AsRef<str>,
new_status: SiteOnlineStatus,
client_commands: &ClientCommandSender,
app_state: &AppState<State>
app_state: &Arc<State>
)
{
let site = site.as_ref();
Expand Down Expand Up @@ -268,7 +269,7 @@ fn online_status_worker(
site: impl Into<String>,
mut events: UnboundedReceiver<SiteOnlineStatus>,
client_commands: ClientCommandSender,
app_state: AppState<State>
app_state: Arc<State>
) -> impl Future<Output = ()>
{
const ONLINE_TIMER: Duration = Duration::from_secs(10);
Expand Down Expand Up @@ -301,7 +302,7 @@ fn online_status_worker(
set_online_status(&site, SiteOnlineStatus::Online, &client_commands, &app_state).await;
} else if let Err(err) = dir_result
&& let CallRpcMethodErrorKind::RpcError(RpcError { code, .. }) = err.error()
&& (*code == RpcErrorCode::MethodCallTimeout || *code == RpcErrorCode::MethodNotFound)
&& (*code == RpcErrorCode::MethodCallTimeout.into() || *code == RpcErrorCode::MethodNotFound.into())
{
set_online_status(&site, SiteOnlineStatus::Offline, &client_commands, &app_state).await;
}
Expand Down Expand Up @@ -338,7 +339,7 @@ pub(crate) fn parse_notification(msg: &shvrpc::RpcMessage, sites_info: &BTreeMap
pub(crate) async fn sites_task(
client_cmd_tx: ClientCommandSender,
client_evt_rx: ClientEventsReceiver,
app_state: AppState<State>,
app_state: Arc<State>,
)
{
let mut client_evt_rx = client_evt_rx.fuse();
Expand Down Expand Up @@ -667,11 +668,11 @@ mod tests {

use async_broadcast::Sender;
use futures::{channel::mpsc::{UnboundedReceiver, UnboundedSender}, StreamExt};
use shvclient::{client::ClientCommand, ClientEvent};
use shvclient::{clientapi::ClientCommand, ClientEvent};
use shvproto::RpcValue;
use shvrpc::rpcframe::RpcFrame;

use crate::{dirtylog::DirtyLogCommand, sites::{sites_task, SiteInfo}, sync::SyncCommand, util::{init_logger, testing::{run_test, ExpectCall, ExpectSignal, ExpectSubscription, ExpectUnsubscription, PrettyJoinError, SendSignal, TestStep}, DedupReceiver}, State};
use crate::{dirtylog::DirtyLogCommand, sites::{sites_task, SiteInfo}, sync::SyncCommand, util::{init_logger, testing::{run_test, ExpectCall, ExpectSignal, ExpectSubscription, ExpectUnsubscription, PrettyJoinError, SendSignal, TestStep}, DedupReceiver}};

#[test]
fn parse_notification() {
Expand Down Expand Up @@ -731,7 +732,7 @@ mod tests {

#[async_trait::async_trait]
impl TestStep<SitesTaskTestState> for ClientEvent {
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand<State>>,_subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SitesTaskTestState) {
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand>,_subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SitesTaskTestState) {
let x = state.sender.clone();
x.broadcast(self.clone()).await.expect("Sending ClientEvents must work");
}
Expand All @@ -744,7 +745,7 @@ mod tests {

#[async_trait::async_trait]
impl TestStep<SitesTaskTestState> for ExpectDirtylogCommand {
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand<State>>,_subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SitesTaskTestState) {
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand>,_subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SitesTaskTestState) {
let event = state.dirtylog_cmd_rx.select_next_some().await;
match (event, self) {
(DirtyLogCommand::ProcessNotification(..), ExpectDirtylogCommand::ProcessNotification) => {
Expand All @@ -765,7 +766,7 @@ mod tests {

#[async_trait::async_trait]
impl TestStep<SitesTaskTestState> for ExpectSyncCommand {
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand<State>>,_subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SitesTaskTestState) {
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand>,_subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SitesTaskTestState) {
let Some(event) = state.sync_cmd_rx.next().await else {
panic!("Expected a SyncCommand, but got none");
};
Expand Down Expand Up @@ -861,7 +862,7 @@ mod tests {
TestCase {
name: "Empty sites",
steps: &[
Box::new(ClientEvent::Connected(shvclient::client::ShvApiVersion::V3)),
Box::new(ClientEvent::Connected(shvclient::clientapi::ShvApiVersion::V3)),
Box::new(ExpectCall("sites", "getSites", Ok(no_sites()))),
],
starting_files: vec![],
Expand All @@ -871,7 +872,7 @@ mod tests {
TestCase {
name: "Test everything",
steps: &[
Box::new(ClientEvent::Connected(shvclient::client::ShvApiVersion::V3)),
Box::new(ClientEvent::Connected(shvclient::clientapi::ShvApiVersion::V3)),
Box::new(ExpectCall("sites", "getSites", Ok(some_broker()))),
Box::new(ExpectSubscription("shv/legacy_sync_path_device/*:*:mntchng".try_into().unwrap())),
Box::new(ExpectSubscription("shv/node/*:*:mntchng".try_into().unwrap())),
Expand Down Expand Up @@ -911,7 +912,7 @@ mod tests {
TestCase {
name: "Periodic sync",
steps: &[
Box::new(ClientEvent::Connected(shvclient::client::ShvApiVersion::V3)),
Box::new(ClientEvent::Connected(shvclient::clientapi::ShvApiVersion::V3)),
Box::new(ExpectCall("sites", "getSites", Ok(no_sites()))),
Box::new(ExpectSyncCommand::SyncAll),
Box::new(ExpectSyncCommand::SyncAll),
Expand Down
10 changes: 5 additions & 5 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ use futures::io::{BufReader, BufWriter};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use log::{debug, error, info, warn};
use shvclient::client::RpcCall;
use shvclient::clientapi::RpcCall;
use shvclient::clientnode::METH_DIR;
use shvclient::{AppState, ClientEventsReceiver};
use shvclient::{ClientCommandSender, ClientEventsReceiver};
use shvproto::RpcValue;
use shvrpc::join_path;
use time::format_description::well_known::Iso8601;
Expand All @@ -25,7 +25,7 @@ use crate::journalrw::{GetLog2Params, GetLog2Since, JournalReaderLog2, JournalWr
use crate::sites::{SitesData, SubHpInfo};
use crate::tree::{FileType, LsFilesEntry, METH_READ};
use crate::util::{get_files, is_log2_file, DedupReceiver};
use crate::{ClientCommandSender, State, MAX_JOURNAL_DIR_SIZE_DEFAULT};
use crate::{State, MAX_JOURNAL_DIR_SIZE_DEFAULT};

#[derive(Default)]
pub(crate) struct SyncInfo {
Expand Down Expand Up @@ -292,7 +292,7 @@ async fn sync_site_by_download(
let file_list = match file_list {
Some(file_list) => file_list,
None => &RpcCall::new(remote_journal_path, "lsfiles")
.exec::<_, Vec<LsFilesEntry>, _>(&client_cmd_tx)
.exec::<Vec<LsFilesEntry>, _>(&client_cmd_tx)
.await
.map(|file_list| file_list.into_iter().filter(|file| matches!(file.ftype, FileType::File) && file.name.ends_with(".log2")).collect::<Vec<_>>())
.map(|mut file_list| { file_list.sort_by(|file_a, file_b| file_a.name.cmp(&file_b.name)); file_list })
Expand Down Expand Up @@ -702,7 +702,7 @@ const MAX_SYNC_TASKS_DEFAULT: usize = 8;
pub(crate) async fn sync_task(
client_cmd_tx: ClientCommandSender,
_client_evt_rx: ClientEventsReceiver,
app_state: AppState<State>,
app_state: Arc<State>,
mut sync_cmd_rx: DedupReceiver<SyncCommand>,
)
{
Expand Down
6 changes: 3 additions & 3 deletions src/sync/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::collections::HashMap;

use crate::dirtylog::DirtyLogCommand;
use crate::util::{dedup_channel, testing::*};
use crate::{sync::{sync_task, SyncCommand}, util::{init_logger, DedupSender}, State};
use crate::{sync::{sync_task, SyncCommand}, util::{init_logger, DedupSender}};
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use log::debug;
use shvclient::client::ClientCommand;
use shvclient::clientapi::ClientCommand;
use shvproto::{make_list, make_map, RpcValue};
use shvrpc::rpcframe::RpcFrame;
use shvrpc::rpcmessage::RpcError;
Expand All @@ -17,7 +17,7 @@ struct SyncTaskTestState {

#[async_trait::async_trait]
impl TestStep<SyncTaskTestState> for SyncCommand {
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand<State>>, _subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SyncTaskTestState) {
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand>, _subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SyncTaskTestState) {
let cmd = self.clone();
debug!(target: "test-driver", "Sending SyncCommand::{cmd:?}");
state.dedup_sender.send(cmd).expect("Sending SyncCommands should succeed");
Expand Down
Loading