Skip to content

Commit 1b3c863

Browse files
committed
Switch to libshvclient 0.20 method handlers API
Replace deprecated split methods_getter/request_handler API with the new one introduced in libshvclient 0.20. Each dynamic node implements one request handler that resolves its methods and returns requested method handler in the result.
1 parent 540b59a commit 1b3c863

12 files changed

Lines changed: 976 additions & 985 deletions

File tree

Cargo.lock

Lines changed: 578 additions & 366 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
[package]
22
name = "historyprovider"
3-
version = "2.0.2"
3+
version = "2.1.0"
44
edition = "2024"
55

66
[[bin]]
77
name = "hp"
88

99
[dependencies]
1010
tokio = { version = "1.48.0", features = ["macros", "net", "rt", "rt-multi-thread", "sync", "time"] }
11-
shvproto = { git = "https://github.com/silicon-heaven/libshvproto-rs", branch = "master", version = "3.6" }
12-
shvrpc = { git = "https://github.com/silicon-heaven/libshvrpc-rs", branch = "master", version = "3.11" }
13-
shvclient = { git = "https://github.com/silicon-heaven/libshvclient-rs", branch = "main", version = "0.11", features = ["tokio"] }
11+
shvproto = { git = "https://github.com/silicon-heaven/libshvproto-rs", tag = "3.6.22" }
12+
shvrpc = { git = "https://github.com/silicon-heaven/libshvrpc-rs", tag = "5.0.0" }
13+
shvclient = { git = "https://github.com/silicon-heaven/libshvclient-rs", tag = "0.20.0", features = ["tokio"] }
1414
futures = "0.3.31"
1515
log = "0.4.28"
1616
clap = { version = "4.5.53", features = ["derive"] }
@@ -42,5 +42,5 @@ humantime = "2.3.0"
4242
[dev-dependencies]
4343
async-broadcast = "0.7.2"
4444
async-trait = "0.1.80"
45-
shvclient = { git = "https://github.com/silicon-heaven/libshvclient-rs", branch = "main", features = ["mocking"] }
45+
shvclient = { git = "https://github.com/silicon-heaven/libshvclient-rs", tag = "0.20.0", features = ["mocking"] }
4646
tempfile = "3.23.0"

src/alarmlog.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use std::sync::Arc;
44
use futures::stream::FuturesUnordered;
55
use futures::StreamExt;
66
use log::warn;
7-
use shvclient::AppState;
87
use shvproto::{FromRpcValue, ToRpcValue};
98
use tokio::sync::Semaphore;
109

