Skip to content

Commit 90971ad

Browse files
committed
Add event handler support to legacy client pool
Enable tracking connection events (new, closed, error, timeout) in the legacy client pool via an EventHandler. Expose PoolKey and event types for user statistics. Update Builder to accept an event handler. Notify on relevant connection lifecycle events. Signed-off-by: Nicola Bonelli <nicola.bonelli@huawei-partners.com>
1 parent cecf97c commit 90971ad

File tree

5 files changed

+216
-72
lines changed

5 files changed

+216
-72
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ members = [
88
"orion-lib",
99
"orion-proxy",
1010
"orion-xds",
11+
"orion-client",
1112
]
1213

1314

@@ -31,6 +32,7 @@ orion-error = { path = "orion-error" }
3132
orion-format = { path = "orion-format" }
3233
orion-lib = { path = "orion-lib" }
3334
orion-xds = { path = "orion-xds" }
35+
orion-client = { path = "orion-client" }
3436

3537
abort-on-drop = "0.2"
3638
bytes = "1"

orion-client/Cargo.toml

Lines changed: 55 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,26 @@
11
[package]
2+
authors = [
3+
"Sean McArthur <sean@seanmonstar.com>, Nicola Bonelli <nicola.bonelli@huawei.com>",
4+
]
5+
categories = [
6+
"network-programming",
7+
"web-programming::http-client",
8+
"web-programming::http-server",
9+
]
10+
description = "hyper utilities (orion-client)"
11+
documentation = "https://docs.rs/hyper-util"
12+
edition = "2021"
13+
homepage = "https://hyper.rs"
14+
keywords = ["http", "hyper", "hyperium"]
15+
license = "MIT"
216
name = "hyper-util"
3-
version = "0.1.15"
4-
description = "hyper utilities"
517
readme = "README.md"
6-
homepage = "https://hyper.rs"
7-
documentation = "https://docs.rs/hyper-util"
818
repository = "https://github.com/hyperium/hyper-util"
9-
license = "MIT"
10-
authors = ["Sean McArthur <sean@seanmonstar.com>"]
11-
keywords = ["http", "hyper", "hyperium"]
12-
categories = ["network-programming", "web-programming::http-client", "web-programming::http-server"]
13-
edition = "2021"
1419
rust-version = "1.63"
20+
version = "0.1.15"
1521

1622
[package.metadata.docs.rs]
17-
features = ["full"]
23+
features = ["full"]
1824
rustdoc-args = ["--cfg", "docsrs"]
1925

2026
[dependencies]
@@ -30,19 +36,24 @@ ipnet = { version = "2.9", optional = true }
3036
libc = { version = "0.2", optional = true }
3137
percent-encoding = { version = "2.3", optional = true }
3238
pin-project-lite = "0.2.4"
39+
scopeguard = "1.2.0"
3340
socket2 = { version = "0.5.9", optional = true, features = ["all"] }
34-
tracing = { version = "0.1", default-features = false, features = ["std"], optional = true }
35-
tokio = { version = "1", optional = true, default-features = false }
41+
tokio = { version = "1", optional = true, default-features = false }
3642
tower-service = { version = "0.3", optional = true }
43+
tracing = { version = "0.1", default-features = false, features = [
44+
"std",
45+
], optional = true }
3746

3847
[dev-dependencies]
39-
hyper = { version = "1.4.0", features = ["full"] }
4048
bytes = "1"
41-
futures-util = { version = "0.3.16", default-features = false, features = ["alloc"] }
49+
futures-util = { version = "0.3.16", default-features = false, features = [
50+
"alloc",
51+
] }
4252
http-body-util = "0.1.0"
43-
tokio = { version = "1", features = ["macros", "test-util", "signal"] }
44-
tokio-test = "0.4"
53+
hyper = { version = "1.4.0", features = ["full"] }
4554
pretty_env_logger = "0.5"
55+
tokio = { version = "1", features = ["macros", "signal", "test-util"] }
56+
tokio-test = "0.4"
4657

4758
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]
4859
pnet_datalink = "0.35.0"
@@ -55,30 +66,40 @@ windows-registry = { version = "0.5", optional = true }
5566

