Skip to content

Commit 1a317d7

Browse files
committed
Rename ServerFactory->ServerFact and decouple ServerTransport
1 parent e11db27 commit 1a317d7

5 files changed

Lines changed: 116 additions & 129 deletions

File tree

stream/src/server/dispatch.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use super::task::*;
33
use occams_rpc_core::Codec;
44
use std::marker::PhantomData;
55

6-
/// ReqDispatch should be a user-defined struct initialized for every connection, by ServerFactory::new_dispatcher.
6+
/// ReqDispatch should be a user-defined struct initialized for every connection, by ServerFact::new_dispatcher.
77
///
88
/// ReqDispatch must have Sync, because the connection reader and writer access concurrently.
99
///
@@ -31,8 +31,8 @@ pub trait ReqDispatch<R: ServerTaskResp>: Send + Sync + Sized + 'static {
3131
/// # Example
3232
///
3333
/// ```no_compile,ignore
34-
/// use occams_rpc_stream::server::{ServerFactory, ReqDispatch};
35-
/// impl ServerFactory for YourServer {
34+
/// use occams_rpc_stream::server::{ServerFact, ReqDispatch};
35+
/// impl ServerFact for YourServer {
3636
///
3737
/// ...
3838
///

stream/src/server/mod.rs

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,19 @@ use occams_rpc_core::{Codec, error::*, io::*, runtime::AsyncIO};
1919
use std::{fmt, future::Future, io, sync::Arc};
2020

2121
/// A central hub defined by the user for the server-side, to define the customizable plugin.
22-
pub trait ServerFactory: Sync + Send + 'static + Sized {
22+
pub trait ServerFact: Sync + Send + 'static + Sized {
2323
/// A [captains-log::filter::Filter](https://docs.rs/captains-log/latest/captains_log/filter/index.html) implementation.
2424
/// The type that new_logger returns.
2525
///
2626
/// maybe a `Arc<LogFilter> or KeyFilter<Arc<LogFilter>>`
27-
type Logger: Filter + Send + 'static;
28-
29-
/// Define the transport layer protocol
30-
///
31-
/// Refers to [ServerTransport]
32-
type Transport: ServerTransport<Self>;
27+
type Logger: Filter + Send + Sync + 'static;
3328

3429
/// Define the adaptor of async runtime
3530
///
3631
/// Refers to [occams_rpc_core::runtime::AsyncIO](https://docs.rs/occams-rpc-core/latest/occams_rpc_core/runtime/index.html)
3732
type IO: AsyncIO;
3833

39-
/// You should keep ServerConfig inside ServerFactory, get_config() will return the reference.
34+
/// You should keep ServerConfig inside ServerFact, get_config() will return the reference.
4035
fn get_config(&self) -> &ServerConfig;
4136

4237
/// Construct a [captains_log::filter::Filter](https://docs.rs/captains-log/latest/captains_log/filter/trait.Filter.html) to oganize log of a client
@@ -65,37 +60,36 @@ pub trait ServerFactory: Sync + Send + 'static + Sized {
6560
/// The implementation can be found on:
6661
///
6762
/// - [occams-rpc-tcp](https://docs.rs/occams-rpc-tcp): For TCP and Unix socket
68-
pub trait ServerTransport<F: ServerFactory>: Send + Sync + Sized + 'static + fmt::Debug {
63+
pub trait ServerTransport<IO: AsyncIO>: Send + Sync + Sized + 'static + fmt::Debug {
6964
type Listener: AsyncListener;
7065

71-
/// The ServerTransport holds a logger, the server will use it by reference.
72-
fn get_logger(&self) -> &F::Logger;
73-
7466
/// The implementation is expected to store the conn_count until dropped
7567
fn new_conn(
76-
stream: <Self::Listener as AsyncListener>::Conn, f: &F, conn_count: Arc<()>,
68+
stream: <Self::Listener as AsyncListener>::Conn, config: &ServerConfig, conn_count: Arc<()>,
7769
) -> Self;
7870

7971
/// Read a request from the socket
80-
fn read_req<'a>(
81-
&'a self, close_ch: &crossfire::MAsyncRx<()>,
72+
fn read_req<'a, F: ServerFact>(
73+
&'a self, logger: &F::Logger, close_ch: &crossfire::MAsyncRx<()>,
8274
) -> impl Future<Output = Result<RpcSvrReq<'a>, RpcIntErr>> + Send;
8375

