Skip to content

Commit 481c077

Browse files
committed
feat(request): allow to use custom id generator on client
1 parent 9272591 commit 481c077

File tree

6 files changed

+124
-18
lines changed

6 files changed

+124
-18
lines changed

client/http-client/src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ where
276276

277277
Ok(HttpClient {
278278
transport,
279-
id_manager: Arc::new(RequestIdManager::new(id_kind)),
279+
id_manager: Arc::new(RequestIdManager::new(id_kind, None)),
280280
request_timeout,
281281
request_guard,
282282
})

core/src/client/async_client/mod.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ mod helpers;
3030
mod manager;
3131
mod utils;
3232

33-
use crate::client::async_client::helpers::{process_subscription_close_response, InnerBatchResponse};
33+
use crate::JsonRawValue;
34+
use crate::client::async_client::helpers::{InnerBatchResponse, process_subscription_close_response};
3435
use crate::client::async_client::utils::MaybePendingFutures;
3536
use crate::client::{
3637
BatchMessage, BatchResponse, ClientT, Error, ReceivedMessage, RegisterNotificationMessage, RequestMessage,
@@ -40,31 +41,30 @@ use crate::error::RegisterMethodError;
4041
use crate::params::{BatchRequestBuilder, EmptyBatchRequest};
4142
use crate::tracing::client::{rx_log_from_json, tx_log_from_str};
4243
use crate::traits::ToRpcParams;
43-
use crate::JsonRawValue;
4444
use std::borrow::Cow as StdCow;
4545

4646
use core::time::Duration;
4747
use helpers::{
4848
build_unsubscribe_message, call_with_timeout, process_batch_response, process_notification,
4949
process_single_response, process_subscription_response, stop_subscription,
5050
};
51-
use jsonrpsee_types::{InvalidRequestId, ResponseSuccess, TwoPointZero};
51+
use jsonrpsee_types::{IdGeneratorFn, InvalidRequestId, ResponseSuccess, TwoPointZero};
5252
use manager::RequestManager;
5353
use std::sync::Arc;
5454

5555
use async_trait::async_trait;
5656
use futures_timer::Delay;
57+
use futures_util::Stream;
5758
use futures_util::future::{self, Either};
5859
use futures_util::stream::StreamExt;
59-
use futures_util::Stream;
6060
use jsonrpsee_types::response::{ResponsePayload, SubscriptionError};
6161
use jsonrpsee_types::{NotificationSer, RequestSer, Response, SubscriptionResponse};
6262
use serde::de::DeserializeOwned;
6363
use tokio::sync::{mpsc, oneshot};
6464
use tracing::instrument;
6565

6666
use self::utils::{InactivityCheck, IntervalStream};
67-
use super::{generate_batch_id_range, subscription_channel, FrontToBack, IdKind, RequestIdManager};
67+
use super::{FrontToBack, IdKind, RequestIdManager, generate_batch_id_range, subscription_channel};
6868

6969
pub(crate) type Notification<'a> = jsonrpsee_types::Notification<'a, Option<serde_json::Value>>;
7070

@@ -346,7 +346,7 @@ impl ClientBuilder {
346346
to_back: to_back.clone(),
347347
request_timeout: self.request_timeout,
348348
error: ErrorFromBack::new(to_back, disconnect_reason),
349-
id_manager: RequestIdManager::new(self.id_kind),
349+
id_manager: RequestIdManager::new(self.id_kind, None),
350350
max_log_length: self.max_log_length,
351351
on_exit: Some(client_dropped_tx),
352352
}
@@ -454,6 +454,11 @@ impl Client {
454454
pub fn request_timeout(&self) -> Duration {
455455
self.request_timeout
456456
}
457+
458+
/// Registers a custom request id generator
459+
pub fn with_request_id_generator(&mut self, request_id_generator: IdGeneratorFn) {
460+
self.id_manager.custom_id_generator = Some(request_id_generator);
461+
}
457462
}
458463

459464
impl Drop for Client {

core/src/client/mod.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use crate::traits::ToRpcParams;
4747
use async_trait::async_trait;
4848
use core::marker::PhantomData;
4949
use futures_util::stream::{Stream, StreamExt};
50-
use jsonrpsee_types::{ErrorObject, Id, SubscriptionId};
50+
use jsonrpsee_types::{ErrorObject, Id, IdGeneratorFn, SubscriptionId};
5151
use serde::de::DeserializeOwned;
5252
use serde_json::Value as JsonValue;
5353
use tokio::sync::{mpsc, oneshot};
@@ -320,11 +320,7 @@ impl<Notif> Subscription<Notif> {
320320
return None;
321321
}
322322

323-
if lagged {
324-
Some(SubscriptionCloseReason::Lagged)
325-
} else {
326-
Some(SubscriptionCloseReason::ConnectionClosed)
327-
}
323+
if lagged { Some(SubscriptionCloseReason::Lagged) } else { Some(SubscriptionCloseReason::ConnectionClosed) }
328324
}
329325
}
330326

@@ -459,17 +455,23 @@ pub struct RequestIdManager {
459455
current_id: CurrentId,
460456
/// Request ID type.
461457
id_kind: IdKind,
458+
/// Custom ID generator.
459+
custom_id_generator: Option<IdGeneratorFn>,
462460
}
463461

