Skip to content

Commit 187fc03

Browse files
committed
Put RT as associate type of ClientTransport & ServerTransport
Let FailoverPool store the runtime handle (so that we can reduce one generic parameter). update_addr does not need rt argument as well.
1 parent 8af7703 commit 187fc03

11 files changed

Lines changed: 55 additions & 45 deletions

File tree

docs/api_design.md

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ A Service in `razor-rpc` follows these principles:
5555
- Client and server share the same trait definition for compile-time checks
5656
- Compatible with GRPC naming conventions (`service` in PascalCase, `method` in snake_case)
5757
- Methods should be `async fn` or return `impl Future`
58-
- Methods can return **custom error type**. All method should return `Result<T, RpcError<E>>` where `E: RpcErrCodec`, refer to doc: [error module](crate::error).
58+
- Methods can return **custom error type**. All method should return `Result<T, RpcError<E>>` where `E: RpcErrCodec`, refer to doc: [error module](crate::error).
5959

6060
We supports rust 1.75 `AFIT` (Async fn in Traits) `RPITIT` (Return Position Impl Trait in Traits), and legacy `#[async_trait]`.
6161

@@ -198,7 +198,7 @@ async fn use_client(server_addr: &str) {
198198
let rt = RT::new_multi_thread(8);
199199
let factory = APIFact::<Codec>::new(client_config);
200200
// 9. Create client connection pool
201-
let pool: APIConnPool<Codec, ClientProto> = factory.new_conn_pool::<ClientProto, RT>(&rt, server_addr);
201+
let pool: APIConnPool<Codec, ClientProto> = factory.new_conn_pool::<ClientProto>(&rt, server_addr);
202202
let client = CalculatorClient::new(pool);
203203
// You will have to import CalculatorService trait to call its methods
204204
use CalculatorService;
@@ -282,9 +282,9 @@ endpoint_client!(KVClient);
282282
#[endpoint_async(KVClient)]
283283
pub trait KVService {
284284
// Note: endpoint_async macro requires exactly one parameter besides &self
285-
fn put(&self, kv: (String, String))
285+
fn put(&self, kv: (String, String))
286286
-> impl Future<Output = Result<(), RpcError<ClusterErr>>> + Send;
287-
fn get(&self, key: String)
287+
fn get(&self, key: String)
288288
-> impl Future<Output = Result<Option<String>, RpcError<String>>> + Send;
289289
}
290290

@@ -295,13 +295,13 @@ type FailoverCaller = razor_rpc::client::APIFailoverPool<Codec, TcpClient<RT>>;
295295

296296
impl KVClient<FailoverCaller> {
297297
pub fn new_cluster_client(
298-
config: ClientConfig,
299-
addrs: Vec<String>,
298+
config: ClientConfig,
299+
addrs: Vec<String>,
300300
rt: &RT
301301
) -> Self {
302302
let fact = APIFact::<Codec>::new(config);
303303
// stateless=false: maintain leader affinity for stateful service
304-
let pool = fact.new_failover::<TcpClient<RT>, RT>(rt, addrs, false, 3);
304+
let pool = fact.new_failover::<TcpClient<RT>>(rt, addrs, false, 3);
305305
KVClient::new(pool)
306306
}
307307
}
@@ -315,12 +315,12 @@ async fn example() {
315315
"127.0.0.1:8081".to_string(),
316316
"127.0.0.1:8082".to_string(),
317317
];
318-
318+
319319
let client = KVClient::new_cluster_client(config, addrs, &rt);
320-
320+
321321
// Write goes to leader (with automatic redirect if needed)
322322
client.put(("key1".to_string(), "value1".to_string())).await.unwrap();
323-
323+
324324
// Read can go to any node
325325
let value = client.get("key1".to_string()).await.unwrap();
326326
}