@@ -30,7 +29,7 @@ pub(crate) struct AlarmLog {
3029
pub(crate) async fn alarmlog_impl(
3130
site_path_prefix: &str,
3231
params: &AlarmLogParams,
33-
app_state: AppState<State>,
32+
app_state: Arc<State>,
3433
) -> BTreeMap<String, AlarmLog>
3534
{
3635
let typeinfos = app_state.sites_data.read().await.typeinfos.clone();

src/dirtylog.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
use std::collections::{HashMap, HashSet, VecDeque};
22
use std::path::{Path, PathBuf};
33
use std::pin::Pin;
4+
use std::sync::Arc;
45

56
use futures::channel::mpsc::UnboundedReceiver;
67
use futures::channel::oneshot::Sender as OneshotSender;
78
use futures::io::BufReader;
89
use futures::stream::FuturesUnordered;
910
use futures::StreamExt;
1011
use log::{debug, error, info, warn};
11-
use shvclient::{AppState, ClientEventsReceiver};
12+
use shvclient::{ClientCommandSender, ClientEventsReceiver};
1213
use shvproto::DateTime as ShvDateTime;
1314
use shvrpc::metamethod::AccessLevel;
1415
use tokio_util::compat::TokioAsyncReadCompatExt;
@@ -18,7 +19,7 @@ use crate::journalentry::JournalEntry;
1819
use crate::journalrw::{JournalReaderLog2, JournalWriterLog2, VALUE_FLAG_SPONTANEOUS_BIT};
1920
use crate::sites::ParsedNotification;
2021
use crate::util::{get_files, is_log2_file};
21-
use crate::{ClientCommandSender, State};
22+
use crate::State;
2223

2324

2425
#[derive(Debug)]
@@ -36,7 +37,7 @@ pub(crate) enum DirtyLogCommand {
3637
pub(crate) async fn dirtylog_task(
3738
_client_cmd_tx: ClientCommandSender,
3839
_client_evt_rx: ClientEventsReceiver,
39-
app_state: AppState<State>,
40+
app_state: Arc<State>,
4041
mut cmd_rx: UnboundedReceiver<DirtyLogCommand>,
4142
) {
4243
// Per site request
@@ -296,11 +297,11 @@ mod tests {
296297

297298
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
298299
use log::debug;
299-
use shvclient::client::ClientCommand;
300+
use shvclient::clientapi::ClientCommand;
300301
use shvproto::DateTime;
301302
use shvrpc::rpcframe::RpcFrame;
302303

303-
use crate::{State, datachange::DataChange, dirtylog::{DirtyLogCommand, dirtylog_task}, journalentry::JournalEntry, sync::SyncCommand, util::{DedupReceiver, init_logger, testing::{PrettyJoinError, TestStep, run_test}}};
304+
use crate::{datachange::DataChange, dirtylog::{DirtyLogCommand, dirtylog_task}, journalentry::JournalEntry, sync::SyncCommand, util::{DedupReceiver, init_logger, testing::{PrettyJoinError, TestStep, run_test}}};
304305

305306
struct DirtylogTaskTestState {
306307
sender: UnboundedSender<DirtyLogCommand>,
@@ -311,7 +312,7 @@ mod tests {
311312

312313
#[async_trait::async_trait]
313314
impl TestStep<DirtylogTaskTestState> for TestDirtyLogCommand {
314-
async fn exec(&self, _client_command_receiver: &mut UnboundedReceiver<ClientCommand<State>>, _subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut DirtylogTaskTestState) {
315+
async fn exec(&self, _client_command_receiver: &mut UnboundedReceiver<ClientCommand>, _subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut DirtylogTaskTestState) {
315316
let cmd = match &self.0 {
316317
DirtyLogCommand::ProcessNotification(msg) => DirtyLogCommand::ProcessNotification(msg.clone()),
317318
DirtyLogCommand::Trim { site } => DirtyLogCommand::Trim { site: site.clone() },
@@ -329,7 +330,7 @@ mod tests {
329330

330331
#[async_trait::async_trait]
331332
impl TestStep<DirtylogTaskTestState> for TestGetDirtyLog {
332-
async fn exec(&self, _client_command_receiver: &mut UnboundedReceiver<ClientCommand<State>>, _subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut DirtylogTaskTestState) {
333+
async fn exec(&self, _client_command_receiver: &mut UnboundedReceiver<ClientCommand>, _subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut DirtylogTaskTestState) {
333334
debug!(target: "test-driver", "Sending DirtyLogCommand::Get");
334335
let (sender, receiver) = futures::channel::oneshot::channel();
335336
state.sender.unbounded_send(DirtyLogCommand::Get { site: self.site.clone(), response_tx: sender }).expect("Sending DirtyLogCommands should succeed");

src/getlog.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use chrono::TimeZone;
88
use futures::io::BufReader;
99
use futures::{Stream, StreamExt, TryStreamExt};
1010
use log::{error, info, warn};
11-
use shvclient::AppState;
1211
use shvrpc::rpcmessage::{RpcError, RpcErrorCode};
1312
use tokio::fs::DirEntry;
1413
use tokio_util::compat::TokioAsyncReadCompatExt;
@@ -47,7 +46,7 @@ fn file_name_to_file_msec(filename: &str) -> Result<i64, String> {
4746
pub(crate) async fn getlog_handler(
4847
site_path: &str,
4948
params: &GetLog2Params,
50-
app_state: AppState<State>,
49+
app_state: Arc<State>,
5150
) -> Result<GetLogResult, RpcError> {
5251
if !app_state.sites_data.read().await.sites_info.contains_key(site_path) {
5352
return Err(RpcError::new(RpcErrorCode::InvalidParam, format!("Wrong getLog path: {site_path}")));

src/lib.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use std::collections::BTreeMap;
2+
use std::sync::Arc;
23

34
use futures::channel::mpsc::{unbounded, UnboundedSender};
45
use log::info;
56
use serde::Deserialize;
6-
use shvclient::AppState;
77
use shvproto::RpcValue;
88
use shvrpc::client::ClientConfig;
99
use tokio::sync::RwLock;
@@ -79,9 +79,6 @@ struct State {
7979
dirtylog_cmd_tx: UnboundedSender<dirtylog::DirtyLogCommand>,
8080
}
8181

82-
type ClientCommandSender = shvclient::ClientCommandSender<State>;
83-
type Subscriber = shvclient::client::Subscriber<State>;
84-
8582
pub async fn run(hp_config: &HpConfig, client_config: &ClientConfig) -> shvrpc::Result<()> {
8683
info!("Setting up journal dir: {}", &hp_config.journal_dir);
8784
std::fs::create_dir_all(&hp_config.journal_dir)?;
@@ -90,7 +87,7 @@ pub async fn run(hp_config: &HpConfig, client_config: &ClientConfig) -> shvrpc::
9087
let (sync_cmd_tx, sync_cmd_rx) = crate::util::dedup_channel();
9188
let (dirtylog_cmd_tx, dirtylog_cmd_rx) = unbounded();
9289

93-
let app_state = AppState::new(State {
90+
let app_state = Arc::new(State {
9491
start_time: std::time::Instant::now(),
9592
sites_data: RwLock::default(),
9693
sync_info: Default::default(),
@@ -103,11 +100,11 @@ pub async fn run(hp_config: &HpConfig, client_config: &ClientConfig) -> shvrpc::
103100
});
104101

105102
shvclient::Client::new()
106-
.mount_dynamic("",
107-
shvclient::MethodsGetter::new(tree::methods_getter),
108-
shvclient::RequestHandler::stateful(tree::request_handler)
109-
)
110-
.with_app_state(app_state.clone())
103+
.mount_dynamic("", {
104+
let app_state = app_state.clone();
105+
move |rq, cmd_sender|
106+
tree::request_handler(rq, cmd_sender, app_state.clone())
107+
})
111108
.run_with_init(client_config, |client_cmd_tx, client_evt_rx| {
112109
tokio::spawn(sites::sites_task(client_cmd_tx.clone(), client_evt_rx.clone(), app_state.clone()));
113110
tokio::spawn(sync::sync_task(client_cmd_tx.clone(), client_evt_rx.clone(), app_state.clone(), sync_cmd_rx));

src/pushlog.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use std::path::Path;
2+
use std::sync::Arc;
23

34
use futures::io::{BufReader, BufWriter};
45
use futures::StreamExt as _;
56
use log::{error, info, warn};
6-
use shvclient::AppState;
77
use shvproto::RpcValue;
88
use tokio_util::compat::TokioAsyncReadCompatExt as _;
99

@@ -30,7 +30,7 @@ impl From<PushLogResult> for RpcValue {
3030
pub(crate) async fn pushlog_impl(
3131
log_reader: Log2Reader,
3232
site_path: &str,
33-
app_state: AppState<State>,
33+
app_state: Arc<State>,
3434
) -> PushLogResult
3535
{
3636
info!("pushLog handler, site: {site_path}, log header: {header}", header = log_reader.header);

src/sites.rs

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,20 @@ use futures::channel::mpsc::UnboundedReceiver;
66
use futures::stream::{FuturesUnordered, SelectAll};
77
use futures::StreamExt;
88
use log::{debug, error, warn};
9-
use shvclient::client::{CallRpcMethodErrorKind, RpcCall, RpcCallDirExists, RpcCallLsList};
10-
use shvclient::clientnode::{find_longest_path_prefix, METH_DIR, SIG_CHNG};
11-
use shvclient::{AppState, ClientEventsReceiver};
9+
use shvclient::clientapi::{CallRpcMethodErrorKind, RpcCall, RpcCallDirExists, RpcCallLsList, Subscriber};
10+
use shvclient::clientnode::{METH_DIR, SIG_CHNG};
11+
use shvclient::{ClientCommandSender, ClientEventsReceiver};
1212
use shvproto::{DateTime, RpcValue};
1313
use shvrpc::rpcmessage::{RpcError, RpcErrorCode};
14+
use shvrpc::util::find_longest_path_prefix;
1415
use shvrpc::{join_path, RpcMessageMetaTags};
1516
use tokio::time::timeout;
1617

1718
use crate::alarm::{collect_alarms, collect_state_alarms, Alarm};
1819
use crate::getlog::getlog_handler;
1920
use crate::typeinfo::TypeInfo;
2021
use crate::util::{subscribe, subscription_prefix_path};
21-
use crate::{AlarmWithTimestamp, ClientCommandSender, State, Subscriber};
22+
use crate::{AlarmWithTimestamp, State};
2223

2324
#[derive(Clone, Copy, Debug, Default, PartialEq)]
2425
pub(crate) enum SiteOnlineStatus {
@@ -202,7 +203,7 @@ async fn set_online_status(
202203
site: impl AsRef<str>,
203204
new_status: SiteOnlineStatus,
204205
client_commands: &ClientCommandSender,
205-
app_state: &AppState<State>
206+
app_state: &Arc<State>
206207
)
207208
{
208209
let site = site.as_ref();
@@ -268,7 +269,7 @@ fn online_status_worker(
268269
site: impl Into<String>,
269270
mut events: UnboundedReceiver<SiteOnlineStatus>,
270271
client_commands: ClientCommandSender,
271-
app_state: AppState<State>
272+
app_state: Arc<State>
272273
) -> impl Future<Output = ()>
273274
{
274275
const ONLINE_TIMER: Duration = Duration::from_secs(10);
@@ -301,7 +302,7 @@ fn online_status_worker(
301302
set_online_status(&site, SiteOnlineStatus::Online, &client_commands, &app_state).await;
302303
} else if let Err(err) = dir_result
303304
&& let CallRpcMethodErrorKind::RpcError(RpcError { code, .. }) = err.error()
304-
&& (*code == RpcErrorCode::MethodCallTimeout || *code == RpcErrorCode::MethodNotFound)
305+
&& (*code == RpcErrorCode::MethodCallTimeout.into() || *code == RpcErrorCode::MethodNotFound.into())
305306
{
306307
set_online_status(&site, SiteOnlineStatus::Offline, &client_commands, &app_state).await;
307308
}
@@ -338,7 +339,7 @@ pub(crate) fn parse_notification(msg: &shvrpc::RpcMessage, sites_info: &BTreeMap
338339
pub(crate) async fn sites_task(
339340
client_cmd_tx: ClientCommandSender,
340341
client_evt_rx: ClientEventsReceiver,
341-
app_state: AppState<State>,
342+
app_state: Arc<State>,
342343
)
343344
{
344345
let mut client_evt_rx = client_evt_rx.fuse();
@@ -667,11 +668,11 @@ mod tests {
667668

668669
use async_broadcast::Sender;
669670
use futures::{channel::mpsc::{UnboundedReceiver, UnboundedSender}, StreamExt};
670-
use shvclient::{client::ClientCommand, ClientEvent};
671+
use shvclient::{clientapi::ClientCommand, ClientEvent};
671672
use shvproto::RpcValue;
672673
use shvrpc::rpcframe::RpcFrame;
673674

674-
use crate::{dirtylog::DirtyLogCommand, sites::{sites_task, SiteInfo}, sync::SyncCommand, util::{init_logger, testing::{run_test, ExpectCall, ExpectSignal, ExpectSubscription, ExpectUnsubscription, PrettyJoinError, SendSignal, TestStep}, DedupReceiver}, State};
675+
use crate::{dirtylog::DirtyLogCommand, sites::{sites_task, SiteInfo}, sync::SyncCommand, util::{init_logger, testing::{run_test, ExpectCall, ExpectSignal, ExpectSubscription, ExpectUnsubscription, PrettyJoinError, SendSignal, TestStep}, DedupReceiver}};
675676

676677
#[test]
677678
fn parse_notification() {
@@ -731,7 +732,7 @@ mod tests {
731732

732733
#[async_trait::async_trait]
733734
impl TestStep<SitesTaskTestState> for ClientEvent {
734-
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand<State>>,_subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SitesTaskTestState) {
735+
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand>,_subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SitesTaskTestState) {
735736
let x = state.sender.clone();
736737
x.broadcast(self.clone()).await.expect("Sending ClientEvents must work");
737738
}
@@ -744,7 +745,7 @@ mod tests {
744745

745746
#[async_trait::async_trait]
746747
impl TestStep<SitesTaskTestState> for ExpectDirtylogCommand {
747-
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand<State>>,_subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SitesTaskTestState) {
748+
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand>,_subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SitesTaskTestState) {
748749
let event = state.dirtylog_cmd_rx.select_next_some().await;
749750
match (event, self) {
750751
(DirtyLogCommand::ProcessNotification(..), ExpectDirtylogCommand::ProcessNotification) => {
@@ -765,7 +766,7 @@ mod tests {
765766

766767
#[async_trait::async_trait]
767768
impl TestStep<SitesTaskTestState> for ExpectSyncCommand {
768-
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand<State>>,_subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SitesTaskTestState) {
769+
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand>,_subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SitesTaskTestState) {
769770
let Some(event) = state.sync_cmd_rx.next().await else {
770771
panic!("Expected a SyncCommand, but got none");
771772
};
@@ -861,7 +862,7 @@ mod tests {
861862
TestCase {
862863
name: "Empty sites",
863864
steps: &[
864-
Box::new(ClientEvent::Connected(shvclient::client::ShvApiVersion::V3)),
865+
Box::new(ClientEvent::Connected(shvclient::clientapi::ShvApiVersion::V3)),
865866
Box::new(ExpectCall("sites", "getSites", Ok(no_sites()))),
866867
],
867868
starting_files: vec![],
@@ -871,7 +872,7 @@ mod tests {
871872
TestCase {
872873
name: "Test everything",
873874
steps: &[
874-
Box::new(ClientEvent::Connected(shvclient::client::ShvApiVersion::V3)),
875+
Box::new(ClientEvent::Connected(shvclient::clientapi::ShvApiVersion::V3)),
875876
Box::new(ExpectCall("sites", "getSites", Ok(some_broker()))),
876877
Box::new(ExpectSubscription("shv/legacy_sync_path_device/*:*:mntchng".try_into().unwrap())),
877878
Box::new(ExpectSubscription("shv/node/*:*:mntchng".try_into().unwrap())),
@@ -911,7 +912,7 @@ mod tests {
911912
TestCase {
912913
name: "Periodic sync",
913914
steps: &[
914-
Box::new(ClientEvent::Connected(shvclient::client::ShvApiVersion::V3)),
915+
Box::new(ClientEvent::Connected(shvclient::clientapi::ShvApiVersion::V3)),
915916
Box::new(ExpectCall("sites", "getSites", Ok(no_sites()))),
916917
Box::new(ExpectSyncCommand::SyncAll),
917918
Box::new(ExpectSyncCommand::SyncAll),

src/sync.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ use futures::io::{BufReader, BufWriter};
88
use futures::stream::FuturesUnordered;
99
use futures::StreamExt;
1010
use log::{debug, error, info, warn};
11-
use shvclient::client::RpcCall;
11+
use shvclient::clientapi::RpcCall;
1212
use shvclient::clientnode::METH_DIR;
13-
use shvclient::{AppState, ClientEventsReceiver};
13+
use shvclient::{ClientCommandSender, ClientEventsReceiver};
1414
use shvproto::RpcValue;
1515
use shvrpc::join_path;
1616
use time::format_description::well_known::Iso8601;
@@ -25,7 +25,7 @@ use crate::journalrw::{GetLog2Params, GetLog2Since, JournalReaderLog2, JournalWr
2525
use crate::sites::{SitesData, SubHpInfo};
2626
use crate::tree::{FileType, LsFilesEntry, METH_READ};
2727
use crate::util::{get_files, is_log2_file, DedupReceiver};
28-
use crate::{ClientCommandSender, State, MAX_JOURNAL_DIR_SIZE_DEFAULT};
28+
use crate::{State, MAX_JOURNAL_DIR_SIZE_DEFAULT};
2929

3030
#[derive(Default)]
3131
pub(crate) struct SyncInfo {
@@ -292,7 +292,7 @@ async fn sync_site_by_download(
292292
let file_list = match file_list {
293293
Some(file_list) => file_list,
294294
None => &RpcCall::new(remote_journal_path, "lsfiles")
295-
.exec::<_, Vec<LsFilesEntry>, _>(&client_cmd_tx)
295+
.exec::<Vec<LsFilesEntry>, _>(&client_cmd_tx)
296296
.await
297297
.map(|file_list| file_list.into_iter().filter(|file| matches!(file.ftype, FileType::File) && file.name.ends_with(".log2")).collect::<Vec<_>>())
298298
.map(|mut file_list| { file_list.sort_by(|file_a, file_b| file_a.name.cmp(&file_b.name)); file_list })
@@ -702,7 +702,7 @@ const MAX_SYNC_TASKS_DEFAULT: usize = 8;
702702
pub(crate) async fn sync_task(
703703
client_cmd_tx: ClientCommandSender,
704704
_client_evt_rx: ClientEventsReceiver,
705-
app_state: AppState<State>,
705+
app_state: Arc<State>,
706706
mut sync_cmd_rx: DedupReceiver<SyncCommand>,
707707
)
708708
{

src/sync/tests.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ use std::collections::HashMap;
22

33
use crate::dirtylog::DirtyLogCommand;
44
use crate::util::{dedup_channel, testing::*};
5-
use crate::{sync::{sync_task, SyncCommand}, util::{init_logger, DedupSender}, State};
5+
use crate::{sync::{sync_task, SyncCommand}, util::{init_logger, DedupSender}};
66
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
77
use log::debug;
8-
use shvclient::client::ClientCommand;
8+
use shvclient::clientapi::ClientCommand;
99
use shvproto::{make_list, make_map, RpcValue};
1010
use shvrpc::rpcframe::RpcFrame;
1111
use shvrpc::rpcmessage::RpcError;
@@ -17,7 +17,7 @@ struct SyncTaskTestState {
1717

1818
#[async_trait::async_trait]
1919
impl TestStep<SyncTaskTestState> for SyncCommand {
20-
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand<State>>, _subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SyncTaskTestState) {
20+
async fn exec(&self, _client_command_reciever: &mut UnboundedReceiver<ClientCommand>, _subscriptions: &mut HashMap<String, UnboundedSender<RpcFrame>>, state: &mut SyncTaskTestState) {
2121
let cmd = self.clone();
2222
debug!(target: "test-driver", "Sending SyncCommand::{cmd:?}");
2323
state.dedup_sender.send(cmd).expect("Sending SyncCommands should succeed");

0 commit comments

Comments
 (0)