Skip to content

Commit 3332bb8

Browse files
Fix #127: Capture async receive timestamp before deserialization
The async standalone server was calling get_monotonic_time_ns() after transport.receive().await returned, which includes both Tokio executor scheduling delay and deserialization time in the one-way latency measurement. The blocking path already used receive_blocking_timed() which timestamps between raw I/O and deserialization. Add receive_timed() to the async IpcTransport trait with a default implementation, then override it in TCP and UDS transports to capture the monotonic timestamp immediately after read_exact() completes but before Message::from_bytes() deserialization runs. Update the standalone server to use receive_timed() for one-way latency measurement. - Add async receive_timed() default method to IpcTransport trait - Implement read_message_timed() in TcpSocketTransport - Implement read_message_timed() in UnixDomainSocketTransport - Override receive_timed() in both TCP and UDS async transports - Update standalone_server async one-way path to use receive_timed() - Remove unused get_monotonic_time_ns import from standalone_server - All tests passing, clippy clean AI-assisted-by: Claude Opus 4.6 Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent d586320 commit 3332bb8

4 files changed

Lines changed: 129 additions & 7 deletions

File tree

src/ipc/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,28 @@ pub trait IpcTransport: Send + Sync {
730730
/// - Async implementation allows cancellation
731731
async fn receive(&mut self) -> Result<Message>;
732732

733+
/// Receive a message and capture a monotonic timestamp immediately
734+
/// after the raw bytes are read but before deserialization.
735+
///
736+
/// This provides more accurate one-way latency measurement by
737+
/// excluding deserialization overhead from the receive timestamp,
738+
/// mirroring the blocking transport's `receive_blocking_timed()`.
739+
///
740+
/// The default implementation captures the timestamp after
741+
/// `receive()` returns (including deserialization). Transport
742+
/// implementations should override this to place the timestamp
743+
/// between raw I/O and deserialization for better accuracy.
744+
///
745+
/// # Returns
746+
///
747+
/// A tuple of (Message, timestamp_ns) where timestamp_ns is a
748+
/// monotonic clock value captured as close to I/O completion as
749+
/// possible.
750+
async fn receive_timed(&mut self) -> Result<(Message, u64)> {
751+
let msg = self.receive().await?;
752+
Ok((msg, get_monotonic_time_ns()))
753+
}
754+
733755
/// Close the transport
734756
///
735757
/// Cleanly shuts down the transport, releasing all resources

src/ipc/tcp_socket.rs

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use super::{ConnectionId, IpcError, IpcTransport, Message, TransportConfig, TransportState};
1+
use super::{
2+
get_monotonic_time_ns, ConnectionId, IpcError, IpcTransport, Message, TransportConfig,
3+
TransportState,
4+
};
25
use anyhow::{anyhow, Result};
36
use async_trait::async_trait;
47
use std::collections::HashMap;
@@ -69,6 +72,28 @@ impl TcpSocketTransport {
6972
Message::from_bytes(&message_data)
7073
}
7174

75+
/// Read a message and capture a monotonic timestamp immediately after
76+
/// the raw bytes are read but before deserialization. This excludes
77+
/// deserialization overhead from one-way latency measurements.
78+
async fn read_message_timed(stream: &mut TcpStream) -> Result<(Message, u64)> {
79+
let mut len_bytes = [0u8; 4];
80+
stream.read_exact(&mut len_bytes).await?;
81+
let message_len = u32::from_le_bytes(len_bytes) as usize;
82+
83+
if message_len > 16 * 1024 * 1024 {
84+
return Err(anyhow!("Message too large: {} bytes", message_len));
85+
}
86+
87+
let mut message_data = vec![0u8; message_len];
88+
stream.read_exact(&mut message_data).await?;
89+
90+
// Capture timestamp between raw I/O and deserialization
91+
let receive_time_ns = get_monotonic_time_ns();
92+
93+
let message = Message::from_bytes(&message_data)?;
94+
Ok((message, receive_time_ns))
95+
}
96+
7297
/// Write a message to the TCP stream.
7398
///
7499
/// Unlike SHM and PMQ, the timestamp is not refreshed
@@ -340,6 +365,36 @@ impl IpcTransport for TcpSocketTransport {
340365
}
341366
}
342367