5667
[features]
5768
default = []
58-
59-
# Shorthand to enable everything
69+
# enable everything
6070
full = [
61-
"client",
62-
"client-legacy",
63-
"client-proxy",
64-
"client-proxy-system",
65-
"server",
66-
"server-auto",
67-
"server-graceful",
68-
"service",
69-
"http1",
70-
"http2",
71-
"tokio",
72-
"tracing",
71+
"client",
72+
"client-legacy",
73+
"client-proxy",
74+
"client-proxy-system",
75+
"http1",
76+
"http2",
77+
"server",
78+
"server-auto",
79+
"server-graceful",
80+
"service",
81+
"tokio",
82+
"tracing",
7383
]
7484

75-
client = ["hyper/client", "dep:tracing", "dep:futures-channel", "dep:tower-service"]
76-
client-legacy = ["client", "dep:socket2", "tokio/sync", "dep:libc", "dep:futures-util"]
85+
client = [
86+
"dep:futures-channel",
87+
"dep:tower-service",
88+
"dep:tracing",
89+
"hyper/client",
90+
]
91+
client-legacy = [
92+
"client",
93+
"dep:futures-util",
94+
"dep:libc",
95+
"dep:socket2",
96+
"tokio/sync",
97+
]
7798
client-proxy = ["client", "dep:base64", "dep:ipnet", "dep:percent-encoding"]
7899
client-proxy-system = ["dep:system-configuration", "dep:windows-registry"]
79100

80-
server = ["hyper/server"]
81-
server-auto = ["server", "http1", "http2"]
101+
server = ["hyper/server"]
102+
server-auto = ["http1", "http2", "server"]
82103
server-graceful = ["server", "tokio/sync"]
83104

84105
service = ["dep:tower-service"]
@@ -89,18 +110,3 @@ http2 = ["hyper/http2"]
89110
tokio = ["dep:tokio", "tokio/net", "tokio/rt", "tokio/time"]
90111

91112
tracing = ["dep:tracing"]
92-
93-
# internal features used in CI
94-
__internal_happy_eyeballs_tests = []
95-
96-
[[example]]
97-
name = "client"
98-
required-features = ["client-legacy", "http1", "tokio"]
99-
100-
[[example]]
101-
name = "server"
102-
required-features = ["server", "http1", "tokio"]
103-
104-
[[example]]
105-
name = "server_graceful"
106-
required-features = ["tokio", "server-graceful", "server-auto"]

orion-client/src/client/legacy/client.rs

Lines changed: 72 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,15 @@ use super::pool::{self, Ver};
2828
use crate::common::future::poll_fn;
2929
use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper};
3030

31+
use crate::client::legacy::pool::{ConnectionEvent, EventHandler};
32+
3133
type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
3234

