Skip to content

Commit 403fb18

Browse files
committed
Simplify traits
Remove Facts::Logger, use fixed type Arc<LogFilter> Replace generic param from ClientTransport and ServerTransport with associate type IO, so that don't need to specifiy in ClientPool/FailoverPool The ClientFacts / ServerFacts inherit AsyncIO trait, through Deref and blanket trait
1 parent 4510f6c commit 403fb18

20 files changed

Lines changed: 318 additions & 342 deletions

File tree

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ build: init
5555
cargo build -p occams-rpc-stream-macros
5656
cargo build -p occams-rpc-stream
5757
cargo build -p occams-rpc-tcp
58-
cargo build -p occams-rpc-test
58+
# cargo build -p occams-rpc-test
5959
cargo build
6060

6161
.DEFAULT_GOAL = build

core/src/runtime.rs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub trait AsyncFdTrait<T: AsRawFd + AsFd + Send + Sync + 'static>:
3939
/// Defines the interface we used from async runtime
4040
///
4141
/// See module level doc: [crate::runtime]
42-
pub trait AsyncIO: Send + 'static {
42+
pub trait AsyncIO: Send + Sync + 'static {
4343
type Interval: TimeInterval;
4444

4545
type AsyncFd<T: AsRawFd + AsFd + Send + Sync + 'static>: AsyncFdTrait<T>;
@@ -73,6 +73,60 @@ pub trait AsyncIO: Send + 'static {
7373
fn to_async_fd_rw<T: AsRawFd + AsFd + Send + Sync + 'static>(
7474
fd: T,
7575
) -> io::Result<Self::AsyncFd<T>>;
76+
77+
/// You may spawn with globally runtime, or to a owned runtime executor
78+
fn spawn_detach<F, R>(&self, f: F)
79+
where
80+
F: Future<Output = R> + Send + 'static,
81+
R: Send + 'static;
82+
}
83+
84+
impl<F: std::ops::Deref<Target = IO> + Send + Sync + 'static, IO: AsyncIO> AsyncIO for F {
85+
type Interval = IO::Interval;
86+
87+
type AsyncFd<T: AsRawFd + AsFd + Send + Sync + 'static> = IO::AsyncFd<T>;
88+
89+
#[inline]
90+
fn sleep(d: Duration) -> impl Future + Send {
91+
IO::sleep(d)
92+
}
93+
94+
#[inline]
95+
fn tick(d: Duration) -> Self::Interval {
96+
IO::tick(d)
97+
}
98+
99+
fn connect_tcp(
100+
addr: &SocketAddr, timeout: Duration,
101+
) -> impl Future<Output = io::Result<Self::AsyncFd<TcpStream>>> + Send {
102+
IO::connect_tcp(addr, timeout)
103+
}
104+
105+
fn connect_unix(
106+
addr: &PathBuf, timeout: Duration,
107+
) -> impl Future<Output = io::Result<Self::AsyncFd<UnixStream>>> + Send {
108+
IO::connect_unix(addr, timeout)
109+
}
110+
111+
fn to_async_fd_rd<T: AsRawFd + AsFd + Send + Sync + 'static>(
112+
fd: T,
113+
) -> io::Result<Self::AsyncFd<T>> {
114+
IO::to_async_fd_rd(fd)
115+
}
116+
117+
fn to_async_fd_rw<T: AsRawFd + AsFd + Send + Sync + 'static>(
118+
fd: T,
119+
) -> io::Result<Self::AsyncFd<T>> {
120+
IO::to_async_fd_rw(fd)
121+
}
122+
123+
fn spawn_detach<FR, R>(&self, f: FR)
124+
where
125+
FR: Future<Output = R> + Send + 'static,
126+
R: Send + 'static,
127+
{
128+
IO::spawn_detach(self.deref(), f)
129+
}
76130
}
77131

78132
/// Defines the universal interval/ticker trait

