Skip to content

Commit 5726853

Browse files
feat(s2n-quic-dc): Expose acceptor timing to application (#3006)
1 parent 4ca17fa commit 5726853

4 files changed

Lines changed: 62 additions & 32 deletions

File tree

dc/s2n-quic-dc/src/stream/application.rs

Lines changed: 42 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,38 +27,54 @@ pub struct Builder<Sub: event::Subscriber> {
2727
pub app_queue_time: Option<Timestamp>,
2828
}
2929

30+
/// Carries timestamps of events before the stream is returned to the application in accept().
31+
#[non_exhaustive]
32+
pub struct AcceptInfo {
33+
/// How long the stream spent inside the dcQUIC acceptor before being enqueued for the
34+
/// application.
35+
pub dc_quic_accept_time: Duration,
36+
/// How long the stream spent enqueued for the application.
37+
pub app_queue_sojourn_time: Duration,
38+
}
39+
3040
impl<Sub> Builder<Sub>
3141
where
3242
Sub: event::Subscriber,
3343
{
3444
/// Builds the stream and emits an event indicating that the stream was built
3545
#[inline]
36-
pub(crate) fn accept(self) -> io::Result<(Stream<Sub>, Duration)> {
37-
let sojourn_time = {
38-
let remote_address = self.shared.remote_addr();
39-
let remote_address = &remote_address;
40-
let creds = self.shared.credentials();
41-
let credential_id = &*creds.id;
42-
let stream_id = creds.key_id.as_u64();
43-
let now = self.shared.common.clock.get_time();
44-
let total_sojourn_time = now.saturating_duration_since(self.kernel_start_time);
45-
let queue_sojourn_time =
46-
now.saturating_duration_since(self.app_queue_time.expect("set by accept_stream"));
47-
48-
self.shared
49-
.endpoint_publisher(now)
50-
.on_acceptor_stream_dequeued(event::builder::AcceptorStreamDequeued {
51-
remote_address,
52-
credential_id,
53-
stream_id,
54-
sojourn_time: total_sojourn_time,
55-
queue_sojourn_time,
56-
});
57-
58-
queue_sojourn_time
59-
};
60-
61-
self.build().map(|stream| (stream, sojourn_time))
46+
pub(crate) fn accept(self) -> io::Result<(Stream<Sub>, AcceptInfo)> {
47+
let kernel_start_time = self.kernel_start_time;
48+
let app_queue_time = self.app_queue_time.expect("set by accept_stream");
49+
let remote_address = self.shared.remote_addr();
50+
let remote_address = &remote_address;
51+
let creds = self.shared.credentials();
52+
let credential_id = &*creds.id;
53+
let stream_id = creds.key_id.as_u64();
54+
let now = self.shared.common.clock.get_time();
55+
let total_sojourn_time = now.saturating_duration_since(self.kernel_start_time);
56+
let queue_sojourn_time = now.saturating_duration_since(app_queue_time);
57+
58+
self.shared
59+
.endpoint_publisher(now)
60+
.on_acceptor_stream_dequeued(event::builder::AcceptorStreamDequeued {
61+
remote_address,
62+
credential_id,
63+
stream_id,
64+
sojourn_time: total_sojourn_time,
65+
queue_sojourn_time,
66+
});
67+
68+
self.build().map(|stream| {
69+
(
70+
stream,
71+
AcceptInfo {
72+
dc_quic_accept_time: app_queue_time
73+
.saturating_duration_since(kernel_start_time),
74+
app_queue_sojourn_time: now.saturating_duration_since(app_queue_time),
75+
},
76+
)
77+
})
6278
}
6379

6480
#[inline]

dc/s2n-quic-dc/src/stream/server/accept.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
use crate::{
55
event,
66
stream::{
7-
application::{Builder as StreamBuilder, Stream},
7+
application::{AcceptInfo, Builder as StreamBuilder, Stream},
88
server::stats,
99
},
1010
sync::mpmc as channel,
@@ -37,7 +37,7 @@ where
3737
pub async fn accept<Sub>(
3838
streams: &Receiver<Sub>,
3939
stats: &stats::Sender,
40-
) -> io::Result<(Stream<Sub>, SocketAddr)>
40+
) -> io::Result<(Stream<Sub>, AcceptInfo, SocketAddr)>
4141
where
4242
Sub: event::Subscriber,
4343
{
@@ -49,10 +49,10 @@ where
4949
})?;
5050

5151
// build the stream inside the application context
52-
let (stream, sojourn_time) = stream.accept()?;
53-
stats.send(sojourn_time);
52+
let (stream, info) = stream.accept()?;
53+
stats.send(info.app_queue_sojourn_time);
5454

5555
let remote_addr = stream.peer_addr()?;
5656

57-
Ok((stream, remote_addr))
57+
Ok((stream, info, remote_addr))
5858
}

dc/s2n-quic-dc/src/stream/server/tokio.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,18 @@ impl<H: Handshake + Clone, S: event::Subscriber + Clone> Server<H, S> {
100100

101101
#[inline]
102102
pub async fn accept(&self) -> io::Result<(crate::stream::application::Stream<S>, SocketAddr)> {
103+
let (stream, _info, addr) = accept::accept(&self.streams, &self.stats).await?;
104+
Ok((stream, addr))
105+
}
106+
107+
#[inline]
108+
pub async fn accept_with_info(
109+
&self,
110+
) -> io::Result<(
111+
crate::stream::application::Stream<S>,
112+
crate::stream::application::AcceptInfo,
113+
SocketAddr,
114+
)> {
103115
accept::accept(&self.streams, &self.stats).await
104116
}
105117

dc/s2n-quic-dc/src/stream/testing.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,9 @@ impl Server {
607607
}
608608

609609
pub async fn accept(&self) -> io::Result<(Stream, SocketAddr)> {
610-
stream_server::accept::accept(&self.receiver, &self.stats).await
610+
let (stream, _info, addr) =
611+
stream_server::accept::accept(&self.receiver, &self.stats).await?;
612+
Ok((stream, addr))
611613
}
612614

613615
pub fn subscriber(&self) -> Arc<testing::Subscriber> {

0 commit comments

Comments
 (0)