Skip to content

Commit 4c0eaca

Browse files
committed
convert Server::bind to accept a normal service factory
1 parent 81421c2 commit 4c0eaca

17 files changed

+278
-129
lines changed

actix-server/CHANGES.md

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
# Changes
22

33
## Unreleased - 2021-xx-xx
4-
* Rename `Server` to `ServerHandle`. [#???]
5-
* Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#???]
4+
* Rename `Server` to `ServerHandle`. [#403]
5+
* Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#403]
6+
* Remove wrapper `service::ServiceFactory` trait. [#403]
7+
* `Server::bind` and related methods now take a regular `ServiceFactory` (from actix-service crate). [#403]
68
* Minimum supported Rust version (MSRV) is now 1.52.
79

8-
[#???]: https://github.com/actix/actix-net/pull/???
10+
[#403]: https://github.com/actix/actix-net/pull/403
911

1012

1113
## 2.0.0-beta.6 - 2021-10-11

actix-server/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,4 @@ actix-rt = "2.0.0"
3838
bytes = "1"
3939
env_logger = "0.9"
4040
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
41-
tokio = { version = "1.5.1", features = ["io-util"] }
41+
tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] }

actix-server/examples/startup-fail.rs

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
use std::io;
2+
3+
use actix_rt::net::TcpStream;
4+
use actix_server::Server;
5+
use actix_service::fn_service;
6+
use log::info;
7+
8+
#[actix_rt::main]
9+
async fn main() -> io::Result<()> {
10+
env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("trace,mio=info"))
11+
.init();
12+
13+
let addr = ("127.0.0.1", 8080);
14+
info!("starting server on port: {}", &addr.0);
15+
16+
Server::build()
17+
.bind(
18+
"startup-fail",
19+
addr,
20+
fn_service(move |mut _stream: TcpStream| async move { Ok::<u32, u32>(42) }),
21+
)?
22+
.workers(2)
23+
.run()
24+
.await
25+
}

actix-server/examples/tcp-echo.rs

+9-6
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use actix_rt::net::TcpStream;
2121
use actix_server::Server;
2222
use actix_service::{fn_service, ServiceFactoryExt as _};
2323
use bytes::BytesMut;
24-
use futures_util::future::ok;
2524
use log::{error, info};
2625
use tokio::io::{AsyncReadExt, AsyncWriteExt};
2726

@@ -39,11 +38,11 @@ async fn main() -> io::Result<()> {
3938
// logical CPU cores as the worker count. For this reason, the closure passed to bind needs
4039
// to return a service *factory*; so it can be created once per worker.
4140
Server::build()
42-
.bind("echo", addr, move || {
41+
.bind("echo", addr, {
4342
let count = Arc::clone(&count);
4443
let num2 = Arc::clone(&count);
4544

46-
fn_service(move |mut stream: TcpStream| {
45+
let svc = fn_service(move |mut stream: TcpStream| {
4746
let count = Arc::clone(&count);
4847

4948
async move {
@@ -78,11 +77,15 @@ async fn main() -> io::Result<()> {
7877
}
7978
})
8079
.map_err(|err| error!("Service Error: {:?}", err))
81-
.and_then(move |(_, size)| {
80+
.and_then_send(move |(_, size)| {
8281
let num = num2.load(Ordering::SeqCst);
8382
info!("[{}] total bytes read: {}", num, size);
84-
ok(size)
85-
})
83+
async move { Ok(size) }
84+
});
85+
86+
let svc2 = svc.clone();
87+
88+
svc2
8689
})?
8790
.workers(1)
8891
.run()

actix-server/src/builder.rs

+48-32
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::{
2+
fmt,
23
future::Future,
34
io, mem,
45
pin::Pin,
@@ -7,32 +8,25 @@ use std::{
78
};
89

910
use actix_rt::{self as rt, net::TcpStream, time::sleep, System};
11+
use actix_service::ServiceFactory;
1012
use log::{error, info};
1113
use tokio::sync::{
1214
mpsc::{unbounded_channel, UnboundedReceiver},
1315
oneshot,
1416
};
1517

16-
use crate::accept::AcceptLoop;
17-
use crate::join_all;
18-
use crate::server::{ServerCommand, ServerHandle};
19-
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
20-
use crate::signals::{Signal, Signals};
21-
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
22-
use crate::socket::{MioTcpListener, MioTcpSocket};
23-
use crate::waker_queue::{WakerInterest, WakerQueue};
24-
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer};
25-
26-
#[derive(Debug)]
27-
#[non_exhaustive]
28-
pub struct Server;
29-
30-
impl Server {
31-
/// Start server building process.
32-
pub fn build() -> ServerBuilder {
33-
ServerBuilder::default()
34-
}
35-
}
18+
use crate::{
19+
accept::AcceptLoop,
20+
join_all,
21+
server::{ServerCommand, ServerHandle},
22+
service::{InternalServiceFactory, StreamNewService},
23+
signals::{Signal, Signals},
24+
socket::{
25+
MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs,
26+
},
27+
waker_queue::{WakerInterest, WakerQueue},
28+
worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer},
29+
};
3630

3731
/// Server builder
3832
pub struct ServerBuilder {
@@ -169,38 +163,48 @@ impl ServerBuilder {
169163
/// Binds to all network interface addresses that resolve from the `addr` argument.
170164
/// Eg. using `localhost` might bind to both IPv4 and IPv6 addresses. Bind to multiple distinct
171165
/// interfaces at the same time by passing a list of socket addresses.
172-
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
166+
pub fn bind<F, U, InitErr>(
167+
mut self,
168+
name: impl AsRef<str>,
169+
addr: U,
170+
factory: F,
171+
) -> io::Result<Self>
173172
where
174-
F: ServiceFactory<TcpStream>,
173+
F: ServiceFactory<TcpStream, Config = (), InitError = InitErr> + Send + Clone + 'static,
174+
InitErr: fmt::Debug + Send + 'static,
175175
U: ToSocketAddrs,
176176
{
177177
let sockets = bind_addr(addr, self.backlog)?;
178178

179179
for lst in sockets {
180180
let token = self.next_token();
181+
181182
self.services.push(StreamNewService::create(
182183
name.as_ref().to_string(),
183184
token,
184185
factory.clone(),
185186
lst.local_addr()?,
186187
));
188+
187189
self.sockets
188190
.push((token, name.as_ref().to_string(), MioListener::Tcp(lst)));
189191
}
192+
190193
Ok(self)
191194
}
192195

193196
/// Bind server to existing TCP listener.
194197
///
195198
/// Useful when running as a systemd service and a socket FD can be passed to the process.
196-
pub fn listen<F, N: AsRef<str>>(
199+
pub fn listen<F, InitErr>(
197200
mut self,
198-
name: N,
201+
name: impl AsRef<str>,
199202
lst: StdTcpListener,
200203
factory: F,
201204
) -> io::Result<Self>
202205
where
203-
F: ServiceFactory<TcpStream>,
206+
F: ServiceFactory<TcpStream, Config = (), InitError = InitErr> + Send + Clone + 'static,
207+
InitErr: fmt::Debug + Send + 'static,
204208
{
205209
lst.set_nonblocking(true)?;
206210

@@ -259,7 +263,7 @@ impl ServerBuilder {
259263
Signals::start(self.server.clone());
260264
}
261265

262-
// start http server actor
266+
// start http server
263267
let server = self.server.clone();
264268
rt::spawn(self);
265269
server
@@ -402,11 +406,19 @@ impl ServerBuilder {
402406
#[cfg(unix)]
403407
impl ServerBuilder {
404408
/// Add new unix domain service to the server.
405-
pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self>
409+
pub fn bind_uds<F, U, InitErr>(
410+
self,
411+
name: impl AsRef<str>,
412+
addr: U,
413+
factory: F,
414+
) -> io::Result<Self>
406415
where
407-
F: ServiceFactory<actix_rt::net::UnixStream>,
408-
N: AsRef<str>,
416+
F: ServiceFactory<actix_rt::net::UnixStream, Config = (), InitError = InitErr>
417+
+ Send
418+
+ Clone
419+
+ 'static,
409420
U: AsRef<std::path::Path>,
421+
InitErr: fmt::Debug + Send + 'static,
410422
{
411423
// The path must not exist when we try to bind.
412424
// Try to remove it to avoid bind error.
@@ -424,14 +436,18 @@ impl ServerBuilder {
424436
/// Add new unix domain service to the server.
425437
///
426438
/// Useful when running as a systemd service and a socket FD can be passed to the process.
427-
pub fn listen_uds<F, N: AsRef<str>>(
439+
pub fn listen_uds<F, InitErr>(
428440
mut self,
429-
name: N,
441+
name: impl AsRef<str>,
430442
lst: crate::socket::StdUnixListener,
431443
factory: F,
432444
) -> io::Result<Self>
433445
where
434-
F: ServiceFactory<actix_rt::net::UnixStream>,
446+
F: ServiceFactory<actix_rt::net::UnixStream, Config = (), InitError = InitErr>
447+
+ Send
448+
+ Clone
449+
+ 'static,
450+
InitErr: fmt::Debug + Send + 'static,
435451
{
436452
use std::net::{IpAddr, Ipv4Addr};
437453

actix-server/src/lib.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,8 @@ mod test_server;
1414
mod waker_queue;
1515
mod worker;
1616

17-
pub use self::builder::{Server, ServerBuilder};
18-
pub use self::server::{ServerHandle};
19-
pub use self::service::ServiceFactory;
17+
pub use self::builder::ServerBuilder;
18+
pub use self::server::{Server, ServerHandle};
2019
pub use self::test_server::TestServer;
2120

2221
#[doc(hidden)]

actix-server/src/server.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,18 @@ use std::task::{Context, Poll};
66
use tokio::sync::mpsc::UnboundedSender;
77
use tokio::sync::oneshot;
88

9-
use crate::signals::Signal;
9+
use crate::{signals::Signal, ServerBuilder};
10+
11+
#[derive(Debug)]
12+
#[non_exhaustive]
13+
pub struct Server;
14+
15+
impl Server {
16+
/// Start server building process.
17+
pub fn build() -> ServerBuilder {
18+
ServerBuilder::default()
19+
}
20+
}
1021

1122
#[derive(Debug)]
1223
pub(crate) enum ServerCommand {

actix-server/src/service.rs

+25-34
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,19 @@
1-
use std::marker::PhantomData;
2-
use std::net::SocketAddr;
3-
use std::task::{Context, Poll};
4-
5-
use actix_service::{Service, ServiceFactory as BaseServiceFactory};
1+
use std::{
2+
fmt,
3+
marker::PhantomData,
4+
net::SocketAddr,
5+
task::{Context, Poll},
6+
};
7+
8+
use actix_service::{Service, ServiceFactory};
69
use actix_utils::future::{ready, Ready};
710
use futures_core::future::LocalBoxFuture;
811
use log::error;
912

10-
use crate::socket::{FromStream, MioStream};
11-
use crate::worker::WorkerCounterGuard;
12-
13-
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
14-
type Factory: BaseServiceFactory<Stream, Config = ()>;
15-
16-
fn create(&self) -> Self::Factory;
17-
}
13+
use crate::{
14+
socket::{FromStream, MioStream},
15+
worker::WorkerCounterGuard,
16+
};
1817

1918
pub(crate) trait InternalServiceFactory: Send {
2019
fn name(&self, token: usize) -> &str;
@@ -80,17 +79,18 @@ where
8079
}
8180
}
8281

83-
pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
82+
pub(crate) struct StreamNewService<F, Io, InitErr> {
8483
name: String,
8584
inner: F,
8685
token: usize,
8786
addr: SocketAddr,
88-
_t: PhantomData<Io>,
87+
_t: PhantomData<(Io, InitErr)>,
8988
}
9089

91-
impl<F, Io> StreamNewService<F, Io>
90+
impl<F, Io, InitErr> StreamNewService<F, Io, InitErr>
9291
where
93-
F: ServiceFactory<Io>,
92+
F: ServiceFactory<Io, Config = (), InitError = InitErr> + Send + Clone + 'static,
93+
InitErr: fmt::Debug + Send + 'static,
9494
Io: FromStream + Send + 'static,
9595
{
9696
pub(crate) fn create(
@@ -109,9 +109,10 @@ where
109109
}
110110
}
111111

112-
impl<F, Io> InternalServiceFactory for StreamNewService<F, Io>
112+
impl<F, Io, InitErr> InternalServiceFactory for StreamNewService<F, Io, InitErr>
113113
where
114-
F: ServiceFactory<Io>,
114+
F: ServiceFactory<Io, Config = (), InitError = InitErr> + Send + Clone + 'static,
115+
InitErr: fmt::Debug + Send + 'static,
115116
Io: FromStream + Send + 'static,
116117
{
117118
fn name(&self, _: usize) -> &str {
@@ -130,28 +131,18 @@ where
130131

131132
fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> {
132133
let token = self.token;
133-
let fut = self.inner.create().new_service(());
134+
let fut = self.inner.new_service(());
134135
Box::pin(async move {
135136
match fut.await {
136137
Ok(inner) => {
137138
let service = Box::new(StreamService::new(inner)) as _;
138139
Ok((token, service))
139140
}
140-
Err(_) => Err(()),
141+
Err(err) => {
142+
error!("{:?}", err);
143+
Err(())
144+
}
141145
}
142146
})
143147
}
144148
}
145-
146-
impl<F, T, I> ServiceFactory<I> for F
147-
where
148-
F: Fn() -> T + Send + Clone + 'static,
149-
T: BaseServiceFactory<I, Config = ()>,
150-
I: FromStream,
151-
{
152-
type Factory = T;
153-
154-
fn create(&self) -> T {
155-
(self)()
156-
}
157-
}

0 commit comments

Comments
 (0)