src/client/mod.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
pub mod task;
22
use captains_log::filter::LogFilter;
33
use crossfire::oneshot::oneshot;
4-
use orb::AsyncRuntime;
54
pub use razor_rpc_macros::{endpoint_async, endpoint_client};
65
pub use razor_stream::client::ClientCaller;
76
pub use task::*;
@@ -33,16 +32,16 @@ impl<C: Codec> APIFact<C> {
3332
self.logger.set_level(level);
3433
}
3534

36-
pub fn new_conn_pool<P: ClientTransport, RT: AsyncRuntime + Clone>(
37-
self: Arc<Self>, rt: &RT, addr: &str,
35+
pub fn new_conn_pool<P: ClientTransport>(
36+
self: Arc<Self>, rt: &P::RT, addr: &str,
3837
) -> APIConnPool<C, P> {
39-
ConnPool::<APIFact<C>, P>::new::<RT>(self.clone(), rt, addr, 0)
38+
ConnPool::<APIFact<C>, P>::new(self.clone(), rt, addr, 0)
4039
}
4140

42-
pub fn new_failover<P: ClientTransport, RT: AsyncRuntime + Clone>(
43-
self: Arc<Self>, rt: &RT, addrs: Vec<String>, stateless: bool, retry_limit: usize,
41+
pub fn new_failover<P: ClientTransport>(
42+
self: Arc<Self>, rt: &P::RT, addrs: Vec<String>, stateless: bool, retry_limit: usize,
4443
) -> APIFailoverPool<C, P> {
45-
FailoverPool::<APIFact<C>, P>::new::<RT>(self.clone(), rt, addrs, stateless, retry_limit, 0)
44+
FailoverPool::<APIFact<C>, P>::new(self.clone(), rt, addrs, stateless, retry_limit, 0)
4645
}
4746
}
4847

stream/src/client/failover.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use ahash::AHashMap;
1111
use arc_swap::ArcSwap;
1212
use captains_log::filter::LogFilter;
1313
use crossfire::{AsyncRx, MTx, SendError, mpsc};
14-
use orb::AsyncRuntime;
14+
use orb::prelude::AsyncExec;
1515
use std::fmt;
1616
use std::sync::{
1717
Arc, Weak,
@@ -57,6 +57,7 @@ where
5757
next_node: AtomicUsize,
5858
pool_channel_size: usize,
5959
facts: Arc<FailoverFacts<F>>,
60+
rt: P::RT,
6061
}
6162

6263
struct ClusterConfig<F, P>
@@ -76,8 +77,8 @@ where
7677
/// Initiate the pool with multiple addresses.
7778
/// When stateless == true, all addresses in the pool will be selected with equal chance (round-robin);
7879
/// When stateless == false, the leader address will always be picked unless error happens.
79-
pub fn new<RT: AsyncRuntime + Clone>(
80-
facts: Arc<F>, rt: &RT, addrs: Vec<String>, stateless: bool, retry_limit: usize,
80+
pub fn new(
81+
facts: Arc<F>, rt: &P::RT, addrs: Vec<String>, stateless: bool, retry_limit: usize,
8182
pool_channel_size: usize,
8283
) -> Self {
8384
let (retry_tx, retry_rx) = mpsc::unbounded_async();
@@ -86,7 +87,7 @@ where
8687
Arc::new(FailoverFacts { retry_limit, retry_tx, logger: facts.new_logger(), facts });
8788
let mut pools = Vec::with_capacity(addrs.len());
8889
for addr in addrs.iter() {
89-
let pool = ConnPool::new::<RT>(wrapped_facts.clone(), rt, addr, pool_channel_size);
90+
let pool = ConnPool::new(wrapped_facts.clone(), rt, addr, pool_channel_size);
9091
pools.push(pool);
9192
}
9293
// NOTE: the ConnPool has cycle reference with FailoverPoolInner
@@ -97,6 +98,7 @@ where
9798
ver: AtomicU64::new(1),
9899
next_node: AtomicUsize::new(0),
99100
pool_channel_size,
101+
rt: rt.clone(),
100102
});
101103
let weak_self = Arc::downgrade(&inner);
102104
rt.spawn_detach(async move {
@@ -174,7 +176,7 @@ where
174176
}
175177
}
176178

177-
pub fn update_addrs<RT: AsyncRuntime + Clone>(&self, addrs: Vec<String>, rt: &RT) {
179+
pub fn update_addrs(&self, addrs: Vec<String>) {
178180
let inner = &self.inner;
179181
let old_pools = inner.pools.load_full();
180182
let mut new_pools: Vec<ConnPool<FailoverFacts<F>, P>> = Vec::with_capacity(addrs.len());
@@ -190,7 +192,7 @@ where
190192
} else {
191193
// Create a new pool for the new address
192194
let new_pool =
193-
ConnPool::new::<RT>(inner.facts.clone(), rt, &addr, inner.pool_channel_size);
195+
ConnPool::new(inner.facts.clone(), &inner.rt, &addr, inner.pool_channel_size);
194196
new_pools.push(new_pool);
195197
}
196198
}

stream/src/client/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use crate::{Codec, error::RpcIntErr};
44
use captains_log::filter::LogFilter;
55
use crossfire::{AsyncRx, mpsc};
6+
use orb::AsyncRuntime;
67
use std::future::Future;
78
use std::sync::Arc;
89
use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -146,6 +147,8 @@ impl<C: ClientCallerBlocking + Send + Sync> ClientCallerBlocking for Arc<C> {
146147
///
147148
/// - [razor-rpc-tcp](https://docs.rs/razor-rpc-tcp): For TCP and Unix socket
148149
pub trait ClientTransport: fmt::Debug + Send + Sized + 'static {
150+
type RT: AsyncRuntime + Clone;
151+
149152
/// How to establish an async connection.
150153
///
151154
/// conn_id: used for log fmt, can by the same of addr.

stream/src/client/pool.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::client::{
55
use crate::error::RpcIntErr;
66
use captains_log::filter::LogFilter;
77
use crossfire::{MAsyncRx, MAsyncTx, MTx, RecvTimeoutError, mpmc};
8-
use orb::AsyncRuntime;
8+
use orb::prelude::{AsyncExec, AsyncTime};
99
use std::fmt;
1010
use std::marker::PhantomData;
1111
use std::sync::Arc;
@@ -65,9 +65,7 @@ struct ConnPoolInner<F: ClientFacts, P: ClientTransport> {
6565
const ONE_SEC: Duration = Duration::from_secs(1);
6666

6767
impl<F: ClientFacts, P: ClientTransport> ConnPool<F, P> {
68-
pub fn new<RT: AsyncRuntime + Clone>(
69-
facts: Arc<F>, rt: &RT, addr: &str, mut channel_size: usize,
70-
) -> Self {
68+
pub fn new(facts: Arc<F>, rt: &P::RT, addr: &str, mut channel_size: usize) -> Self {
7169
let config = facts.get_config();
7270
if config.thresholds > 0 {
7371
if channel_size < config.thresholds {
@@ -91,7 +89,7 @@ impl<F: ClientFacts, P: ClientTransport> ConnPool<F, P> {
9189
_phan: Default::default(),
9290
});
9391
let s = Self { tx_async, tx, inner };
94-
s.spawn::<RT>(rt);
92+
s.spawn(rt);
9593
s
9694
}
9795

@@ -118,7 +116,7 @@ impl<F: ClientFacts, P: ClientTransport> ConnPool<F, P> {
118116
/// by default there's one worker thread after initiation, but you can pre-spawn more thread if
119117
/// the connection is not enough to achieve desired throughput.
120118
#[inline]
121-
pub fn spawn<RT: AsyncRuntime + Clone>(&self, rt: &RT) {
119+
pub fn spawn(&self, rt: &P::RT) {
122120
let worker_id = self.inner.worker_count.fetch_add(1, Acquire);
123121
self.inner.clone().spawn_worker(rt, worker_id);
124122
}
@@ -155,7 +153,7 @@ impl<F: ClientFacts, P: ClientTransport> fmt::Display for ConnPoolInner<F, P> {
155153
}
156154

157155
impl<F: ClientFacts, P: ClientTransport> ConnPoolInner<F, P> {
158-
fn spawn_worker<RT: AsyncRuntime + Clone>(self: Arc<Self>, rt: &RT, worker_id: usize) {
156+
fn spawn_worker(self: Arc<Self>, rt: &P::RT, worker_id: usize) {
159157
let _rt = rt.clone();
160158
rt.spawn_detach(async move {
161159
logger_trace!(&self.logger, "{} worker_id={} running", self, worker_id);
@@ -181,7 +179,7 @@ impl<F: ClientFacts, P: ClientTransport> ConnPoolInner<F, P> {
181179
}
182180

183181
#[inline]
184-
async fn connect<RT: AsyncRuntime>(&self, rt: &RT) -> Result<ClientStream<F, P>, RpcIntErr> {
182+
async fn connect(&self, rt: &P::RT) -> Result<ClientStream<F, P>, RpcIntErr> {
185183
ClientStream::connect(self.facts.clone(), rt, &self.addr, &self.conn_id, None).await
186184
}
187185

@@ -220,23 +218,27 @@ impl<F: ClientFacts, P: ClientTransport> ConnPoolInner<F, P> {
220218
/// connection attempts happens after we spawn.
221219
/// If the address is dead, the thread might exit after multiple attempts, and later re-spawn
222220
/// when the needs arrives.
223-
async fn run<RT: AsyncRuntime + Clone>(self: &Arc<Self>, rt: RT, mut worker_id: usize) {
221+
async fn run(self: &Arc<Self>, rt: P::RT, mut worker_id: usize) {
224222
'CONN_LOOP: loop {
225-
match self.connect::<RT>(&rt).await {
223+
match self.connect(&rt).await {
226224
Ok(mut stream) => {
227225
logger_trace!(self.logger, "{} worker={} connected", self, worker_id);
228226
if worker_id == 0 {
229227
// act as monitor
230228
'MONITOR: loop {
231229
if self.get_workers() > 1 {
232-
RT::sleep(ONE_SEC).await;
230+
<P::RT as AsyncTime>::sleep(ONE_SEC).await;
233231
if stream.ping().await.is_err() {
234232
self.set_err();
235233
// don't cleanup the channel unless only one worker left
236234
continue 'CONN_LOOP;
237235
}
238236
} else {
239-
match self.rx.recv_with_timer(RT::sleep(ONE_SEC)).await {
237+
match self
238+
.rx
239+
.recv_with_timer(<P::RT as AsyncTime>::sleep(ONE_SEC))
240+
.await
241+
{
240242
Err(RecvTimeoutError::Disconnected) => {
241243
return;
242244
}
@@ -258,13 +260,13 @@ impl<F: ClientFacts, P: ClientTransport> ConnPoolInner<F, P> {
258260
// there's might be a lag to connect,
259261
// so we are spawning identity with new worker,
260262
worker_id = 1;
261-
self.clone().spawn_worker::<RT>(&rt, 0);
263+
self.clone().spawn_worker(&rt, 0);
262264
}
263265
if stream.send_task(task, true).await.is_err() {
264266
self.set_err();
265267
if worker_id == 0 {
266268
self.cleanup();
267-
RT::sleep(ONE_SEC).await;
269+
<P::RT as AsyncTime>::sleep(ONE_SEC).await;
268270
continue 'CONN_LOOP;
269271
} else {
270272
return;
@@ -297,7 +299,7 @@ impl<F: ClientFacts, P: ClientTransport> ConnPoolInner<F, P> {
297299
self.set_err();
298300
error!("connect failed to {}: {}", self.addr, e);
299301
self.cleanup();
300-
RT::sleep(ONE_SEC).await;
302+
<P::RT as AsyncTime>::sleep(ONE_SEC).await;
301303
}
302304
}
303305
}

stream/src/server/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ pub trait ServerFacts: Sync + Send + 'static + Sized {
6565
///
6666
/// - [razor-rpc-tcp](https://docs.rs/razor-rpc-tcp): For TCP and Unix socket
6767
pub trait ServerTransport: Send + Sync + Sized + 'static + fmt::Debug {
68+
type RT: AsyncRuntime + Clone;
69+
6870
type Listener: AsyncListener;
6971

7072
fn bind(addr: &str) -> impl Future<Output = io::Result<Self::Listener>> + Send;

test-suite/src/api/basic/test_failover_redirect.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use orb::prelude::{AsyncExec, AsyncTime};
1+
use orb::prelude::AsyncExec;
22
use parking_lot::Mutex;
33
use razor_rpc::client::{APIFact, ClientConfig, endpoint_async, endpoint_client};
44
use razor_rpc::error::{EncodedErr, RpcErrCodec, RpcError};
@@ -13,7 +13,6 @@ use std::collections::HashMap;
1313
use std::fmt;
1414
use std::future::Future;
1515
use std::sync::Arc;
16-
use std::sync::atomic::{AtomicUsize, Ordering};
1716

1817
use crate::{TestRunner, logfn, runner};
1918

@@ -179,7 +178,7 @@ pub type FailoverCaller = razor_rpc::client::APIFailoverPool<MsgpCodec, TcpClien
179178
impl DataClient<FailoverCaller> {
180179
pub fn new_failover_client(config: ClientConfig, addrs: Vec<String>, rt: &crate::RT) -> Self {
181180
let fact = APIFact::<MsgpCodec>::new(config);
182-
let pool = fact.new_failover::<TcpClient<crate::RT>, crate::RT>(rt, addrs, false, 3);
181+
let pool = fact.new_failover::<TcpClient<crate::RT>>(rt, addrs, false, 3);
183182
DataClient::new(pool)
184183
}
185184
}

test-suite/src/api/basic/test_multi_service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub type PoolCaller = APIConnPool<MsgpCodec, TcpClient<crate::RT>>;
3838
impl MyClient<PoolCaller> {
3939
pub fn new_client(config: ClientConfig, addr: &str, rt: &crate::RT) -> Self {
4040
let facts = APIFact::<MsgpCodec>::new(config);
41-
let pool = facts.new_conn_pool::<TcpClient<crate::RT>, crate::RT>(rt, addr);
41+
let pool = facts.new_conn_pool::<TcpClient<crate::RT>>(rt, addr);
4242
MyClient::new(pool)
4343
}
4444
}

test-suite/src/stream/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ pub async fn init_failover_client(
165165
// since client may be drop by test logic, it's not allow
166166
// to drop a tokio runtime inside async code.
167167
let facts = MyClient::new(config);
168-
FailoverPool::new::<crate::RT>(
168+
FailoverPool::new(
169169
facts,
170170
rt,
171171
addrs,

transport/tcp/src/client.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,9 @@ impl<RT: AsyncRuntime> TcpClient<RT> {
203203
}
204204
}
205205

206-
impl<RT: AsyncRuntime> ClientTransport for TcpClient<RT> {
206+
impl<RT: AsyncRuntime + Clone> ClientTransport for TcpClient<RT> {
207+
type RT = RT;
208+
207209
async fn connect(addr: &str, conn_id: &str, config: &ClientConfig) -> Result<Self, RpcIntErr> {
208210
let stream: UnifyStream<RT> =
209211
match UnifyStream::<RT>::connect_timeout(addr, config.connect_timeout).await {

0 commit comments

Comments
 (0)