3335
/// A Client to make outgoing HTTP requests.
3436
///
3537
/// `Client` is cheap to clone and cloning is the recommended way to share a `Client`. The
3638
/// underlying connection pool will be reused.
39+
///
3740
#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
3841
pub struct Client<C, B> {
3942
config: Config,
@@ -62,7 +65,7 @@ pub struct Error {
6265
}
6366

6467
#[derive(Debug)]
65-
enum ErrorKind {
68+
pub enum ErrorKind {
6669
Canceled,
6770
ChannelClosed,
6871
Connect,
@@ -81,8 +84,9 @@ macro_rules! e {
8184
};
8285
}
8386

84-
// We might change this... :shrug:
85-
type PoolKey = (http::uri::Scheme, http::uri::Authority);
87+
/// PoolKey is a tuple of the Scheme and Authority of the Uri, used to
88+
/// identify a connection pool for a specific destination.
89+
pub type PoolKey = (http::uri::Scheme, http::uri::Authority);
8690

8791
enum TrySendError<B> {
8892
Retryable { error: Error, req: Request<B>, connection_reused: bool },
@@ -477,9 +481,49 @@ where
477481
return Either::Right(future::err(canceled));
478482
},
479483
};
484+
485+
let on_event_error = pool.on_event.clone();
486+
let pool_key_clone = pool_key.clone();
487+
480488
Either::Left(
481-
connector.connect(super::connect::sealed::Internal, dst).map_err(|src| e!(Connect, src)).and_then(
482-
move |io| {
489+
connector
490+
.connect(super::connect::sealed::Internal, dst)
491+
.map_err(move |err| {
492+
let err_box: Box<dyn StdError + Send + Sync> = err.into();
493+
let mut source_err: Option<&dyn StdError> = Some(err_box.as_ref());
494+
let (mut io_error_kind, mut elapsed_error) = (None, false);
495+
496+
while let Some(current_err) = source_err {
497+
use std::io;
498+
if let Some(io_err) = current_err.downcast_ref::<io::Error>() {
499+
io_error_kind = Some(io_err.kind());
500+
} else if current_err.is::<tokio::time::error::Elapsed>() {
501+
elapsed_error = true;
502+
}
503+
504+
source_err = current_err.source();
505+
}
506+
507+
if let Some(ref handler) = on_event_error {
508+
let is_timeout =
509+
elapsed_error || matches!(io_error_kind, Some(std::io::ErrorKind::TimedOut));
510+
if is_timeout {
511+
handler.call(&pool_key_clone, ConnectionEvent::ConnectionTimeout);
512+
} else {
513+
handler.call(&pool_key_clone, ConnectionEvent::ConnectionError);
514+
}
515+
}
516+
517+
// If the connection failed, we need to notify the event connection error.
518+
519+
e!(Connect, err_box)
520+
})
521+
.and_then(move |io| {
522+
// increment the total connection count for this pool key
523+
if let Some(ref handler) = pool.on_event {
524+
handler.call(&pool_key, ConnectionEvent::NewConnection);
525+
}
526+
483527
let connected = io.connected();
484528
// If ALPN is h2 and we aren't http2_only already,
485529
// then we need to convert our pool checkout into
@@ -505,6 +549,14 @@ where
505549
let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
506550

507551
Either::Left(Box::pin(async move {
552+
use scopeguard::{guard, ScopeGuard};
553+
let guard = guard((), |_| {
554+
// increment the destroy connection count for this pool key (if still armed)
555+
if let Some(ref handler) = pool.on_event {
556+
handler.call(&pool_key, ConnectionEvent::ConnectionError);
557+
}
558+
});
559+
508560
let tx = if is_h2 {
509561
#[cfg(feature = "http2")]
510562
{
@@ -617,10 +669,12 @@ where
617669
}
618670
};
619671

672+
// “defuse” the guard...
673+
_ = ScopeGuard::into_inner(guard);
674+
620675
Ok(pool.pooled(connecting, PoolClient { conn_info: connected, tx }))
621676
}))
622-
},
623-
),
677+
}),
624678
)
625679
})
626680
}
@@ -961,6 +1015,7 @@ pub struct Builder {
9611015
h2_builder: hyper::client::conn::http2::Builder<Exec>,
9621016
pool_config: pool::Config,
9631017
pool_timer: Option<timer::Timer>,
1018+
event_handler: Option<EventHandler>,
9641019
}
9651020

9661021
impl Builder {
@@ -979,6 +1034,7 @@ impl Builder {
9791034
h2_builder: hyper::client::conn::http2::Builder::new(exec),
9801035
pool_config: pool::Config { idle_timeout: Some(Duration::from_secs(90)), max_idle_per_host: usize::MAX },
9811036
pool_timer: None,
1037+
event_handler: None,
9821038
}
9831039
}
9841040
/// Set an optional timeout for idle sockets being kept-alive.
@@ -1444,6 +1500,13 @@ impl Builder {
14441500
self
14451501
}
14461502

1503+
/// Provide a event handler to be used for updating statistics.
1504+
///
1505+
pub fn event_handler(&mut self, on_update: EventHandler) -> &mut Self {
1506+
self.event_handler = Some(on_update);
1507+
self
1508+
}
1509+
14471510
/// Set the maximum write buffer size for each HTTP/2 stream.
14481511
///
14491512
/// Default is currently 1MB, but may change.
@@ -1510,6 +1573,7 @@ impl Builder {
15101573
{
15111574
let exec = self.exec.clone();
15121575
let timer = self.pool_timer.clone();
1576+
let on_update = self.event_handler.clone();
15131577
Client {
15141578
config: self.client_config,
15151579
exec: exec.clone(),
@@ -1518,7 +1582,7 @@ impl Builder {
15181582
#[cfg(feature = "http2")]
15191583
h2_builder: self.h2_builder.clone(),
15201584
connector,
1521-
pool: pool::Pool::new(self.pool_config, exec, timer),
1585+
pool: pool::Pool::new(self.pool_config, exec, timer, on_update),
15221586
}
15231587
}
15241588
}

orion-client/src/client/legacy/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#[cfg(any(feature = "http1", feature = "http2"))]
22
mod client;
33
#[cfg(any(feature = "http1", feature = "http2"))]
4-
pub use client::{Builder, Client, Error, ResponseFuture};
4+
pub use client::{Builder, Client, Error, PoolKey, ResponseFuture};
55

66
pub mod connect;
77
#[doc(hidden)]

0 commit comments

Comments
 (0)