Skip to content

Commit 3898cea

Browse files
authored
Fix/core latency measurement (#104)
* fix: BenchmarkResults::new() correctly sets one_way/round_trip flags - Added one_way and round_trip parameters to BenchmarkResults::new() - Updated TestConfiguration initialization to use passed parameters instead of hardcoded false values - Updated all call sites in benchmark.rs, benchmark_blocking.rs, main.rs, results.rs, results_blocking.rs to pass config.one_way and config.round_trip (or true, true in test code) - This ensures JSON output test_config correctly reflects actual test configuration Cherry-picked from container-to-container-ipc branch (f6783d8). AI-assisted-by: Claude claude-4.6-opus-high-thinking (Anthropic) Made-with: Cursor * fix: Fix timestamp and payload validation bugs across IPC mechanisms - SHM blocking: update timestamp immediately before ring buffer write to prevent stale timestamps when backpressure delays writes - SHM blocking: add write_data_polling fallback with timestamp_offset support for accurate timing under fallback conditions - SHM blocking: replace pthread_cond_wait with timed variant (500us) and add broken-condvar detection with polling fallback - PMQ blocking: validate payload size against queue capacity before sending to prevent silent truncation - IPC mod: add one_way_latency_ns field to Message struct for carrying server-measured latency in responses Cherry-picked from container-to-container-ipc branch (6e2d41b). SHM-direct changes excluded (require futex/cross-container infrastructure not present in standalone mode). AI-assisted-by: Claude claude-4.6-opus-high-thinking (Anthropic) Made-with: Cursor * perf: Optimize async SHM and PMQ timestamp accuracy - Async SHM: replace byte-by-byte ring buffer copy with bulk memcpy for 100x+ speedup on large messages (4KB-64KB now achieve 3-25us vs previous 1-3ms) - Async SHM: update timestamp immediately before IPC write for accurate latency measurement (excludes async scheduling overhead) - Async PMQ: capture timestamps inside spawn_blocking (immediately before/after syscalls) to exclude async scheduling overhead from measured latency - Async PMQ: pre-compute timestamp offset for efficient in-place updates Note: Reverted adaptive spin optimization (spin_loop + yield_now) back to sleep(10us) as it caused cross-process SHM round-trip failures. The spin loop doesn't yield to the OS scheduler, starving the server process. Cherry-picked from container-to-container-ipc branch (8217998). AI-assisted-by: Claude claude-4.6-opus-high-thinking (Anthropic) Made-with: Cursor * docs: Improve documentation for latency measurement changes - BenchmarkResults::new(): document all 10 parameters including new one_way/round_trip flags, and explain JSON propagation - write_data_blocking(): update doc to reflect pthread_cond_timedwait (was incorrectly referencing pthread_cond_wait), document timestamp_offset parameter and broken-condvar fallback behavior - write_data_polling(): add parameter docs and explain polling interval and timeout - read_data_blocking(): minor doc clarification - send_message() in async SHM: fix duplicate summary line, replace incorrect "adaptive spinning" with actual "poll-and-sleep" behavior - receive_message() in async SHM: same adaptive spinning correction - PMQ receive(): fix missing blank line between doc sections All 254 tests passing, clippy clean on Rust 1.94.0. AI-assisted-by: Claude claude-4.6-opus-high-thinking Made-with: Cursor * fix: Address code review findings from latency measurement changes Critical fix: - write_data_polling() in shared_memory_blocking.rs was missing pthread_cond_signal after writing data. A reader blocked on the condvar path would never be woken when the writer used the polling fallback, causing a potential deadlock. Dead code removal: - Remove unused _receive_time_ns capture in shared_memory.rs receive_message() and posix_message_queue.rs receive(). Latency is computed by the benchmark caller, not inside the transport layer. Documentation fixes: - shared_memory.rs receive_message(): remove inaccurate claim that receive timestamp is used for latency calculation - posix_message_queue.rs receive(): remove incorrect claim that latency is stored in message's one_way_latency_ns field; replace with accurate polling strategy description - main.rs: restore buffer_size=0 comments in failure handlers - mod.rs: remove println! calls from timestamp_offset test All 254 tests passing, clippy clean on Rust 1.94.0. AI-assisted-by: Claude claude-4.6-opus-high-thinking Made-with: Cursor * test: Add config flag propagation tests for BenchmarkResults - Add assertions to test_benchmark_results_new verifying one_way_enabled and round_trip_enabled reflect constructor arguments - Add test_benchmark_results_config_flags_false verifying flags are correctly set to false (not hardcoded to true) - Add test_benchmark_results_config_flags_mixed verifying one_way-only and round_trip-only combinations propagate correctly - Total test count: 256 passed, 0 failed AI-assisted-by: Claude claude-4.6-opus-high-thinking Made-with: Cursor * test: Add ring buffer wrap-around and config flag coverage tests - Add test_ring_buffer_wrap_around to exercise the two-part copy_nonoverlapping code path when data wraps past the end of the shared memory ring buffer - Add test_benchmark_results_config_flags_false verifying one_way/round_trip flags correctly set to false - Add test_benchmark_results_config_flags_mixed verifying one_way-only and round_trip-only combinations - Add flag propagation assertions to test_benchmark_results_new - Total test count: 257 passed, 0 failed AI-assisted-by: Claude claude-4.6-opus-high-thinking Made-with: Cursor * test: Add write_data_polling coverage test for blocking SHM - Add test_write_data_polling_and_read exercising the polling-based write fallback path directly: - Validates basic write + read round-trip via polling path - Validates in-place timestamp update when timestamp_offset is provided - Validates shutdown detection when buffer is full - Total test count: 258 passed, 0 failed AI-assisted-by: Claude claude-4.6-opus-high-thinking Made-with: Cursor * fix: Fix Windows CI build errors in new tests - Add #[cfg(unix)] to test_write_data_polling_and_read since write_data_polling() is unix-only (uses pthread primitives) - Replace IpcMechanism::UnixDomainSocket with SharedMemory in test_benchmark_results_config_flags_mixed since UDS variant is #[cfg(unix)] only AI-assisted-by: Claude claude-4.6-opus-high-thinking Made-with: Cursor * fix: Remove container-IPC scope creep from blocking SHM write path - Revert write_data_blocking() to simple pthread_cond_wait, removing timed condvar, broken-condvar detection, and polling fallback logic that was intended for container-to-container IPC support - Remove write_data_polling() method and associated #[allow(dead_code)] - Remove test_write_data_polling_and_read test covering the removed polling fallback - Retain timestamp_offset parameter in write_data_blocking() for accurate latency measurement (core to this PR) - Apply cargo fmt whitespace fixes - All tests passing (257 passed, 0 failed), clippy clean AI-assisted-by: Claude claude-4.6-opus-high-thinking Made-with: Cursor * fix: Remove out-of-scope write_data_polling test - Remove test_write_data_polling_and_read from blocking SHM tests - This test exercises write_data_polling() which is a container-specific fallback path and does not belong in this PR (per reviewer feedback) - The function and test will be reintroduced in the container-to-container IPC branch where it is needed AI-assisted-by: Claude Sonnet 4 (claude-sonnet-4-20250514) Made-with: Cursor * docs: Document why TCP/UDS skip timestamp refresh before send - Add doc comments to write_message() in tcp_socket.rs and unix_domain_socket.rs explaining that the timestamp is not refreshed after serialization (unlike SHM and PMQ) - The gap between to_bytes() and the kernel write_all() is only nanoseconds of CPU work with no userspace backpressure loop or async scheduling hop, making timestamp error negligible - Addresses reviewer feedback on PR #104 regarding TCP/UDS timestamp handling relative to issue #98 AI-assisted-by: Claude Sonnet 4 (claude-sonnet-4-20250514) Made-with: Cursor
1 parent c4a7962 commit 3898cea

12 files changed

Lines changed: 420 additions & 73 deletions

src/benchmark.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,8 @@ impl BenchmarkRunner {
494494
self.config.msg_count,
495495
self.config.duration,
496496
self.config.warmup_iterations,
497+
self.config.one_way,
498+
self.config.round_trip,
497499
);
498500

499501
// Run warmup if configured

src/benchmark_blocking.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -720,6 +720,8 @@ impl BlockingBenchmarkRunner {
720720
self.config.msg_count,
721721
self.config.duration,
722722
self.config.warmup_iterations,
723+
self.config.one_way,
724+
self.config.round_trip,
723725
);
724726

725727
// Run warmup if configured

src/ipc/mod.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1574,3 +1574,35 @@ mod tests {
15741574
);
15751575
}
15761576
}
1577+
1578+
#[test]
1579+
fn test_timestamp_offset_update_in_serialized_buffer() {
1580+
use super::*;
1581+
1582+
// Create a message with timestamp 0
1583+
let mut msg = Message::new(42, vec![1, 2, 3, 4], MessageType::OneWay);
1584+
msg.timestamp = 0;
1585+
1586+
// Serialize it
1587+
let mut serialized = bincode::serialize(&msg).unwrap();
1588+
1589+
// Verify timestamp is 0 in bytes 8-15
1590+
let extracted_ts = u64::from_le_bytes(serialized[8..16].try_into().unwrap());
1591+
assert_eq!(extracted_ts, 0, "Timestamp should be 0 initially");
1592+
1593+
// Now update the timestamp bytes
1594+
let new_ts: u64 = 123456789;
1595+
let ts_bytes = new_ts.to_le_bytes();
1596+
serialized[8..16].copy_from_slice(&ts_bytes);
1597+
1598+
// Verify it was updated
1599+
let updated_ts = u64::from_le_bytes(serialized[8..16].try_into().unwrap());
1600+
assert_eq!(updated_ts, new_ts, "Timestamp should be updated");
1601+
1602+
// Deserialize and verify
1603+
let deserialized: Message = bincode::deserialize(&serialized).unwrap();
1604+
assert_eq!(
1605+
deserialized.timestamp, new_ts,
1606+
"Deserialized timestamp should match"
1607+
);
1608+
}