464462
impl RequestIdManager {
465463
/// Create a new `RequestIdGuard` with the provided concurrency limit.
466-
pub fn new(id_kind: IdKind) -> Self {
467-
Self { current_id: CurrentId::new(), id_kind }
464+
pub fn new(id_kind: IdKind, custom_id_generator: Option<IdGeneratorFn>) -> Self {
465+
Self { current_id: CurrentId::new(), id_kind, custom_id_generator }
468466
}
469467

470468
/// Attempts to get the next request ID.
471469
pub fn next_request_id(&self) -> Id<'static> {
472-
self.id_kind.into_id(self.current_id.next())
470+
if let Some(ref generator) = self.custom_id_generator {
471+
generator.call()
472+
} else {
473+
self.id_kind.into_id(self.current_id.next()) // Default behavior
474+
}
473475
}
474476

475477
/// Get a handle to the `IdKind`.
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2+
//
3+
// Permission is hereby granted, free of charge, to any
4+
// person obtaining a copy of this software and associated
5+
// documentation files (the "Software"), to deal in the
6+
// Software without restriction, including without
7+
// limitation the rights to use, copy, modify, merge,
8+
// publish, distribute, sublicense, and/or sell copies of
9+
// the Software, and to permit persons to whom the Software
10+
// is furnished to do so, subject to the following
11+
// conditions:
12+
//
13+
// The above copyright notice and this permission notice
14+
// shall be included in all copies or substantial portions
15+
// of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18+
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19+
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20+
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21+
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22+
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23+
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24+
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25+
// DEALINGS IN THE SOFTWARE.
26+
27+
use std::net::SocketAddr;
28+
use std::time::{SystemTime, UNIX_EPOCH};
29+
30+
use jsonrpsee::client_transport::ws::{Url, WsTransportClientBuilder};
31+
use jsonrpsee::core::client::{Client, ClientBuilder, ClientT};
32+
use jsonrpsee::rpc_params;
33+
use jsonrpsee::server::{RpcModule, Server};
34+
use jsonrpsee::types::{Id, IdGeneratorFn};
35+
36+
#[tokio::main]
37+
async fn main() -> anyhow::Result<()> {
38+
tracing_subscriber::FmtSubscriber::builder()
39+
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
40+
.try_init()
41+
.expect("setting default subscriber failed");
42+
43+
let addr = run_server().await?;
44+
let uri = Url::parse(&format!("ws://{}", addr))?;
45+
46+
let (tx, rx) = WsTransportClientBuilder::default().build(uri).await?;
47+
let mut client: Client = ClientBuilder::default().build_with_tokio(tx, rx);
48+
49+
let custom_generator = Box::new(|| {
50+
let timestamp_in_seconds = SystemTime::now().duration_since(UNIX_EPOCH).expect("Time went backwards").as_secs();
51+
Id::Number(timestamp_in_seconds)
52+
});
53+
54+
client.with_request_id_generator(IdGeneratorFn::new(custom_generator));
55+
56+
let response: String = client.request("say_hello", rpc_params![]).await?;
57+
tracing::info!("response: {:?}", response);
58+
59+
Ok(())
60+
}
61+
62+
async fn run_server() -> anyhow::Result<SocketAddr> {
63+
let server = Server::builder().build("127.0.0.1:0").await?;
64+
let mut module = RpcModule::new(());
65+
module.register_method("say_hello", |_, _, _| "lo")?;
66+
let addr = server.local_addr()?;
67+
68+
let handle = server.start(module);
69+
70+
// In this example we don't care about doing shutdown so let's it run forever.
71+
// You may use the `ServerHandle` to shut it down or manage it yourself.
72+
tokio::spawn(handle.stopped());
73+
74+
Ok(addr)
75+
}

types/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,5 @@ pub mod error;
4545

4646
pub use error::{ErrorCode, ErrorObject, ErrorObjectOwned};
4747
pub use params::{Id, InvalidRequestId, Params, ParamsSequence, SubscriptionId, TwoPointZero};
48-
pub use request::{InvalidRequest, Notification, NotificationSer, Request, RequestSer};
48+
pub use request::{IdGeneratorFn, InvalidRequest, Notification, NotificationSer, Request, RequestSer};
4949
pub use response::{Response, ResponsePayload, SubscriptionPayload, SubscriptionResponse, Success as ResponseSuccess};

types/src/request.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@
2727
//! Types to handle JSON-RPC requests according to the [spec](https://www.jsonrpc.org/specification#request-object).
2828
//! Some types come with a "*Ser" variant that implements [`serde::Serialize`]; these are used in the client.
2929
30-
use std::borrow::Cow;
30+
use std::{
31+
borrow::Cow,
32+
fmt::{Debug, Formatter, Result},
33+
};
3134

3235
use crate::{
3336
Params,
@@ -173,6 +176,27 @@ impl<'a> NotificationSer<'a> {
173176
}
174177
}
175178

179+
/// Custom id generator function
180+
pub struct IdGeneratorFn(Box<dyn Fn() -> Id<'static> + Send + Sync>);
181+
182+
impl IdGeneratorFn {
183+
/// Creates a new `IdGeneratorFn`
184+
pub fn new(generator: Box<dyn Fn() -> Id<'static> + Send + Sync>) -> Self {
185+
IdGeneratorFn(generator)
186+
}
187+
188+
/// Calls the id generator function
189+
pub fn call(&self) -> Id<'static> {
190+
(self.0)()
191+
}
192+
}
193+
194+
impl Debug for IdGeneratorFn {
195+
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
196+
f.write_str("<CustomIdGenerator>")
197+
}
198+
}
199+
176200
#[cfg(test)]
177201
mod test {
178202
use super::{Cow, Id, InvalidRequest, Notification, NotificationSer, Request, RequestSer, TwoPointZero};

0 commit comments

Comments
 (0)