8476
/// Write our user task response
85-
fn write_resp<C: Codec, T: ServerTaskEncode>(
86-
&self, codec: &C, task: T,
77+
fn write_resp<F: ServerFact, T: ServerTaskEncode>(
78+
&self, logger: &F::Logger, codec: &impl Codec, task: T,
8779
) -> impl Future<Output = io::Result<()>> + Send;
8880

8981
/// Write out ping resp or error
90-
fn write_resp_internal(
91-
&self, seq: u64, err: Option<RpcIntErr>,
82+
fn write_resp_internal<F: ServerFact>(
83+
&self, logger: &F::Logger, seq: u64, err: Option<RpcIntErr>,
9284
) -> impl Future<Output = io::Result<()>> + Send;
9385

9486
/// Flush the response for the socket writer, if the transport has buffering logic
95-
fn flush_resp(&self) -> impl Future<Output = io::Result<()>> + Send;
87+
fn flush_resp<F: ServerFact>(
88+
&self, logger: &F::Logger,
89+
) -> impl Future<Output = io::Result<()>> + Send;
9690

9791
/// Shutdown the write direction of the connection
98-
fn close_conn(&self) -> impl Future<Output = ()> + Send;
92+
fn close_conn<F: ServerFact>(&self, logger: &F::Logger) -> impl Future<Output = ()> + Send;
9993
}
10094

10195
/// A temporary struct to hold data buffer return by ServerTransport

stream/src/server/server.rs

Lines changed: 50 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ use std::io;
66
use std::sync::{Arc, Mutex};
77
use std::time::{Duration, Instant};
88

9-
/// An RpcServer that listen, accept, and server connections, according to ServerFactory interface.
9+
/// An RpcServer that listen, accept, and server connections, according to ServerFact interface.
1010
pub struct RpcServer<F>
1111
where
12-
F: ServerFactory,
12+
F: ServerFact,
1313
{
1414
listeners_abort: Vec<(AbortHandle, String)>,
1515
logger: F::Logger,
@@ -21,7 +21,7 @@ where
2121

2222
impl<F> RpcServer<F>
2323
where
24-
F: ServerFactory,
24+
F: ServerFact,
2525
{
2626
pub fn new(factory: Arc<F>) -> Self {
2727
let (tx, rx) = crossfire::mpmc::unbounded_async();
@@ -35,8 +35,8 @@ where
3535
}
3636
}
3737

38-
pub fn listen(&mut self, addr: &str) -> io::Result<String> {
39-
match <<F::Transport as ServerTransport<F>>::Listener as AsyncListener>::bind(addr) {
38+
pub fn listen<T: ServerTransport<F::IO>>(&mut self, addr: &str) -> io::Result<String> {
39+
match <T::Listener as AsyncListener>::bind(addr) {
4040
Err(e) => {
4141
error!("bind addr {:?} err: {}", addr, e);
4242
return Err(e);
@@ -68,12 +68,12 @@ where
6868
return;
6969
}
7070
Ok(stream) => {
71-
let conn = F::Transport::new_conn(
71+
let conn = T::new_conn(
7272
stream,
73-
&factory,
73+
factory.get_config(),
7474
conn_ref_count.clone(),
7575
);
76-
Self::server_conn(conn, &factory, server_close_rx.clone())
76+
Self::server_conn::<T>(conn, &factory, server_close_rx.clone())
7777
}
7878
}
7979
}
@@ -88,24 +88,34 @@ where
8888
}
8989
}
9090

91-
fn server_conn(conn: F::Transport, factory: &F, server_close_rx: crossfire::MAsyncRx<()>) {
91+
fn server_conn<T: ServerTransport<F::IO>>(
92+
conn: T, factory: &F, server_close_rx: crossfire::MAsyncRx<()>,
93+
) {
9294
let conn = Arc::new(conn);
9395

9496
let dispatch = Arc::new(factory.new_dispatcher());
9597
let (done_tx, done_rx) = crossfire::mpsc::unbounded_async();
9698

9799
let noti = RespNoti(done_tx);
98-
struct Reader<F: ServerFactory, D: ReqDispatch<R>, R: ServerTaskResp> {
100+
struct Reader<
101+
T: ServerTransport<F::IO>,
102+
F: ServerFact,
103+
D: ReqDispatch<R>,
104+
R: ServerTaskResp,
105+
> {
99106
noti: RespNoti<R>,
100-
conn: Arc<F::Transport>,
107+
conn: Arc<T>,
101108
server_close_rx: crossfire::MAsyncRx<()>,
102109
dispatch: Arc<D>,
110+
logger: F::Logger,
103111
}
104112

105-
impl<F: ServerFactory, D: ReqDispatch<R>, R: ServerTaskResp> Reader<F, D, R> {
113+
impl<T: ServerTransport<F::IO>, F: ServerFact, D: ReqDispatch<R>, R: ServerTaskResp>
114+
Reader<T, F, D, R>
115+
{
106116
async fn run(self) -> Result<(), ()> {
107117
loop {
108-
match self.conn.read_req(&self.server_close_rx).await {
118+
match self.conn.read_req::<F>(&self.logger, &self.server_close_rx).await {
109119
Ok(req) => {
110120
if req.action == RpcAction::Num(0) && req.msg.len() == 0 {
111121
// ping request
@@ -129,41 +139,52 @@ where
129139
#[inline]
130140
fn send_quick_resp(&self, seq: u64, err: Option<RpcIntErr>) -> Result<(), ()> {
131141
if self.noti.send_err(seq, err).is_err() {
132-
logger_warn!(
133-
self.conn.get_logger(),
134-
"{:?} reader abort due to writer has err",
135-
self.conn
136-
);
142+
logger_warn!(self.logger, "{:?} reader abort due to writer has err", self.conn);
137143
return Err(());
138144
}
139145
Ok(())
140146
}
141147
}
142-
let reader = Reader::<F, _, _> {
148+
let reader = Reader::<T, F, _, _> {
143149
noti,
144150
conn: conn.clone(),
145151
server_close_rx,
146152
dispatch: dispatch.clone(),
153+
logger: factory.new_logger(),
147154
};
148155
factory.spawn_detach(async move { reader.run().await });
149156

150-
struct Writer<F: ServerFactory, D: ReqDispatch<R>, R: ServerTaskResp> {
157+
struct Writer<
158+
T: ServerTransport<F::IO>,
159+
F: ServerFact,
160+
D: ReqDispatch<R>,
161+
R: ServerTaskResp,
162+
> {
151163
dispatch: Arc<D>,
152164
done_rx: crossfire::AsyncRx<Result<R, (u64, Option<RpcIntErr>)>>,
153-
conn: Arc<F::Transport>,
165+
conn: Arc<T>,
166+
logger: F::Logger,
154167
}
155168

156-
impl<F: ServerFactory, D: ReqDispatch<R>, R: ServerTaskResp> Writer<F, D, R> {
169+
impl<T: ServerTransport<F::IO>, F: ServerFact, D: ReqDispatch<R>, R: ServerTaskResp>
170+
Writer<T, F, D, R>
171+
{
157172
async fn run(self) -> Result<(), io::Error> {
158173
macro_rules! process {
159174
($task: expr) => {{
160175
match $task {
161176
Ok(_task) => {
162-
logger_trace!(self.conn.get_logger(), "write_resp {:?}", _task);
163-
self.conn.write_resp(self.dispatch.get_codec(), _task).await?;
177+
logger_trace!(self.logger, "write_resp {:?}", _task);
178+
self.conn
179+
.write_resp::<F, R>(
180+
&self.logger,
181+
self.dispatch.get_codec(),
182+
_task,
183+
)
184+
.await?;
164185
}
165186
Err((seq, err)) => {
166-
self.conn.write_resp_internal(seq, err).await?;
187+
self.conn.write_resp_internal::<F>(&self.logger, seq, err).await?;
167188
}
168189
}
169190
}};
@@ -173,14 +194,14 @@ where
173194
while let Ok(task) = self.done_rx.try_recv() {
174195
process!(task);
175196
}
176-
self.conn.flush_resp().await?;
197+
self.conn.flush_resp::<F>(&self.logger).await?;
177198
}
178-
logger_trace!(self.conn.get_logger(), "{:?} writer exits", self.conn);
179-
self.conn.close_conn().await;
199+
logger_trace!(self.logger, "{:?} writer exits", self.conn);
200+
self.conn.close_conn::<F>(&self.logger).await;
180201
Ok(())
181202
}
182203
}
183-
let writer = Writer::<F, _, _> { done_rx, conn, dispatch };
204+
let writer = Writer::<T, F, _, _> { done_rx, conn, dispatch, logger: factory.new_logger() };
184205
factory.spawn_detach(async move { writer.run().await });
185206
}
186207

stream/test/src/server.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use super::client::{FileAction, FileIOReq, FileIOResp, FileOpenReq};
22
use occams_rpc_codec::MsgpCodec;
33
use occams_rpc_stream::server::{dispatch::*, task::*, *};
4+
use occams_rpc_tcp::TcpServer;
45
use std::sync::Arc;
56

67
use captains_log::filter::LogFilter;
@@ -15,7 +16,7 @@ where
1516
{
1617
let factory = Arc::new(FileServer::new(server_handle, config));
1718
let mut server = RpcServer::new(factory);
18-
let local_addr = server.listen(addr)?;
19+
let local_addr = server.listen::<TcpServer<crate::RT>>(addr)?;
1920
Ok((server, local_addr))
2021
}
2122

@@ -39,15 +40,13 @@ where
3940
}
4041
}
4142

42-
impl<H, FH> ServerFactory for FileServer<H, FH>
43+
impl<H, FH> ServerFact for FileServer<H, FH>
4344
where
4445
H: FnOnce(FileServerTask) -> FH + Send + Sync + 'static + Clone,
4546
FH: Future<Output = Result<(), ()>> + Send + 'static,
4647
{
4748
type Logger = Arc<LogFilter>;
4849

49-
type Transport = occams_rpc_tcp::TcpServer<Self>;
50-
5150
type IO = crate::RT;
5251

5352
type RespTask = FileServerTask;

0 commit comments

Comments
 (0)