runtime/smol/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ with a low-level streaming interface, and high-level remote API call interface.
1818
[dependencies]
1919
futures = "0.3"
2020
occams-rpc-core = { path = "../../core/", version="0" }
21+
occams-rpc-stream = { path = "../../stream", version="0" }
2122
async-io = "2.6"
2223
async-executor = "1"
2324
smol = {version="2", optional=true}

runtime/smol/src/lib.rs

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
//! It implements the [`AsyncIO`](https://docs.rs/occams-rpc-core/latest/occams_rpc_core/runtime/index.html) trait to support `async-io` of `smol`.
88
99
use async_executor::Executor;
10+
use async_io::{Async, Timer};
1011
use occams_rpc_core::io::*;
1112
use occams_rpc_core::runtime::{AsyncFdTrait, AsyncIO, TimeInterval};
1213
use std::future::Future;
@@ -22,8 +23,6 @@ use std::sync::Arc;
2223
use std::task::*;
2324
use std::time::{Duration, Instant};
2425

25-
use async_io::{Async, Timer};
26-
2726
/// The main struct for async-io, assign this type to AsyncIO trait when used:
2827
///
2928
/// - [ClientFactory::IO](https://occams-rpc-stream/latest/occams-rpc-stream/client/trait.ClientFactory.html)
@@ -42,25 +41,6 @@ impl SmolRT {
4241
pub fn new(executor: Arc<Executor<'static>>) -> Self {
4342
Self(Some(executor))
4443
}
45-
46-
#[inline]
47-
pub fn spawn_detach<F, R>(&self, f: F)
48-
where
49-
F: Future<Output = R> + Send + 'static,
50-
R: Send + 'static,
51-
{
52-
if let Some(executor) = self.0.as_ref() {
53-
executor.spawn(f).detach();
54-
} else {
55-
#[cfg(feature = "global")]
56-
{
57-
smol::spawn(f).detach();
58-
return;
59-
}
60-
#[cfg(not(feature = "global"))]
61-
unreachable!();
62-
}
63-
}
6444
}
6545

6646
impl AsyncIO for SmolRT {
@@ -112,6 +92,25 @@ impl AsyncIO for SmolRT {
11292
) -> io::Result<Self::AsyncFd<T>> {
11393
Ok(SmolFD(Async::new(fd)?))
11494
}
95+
96+
#[inline]
97+
fn spawn_detach<F, R>(&self, f: F)
98+
where
99+
F: Future<Output = R> + Send + 'static,
100+
R: Send + 'static,
101+
{
102+
if let Some(executor) = self.0.as_ref() {
103+
executor.spawn(f).detach();
104+
} else {
105+
#[cfg(feature = "global")]
106+
{
107+
smol::spawn(f).detach();
108+
return;
109+
}
110+
#[cfg(not(feature = "global"))]
111+
unreachable!();
112+
}
113+
}
115114
}
116115

117116
/// Associate type for SmolRT
@@ -153,3 +152,5 @@ impl<T: AsRawFd + AsFd + Send + Sync + 'static> Deref for SmolFD<T> {
153152
self.0.get_ref()
154153
}
155154
}
155+
156+
pub type ClientDefault<T, C> = occams_rpc_stream::client::ClientDefault<T, SmolRT, C>;

runtime/tokio/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ with a low-level streaming interface, and high-level remote API call interface.
1818
[dependencies]
1919
tokio = { version = "1", features = ["net", "time", "rt"] }
2020
occams-rpc-core = { path = "../../core/", version="0" }
21+
occams-rpc-stream = { path = "../../stream", version="0" }
2122

2223
[package.metadata.docs.rs]
2324
all-features = true

runtime/tokio/src/lib.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,6 @@ impl TokioRT {
3333
pub fn new(handle: Handle) -> Self {
3434
Self(handle)
3535
}
36-
37-
#[inline]
38-
pub fn spawn_detach<F, R>(&self, f: F)
39-
where
40-
F: Future<Output = R> + Send + 'static,
41-
R: Send + 'static,
42-
{
43-
self.0.spawn(f);
44-
}
4536
}
4637

