Skip to content

Commit a926899

Browse files
authored
Merge pull request #117 from silicon-heaven/rpccall-userid
Add user_id to RpcCall
2 parents ddf1ba8 + 2803100 commit a926899

4 files changed

Lines changed: 84 additions & 41 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name = "shvclient"
33
description = "A Rust framework for Silicon Heaven RPC devices"
44
license = "MIT"
55
repository = "https://github.com/silicon-heaven/libshvclient-rs"
6-
version = "5.1.4"
6+
version = "5.2.0"
77
edition = "2024"
88

99
[lib]

examples/simple_device_tokio.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ pub(crate) async fn main() -> shvrpc::Result<()> {
217217
Some(Ok(param.into()))
218218
}
219219
"setVecString" [IsSetter, Write, "List", ""] (param: Vec<String>) => {
220-
println!("param data: {:?}", &param);
220+
println!("param data: {param:?}");
221221
Some(Ok(().into()))
222222
}
223223
"42" [IsGetter, Browse, "", ""] => {

src/client.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -825,7 +825,7 @@ mod tests {
825825
) {
826826
let mut conn_mock = init_connection(&conn_evt_tx, &mut cli_evt_rx, SHV_API_VERSION_DEFAULT).await;
827827
let mut resp_rx = cli_cmd_tx
828-
.do_rpc_call("path/to/resource", "get", None, None)
828+
.do_rpc_call("path/to/resource", "get", None, None, None)
829829
.expect("RpcCall command send");
830830

831831
let req = conn_mock.expect_send_message().await;
@@ -843,7 +843,7 @@ mod tests {
843843
) {
844844
let mut conn_mock = init_connection(&conn_evt_tx, &mut cli_evt_rx, SHV_API_VERSION_DEFAULT).await;
845845
let mut resp_rx = cli_cmd_tx
846-
.do_rpc_call("path/to/resource", "get", None, Some(Duration::from_millis(100)))
846+
.do_rpc_call("path/to/resource", "get", None, Some(Duration::from_millis(100)), None)
847847
.expect("RpcCall command send");
848848

849849
let _req = conn_mock.expect_send_message().await;
@@ -861,7 +861,7 @@ mod tests {
861861
) {
862862
let mut conn_mock = init_connection(&conn_evt_tx, &mut cli_evt_rx, SHV_API_VERSION_DEFAULT).await;
863863
let mut resp_rx = cli_cmd_tx
864-
.do_rpc_call("path/to/resource", "get", None, Some(Duration::from_millis(100)))
864+
.do_rpc_call("path/to/resource", "get", None, Some(Duration::from_millis(100)), None)
865865
.expect("RpcCall command send");
866866

867867
let req = conn_mock.expect_send_message().await;
@@ -903,7 +903,7 @@ mod tests {
903903
mut _cli_evt_rx: ClientEventsReceiver,
904904
) {
905905
let mut resp_rx = cli_cmd_tx
906-
.do_rpc_call("path/to/resource", "get", None, None)
906+
.do_rpc_call("path/to/resource", "get", None, None, None)
907907
.expect("RpcCall command send");
908908
receive_rpc_msg(&mut resp_rx).timeout(Duration::from_millis(1000)).await.expect_err("Unexpected method call response");
909909
}

src/clientapi.rs

Lines changed: 78 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use shvproto::RpcValue;
77
use shvrpc::rpc::ShvRI;
88
use shvrpc::rpcdiscovery::{DirParam, DirResult, LsParam, LsResult, MethodInfo};
99
use shvrpc::rpcmessage::RpcError;
10-
use shvrpc::{RpcFrame, RpcMessage};
10+
use shvrpc::{RpcFrame, RpcMessage, RpcMessageMetaTags};
1111

1212
use private::next_subscription_id;
1313

@@ -171,39 +171,45 @@ impl ClientCommandSender {
171171
method: impl AsRef<str>,
172172
param: Option<RpcValue>,
173173
timeout: Option<Duration>,
174+
user_id: Option<String>,
174175
) -> Result<Receiver<RpcFrame>, futures::channel::mpsc::TrySendError<ClientCommand>>
175176
{
177+
let mut request = RpcMessage::new_request(shvpath, method).with_param(param);
178+
if let Some(user_id) = user_id {
179+
request.set_user_id(user_id);
180+
}
181+
176182
let (response_sender, response_receiver) = futures::channel::mpsc::unbounded();
177183
self.sender.unbounded_send(ClientCommand::RpcCall {
178-
request: RpcMessage::new_request(shvpath, method).with_param(param),
184+
request,
179185
response_sender,
180186
timeout,
181187
})
182188
.map(|()| response_receiver)
183189
}
184190

185-
pub async fn call_dir(&self, path: &str, param: DirParam, timeout: Option<Duration>) -> Result<DirResult, CallRpcMethodError> {
186-
self.call_dir_into(path, param, timeout).await
191+
pub async fn call_dir(&self, path: &str, param: DirParam, timeout: Option<Duration>, user_id: Option<String>) -> Result<DirResult, CallRpcMethodError> {
192+
self.call_dir_into(path, param, timeout, user_id).await
187193
}
188194

189-
pub async fn call_dir_brief(&self, path: &str, timeout: Option<Duration>) -> Result<Vec<MethodInfo>, CallRpcMethodError> {
190-
self.call_dir_into(path, DirParam::Brief, timeout).await
195+
pub async fn call_dir_brief(&self, path: &str, timeout: Option<Duration>, user_id: Option<String>) -> Result<Vec<MethodInfo>, CallRpcMethodError> {
196+
self.call_dir_into(path, DirParam::Brief, timeout, user_id).await
191197
}
192198

193-
pub async fn call_dir_full(&self, path: &str, timeout: Option<Duration>) -> Result<Vec<MethodInfo>, CallRpcMethodError> {
194-
self.call_dir_into(path, DirParam::Full, timeout).await
199+
pub async fn call_dir_full(&self, path: &str, timeout: Option<Duration>, user_id: Option<String>) -> Result<Vec<MethodInfo>, CallRpcMethodError> {
200+
self.call_dir_into(path, DirParam::Full, timeout, user_id).await
195201
}
196202

197-
pub async fn call_dir_exists(&self, path: &str, method: &str, timeout: Option<Duration>) -> Result<bool, CallRpcMethodError> {
198-
self.call_dir_into(path, DirParam::Exists(method.into()), timeout).await
203+
pub async fn call_dir_exists(&self, path: &str, method: &str, timeout: Option<Duration>, user_id: Option<String>) -> Result<bool, CallRpcMethodError> {
204+
self.call_dir_into(path, DirParam::Exists(method.into()), timeout, user_id).await
199205
}
200206

201-
async fn call_dir_into<R, E>(&self, path: &str, param: DirParam, timeout: Option<Duration>) -> Result<R, CallRpcMethodError>
207+
async fn call_dir_into<R, E>(&self, path: &str, param: DirParam, timeout: Option<Duration>, user_id: Option<String>) -> Result<R, CallRpcMethodError>
202208
where
203209
R: TryFrom<DirResult, Error = E>,
204210
E: std::fmt::Display,
205211
{
206-
self.call_rpc_method(path, METH_DIR, Some(RpcValue::from(param)), timeout, None::<fn(_)>)
212+
self.call_rpc_method(path, METH_DIR, Some(RpcValue::from(param)), timeout, user_id, None::<fn(_)>)
207213
.await
208214
.and_then(|dir_res|
209215
R::try_from(dir_res).map_err(|e|
@@ -216,24 +222,24 @@ impl ClientCommandSender {
216222
)
217223
}
218224

219-
pub async fn call_ls(&self, path: &str, param: LsParam, timeout: Option<Duration>) -> Result<LsResult, CallRpcMethodError> {
220-
self.call_ls_into(path, param, timeout).await
225+
pub async fn call_ls(&self, path: &str, param: LsParam, timeout: Option<Duration>, user_id: Option<String>) -> Result<LsResult, CallRpcMethodError> {
226+
self.call_ls_into(path, param, timeout, user_id).await
221227
}
222228

223-
pub async fn call_ls_exists(&self, path: &str, dirname: &str, timeout: Option<Duration>) -> Result<bool, CallRpcMethodError> {
224-
self.call_ls_into(path, LsParam::Exists(dirname.into()), timeout).await
229+
pub async fn call_ls_exists(&self, path: &str, dirname: &str, timeout: Option<Duration>, user_id: Option<String>) -> Result<bool, CallRpcMethodError> {
230+
self.call_ls_into(path, LsParam::Exists(dirname.into()), timeout, user_id).await
225231
}
226232

227-
pub async fn call_ls_list(&self, path: &str, timeout: Option<Duration>) -> Result<Vec<String>, CallRpcMethodError> {
228-
self.call_ls_into(path, LsParam::List, timeout).await
233+
pub async fn call_ls_list(&self, path: &str, timeout: Option<Duration>, user_id: Option<String>) -> Result<Vec<String>, CallRpcMethodError> {
234+
self.call_ls_into(path, LsParam::List, timeout, user_id).await
229235
}
230236

231-
async fn call_ls_into<R, E>(&self, path: &str, param: LsParam, timeout: Option<Duration>) -> Result<R, CallRpcMethodError>
237+
async fn call_ls_into<R, E>(&self, path: &str, param: LsParam, timeout: Option<Duration>, user_id: Option<String>) -> Result<R, CallRpcMethodError>
232238
where
233239
R: TryFrom<LsResult, Error = E>,
234240
E: std::fmt::Display,
235241
{
236-
self.call_rpc_method(path, METH_LS, Some(RpcValue::from(param)), timeout, None::<fn(_)>)
242+
self.call_rpc_method(path, METH_LS, Some(RpcValue::from(param)), timeout, user_id, None::<fn(_)>)
237243
.await
238244
.and_then(|ls_res|
239245
R::try_from(ls_res).map_err(|e|
@@ -252,6 +258,7 @@ impl ClientCommandSender {
252258
method: impl AsRef<str>,
253259
param: Option<RpcValue>,
254260
timeout: Option<Duration>,
261+
user_id: Option<String>,
255262
) -> Pin<Box<dyn Stream<Item = Result<RpcCallResponse<R>, CallRpcMethodError>> + Send>>
256263
where
257264
R: for<'a> TryFrom<&'a RpcValue, Error = E> + Send + 'static,
@@ -271,7 +278,7 @@ impl ClientCommandSender {
271278
if self.sender.is_closed() {
272279
return Box::pin(futures::stream::empty());
273280
}
274-
let call = self.do_rpc_call(path, method, param, timeout)
281+
let call = self.do_rpc_call(path, method, param, timeout, user_id)
275282
.map_err(|err| {
276283
warn!("Cannot send RPC request to the client core. \
277284
Path: `{path}`, method: `{method}`, error: {err}");
@@ -308,6 +315,7 @@ impl ClientCommandSender {
308315
method: impl AsRef<str>,
309316
param: Option<RpcValue>,
310317
timeout: Option<Duration>,
318+
user_id: Option<String>,
311319
progress_notifier: Option<F>,
312320
) -> Result<R, CallRpcMethodError>
313321
where
@@ -318,7 +326,7 @@ impl ClientCommandSender {
318326
let path = path.as_ref();
319327
let method = method.as_ref();
320328

321-
let mut receiver = self.call_rpc_method_stream(path, method, param, timeout);
329+
let mut receiver = self.call_rpc_method_stream(path, method, param, timeout, user_id);
322330
while let Some(result) = receiver.next().await {
323331
match result? {
324332
RpcCallResponse::Delay(progress) => {
@@ -410,11 +418,12 @@ pub struct RpcCall<'a> {
410418
method: &'a str,
411419
param: Option<RpcValue>,
412420
timeout: Option<Duration>,
421+
user_id: Option<String>,
413422
}
414423

415424
impl<'a> RpcCall<'a> {
416425
pub fn new(path: &'a str, method: &'a str) -> Self {
417-
Self { path, method, param: None, timeout: None }
426+
Self { path, method, param: None, timeout: None, user_id: None }
418427
}
419428

420429
#[must_use]
@@ -429,28 +438,34 @@ impl<'a> RpcCall<'a> {
429438
self
430439
}
431440

441+
#[must_use]
442+
pub fn user_id(mut self, user_id: impl Into<String>) -> Self {
443+
self.user_id = Some(user_id.into());
444+
self
445+
}
446+
432447
pub async fn exec<R, E>(self, client_cmd_sender: &ClientCommandSender) -> Result<R, CallRpcMethodError>
433448
where
434449
R: for<'r> TryFrom<&'r RpcValue, Error = E> + Send + 'static,
435450
E: std::fmt::Display,
436451
{
437-
client_cmd_sender.call_rpc_method(self.path, self.method, self.param, self.timeout, None::<fn(_)>).await
452+
client_cmd_sender.call_rpc_method(self.path, self.method, self.param, self.timeout, self.user_id, None::<fn(_)>).await
438453
}
439454

440455
pub async fn exec_with_progress<R, E>(self, client_cmd_sender: &ClientCommandSender, progress_notifier: impl Fn(f64) + Send + 'static) -> Result<R, CallRpcMethodError>
441456
where
442457
R: for<'r> TryFrom<&'r RpcValue, Error = E> + Send + 'static,
443458
E: std::fmt::Display,
444459
{
445-
client_cmd_sender.call_rpc_method(self.path, self.method, self.param, self.timeout, Some(progress_notifier)).await
460+
client_cmd_sender.call_rpc_method(self.path, self.method, self.param, self.timeout, self.user_id, Some(progress_notifier)).await
446461
}
447462

448463
pub fn stream<R, E>(self, client_cmd_sender: &ClientCommandSender) -> Pin<Box<dyn Stream<Item = Result<RpcCallResponse<R>, CallRpcMethodError>> + Send>>
449464
where
450465
R: for<'r> TryFrom<&'r RpcValue, Error = E> + Send + 'static,
451466
E: std::fmt::Display,
452467
{
453-
client_cmd_sender.call_rpc_method_stream(self.path, self.method, self.param, self.timeout)
468+
client_cmd_sender.call_rpc_method_stream(self.path, self.method, self.param, self.timeout, self.user_id)
454469
}
455470

456471
}
@@ -459,11 +474,12 @@ impl<'a> RpcCall<'a> {
459474
pub struct RpcCallLsList<'a> {
460475
path: &'a str,
461476
timeout: Option<Duration>,
477+
user_id: Option<String>,
462478
}
463479

464480
impl<'a> RpcCallLsList<'a> {
465481
pub fn new(path: &'a str) -> Self {
466-
Self { path, timeout: None }
482+
Self { path, timeout: None, user_id: None }
467483
}
468484

469485
#[must_use]
@@ -472,8 +488,14 @@ impl<'a> RpcCallLsList<'a> {
472488
self
473489
}
474490

491+
#[must_use]
492+
pub fn user_id(mut self, user_id: impl Into<String>) -> Self {
493+
self.user_id = Some(user_id.into());
494+
self
495+
}
496+
475497
pub async fn exec(self, client_cmd_sender: &ClientCommandSender) -> Result<Vec<String>, CallRpcMethodError> {
476-
client_cmd_sender.call_ls_list(self.path, self.timeout).await
498+
client_cmd_sender.call_ls_list(self.path, self.timeout, self.user_id).await
477499
}
478500
}
479501

@@ -482,11 +504,12 @@ pub struct RpcCallLsExists<'a> {
482504
path: &'a str,
483505
dirname: &'a str,
484506
timeout: Option<Duration>,
507+
user_id: Option<String>,
485508
}
486509

487510
impl<'a> RpcCallLsExists<'a> {
488511
pub fn new(path: &'a str, dirname: &'a str) -> Self {
489-
Self { path, dirname, timeout: None }
512+
Self { path, dirname, timeout: None, user_id: None }
490513
}
491514

492515
#[must_use]
@@ -495,20 +518,27 @@ impl<'a> RpcCallLsExists<'a> {
495518
self
496519
}
497520

521+
#[must_use]
522+
pub fn user_id(mut self, user_id: impl Into<String>) -> Self {
523+
self.user_id = Some(user_id.into());
524+
self
525+
}
526+
498527
pub async fn exec(self, client_cmd_sender: &ClientCommandSender) -> Result<bool, CallRpcMethodError> {
499-
client_cmd_sender.call_ls_exists(self.path, self.dirname, self.timeout).await
528+
client_cmd_sender.call_ls_exists(self.path, self.dirname, self.timeout, self.user_id).await
500529
}
501530
}
502531

503532
#[derive(Debug)]
504533
pub struct RpcCallDirList<'a> {
505534
path: &'a str,
506535
timeout: Option<Duration>,
536+
user_id: Option<String>,
507537
}
508538

509539
impl<'a> RpcCallDirList<'a> {
510540
pub fn new(path: &'a str) -> Self {
511-
Self { path, timeout: None }
541+
Self { path, timeout: None, user_id: None }
512542
}
513543

514544
#[must_use]
@@ -517,12 +547,18 @@ impl<'a> RpcCallDirList<'a> {
517547
self
518548
}
519549

550+
#[must_use]
551+
pub fn user_id(mut self, user_id: impl Into<String>) -> Self {
552+
self.user_id = Some(user_id.into());
553+
self
554+
}
555+
520556
pub async fn exec_brief(self, client_cmd_sender: &ClientCommandSender) -> Result<Vec<MethodInfo>, CallRpcMethodError> {
521-
client_cmd_sender.call_dir_brief(self.path, self.timeout).await
557+
client_cmd_sender.call_dir_brief(self.path, self.timeout, self.user_id).await
522558
}
523559

524560
pub async fn exec_full(self, client_cmd_sender: &ClientCommandSender) -> Result<Vec<MethodInfo>, CallRpcMethodError> {
525-
client_cmd_sender.call_dir_full(self.path, self.timeout).await
561+
client_cmd_sender.call_dir_full(self.path, self.timeout, self.user_id).await
526562
}
527563
}
528564

@@ -531,11 +567,12 @@ pub struct RpcCallDirExists<'a> {
531567
path: &'a str,
532568
method: &'a str,
533569
timeout: Option<Duration>,
570+
user_id: Option<String>,
534571
}
535572

536573
impl<'a> RpcCallDirExists<'a> {
537574
pub fn new(path: &'a str, method: &'a str) -> Self {
538-
Self { path, method, timeout: None }
575+
Self { path, method, timeout: None, user_id: None }
539576
}
540577

541578
#[must_use]
@@ -544,8 +581,14 @@ impl<'a> RpcCallDirExists<'a> {
544581
self
545582
}
546583

584+
#[must_use]
585+
pub fn user_id(mut self, user_id: impl Into<String>) -> Self {
586+
self.user_id = Some(user_id.into());
587+
self
588+
}
589+
547590
pub async fn exec(self, client_cmd_sender: &ClientCommandSender) -> Result<bool, CallRpcMethodError> {
548-
client_cmd_sender.call_dir_exists(self.path, self.method, self.timeout).await
591+
client_cmd_sender.call_dir_exists(self.path, self.method, self.timeout, self.user_id).await
549592
}
550593
}
551594

0 commit comments

Comments
 (0)