Skip to content

Commit e11db27

Browse files
committed
Rename ClientFactory->ClientFact
1 parent a2e00ae commit e11db27

9 files changed

Lines changed: 119 additions & 118 deletions

File tree

src/client/factory.rs

Lines changed: 4 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,18 @@ use super::task::APIClientReq;
22
use super::{AsyncEndpoint, BlockingEndpoint};
33
use captains_log::filter::LogFilter;
44
use occams_rpc_core::{ClientConfig, Codec};
5-
use occams_rpc_stream::client::{ClientFactory, ClientPool, ClientTransport, FailoverPool};
5+
use occams_rpc_stream::client::{ClientFactory, ClientPool, ClientTransport};
66
use std::sync::Arc;
77

88
/// An example factory for API Clients
9-
pub struct APIClientFactory<C: Codec> {
9+
pub struct APIClientFactDefault<C: Codec> {
1010
pub logger: Arc<LogFilter>,
1111
config: ClientConfig,
1212
rt: crate::RT,
1313
_phan: std::marker::PhantomData<fn(&C)>,
1414
}
1515

16-
impl<C: Codec> APIClientFactory<C> {
16+
impl<C: Codec> APIClientFactDefault<C> {
1717
pub fn new(config: ClientConfig, rt: crate::RT) -> Arc<Self> {
1818
Arc::new(Self { logger: Arc::new(LogFilter::new()), config, rt, _phan: Default::default() })
1919
}
@@ -22,45 +22,9 @@ impl<C: Codec> APIClientFactory<C> {
2222
pub fn set_log_level(&self, level: log::Level) {
2323
self.logger.set_level(level);
2424
}
25-
26-
pub fn create_endpoint_async<T: ClientTransport<crate::RT>>(
27-
self: Arc<Self>, addr: &str,
28-
) -> AsyncEndpoint<ClientPool<Self, T>> {
29-
return AsyncEndpoint::new(ClientPool::new(self.clone(), addr, 0));
30-
}
31-
32-
pub fn create_endpoint_async_failover<T: ClientTransport<crate::RT>>(
33-
self: Arc<Self>, addrs: Vec<String>, round_robin: bool, retry_limit: usize,
34-
) -> AsyncEndpoint<Arc<FailoverPool<Self, T>>> {
35-
return AsyncEndpoint::new(Arc::new(FailoverPool::new(
36-
self.clone(),
37-
addrs,
38-
round_robin,
39-
retry_limit,
40-
0,
41-
)));
42-
}
43-
44-
pub fn create_endpoint_blocking<T: ClientTransport<crate::RT>>(
45-
self: Arc<Self>, addr: &str,
46-
) -> BlockingEndpoint<ClientPool<Self, T>> {
47-
return BlockingEndpoint::new(ClientPool::new(self.clone(), addr, 0));
48-
}
49-
50-
pub fn create_endpoint_blocking_failover<T: ClientTransport<crate::RT>>(
51-
self: Arc<Self>, addrs: Vec<String>, round_robin: bool, retry_limit: usize,
52-
) -> BlockingEndpoint<Arc<FailoverPool<Self, T>>> {
53-
return BlockingEndpoint::new(Arc::new(FailoverPool::new(
54-
self.clone(),
55-
addrs,
56-
round_robin,
57-
retry_limit,
58-
0,
59-
)));
60-
}
6125
}
6226

63-
impl<C: Codec> ClientFactory for APIClientFactory<C> {
27+
impl<C: Codec> ClientFactory for APIClientDefault<C> {
6428
type Logger = Arc<LogFilter>;
6529
type Codec = C;
6630
type Task = APIClientReq;

src/client/mod.rs

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,61 @@ pub use occams_rpc_stream::client::ClientCaller;
1111

1212
use occams_rpc_core::Codec;
1313
use occams_rpc_core::error::{EncodedErr, RpcErrCodec, RpcError, RpcIntErr};
14-
use occams_rpc_stream::client::{ClientCallerBlocking, ClientFactory};
14+
use occams_rpc_stream::client::{
15+
ClientCallerBlocking, ClientFact, ClientPool, ClientTransport, FailoverPool,
16+
};
1517
use std::fmt;
18+
use std::sync::Arc;
19+
20+
pub trait APIClientFact: ClientFact<Task = APIClientReq> {
21+
fn create_endpoint_async<T: ClientTransport<<Self as ClientFact>::IO>>(
22+
self: Arc<Self>, addr: &str,
23+
) -> AsyncEndpoint<ClientPool<Self, T>> {
24+
return AsyncEndpoint::new(ClientPool::new(self.clone(), addr, 0));
25+
}
26+
27+
fn create_endpoint_async_failover<T: ClientTransport<<Self as ClientFact>::IO>>(
28+
self: Arc<Self>, addrs: Vec<String>, round_robin: bool, retry_limit: usize,
29+
) -> AsyncEndpoint<Arc<FailoverPool<Self, T>>> {
30+
return AsyncEndpoint::new(Arc::new(FailoverPool::new(
31+
self.clone(),
32+
addrs,
33+
round_robin,
34+
retry_limit,
35+
0,
36+
)));
37+
}
38+
39+
fn create_endpoint_blocking<T: ClientTransport<<Self as ClientFact>::IO>>(
40+
self: Arc<Self>, addr: &str,
41+
) -> BlockingEndpoint<ClientPool<Self, T>> {
42+
return BlockingEndpoint::new(ClientPool::new(self.clone(), addr, 0));
43+
}
44+
45+
fn create_endpoint_blocking_failover<T: ClientTransport<<Self as ClientFact>::IO>>(
46+
self: Arc<Self>, addrs: Vec<String>, round_robin: bool, retry_limit: usize,
47+
) -> BlockingEndpoint<Arc<FailoverPool<Self, T>>> {
48+
return BlockingEndpoint::new(Arc::new(FailoverPool::new(
49+
self.clone(),
50+
addrs,
51+
round_robin,
52+
retry_limit,
53+
0,
54+
)));
55+
}
56+
}
1657

1758
pub struct AsyncEndpoint<C>
1859
where
19-
C: ClientCaller<Factory: ClientFactory<Task = APIClientReq>>,
60+
C: ClientCaller<Factory: ClientFact<Task = APIClientReq>>,
2061
{
2162
caller: C,
22-
codec: <C::Factory as ClientFactory>::Codec,
63+
codec: <C::Factory as ClientFact>::Codec,
2364
}
2465

2566
impl<C> AsyncEndpoint<C>
2667
where
27-
C: ClientCaller<Factory: ClientFactory<Task = APIClientReq>>,
68+
C: ClientCaller<Factory: ClientFact<Task = APIClientReq>>,
2869
{
2970
pub fn new(caller: C) -> Self {
3071
Self { caller, codec: Default::default() }
@@ -48,7 +89,7 @@ where
4889

4990
impl<C> Clone for AsyncEndpoint<C>
5091
where
51-
C: Clone + ClientCaller<Factory: ClientFactory<Task = APIClientReq>>,
92+
C: Clone + ClientCaller<Factory: ClientFact<Task = APIClientReq>>,
5293
{
5394
fn clone(&self) -> Self {
5495
Self::new(self.caller.clone())
@@ -57,7 +98,7 @@ where
5798

5899
impl<C> std::ops::Deref for AsyncEndpoint<C>
59100
where
60-
C: ClientCaller<Factory: ClientFactory<Task = APIClientReq>>,
101+
C: ClientCaller<Factory: ClientFact<Task = APIClientReq>>,
61102
{
62103
type Target = C;
63104

@@ -68,15 +109,15 @@ where
68109

69110
pub struct BlockingEndpoint<C>
70111
where
71-
C: ClientCallerBlocking<Factory: ClientFactory<Task = APIClientReq>>,
112+
C: ClientCallerBlocking<Factory: ClientFact<Task = APIClientReq>>,
72113
{
73114
caller: C,
74-
codec: <C::Factory as ClientFactory>::Codec,
115+
codec: <C::Factory as ClientFact>::Codec,
75116
}
76117

77118
impl<C> BlockingEndpoint<C>
78119
where
79-
C: ClientCallerBlocking<Factory: ClientFactory<Task = APIClientReq>>,
120+
C: ClientCallerBlocking<Factory: ClientFact<Task = APIClientReq>>,
80121
{
81122
fn new(caller: C) -> Self {
82123
Self { caller, codec: Default::default() }
@@ -99,7 +140,7 @@ where
99140

100141
impl<C> Clone for BlockingEndpoint<C>
101142
where
102-
C: Clone + ClientCallerBlocking<Factory: ClientFactory<Task = APIClientReq>>,
143+
C: Clone + ClientCallerBlocking<Factory: ClientFact<Task = APIClientReq>>,
103144
{
104145
fn clone(&self) -> Self {
105146
Self::new(self.caller.clone())
@@ -108,7 +149,7 @@ where
108149

109150
impl<C> std::ops::Deref for BlockingEndpoint<C>
110151
where
111-
C: ClientCallerBlocking<Factory: ClientFactory<Task = APIClientReq>>,
152+
C: ClientCallerBlocking<Factory: ClientFact<Task = APIClientReq>>,
112153
{
113154
type Target = C;
114155

stream/src/client/failover.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
use crate::client::task::*;
2-
use crate::client::{
3-
ClientCaller, ClientCallerBlocking, ClientFactory, ClientPool, ClientTransport,
4-
};
2+
use crate::client::{ClientCaller, ClientCallerBlocking, ClientFact, ClientPool, ClientTransport};
53
use crate::proto::RpcAction;
64
use arc_swap::ArcSwapOption;
75
use crossfire::*;
@@ -25,12 +23,12 @@ use std::sync::{
2523
/// don't clone FailoverPool as it has custom drop. FailoverPool should be put in Arc for usage.
2624
pub struct FailoverPool<F, P>(Arc<FailoverPoolInner<F, P>>)
2725
where
28-
F: ClientFactory,
26+
F: ClientFact,
2927
P: ClientTransport<F::IO>;
3028

3129
struct FailoverPoolInner<F, P>
3230
where
33-
F: ClientFactory,
31+
F: ClientFact,
3432
P: ClientTransport<F::IO>,
3533
{
3634
pools: ArcSwapOption<ClusterConfig<F, P>>,
@@ -45,7 +43,7 @@ where
4543

4644
struct ClusterConfig<F, P>
4745
where
48-
F: ClientFactory,
46+
F: ClientFact,
4947
P: ClientTransport<F::IO>,
5048
{
5149
pools: Vec<ClientPool<FailoverPoolInner<F, P>, P>>,
@@ -54,7 +52,7 @@ where
5452

5553
impl<F, P> FailoverPool<F, P>
5654
where
57-
F: ClientFactory,
55+
F: ClientFact,
5856
P: ClientTransport<F::IO>,
5957
{
6058
pub fn new(
@@ -118,7 +116,7 @@ where
118116

119117
impl<F, P> ClusterConfig<F, P>
120118
where
121-
F: ClientFactory,
119+
F: ClientFact,
122120
P: ClientTransport<F::IO>,
123121
{
124122
#[inline]
@@ -148,7 +146,7 @@ where
148146

149147
impl<F, P> FailoverPoolInner<F, P>
150148
where
151-
F: ClientFactory,
149+
F: ClientFact,
152150
P: ClientTransport<F::IO>,
153151
{
154152
async fn retry_worker(
@@ -187,7 +185,7 @@ where
187185

188186
impl<F, P> Drop for FailoverPool<F, P>
189187
where
190-
F: ClientFactory,
188+
F: ClientFact,
191189
P: ClientTransport<F::IO>,
192190
{
193191
fn drop(&mut self) {
@@ -196,9 +194,9 @@ where
196194
}
197195
}
198196

199-
impl<F, P> ClientFactory for FailoverPoolInner<F, P>
197+
impl<F, P> ClientFact for FailoverPoolInner<F, P>
200198
where
201-
F: ClientFactory,
199+
F: ClientFact,
202200
P: ClientTransport<F::IO>,
203201
{
204202
type Logger = F::Logger;
@@ -244,7 +242,7 @@ where
244242

245243
impl<F, P> ClientCaller for FailoverPool<F, P>
246244
where
247-
F: ClientFactory,
245+
F: ClientFact,
248246
P: ClientTransport<F::IO>,
249247
{
250248
type Factory = F;
@@ -275,7 +273,7 @@ where
275273

276274
impl<F, P> ClientCallerBlocking for FailoverPool<F, P>
277275
where
278-
F: ClientFactory,
276+
F: ClientFact,
279277
P: ClientTransport<F::IO>,
280278
{
281279
type Factory = F;

stream/src/client/mod.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ use std::sync::Arc;
2424
use std::{fmt, io};
2525

2626
/// A trait implemented by the user for the client-side, to define the customizable plugin.
27-
pub trait ClientFactory: Send + Sync + Sized + 'static {
27+
pub trait ClientFact: Send + Sync + Sized + 'static {
2828
/// A [captains-log::filter::Filter](https://docs.rs/captains-log/latest/captains_log/filter/index.html) implementation
2929
/// The type that new_logger returns.
3030
///
31-
/// maybe a `Arc<LogFilter> or KeyFilter<Arc<LogFilter>>`
31+
/// maybe a `Arc<LogFilter> or KeyFilter<Arc<LogFilter>>, or DummyFilter`
3232
type Logger: Filter + Send + Sync + 'static;
3333

3434
/// Define the codec to serialization and deserialization
@@ -81,21 +81,21 @@ pub trait ClientFactory: Send + Sync + Sized + 'static {
8181
}
8282

8383
pub trait ClientCaller: Send {
84-
type Factory: ClientFactory;
84+
type Factory: ClientFact;
8585
fn send_req(
86-
&self, task: <Self::Factory as ClientFactory>::Task,
86+
&self, task: <Self::Factory as ClientFact>::Task,
8787
) -> impl Future<Output = ()> + Send;
8888
}
8989

9090
pub trait ClientCallerBlocking: Send {
91-
type Factory: ClientFactory;
92-
fn send_req_blocking(&self, task: <Self::Factory as ClientFactory>::Task);
91+
type Factory: ClientFact;
92+
fn send_req_blocking(&self, task: <Self::Factory as ClientFact>::Task);
9393
}
9494

9595
impl<C: ClientCaller + Send + Sync> ClientCaller for Arc<C> {
9696
type Factory = C::Factory;
9797
#[inline(always)]
98-
async fn send_req(&self, task: <Self::Factory as ClientFactory>::Task) {
98+
async fn send_req(&self, task: <Self::Factory as ClientFact>::Task) {
9999
self.as_ref().send_req(task).await
100100
}
101101
}
@@ -104,7 +104,7 @@ impl<C: ClientCallerBlocking + Send + Sync> ClientCallerBlocking for Arc<C> {
104104
type Factory = C::Factory;
105105

106106
#[inline(always)]
107-
fn send_req_blocking(&self, task: <Self::Factory as ClientFactory>::Task) {
107+
fn send_req_blocking(&self, task: <Self::Factory as ClientFact>::Task) {
108108
self.as_ref().send_req_blocking(task);
109109
}
110110
}
@@ -115,7 +115,7 @@ impl<C: ClientCallerBlocking + Send + Sync> ClientCallerBlocking for Arc<C> {
115115
///
116116
/// - [occams-rpc-tcp](https://docs.rs/occams-rpc-tcp): For TCP and Unix socket
117117
///
118-
/// NOTE: we use IO in generic param instead of ClientFactory to break cycle dep.
118+
/// NOTE: we use IO in generic param instead of ClientFact to break cycle dep.
119119
/// because FailoverPool will rewrap the factory into its own.
120120
pub trait ClientTransport<IO: AsyncIO>: fmt::Debug + Send + Sized + 'static {
121121
/// How to establish an async connection.
@@ -126,20 +126,20 @@ pub trait ClientTransport<IO: AsyncIO>: fmt::Debug + Send + Sized + 'static {
126126
) -> impl Future<Output = Result<Self, RpcIntErr>> + Send;
127127

128128
/// Shutdown the write direction of the connection
129-
fn close_conn<F: ClientFactory>(&self, logger: &F::Logger) -> impl Future<Output = ()> + Send;
129+
fn close_conn<F: ClientFact>(&self, logger: &F::Logger) -> impl Future<Output = ()> + Send;
130130

131131
/// Flush the request for the socket writer, if the transport has buffering logic
132-
fn flush_req<F: ClientFactory>(
132+
fn flush_req<F: ClientFact>(
133133
&self, logger: &F::Logger,
134134
) -> impl Future<Output = io::Result<()>> + Send;
135135

136136
/// Write out the encoded request task
137-
fn write_req<'a, F: ClientFactory>(
137+
fn write_req<'a, F: ClientFact>(
138138
&'a self, logger: &F::Logger, buf: &'a [u8], blob: Option<&'a [u8]>, need_flush: bool,
139139
) -> impl Future<Output = io::Result<()>> + Send;
140140

141141
/// Read the response and decode it from the socket, find and notify the registered ClientTask
142-
fn read_resp<F: ClientFactory>(
142+
fn read_resp<F: ClientFact>(
143143
&self, factory: &F, logger: &F::Logger, codec: &F::Codec, close_ch: Option<&MAsyncRx<()>>,
144144
task_reg: &mut ClientTaskTimer<F>,
145145
) -> impl std::future::Future<Output = Result<bool, RpcIntErr>> + Send;

0 commit comments

Comments
 (0)