Skip to content

Commit e71d15d

Browse files
committed
Put RT as associate type of ClientTransport & ServerTransport
Let FailoverPool store the runtime handle (so that we can reduce one generic parameter)
1 parent 75069d6 commit e71d15d

8 files changed

Lines changed: 42 additions & 31 deletions

File tree

src/client/mod.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
pub mod task;
22
use captains_log::filter::LogFilter;
33
use crossfire::oneshot::oneshot;
4-
use orb::AsyncRuntime;
54
pub use razor_rpc_macros::{endpoint_async, endpoint_client};
65
pub use razor_stream::client::ClientCaller;
76
pub use task::*;
@@ -33,16 +32,16 @@ impl<C: Codec> APIFact<C> {
3332
self.logger.set_level(level);
3433
}
3534

36-
pub fn new_conn_pool<P: ClientTransport, RT: AsyncRuntime + Clone>(
37-
self: Arc<Self>, rt: &RT, addr: &str,
35+
pub fn new_conn_pool<P: ClientTransport>(
36+
self: Arc<Self>, rt: &P::RT, addr: &str,
3837
) -> APIConnPool<C, P> {
39-
ConnPool::<APIFact<C>, P>::new::<RT>(self.clone(), rt, addr, 0)
38+
ConnPool::<APIFact<C>, P>::new(self.clone(), rt, addr, 0)
4039
}
4140

42-
pub fn new_failover<P: ClientTransport, RT: AsyncRuntime + Clone>(
43-
self: Arc<Self>, rt: &RT, addrs: Vec<String>, stateless: bool, retry_limit: usize,
41+
pub fn new_failover<P: ClientTransport>(
42+
self: Arc<Self>, rt: &P::RT, addrs: Vec<String>, stateless: bool, retry_limit: usize,
4443
) -> APIFailoverPool<C, P> {
45-
FailoverPool::<APIFact<C>, P>::new::<RT>(self.clone(), rt, addrs, stateless, retry_limit, 0)
44+
FailoverPool::<APIFact<C>, P>::new(self.clone(), rt, addrs, stateless, retry_limit, 0)
4645
}
4746
}
4847

stream/src/client/failover.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use ahash::AHashMap;
1111
use arc_swap::ArcSwap;
1212
use captains_log::filter::LogFilter;
1313
use crossfire::{AsyncRx, MTx, SendError, mpsc};
14-
use orb::AsyncRuntime;
14+
use orb::prelude::AsyncExec;
1515
use std::fmt;
1616
use std::sync::{
1717
Arc, Weak,
@@ -57,6 +57,7 @@ where
5757
next_node: AtomicUsize,
5858
pool_channel_size: usize,
5959
facts: Arc<FailoverFacts<F>>,
60+
rt: P::RT,
6061
}
6162

6263
struct ClusterConfig<F, P>
@@ -76,8 +77,8 @@ where
7677
/// Initiate the pool with multiple addresses.
7778
/// When stateless == true, all addresses in the pool will be selected with equal chance (round-robin);
7879
/// When stateless == false, the leader address will always be picked unless error happens.
79-
pub fn new<RT: AsyncRuntime + Clone>(
80-
facts: Arc<F>, rt: &RT, addrs: Vec<String>, stateless: bool, retry_limit: usize,
80+
pub fn new(
81+
facts: Arc<F>, rt: &P::RT, addrs: Vec<String>, stateless: bool, retry_limit: usize,
8182
pool_channel_size: usize,
8283
) -> Self {
8384
let (retry_tx, retry_rx) = mpsc::unbounded_async();
@@ -86,7 +87,7 @@ where
8687
Arc::new(FailoverFacts { retry_limit, retry_tx, logger: facts.new_logger(), facts });
8788
let mut pools = Vec::with_capacity(addrs.len());
8889
for addr in addrs.iter() {
89-
let pool = ConnPool::new::<RT>(wrapped_facts.clone(), rt, addr, pool_channel_size);
90+
let pool = ConnPool::new(wrapped_facts.clone(), rt, addr, pool_channel_size);
9091
pools.push(pool);
9192
}
9293
// NOTE: the ConnPool has cycle reference with FailoverPoolInner
@@ -97,6 +98,7 @@ where
9798
ver: AtomicU64::new(1),
9899
next_node: AtomicUsize::new(0),
99100
pool_channel_size,
101+
rt: rt.clone(),
100102
});
101103
let weak_self = Arc::downgrade(&inner);
102104
rt.spawn_detach(async move {
@@ -174,7 +176,7 @@ where
174176
}
175177
}
176178

177-
pub fn update_addrs<RT: AsyncRuntime + Clone>(&self, addrs: Vec<String>, rt: &RT) {
179+
pub fn update_addrs(&self, addrs: Vec<String>) {
178180
let inner = &self.inner;
179181
let old_pools = inner.pools.load_full();
180182
let mut new_pools: Vec<ConnPool<FailoverFacts<F>, P>> = Vec::with_capacity(addrs.len());
@@ -190,7 +192,7 @@ where
190192
} else {
191193
// Create a new pool for the new address
192194
let new_pool =
193-
ConnPool::new::<RT>(inner.facts.clone(), rt, &addr, inner.pool_channel_size);
195+
ConnPool::new(inner.facts.clone(), &inner.rt, &addr, inner.pool_channel_size);
194196
new_pools.push(new_pool);
195197
}
196198
}

stream/src/client/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use crate::{Codec, error::RpcIntErr};
44
use captains_log::filter::LogFilter;
55
use crossfire::{AsyncRx, mpsc};
6+
use orb::AsyncRuntime;
67
use std::future::Future;
78
use std::sync::Arc;
89
use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -146,6 +147,8 @@ impl<C: ClientCallerBlocking + Send + Sync> ClientCallerBlocking for Arc<C> {
146147
///
147148
/// - [razor-rpc-tcp](https://docs.rs/razor-rpc-tcp): For TCP and Unix socket
148149
pub trait ClientTransport: fmt::Debug + Send + Sized + 'static {
150+
type RT: AsyncRuntime + Clone;
151+
149152
/// How to establish an async connection.
150153
///
151154
/// conn_id: used for log fmt, can by the same of addr.

stream/src/client/pool.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::client::{
55
use crate::error::RpcIntErr;
66
use captains_log::filter::LogFilter;
77
use crossfire::{MAsyncRx, MAsyncTx, MTx, RecvTimeoutError, mpmc};
8-
use orb::AsyncRuntime;
8+
use orb::prelude::{AsyncExec, AsyncTime};
99
use std::fmt;
1010
use std::marker::PhantomData;
1111
use std::sync::Arc;
@@ -65,9 +65,7 @@ struct ConnPoolInner<F: ClientFacts, P: ClientTransport> {
6565
const ONE_SEC: Duration = Duration::from_secs(1);
6666

6767
impl<F: ClientFacts, P: ClientTransport> ConnPool<F, P> {
68-
pub fn new<RT: AsyncRuntime + Clone>(
69-
facts: Arc<F>, rt: &RT, addr: &str, mut channel_size: usize,
70-
) -> Self {
68+
pub fn new(facts: Arc<F>, rt: &P::RT, addr: &str, mut channel_size: usize) -> Self {
7169
let config = facts.get_config();
7270
if config.thresholds > 0 {
7371
if channel_size < config.thresholds {
@@ -91,7 +89,7 @@ impl<F: ClientFacts, P: ClientTransport> ConnPool<F, P> {
9189
_phan: Default::default(),
9290
});
9391
let s = Self { tx_async, tx, inner };
94-
s.spawn::<RT>(rt);
92+
s.spawn(rt);
9593
s
9694
}
9795

@@ -118,7 +116,7 @@ impl<F: ClientFacts, P: ClientTransport> ConnPool<F, P> {
118116
/// by default there's one worker thread after initiation, but you can pre-spawn more thread if
119117
/// the connection is not enough to achieve desired throughput.
120118
#[inline]
121-
pub fn spawn<RT: AsyncRuntime + Clone>(&self, rt: &RT) {
119+
pub fn spawn(&self, rt: &P::RT) {
122120
let worker_id = self.inner.worker_count.fetch_add(1, Acquire);
123121
self.inner.clone().spawn_worker(rt, worker_id);
124122
}
@@ -155,7 +153,7 @@ impl<F: ClientFacts, P: ClientTransport> fmt::Display for ConnPoolInner<F, P> {
155153
}
156154

157155
impl<F: ClientFacts, P: ClientTransport> ConnPoolInner<F, P> {
158-
fn spawn_worker<RT: AsyncRuntime + Clone>(self: Arc<Self>, rt: &RT, worker_id: usize) {
156+
fn spawn_worker(self: Arc<Self>, rt: &P::RT, worker_id: usize) {
159157
let _rt = rt.clone();
160158
rt.spawn_detach(async move {
161159
logger_trace!(&self.logger, "{} worker_id={} running", self, worker_id);
@@ -181,7 +179,7 @@ impl<F: ClientFacts, P: ClientTransport> ConnPoolInner<F, P> {
181179
}
182180

183181
#[inline]
184-
async fn connect<RT: AsyncRuntime>(&self, rt: &RT) -> Result<ClientStream<F, P>, RpcIntErr> {
182+
async fn connect(&self, rt: &P::RT) -> Result<ClientStream<F, P>, RpcIntErr> {
185183
ClientStream::connect(self.facts.clone(), rt, &self.addr, &self.conn_id, None).await
186184
}
187185

@@ -220,23 +218,27 @@ impl<F: ClientFacts, P: ClientTransport> ConnPoolInner<F, P> {
220218
/// connection attempts happens after we spawn.
221219
/// If the address is dead, the thread might exit after multiple attempts, and later re-spawn
222220
/// when the needs arrives.
223-
async fn run<RT: AsyncRuntime + Clone>(self: &Arc<Self>, rt: RT, mut worker_id: usize) {
221+
async fn run(self: &Arc<Self>, rt: P::RT, mut worker_id: usize) {
224222
'CONN_LOOP: loop {
225-
match self.connect::<RT>(&rt).await {
223+
match self.connect(&rt).await {
226224
Ok(mut stream) => {
227225
logger_trace!(self.logger, "{} worker={} connected", self, worker_id);
228226
if worker_id == 0 {
229227
// act as monitor
230228
'MONITOR: loop {
231229
if self.get_workers() > 1 {
232-
RT::sleep(ONE_SEC).await;
230+
<P::RT as AsyncTime>::sleep(ONE_SEC).await;
233231
if stream.ping().await.is_err() {
234232
self.set_err();
235233
// don't cleanup the channel unless only one worker left
236234
continue 'CONN_LOOP;
237235
}
238236
} else {
239-
match self.rx.recv_with_timer(RT::sleep(ONE_SEC)).await {
237+
match self
238+
.rx
239+
.recv_with_timer(<P::RT as AsyncTime>::sleep(ONE_SEC))
240+
.await
241+
{
240242
Err(RecvTimeoutError::Disconnected) => {
241243
return;
242244
}
@@ -258,13 +260,13 @@ impl<F: ClientFacts, P: ClientTransport> ConnPoolInner<F, P> {
258260
// there's might be a lag to connect,
259261
// so we are spawning identity with new worker,
260262
worker_id = 1;
261-
self.clone().spawn_worker::<RT>(&rt, 0);
263+
self.clone().spawn_worker(&rt, 0);
262264
}
263265
if stream.send_task(task, true).await.is_err() {
264266
self.set_err();
265267
if worker_id == 0 {
266268
self.cleanup();
267-
RT::sleep(ONE_SEC).await;
269+
<P::RT as AsyncTime>::sleep(ONE_SEC).await;
268270
continue 'CONN_LOOP;
269271
} else {
270272
return;
@@ -297,7 +299,7 @@ impl<F: ClientFacts, P: ClientTransport> ConnPoolInner<F, P> {
297299
self.set_err();
298300
error!("connect failed to {}: {}", self.addr, e);
299301
self.cleanup();
300-
RT::sleep(ONE_SEC).await;
302+
<P::RT as AsyncTime>::sleep(ONE_SEC).await;
301303
}
302304
}
303305
}

stream/src/server/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ pub trait ServerFacts: Sync + Send + 'static + Sized {
6565
///
6666
/// - [razor-rpc-tcp](https://docs.rs/razor-rpc-tcp): For TCP and Unix socket
6767
pub trait ServerTransport: Send + Sync + Sized + 'static + fmt::Debug {
68+
type RT: AsyncRuntime + Clone;
69+
6870
type Listener: AsyncListener;
6971

7072
fn bind(addr: &str) -> impl Future<Output = io::Result<Self::Listener>> + Send;

test-suite/src/stream/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ pub async fn init_failover_client(
165165
// since client may be drop by test logic, it's not allow
166166
// to drop a tokio runtime inside async code.
167167
let facts = MyClient::new(config);
168-
FailoverPool::new::<crate::RT>(
168+
FailoverPool::new(
169169
facts,
170170
rt,
171171
addrs,

transport/tcp/src/client.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,9 @@ impl<RT: AsyncRuntime> TcpClient<RT> {
203203
}
204204
}
205205

206-
impl<RT: AsyncRuntime> ClientTransport for TcpClient<RT> {
206+
impl<RT: AsyncRuntime + Clone> ClientTransport for TcpClient<RT> {
207+
type RT = RT;
208+
207209
async fn connect(addr: &str, conn_id: &str, config: &ClientConfig) -> Result<Self, RpcIntErr> {
208210
let stream: UnifyStream<RT> =
209211
match UnifyStream::<RT>::connect_timeout(addr, config.connect_timeout).await {

transport/tcp/src/server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ impl<RT: AsyncRuntime> fmt::Debug for TcpServer<RT> {
6262
}
6363
}
6464

65-
impl<RT: AsyncRuntime> ServerTransport for TcpServer<RT> {
65+
impl<RT: AsyncRuntime + Clone> ServerTransport for TcpServer<RT> {
66+
type RT = RT;
6667
type Listener = UnifyListener<RT>;
6768

6869
async fn bind(addr: &str) -> io::Result<Self::Listener> {

0 commit comments

Comments
 (0)