4738
impl AsyncIO for TokioRT {
@@ -94,6 +85,15 @@ impl AsyncIO for TokioRT {
9485
use tokio::io::Interest;
9586
Ok(TokioFD(io::unix::AsyncFd::with_interest(fd, Interest::READABLE | Interest::WRITABLE)?))
9687
}
88+
89+
#[inline]
90+
fn spawn_detach<F, R>(&self, f: F)
91+
where
92+
F: Future<Output = R> + Send + 'static,
93+
R: Send + 'static,
94+
{
95+
self.0.spawn(f);
96+
}
9797
}
9898

9999
/// Associate type for TokioRT
@@ -134,3 +134,5 @@ impl<T: AsRawFd + AsFd + Send + Sync + 'static> Deref for TokioFD<T> {
134134
self.0.get_ref()
135135
}
136136
}
137+
138+
pub type ClientDefault<T, C> = occams_rpc_stream::client::ClientDefault<T, TokioRT, C>;

src/client/facts.rs

Lines changed: 0 additions & 52 deletions
This file was deleted.

src/client/mod.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
#[cfg(any(feature = "tokio", feature = "smol"))]
2-
mod facts;
3-
#[cfg(any(feature = "tokio", feature = "smol"))]
4-
pub use facts::*;
1+
#[cfg(feature = "tokio")]
2+
pub type APIClientDefault<C: Codec> = occams_rpc_tokio::ClientDefault<C, APIClientReq>;
3+
#[cfg(all(not(feature = "tokio"), feature = "smol"))]
4+
pub type APIClientDefault<C: Codec> = occams_rpc_smol::ClientDefault<C, APIClientReq>;
55

66
mod task;
77
pub use task::APIClientReq;
@@ -17,14 +17,16 @@ use occams_rpc_stream::client::{
1717
use std::fmt;
1818
use std::sync::Arc;
1919

20+
pub type ClientDefault<IO, C> = occams_rpc_stream::client::ClientDefault<APIClientReq, IO, C>;
21+
2022
pub trait APIClientFacts: ClientFacts<Task = APIClientReq> {
21-
fn create_endpoint_async<T: ClientTransport<<Self as ClientFacts>::IO>>(
23+
fn create_endpoint_async<T: ClientTransport>(
2224
self: Arc<Self>, addr: &str,
2325
) -> AsyncEndpoint<ClientPool<Self, T>> {
2426
return AsyncEndpoint::new(ClientPool::new(self.clone(), addr, 0));
2527
}
2628

27-
fn create_endpoint_async_failover<T: ClientTransport<<Self as ClientFacts>::IO>>(
29+
fn create_endpoint_async_failover<T: ClientTransport>(
2830
self: Arc<Self>, addrs: Vec<String>, round_robin: bool, retry_limit: usize,
2931
) -> AsyncEndpoint<Arc<FailoverPool<Self, T>>> {
3032
return AsyncEndpoint::new(Arc::new(FailoverPool::new(
@@ -36,13 +38,13 @@ pub trait APIClientFacts: ClientFacts<Task = APIClientReq> {
3638
)));
3739
}
3840

39-
fn create_endpoint_blocking<T: ClientTransport<<Self as ClientFacts>::IO>>(
41+
fn create_endpoint_blocking<T: ClientTransport>(
4042
self: Arc<Self>, addr: &str,
4143
) -> BlockingEndpoint<ClientPool<Self, T>> {
4244
return BlockingEndpoint::new(ClientPool::new(self.clone(), addr, 0));
4345
}
4446

45-
fn create_endpoint_blocking_failover<T: ClientTransport<<Self as ClientFacts>::IO>>(
47+
fn create_endpoint_blocking_failover<T: ClientTransport>(
4648
self: Arc<Self>, addrs: Vec<String>, round_robin: bool, retry_limit: usize,
4749
) -> BlockingEndpoint<Arc<FailoverPool<Self, T>>> {
4850
return BlockingEndpoint::new(Arc::new(FailoverPool::new(

0 commit comments

Comments
 (0)