Skip to content

Commit 50db84c

Browse files
sberg-rhclaude
andauthored
feat(cli): Add standalone --server and --client modes (#110)
* feat(cli): Add standalone --server and --client modes with full reporting Add the ability to run the benchmark server and client as independent processes, enabling cross-environment IPC testing (e.g., host and container). Relates to #11. Standalone mode features: - --server flag starts a server that listens for client connections - --client flag connects to a running server with retry logic (100ms backoff, 30s timeout) - Both async (Tokio) and blocking (std) execution modes supported - Duration (-d) and message-count (-i) modes both supported - Default transport endpoints work without extra flags - Endpoint flags (--socket-path, --shared-memory-name, --message-queue-name) promoted to user-facing Reporting integration: - Full ResultsManager/MetricsCollector integration for structured output (JSON, streaming CSV, console summary with HDR percentiles) - Server-side one-way latency measurement using monotonic clock (accurate for same-host and container scenarios) - Round-trip latency with per-message streaming support Code quality: - Shared helpers: dispatch_server_message(), retry constants - 25 tests covering CLI parsing, transport config, server dispatch, connection retry, shutdown, duration mode, one-way, round-trip - Explicit MessageType::Shutdown on client disconnect Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: Support send_delay and include_first_message in standalone client - Add --send-delay support: inserts a configurable pause after each message send (blocking uses thread::sleep, async uses tokio::sleep) - Add --include-first-message support: when false (default), sends a canary message before measurement to warm up the connection, matching the existing BenchmarkRunner behavior - Applied to both one-way and round-trip tests in both blocking and async client paths Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * perf: Eliminate per-message heap allocation in standalone client Reuse a single Message struct across loop iterations instead of calling Message::new() with payload.clone() on every send. The message id and timestamp are updated in-place before each send. This removes one Vec<u8> heap allocation per message in the measurement loop, reducing allocation overhead that can skew latency results, especially for small messages. Applied to both one-way and round-trip tests in both blocking and async client paths. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: Add concurrency support for standalone client/server Server-side multi-accept: - TCP and UDS servers now accept multiple concurrent connections, spawning a handler thread per client with its own MetricsCollector - Grace period after first client prevents premature server exit - SHM and PMQ fall back to single-client mode with a warning - Server aggregates one-way latency metrics across all handlers Client-side multi-threaded execution: - Blocking client spawns N worker threads, each with its own transport connection, MetricsCollector, and message loop - Async client uses tokio::task::JoinSet for concurrent workers - Results aggregated via MetricsCollector::aggregate_worker_metrics() - Per-message streaming disabled for concurrent mode (aggregated only) Transport additions: - BlockingTcpSocket::from_stream() wraps pre-accepted TcpStream - BlockingUnixDomainSocket::from_stream() wraps pre-accepted UnixStream Shared helpers: - handle_client_connection() -- per-client message dispatch and metrics - aggregate_and_print_server_metrics() -- shared aggregation logic Tests: - test_standalone_concurrent_tcp_round_trip (3 concurrent clients) - test_handle_client_connection_round_trip (dispatch correctness) - test_handle_client_connection_one_way_metrics (metrics recording) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test: Add coverage for concurrency, from_stream, and aggregation - test_standalone_concurrent_tcp_one_way: multi-accept server with 2 concurrent one-way clients, verifying server-side metrics recording - test_tcp_from_stream_send_receive: BlockingTcpSocket::from_stream() full send/receive round-trip - test_uds_from_stream_send_receive: BlockingUnixDomainSocket::from_stream() full send/receive round-trip (unix-only) - test_concurrency_forced_to_one_for_shm: CLI parsing for SHM with concurrency > 1 - test_aggregate_and_print_empty_collectors: empty input edge case - test_aggregate_and_print_single_collector: single collector with data Total binary tests: 34. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Add async multi-accept server and remove unused parameter - Add multi-accept support for async TCP and UDS servers, matching the blocking server's concurrency support. Uses tokio::net listeners with spawn_blocking for per-client handler threads. - Remove unused _args parameter from run_standalone_server_async - Replace inline latency printing in async server with shared print_server_one_way_latency helper Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Use OS-assigned ports in tests and document grace period - Replace all 12 hardcoded test ports (18301-18314) with OS-assigned ports via get_free_port() helper (binds to port 0, extracts assigned port). Prevents port conflicts in parallel test runs and with other processes. - Extract 2-second multi-accept grace period into SERVER_ACCEPT_GRACE_PERIOD constant with documentation explaining the behavior and limitation. - Document the grace period in --server CLI help text so users know concurrent clients should connect within 2 seconds of each other. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Set streams to blocking mode after tokio into_std conversion tokio::net::TcpStream::into_std() leaves the stream in non-blocking mode (set by tokio for epoll/kqueue). The blocking transport's read_exact/write_all calls then fail with WouldBlock errors, causing immediate disconnection. Fix: call set_nonblocking(false) on streams after into_std() in both TCP and UDS async multi-accept servers. Add test_standalone_async_concurrent_tcp_round_trip to exercise the async multi-accept path (tokio accept + spawn_blocking + from_stream + handle_client_connection). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test: Strengthen test assertions for real-world correctness - test_standalone_blocking_tcp_one_way: verify server received exact message count with correct sequential IDs, add shutdown message - test_standalone_blocking_tcp_duration_round_trip: verify response IDs match requests, assert count > 10 for 200ms test, add shutdown - test_standalone_blocking_tcp_duration_one_way: verify server received exact count with sequential IDs, assert count > 10 for 200ms test - test_concurrency_forced_to_one_for_shm: test actual concurrency forcing logic instead of just CLI parsing - test_standalone_concurrent_tcp_one_way: assert exact message count per handler instead of just "greater than zero" Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Address round 2 review feedback - Clean up garbled doc comment on async concurrent test (editing artifacts from multiple rewrites) - Replace silent panic swallowing in async multi-accept servers: try_join_next().transpose() silently dropped JoinErrors from panicked handler tasks. Now logs warnings via warn!(). - Extract effective_concurrency() helper to deduplicate the concurrency-forcing logic (was copied in blocking client, async client, and test). Test now calls the actual helper instead of reimplementing the logic inline. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test: Add real-world scenario tests for coverage and correctness - test_standalone_large_payload_integrity: 4KB payloads with recognizable byte pattern, server echoes back, client verifies content byte-for-byte to catch corruption - test_handle_client_connection_filters_canary: verifies warmup canary messages (id=u64::MAX) are excluded from one-way metrics - test_handle_client_connection_mixed_message_types: interleaved OneWay and Request messages on a single connection, verifies correct metrics recording and response dispatch - test_aggregate_and_print_multiple_collectors: aggregation across 2 collectors with different latency distributions - test_effective_concurrency_all_mechanisms: covers UDS, PMQ, SHM, TCP, and concurrency=1 edge case Total binary tests: 40. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Set accepted streams to blocking mode in multi-accept servers Accepted TCP/UDS streams inherit non-blocking mode from the listener (set for the accept poll loop). The handler threads need blocking mode for the transport's read_exact/write_all operations. This is the blocking-server equivalent of the async into_std fix in commit 8723429. Without this fix, standalone server handlers immediately disconnect from clients. Applies to both run_standalone_server_blocking_multi_accept_tcp and _uds. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Address review feedback - grace period, quiet flag, mutex safety - Fix grace period bug: reset timer on every new connection, not just the first. Prevents premature server exit between one-way and round-trip test phases when using concurrency > 1. Applied to all four multi-accept servers (blocking TCP/UDS, async TCP/UDS). - Honor --quiet flag in standalone server and client. When set, suppress all tracing output to stderr. - Handle poisoned mutex gracefully: use unwrap_or_else(|e| e.into_inner()) instead of unwrap() on mutex locks. If a handler thread panics while holding the lock, other threads can still push their metrics instead of cascade-panicking. - Add defensive --shm-direct guard in standalone server and client: returns error if --shm-direct is used without --blocking. This is normally enforced by main() but the guard protects against future refactoring that might change the dispatch order. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Address review feedback - multiple correctness and quality fixes - Fix grace period bug: reset timer on every new connection, not just the first. Prevents premature server exit between one-way and round-trip test phases when using concurrency > 1. Applied to all four multi-accept servers (blocking TCP/UDS, async TCP/UDS). - Honor --quiet flag in standalone server and client. When set, suppress all tracing output to stderr. - Handle poisoned mutex gracefully: use unwrap_or_else(|e| e.into_inner()) instead of unwrap() on mutex locks in handler threads. - Add defensive --shm-direct guard in standalone server and client. - Add socket buffer tuning (recv/send buffer sizes) to multi-accept TCP servers to match normal transport behavior. - Fix integer division remainder: last worker now receives any extra messages when msg_count is not evenly divisible by concurrency. - Document empty response payloads as intentional design matching existing benchmark runner behavior. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: Add receive_blocking_timed for accurate one-way latency Add receive_blocking_timed() to the BlockingTransport trait that captures a monotonic timestamp after raw bytes are read but before bincode deserialization. This excludes deserialization overhead from one-way latency measurements. - Add default implementation on BlockingTransport trait (backward compatible -- captures timestamp after full receive) - Override in TCP, UDS, and SHM blocking transports to place timestamp between raw I/O read and deserialization - SHM-direct uses default (no bincode deserialization to exclude) - Update handle_client_connection and standalone single-client server to use receive_blocking_timed Impact is most visible with large payloads where deserialization is non-trivial. 64KB one-way TCP test shows min latency dropped from 41us (post-deserialize) to 14us (pre-deserialize), a ~27us improvement representing the bincode deserialization time excluded from measurement. Mean dropped 28% (73us to 52us) and P99 dropped 14% (132us to 113us). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Update MessageLatencyRecord calls for PR #105 compatibility PR #105 (Fix/streaming timestamps) added send_timestamp_ns parameter to MessageLatencyRecord::new(). Update all 4 call sites in standalone client to capture wall-clock timestamp at send time and pass it as the 6th argument. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * refactor: Move standalone logic from main.rs to library crate Move ~3100 lines of standalone client/server code from the binary crate (main.rs) into two new library modules, following the existing flat-file convention (benchmark.rs/benchmark_blocking.rs pattern). Structure: - standalone_server.rs (1982 lines): constants, shared helpers, server dispatch, multi-accept TCP/UDS, async server paths - standalone_client.rs (1146 lines): retry helpers, client dispatch, single/concurrent blocking and async paths - main.rs reduced from ~4200 to ~1120 lines (thin dispatch layer) Additional changes: - Promote logging.rs from binary-private to library-public module - Move set_affinity() to utils.rs as pub function - All standalone functions now pub for tarpaulin coverage measurement and integration test access No behavioral changes. All 374 tests pass. Benchmark comparison across 3 runs confirms no performance regression (mean latencies within 2-5% run-to-run variance). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Improve server resilience and error visibility - Socket configuration failures (set_nonblocking, set_nodelay) in multi-accept servers now log a warning and skip the bad connection instead of crashing the entire server with ? - Thread join panics in blocking multi-accept servers now logged with warn! instead of silently dropped with let _ = - Streaming latency record failures in client now logged with debug! instead of silently swallowed Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test: Comprehensive coverage improvements for standalone modules Server tests (standalone_server.rs, 82.7% coverage): - test_multi_accept_tcp_server_direct: exercises multi-accept TCP directly - test_single_server_direct: exercises blocking single-client directly - test_server_blocking_dispatch: exercises dispatch logic - test_server_blocking_dispatch_uds: exercises UDS dispatch branch - test_async_multi_accept_tcp_full: exercises async multi-accept TCP - test_async_single_server_path: exercises async single-client - test_async_single_server_one_way_metrics: async one-way metrics - test_async_multi_accept_uds_full: exercises async UDS multi-accept - test_multi_accept_server_with_delayed_client: slow sender resilience - test_multi_accept_server_duration_one_way: duration mode with multi-accept - test_async_multi_accept_server_duration_one_way: async duration mode - test_handle_client_connection_send_failure: client disconnect error path - test_single_server_client_disconnect: single server send error path - test_multi_accept_server_survives_bad_client: garbage input resilience - test_handle_client_connection_garbage_input: deserialization error path - test_run_standalone_server_full_dispatch: full entry point dispatch - test_run_standalone_server_rejects_all_via_dispatch: 'all' validation - test_run_standalone_server_rejects_shm_direct: shm-direct guard - test_run_standalone_server_verbose: -vv logging level branches - test_aggregate_server_metrics_from_handlers: real handler data - test_print_server_one_way_latency_with_data/zero: print paths Client tests (standalone_client.rs, 86.3% coverage): - test_client_blocking_tcp_round_trip/one_way: single client paths - test_client_blocking_tcp_duration_round_trip/one_way: duration mode - test_client_blocking_tcp_concurrent_round_trip/one_way: concurrent - test_client_async_single_round_trip/one_way: async single - test_client_async_duration_round_trip/one_way: async duration - test_client_async_concurrent_round_trip/one_way: async concurrent - test_client_blocking_with_send_delay: send_delay round-trip branch - test_client_blocking_one_way_with_send_delay: send_delay one-way branch - test_client_blocking_with_streaming_output: JSON streaming - test_client_blocking_combined_streaming: combined mode streaming - test_client_blocking_csv_streaming: CSV streaming - test_client_blocking_concurrent_duration_one_way: concurrent duration - test_client_async_concurrent_duration_one_way: async concurrent duration - test_run_standalone_client_full_dispatch: full entry point dispatch - test_run_standalone_client_rejects_all_via_dispatch: 'all' validation - test_run_standalone_client_rejects_shm_direct: shm-direct guard - test_connect_async_with_retry_succeeds: async retry path Also: changed tracing .init() to .try_init() with eprintln fallback in both server and client for test compatibility. Coverage: standalone_server 82.7%, standalone_client 86.3%, combined 84.8% Total lib tests: 355. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * perf: Reduce multi-accept server polling interval from 50ms to 5ms Reduce the non-blocking accept loop sleep from 50ms to 5ms in both TCP and UDS multi-accept servers. This cuts connection acceptance latency by 10x with no portability concerns. Discovered during hands-on validation testing of standalone concurrent mode, where the 50ms polling interval was the primary contributor to elevated tail latency under multi-client workloads. Improvement with -c 4 concurrent clients: - RT P95: -46% (65.9us -> 35.5us) - RT P99: -49% (91.4us -> 46.9us) - Throughput: +66% (94.9 -> 157.1 MB/s) Single-client workloads also benefit from faster initial connection acceptance (P99 improved 4-7% across all test modes). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Address PR #110 review feedback (issues 1-11) - Add 60s idle timeout to prevent server hang when no client connects - Log canary/shutdown send errors (warn for canary, debug for shutdown) - Wire concurrent streaming: workers collect MessageLatencyRecord and stream through ResultsManager after completion - Reject multiple mechanisms in standalone mode with clear error - Add debug logging on server receive errors for diagnostics - Add per-worker warmup in concurrent mode (fixes metadata mismatch) - Document two-phase reconnection behavior in concurrent mode - Add SIGINT/SIGTERM signal handler via ctrlc crate for graceful server shutdown and resource cleanup - Add receive_blocking_timed override for PMQ transport to exclude deserialization from one-way latency measurement - Add 11 integration tests spawning separate server/client processes - Fix misleading test comment on garbage input handler Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: CI compatibility for MSRV and lint checks - Drop ctrlc "termination" feature (requires Rust 1.75+ due to nix static zeroed); SIGINT still handled, SIGTERM uses default OS behavior - Remove unused BlockingTcpSocket import in canary failure test Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Pin ctrlc to 3.4.5 for MSRV 1.70 compatibility ctrlc 3.5+ uses std::mem::zeroed() in statics which requires Rust 1.75+. Version 3.4.5 does not have this issue and supports Rust 1.69+. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Use cross-platform mechanisms in multiple-mechanism rejection tests Tests used "tcp uds" but UDS is not a valid mechanism on Windows, causing clap to reject the args before reaching our validation code. Changed to "tcp shm" which exists on all platforms. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Use platform-independent temp dir in output file integration test The test hardcoded /tmp/ which doesn't exist on Windows. Use std::env::temp_dir() for cross-platform compatibility. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: Address PR #110 follow-up review (4 additional findings) - Add sentinel connection in concurrent client to prevent server from exiting between one-way and round-trip phases (keeps a connection open across both phases so the server's grace period check never triggers prematurely) - Surface accept-loop errors: upgrade from debug to warn logging and return Err when accept fails before any client connects - Fix duration precision loss: use as_secs_f64() instead of as_secs() when passing duration to spawned server process - Propagate latency file write errors instead of silently discarding - Add integration tests for concurrent both-tests scenario Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * style: Fix formatting issues caught by CI Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 1435ca2 commit 50db84c

18 files changed

Lines changed: 6881 additions & 63 deletions

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ thiserror = "2.0.16"
5353
tokio-macros = "=2.5.0" # tokio-macros 2.6.0+ requires Rust 1.71+
5454
mio = "=1.0.4" # mio 1.1.0+ requires Rust 1.71+
5555
core_affinity = "0.8.3"
56+
# MSRV: ctrlc 3.5+ uses zeroed() in statics (requires Rust 1.75+)
57+
ctrlc = "=3.4.5"
5658
os_pipe = "1.1.5"
5759
# MSRV: zmij 1.0.20+ requires Rust 1.71+
5860
zmij = "=1.0.19"

examples/blocking_basic.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ fn main() -> Result<()> {
102102
buffer_size: None,
103103
pmq_priority: 0,
104104

105+
// Standalone mode flags
106+
server: false,
107+
client: false,
108+
105109
// Internal flag (not for external use)
106110
internal_run_as_server: false,
107111

examples/blocking_comparison.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,8 @@ fn create_args(
287287
client_affinity: None,
288288
buffer_size: None,
289289
pmq_priority: 0,
290+
server: false,
291+
client: false,
290292
internal_run_as_server: false,
291293
verbose: 0,
292294
quiet: false,

src/benchmark.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,8 @@ impl BenchmarkConfig {
377377
/// # pmq_priority: 0,
378378
/// # include_first_message: false,
379379
/// # blocking: false,
380+
/// # server: false,
381+
/// # client: false,
380382
/// # internal_run_as_server: false,
381383
/// # socket_path: None,
382384
/// # shared_memory_name: None,
@@ -766,7 +768,7 @@ impl BenchmarkRunner {
766768
cmd.arg("-s").arg(self.config.message_size.to_string());
767769

768770
if let Some(duration) = self.config.duration {
769-
cmd.arg("-d").arg(format!("{}s", duration.as_secs()));
771+
cmd.arg("-d").arg(format!("{}s", duration.as_secs_f64()));
770772
} else if let Some(count) = self.config.msg_count {
771773
cmd.arg("-i").arg(count.to_string());
772774
}

src/cli.rs

Lines changed: 128 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ const TIMING: &str = "Timing";
8282
const CONCURRENCY: &str = "Concurrency";
8383
const OUTPUT_AND_LOGGING: &str = "Output and Logging";
8484
const ADVANCED: &str = "Advanced";
85+
const STANDALONE: &str = "Standalone Mode";
8586

8687
/// Returns the default IPC mechanism based on the target OS.
8788
///
@@ -366,27 +367,87 @@ pub struct Args {
366367
#[arg(long, default_value_t = false, help_heading = ADVANCED)]
367368
pub shm_direct: bool,
368369

369-
/// (Internal) Run the process in server-only mode.
370+
/// Run in standalone server mode.
370371
///
371-
/// This is a hidden flag used by the benchmark runner to spawn a child
372-
/// process that acts only as the server. It is not intended for direct
373-
/// use by users.
374-
#[arg(long, hide = true)]
375-
pub internal_run_as_server: bool,
372+
/// Starts the process as a server that listens for incoming client
373+
/// connections. The server waits indefinitely for a client to connect,
374+
/// receives messages, and responds to round-trip requests.
375+
///
376+
/// For TCP and UDS, the server accepts multiple concurrent connections
377+
/// (one handler thread per client). After the first client connects,
378+
/// the server waits 2 seconds before checking if all clients have
379+
/// disconnected. Additional clients should connect within this window.
380+
///
381+
/// This enables running the server and client as independent processes,
382+
/// potentially on different hosts or in different environments (e.g.,
383+
/// host and container).
384+
///
385+
/// Mutually exclusive with --client.
386+
///
387+
/// # Examples
388+
///
389+
/// ```bash
390+
/// # Terminal 1: start server
391+
/// ipc-benchmark --server -m uds --blocking
392+
///
393+
/// # Terminal 2: start client
394+
/// ipc-benchmark --client -m uds --blocking -i 10000
395+
/// ```
396+
#[arg(long, conflicts_with = "client", help_heading = STANDALONE)]
397+
pub server: bool,
376398

377-
// --- Transport-specific arguments for internal use ---
378-
/// (Internal) Specifies the exact socket path for UDS.
379-
#[arg(long, hide = true)]
399+
/// Run in standalone client mode.
400+
///
401+
/// Connects to an already-running server and executes the benchmark
402+
/// workload. If the server is not yet available, the client retries
403+
/// the connection with backoff for up to 30 seconds before failing.
404+
///
405+
/// Mutually exclusive with --server.
406+
///
407+
/// # Examples
408+
///
409+
/// ```bash
410+
/// # Terminal 1: start server
411+
/// ipc-benchmark --server -m tcp --blocking
412+
///
413+
/// # Terminal 2: start client (retries until server is ready)
414+
/// ipc-benchmark --client -m tcp --blocking -i 5000
415+
/// ```
416+
#[arg(long, conflicts_with = "server", help_heading = STANDALONE)]
417+
pub client: bool,
418+
419+
/// Socket path for Unix Domain Sockets in standalone mode.
420+
///
421+
/// Specifies the filesystem path for the UDS socket. Both server
422+
/// and client must use the same path. If not specified, a default
423+
/// path in the system temp directory is used.
424+
#[arg(long, help_heading = STANDALONE)]
380425
pub socket_path: Option<String>,
381426

382-
/// (Internal) Specifies the exact name for Shared Memory.
383-
#[arg(long, hide = true)]
427+
/// Shared memory segment name for standalone mode.
428+
///
429+
/// Specifies the name of the POSIX shared memory segment. Both
430+
/// server and client must use the same name. If not specified,
431+
/// defaults to "ipc_benchmark_shm".
432+
#[arg(long, help_heading = STANDALONE)]
384433
pub shared_memory_name: Option<String>,
385434

386-
/// (Internal) Specifies the exact name for the POSIX Message Queue.
387-
#[arg(long, hide = true)]
435+
/// POSIX Message Queue name for standalone mode.
436+
///
437+
/// Specifies the name of the message queue. Both server and client
438+
/// must use the same name. If not specified, defaults to
439+
/// "ipc_benchmark_pmq".
440+
#[arg(long, help_heading = STANDALONE)]
388441
pub message_queue_name: Option<String>,
389442

443+
/// (Internal) Run the process in server-only mode.
444+
///
445+
/// This is a hidden flag used by the benchmark runner to spawn a child
446+
/// process that acts only as the server. It is not intended for direct
447+
/// use by users.
448+
#[arg(long, hide = true)]
449+
pub internal_run_as_server: bool,
450+
390451
/// (Internal) File path for server to write measured latencies.
391452
///
392453
/// Used internally when spawning the server process to communicate
@@ -993,4 +1054,58 @@ mod tests {
9931054
assert_eq!(args.message_size, 1024);
9941055
assert_eq!(args.msg_count, 1000);
9951056
}
1057+
1058+
#[test]
1059+
fn test_server_flag() {
1060+
let args = Args::parse_from(["ipc-benchmark", "--server", "-m", "tcp"]);
1061+
assert!(args.server);
1062+
assert!(!args.client);
1063+
}
1064+
1065+
#[test]
1066+
fn test_client_flag() {
1067+
let args = Args::parse_from(["ipc-benchmark", "--client", "-m", "tcp"]);
1068+
assert!(args.client);
1069+
assert!(!args.server);
1070+
}
1071+
1072+
#[test]
1073+
fn test_server_client_mutually_exclusive() {
1074+
let result = Args::try_parse_from(["ipc-benchmark", "--server", "--client", "-m", "tcp"]);
1075+
assert!(result.is_err(), "--server and --client should conflict");
1076+
}
1077+
1078+
#[test]
1079+
fn test_standalone_defaults() {
1080+
let args = Args::parse_from(["ipc-benchmark", "-m", "tcp"]);
1081+
assert!(!args.server);
1082+
assert!(!args.client);
1083+
}
1084+
1085+
#[test]
1086+
fn test_standalone_endpoint_flags() {
1087+
let args = Args::parse_from([
1088+
"ipc-benchmark",
1089+
"--server",
1090+
"-m",
1091+
"tcp",
1092+
"--socket-path",
1093+
"/tmp/test.sock",
1094+
"--shared-memory-name",
1095+
"my_shm",
1096+
"--message-queue-name",
1097+
"my_pmq",
1098+
]);
1099+
assert!(args.server);
1100+
assert_eq!(args.socket_path, Some("/tmp/test.sock".to_string()));
1101+
assert_eq!(args.shared_memory_name, Some("my_shm".to_string()));
1102+
assert_eq!(args.message_queue_name, Some("my_pmq".to_string()));
1103+
}
1104+
1105+
#[test]
1106+
fn test_server_with_blocking() {
1107+
let args = Args::parse_from(["ipc-benchmark", "--server", "-m", "tcp", "--blocking"]);
1108+
assert!(args.server);
1109+
assert!(args.blocking);
1110+
}
9961111
}

src/ipc/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,6 +1185,21 @@ pub trait BlockingTransport: Send {
11851185
/// timeout mechanisms if needed (SO_RCVTIMEO on sockets, etc).
11861186
fn receive_blocking(&mut self) -> Result<Message>;
11871187

1188+
/// Receive a message and capture a monotonic timestamp immediately
1189+
/// after the raw bytes are read but before deserialization.
1190+
///
1191+
/// This provides more accurate one-way latency measurement by
1192+
/// excluding deserialization overhead from the receive timestamp.
1193+
///
1194+
/// The default implementation captures the timestamp after
1195+
/// `receive_blocking()` returns (including deserialization).
1196+
/// Transport implementations should override this to place the
1197+
/// timestamp between raw I/O and deserialization.
1198+
fn receive_blocking_timed(&mut self) -> Result<(Message, u64)> {
1199+
let msg = self.receive_blocking()?;
1200+
Ok((msg, get_monotonic_time_ns()))
1201+
}
1202+
11881203
/// Close the transport and release resources.
11891204
///
11901205
/// This method cleanly shuts down the transport, closing connections

src/ipc/posix_message_queue_blocking.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,44 @@ impl BlockingTransport for BlockingPosixMessageQueue {
471471
Ok(message)
472472
}
473473

474+
fn receive_blocking_timed(&mut self) -> Result<(Message, u64)> {
475+
trace!("Waiting to receive timed message via blocking POSIX message queue");
476+
477+
let fd = self.recv_fd.as_ref().ok_or_else(|| {
478+
anyhow!(
479+
"Cannot receive: message queue not initialized. \
480+
Call start_server_blocking() or start_client_blocking() first."
481+
)
482+
})?;
483+
484+
let mut buffer = vec![0u8; self.max_msg_size];
485+
486+
let data = loop {
487+
let mut priority = 0u32;
488+
match mq_receive(fd, &mut buffer, &mut priority) {
489+
Ok(size) => {
490+
buffer.truncate(size);
491+
break buffer;
492+
}
493+
Err(Errno::EAGAIN) => {
494+
std::thread::yield_now();
495+
std::thread::sleep(Duration::from_millis(10));
496+
}
497+
Err(e) => {
498+
return Err(anyhow!("Failed to receive message: {}", e));
499+
}
500+
}
501+
};
502+
503+
let receive_time_ns = crate::ipc::get_monotonic_time_ns();
504+
505+
let message: Message =
506+
bincode::deserialize(&data).context("Failed to deserialize message")?;
507+
508+
trace!("Received message ID {}", message.id);
509+
Ok((message, receive_time_ns))
510+
}
511+
474512
fn close_blocking(&mut self) -> Result<()> {
475513
debug!("Closing blocking POSIX message queue transport");
476514
self.cleanup_queues();

src/ipc/shared_memory_blocking.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,42 @@ impl BlockingTransport for BlockingSharedMemory {
756756
Ok(message)
757757
}
758758

759+
fn receive_blocking_timed(&mut self) -> Result<(Message, u64)> {
760+
self.ensure_peer_ready()?;
761+
762+
let ring_buffer = self.ring_buffer.ok_or_else(|| {
763+
anyhow!(
764+
"Cannot receive: shared memory not initialized. \
765+
Call start_server_blocking() or start_client_blocking() first."
766+
)
767+
})?;
768+
769+
#[cfg(unix)]
770+
let data = unsafe { (*ring_buffer).read_data_blocking()? };
771+
772+
#[cfg(not(unix))]
773+
let data = loop {
774+
match unsafe { (*ring_buffer).read_data() } {
775+
Ok(d) => break d,
776+
Err(_) => {
777+
if unsafe { (*ring_buffer).shutdown.load(Ordering::Acquire) } {
778+
return Err(anyhow!("Connection closed"));
779+
}
780+
thread::yield_now();
781+
thread::sleep(Duration::from_micros(100));
782+
}
783+
}
784+
};
785+
786+
// Capture timestamp after raw read, before deserialization
787+
let receive_time_ns = crate::ipc::get_monotonic_time_ns();
788+
789+
let message: Message =
790+
bincode::deserialize(&data).context("Failed to deserialize message")?;
791+
792+
Ok((message, receive_time_ns))
793+
}
794+
759795
fn close_blocking(&mut self) -> Result<()> {
760796
debug!("Closing blocking shared memory transport");
761797

src/ipc/shared_memory_direct.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,15 @@ impl BlockingTransport for BlockingSharedMemoryDirect {
649649
Ok(message)
650650
}
651651

652+
fn receive_blocking_timed(&mut self) -> Result<(Message, u64)> {
653+
// SHM-direct has no deserialization (direct memcpy), so the
654+
// timestamp is captured immediately after the data read and
655+
// before mutex unlock/signal. This uses the default implementation
656+
// since there's no meaningful deserialization to exclude.
657+
let msg = self.receive_blocking()?;
658+
Ok((msg, crate::ipc::get_monotonic_time_ns()))
659+
}
660+
652661
fn close_blocking(&mut self) -> Result<()> {
653662
debug!("Closing direct memory SHM transport");
654663

0 commit comments

Comments
 (0)