368+
async fn receive_timed(&mut self) -> Result<(Message, u64)> {
369+
if self.state != TransportState::Connected {
370+
return Err(anyhow!("Transport not connected"));
371+
}
372+
373+
if self.stream.is_none() {
374+
if let Some(listener) = self.listener.as_ref() {
375+
let (stream, client_addr) = listener.accept().await?;
376+
debug!(
377+
"TCP Socket server accepted connection from: {}",
378+
client_addr
379+
);
380+
let std_stream = stream.into_std()?;
381+
let socket = socket2::Socket::from(std_stream.try_clone()?);
382+
socket.set_nodelay(true)?;
383+
socket.set_recv_buffer_size(self.buffer_size)?;
384+
socket.set_send_buffer_size(self.buffer_size)?;
385+
self.stream = Some(TcpStream::from_std(std_stream)?);
386+
}
387+
}
388+
389+
if let Some(ref mut stream) = self.stream {
390+
let (message, ts) = Self::read_message_timed(stream).await?;
391+
debug!("Received message {} via TCP Socket", message.id);
392+
Ok((message, ts))
393+
} else {
394+
Err(anyhow!("No active stream available"))
395+
}
396+
}
397+
343398
async fn close(&mut self) -> Result<()> {
344399
debug!("Closing TCP Socket transport");
345400

src/ipc/unix_domain_socket.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use super::{ConnectionId, IpcError, IpcTransport, Message, TransportConfig, TransportState};
1+
use super::{
2+
get_monotonic_time_ns, ConnectionId, IpcError, IpcTransport, Message, TransportConfig,
3+
TransportState,
4+
};
25
use anyhow::{anyhow, Result};
36
use async_trait::async_trait;
47
use std::collections::HashMap;
@@ -66,6 +69,28 @@ impl UnixDomainSocketTransport {
6669
Message::from_bytes(&message_data)
6770
}
6871

72+
/// Read a message and capture a monotonic timestamp immediately after
73+
/// the raw bytes are read but before deserialization. This excludes
74+
/// deserialization overhead from one-way latency measurements.
75+
async fn read_message_timed(stream: &mut UnixStream) -> Result<(Message, u64)> {
76+
let mut len_bytes = [0u8; 4];
77+
stream.read_exact(&mut len_bytes).await?;
78+
let message_len = u32::from_le_bytes(len_bytes) as usize;
79+
80+
if message_len > 16 * 1024 * 1024 {
81+
return Err(anyhow!("Message too large: {} bytes", message_len));
82+
}
83+
84+
let mut message_data = vec![0u8; message_len];
85+
stream.read_exact(&mut message_data).await?;
86+
87+
// Capture timestamp between raw I/O and deserialization
88+
let receive_time_ns = get_monotonic_time_ns();
89+
90+
let message = Message::from_bytes(&message_data)?;
91+
Ok((message, receive_time_ns))
92+
}
93+
6994
/// Write a message to the Unix stream.
7095
///
7196
/// Unlike SHM and PMQ, the timestamp is not refreshed
@@ -320,6 +345,27 @@ impl IpcTransport for UnixDomainSocketTransport {
320345
}
321346
}
322347

348+
async fn receive_timed(&mut self) -> Result<(Message, u64)> {
349+
if self.state != TransportState::Connected {
350+
return Err(anyhow!("Transport not connected"));
351+
}
352+
353+
if self.stream.is_none() {
354+
if let Some(listener) = self.listener.as_ref() {
355+
let (stream, _) = listener.accept().await?;
356+
self.stream = Some(stream);
357+
}
358+
}
359+
360+
if let Some(ref mut stream) = self.stream {
361+
let (message, ts) = Self::read_message_timed(stream).await?;
362+
debug!("Received message {} via Unix Domain Socket", message.id);
363+
Ok((message, ts))
364+
} else {
365+
Err(anyhow!("No active stream available"))
366+
}
367+
}
368+
323369
async fn close(&mut self) -> Result<()> {
324370
debug!("Closing Unix Domain Socket transport");
325371

src/standalone_server.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ use tracing_subscriber::filter::LevelFilter;
1919
use crate::benchmark::BenchmarkConfig;
2020
use crate::cli::{Args, IpcMechanism};
2121
use crate::ipc::{
22-
get_monotonic_time_ns, BlockingTransport, BlockingTransportFactory, Message, MessageType,
23-
TransportConfig, TransportFactory,
22+
BlockingTransport, BlockingTransportFactory, Message, MessageType, TransportConfig,
23+
TransportFactory,
2424
};
2525
use crate::logging::ColorizedFormatter;
2626
use crate::metrics::{LatencyType, MetricsCollector};
@@ -747,15 +747,14 @@ pub async fn run_standalone_server_async_single(
747747
info!("Shutdown signal received, exiting");
748748
break;
749749
}
750-
match transport.receive().await {
751-
Ok(msg) => {
750+
match transport.receive_timed().await {
751+
Ok((msg, receive_time_ns)) => {
752752
if msg.message_type == MessageType::Shutdown {
753753
debug!("Server received shutdown message, exiting");
754754
break;
755755
}
756756

757757
if msg.message_type == MessageType::OneWay && msg.id != u64::MAX {
758-
let receive_time_ns = get_monotonic_time_ns();
759758
let latency_ns = receive_time_ns.saturating_sub(msg.timestamp);
760759
let latency = std::time::Duration::from_nanos(latency_ns);
761760
one_way_metrics.record_message(config.message_size, Some(latency))?;

0 commit comments

Comments
 (0)