src/ipc/posix_message_queue.rs

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,14 @@ impl IpcTransport for PosixMessageQueueTransport {
524524
/// error, indicating that the message queue is full. The transport will
525525
/// retry with a backoff, and a warning will be logged on the first
526526
/// occurrence.
527+
///
528+
/// ## Timestamp Accuracy
529+
///
530+
/// To ensure accurate one-way latency measurement, the timestamp is updated
531+
/// immediately before the `mq_send` syscall inside the blocking task. This
532+
/// excludes async scheduling overhead from the measured latency.
527533
async fn send(&mut self, message: &Message) -> Result<bool> {
534+
// Pre-serialize with current timestamp (will be updated before send)
528535
let data = message.to_bytes()?;
529536
let fd_ref = self
530537
.mq_fd
@@ -538,15 +545,26 @@ impl IpcTransport for PosixMessageQueueTransport {
538545
// Get the priority from the config, which was set during transport creation.
539546
let priority = self.config.as_ref().map_or(0, |c| c.pmq_priority);
540547

548+
// Pre-compute timestamp offset for efficient in-place updates
549+
let ts_offset = Message::timestamp_offset();
550+
541551
// Use non-blocking send with exponential backoff for queue-full conditions
542552
let mut retry_delay_ms = 1;
543553
let max_retries = 100; // More retries since each one is much faster
544554

545555
for attempt in 0..max_retries {
546556
let start_time = std::time::Instant::now();
547557
let result = tokio::task::spawn_blocking({
548-
let data = data.clone();
558+
let mut data = data.clone();
559+
let ts_range = ts_offset.clone();
549560
move || {
561+
// CRITICAL: Update timestamp immediately before mq_send syscall
562+
// This ensures accurate latency measurement by excluding async
563+
// scheduling overhead from the measured one-way latency.
564+
let ts_now = crate::ipc::get_monotonic_time_ns();
565+
let ts_bytes = ts_now.to_le_bytes();
566+
data[ts_range].copy_from_slice(&ts_bytes);
567+
550568
// Reconstruct MqdT from raw fd for the blocking operation
551569
let fd = unsafe { MqdT::from_raw_fd(raw_fd) };
552570
// std::mem::forget(fd); // Don't close the fd when this MqdT drops
@@ -649,6 +667,11 @@ impl IpcTransport for PosixMessageQueueTransport {
649667
/// - **Temporary**: Queue empty (EAGAIN) - retried automatically
650668
/// - **Permanent**: Invalid queue state or deserialization failure
651669
/// - **Resource**: No queue available or transport not initialized
670+
///
671+
/// ## Polling Strategy
672+
///
673+
/// Uses moderate polling (100–500µs backoff, 1000 retries) to
674+
/// balance latency impact against CPU usage.
652675
async fn receive(&mut self) -> Result<Message> {
653676
let fd_ref = self
654677
.mq_fd
@@ -657,9 +680,10 @@ impl IpcTransport for PosixMessageQueueTransport {
657680
let raw_fd = fd_ref.as_raw_fd();
658681
let max_msg_size = self.max_msg_size;
659682

660-
// Use non-blocking receive with exponential backoff for empty queue conditions
661-
let mut retry_delay_ms = 1;
662-
let max_retries = 100;
683+
// Use non-blocking receive with moderate polling
684+
// Balance between latency impact and CPU usage
685+
let mut retry_delay_us = 100; // Start with 100 microseconds
686+
let max_retries = 1000; // Reasonable retry count
663687

664688
for attempt in 0..max_retries {
665689
let result = tokio::task::spawn_blocking({
@@ -682,7 +706,9 @@ impl IpcTransport for PosixMessageQueueTransport {
682706
"Received message {} bytes via POSIX message queue",
683707
buffer.len()
684708
);
685-
return Message::from_bytes(&buffer);
709+
let message = Message::from_bytes(&buffer)?;
710+
711+
return Ok(message);
686712
}
687713
Err(Errno::EAGAIN) => {
688714
// Queue is empty, wait and retry
@@ -692,10 +718,9 @@ impl IpcTransport for PosixMessageQueueTransport {
692718
max_retries
693719
));
694720
}
695-
// Justification: Short, exponentially increasing delay to wait for a message to arrive
696-
// in an empty queue without busy-waiting.
697-
tokio::time::sleep(Duration::from_millis(retry_delay_ms)).await;
698-
retry_delay_ms = (retry_delay_ms * 2).min(10); // Cap at 10ms
721+
// Use moderate delays (100-500 microseconds) to balance latency and CPU
722+
tokio::time::sleep(Duration::from_micros(retry_delay_us)).await;
723+
retry_delay_us = (retry_delay_us * 2).min(500); // Cap at 500µs
699724
}
700725
Err(e) => {
701726
return Err(anyhow!("Failed to receive message: {}", e));

src/ipc/posix_message_queue_blocking.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -391,18 +391,21 @@ impl BlockingTransport for BlockingPosixMessageQueue {
391391
));
392392
}
393393

394-
// Capture timestamp immediately before send and update bytes in buffer
395-
message_with_timestamp.set_timestamp_now();
396-
let timestamp_bytes = message_with_timestamp.timestamp.to_le_bytes();
394+
// Pre-compute the timestamp offset for efficient in-place updates
397395
let ts_offset = Message::timestamp_offset();
398-
serialized[ts_offset].copy_from_slice(&timestamp_bytes);
399396

400-
// Send immediately - no intervening work
397+
// Send with retry loop - update timestamp on each attempt
401398
// Use a retry loop with timeouts since mq_send can fail with EAGAIN
402399
let start = std::time::Instant::now();
403400
let timeout = Duration::from_secs(5);
404401

405402
loop {
403+
// CRITICAL: Capture timestamp immediately before each send attempt
404+
// This ensures accurate latency measurement even under backpressure
405+
let ts_now = crate::ipc::get_monotonic_time_ns();
406+
let timestamp_bytes = ts_now.to_le_bytes();
407+
serialized[ts_offset.clone()].copy_from_slice(&timestamp_bytes);
408+
406409
match mq_send(fd, &serialized, self.priority) {
407410
Ok(()) => {
408411
trace!("Message ID {} sent successfully", message.id);
@@ -415,7 +418,7 @@ impl BlockingTransport for BlockingPosixMessageQueue {
415418
"Timeout: message queue full, possible backpressure"
416419
));
417420
}
418-
// Yield and retry
421+
// Yield and retry - timestamp will be updated on next iteration
419422
std::thread::yield_now();
420423
std::thread::sleep(Duration::from_millis(10));
421424
}

0 commit comments

Comments
 (0)