diff --git a/Cargo.lock b/Cargo.lock index d9e65402..1d22677f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -361,19 +361,6 @@ dependencies = [ "itertools", ] -[[package]] -name = "crossbeam" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" -dependencies = [ - "crossbeam-channel", - "crossbeam-deque", - "crossbeam-epoch", - "crossbeam-queue", - "crossbeam-utils", -] - [[package]] name = "crossbeam-channel" version = "0.5.15" @@ -402,15 +389,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "crossbeam-queue" -version = "0.3.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -591,7 +569,6 @@ dependencies = [ "colored", "core_affinity", "criterion", - "crossbeam", "ctrlc", "half", "hdrhistogram", @@ -604,14 +581,12 @@ dependencies = [ "parking_lot", "parking_lot_core", "quote", - "rand", "rayon", "rayon-core", "serde", "serde_json", "shared_memory", "socket2 0.5.10", - "statistics", "syn", "tempfile", "thiserror", @@ -1203,12 +1178,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "statistics" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f4628c807954468b720f27d8863a5447d328378cbb874d633f6c4e952073acf" - [[package]] name = "strsim" version = "0.10.0" diff --git a/Cargo.toml b/Cargo.toml index 69018a43..bf1eeabb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,8 +7,7 @@ rust-version = "1.70" authors = ["IPC Benchmark Contributors"] description = "A comprehensive interprocess communication benchmark suite" license = "Apache-2.0" -repository = "https://github.com/your-org/ipc-benchmark" -documentation = "https://docs.rs/ipc-benchmark" +repository = "https://github.com/redhat-performance/rusty-comms" readme = "README.md" keywords = ["ipc", "benchmark", "performance", "concurrency"] categories = ["development-tools::profiling", "concurrency"] @@ -23,18 +22,14 @@ tokio = { version = "1.38.0", features = ["full"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" clap = { version = ">=4.4.18, <4.5.0", features = ["derive"] } -crossbeam = "0.8" shared_memory = "0.12" libc = "0.2" -nix = { version = "0.29", features = ["time"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } # MSRV: uuid 1.21+ requires Rust 1.85+ uuid = { version = ">=1.17, <1.21", features = ["v4"] } chrono = { version = "0.4", features = ["serde"] } num_cpus = "1.17" -rand = "0.8" -statistics = "0.4" hdrhistogram = "7.5" bincode = "1.3" async-trait = "0.1" diff --git a/README.md b/README.md index 047c07a2..9fca51f8 100644 --- a/README.md +++ b/README.md @@ -170,7 +170,7 @@ This benchmark suite uses **high-precision monotonic clocks** to measure true IP #### Clock Source -- **Unix/Linux**: Uses `CLOCK_MONOTONIC` via the nix crate +- **Unix/Linux**: Uses `CLOCK_MONOTONIC` via direct `libc::clock_gettime` call - **Windows**: Falls back to system time (less precise) - **Characteristics**: Monotonic clocks measure time from system boot and are unaffected by NTP adjustments, daylight saving time, or manual clock changes @@ -481,7 +481,7 @@ ipc-benchmark -m all --continue-on-error # Run only round-trip tests (one-way and round-trip run # sequentially by default; use these flags to select one) -ipc-benchmark --round-trip --no-one-way +ipc-benchmark --round-trip # Custom percentiles for latency analysis ipc-benchmark --percentiles 50 90 95 99 99.9 99.99 @@ -501,7 +501,7 @@ ipc-benchmark -m shm --buffer-size 16384 This benchmark runs the server as a separate child process for each test to ensure strong isolation and realistic IPC behavior. - The parent process spawns the same binary in a special "server-only" mode and waits for a readiness byte via a pipe connected to the child's stdout. -- On Unix, the readiness signal is a single byte `0x01` written to stdout. On Windows, tests use a simple `echo` to emit a single character (e.g., `R`). +- The readiness signal is a single byte `0x01` written to stdout on all platforms. - The child process is terminated at the end of each test; resources are cleaned up by the transport implementation. Binary resolution strategy used by the spawner: @@ -548,6 +548,8 @@ If you need to analyze the raw performance data, including the first-message spi ```bash # Include the first message in the final results ipc-benchmark --include-first-message +``` + ### Understanding Test Types: Throughput vs. Latency This benchmark suite can be used to measure two primary aspects of IPC performance: **throughput** and **latency**. The configuration you choose will determine which of these you are primarily testing. diff --git a/docs/archive/blocking-mode-implementation-plan.md b/docs/archive/blocking-mode-implementation-plan.md index 53aa240b..4f164df0 100644 --- a/docs/archive/blocking-mode-implementation-plan.md +++ b/docs/archive/blocking-mode-implementation-plan.md @@ -1,9 +1,15 @@ # Implementation Plan: Blocking/Synchronous Mode for IPC Benchmark +> **HISTORICAL DOCUMENT — DO NOT FOLLOW FOR ACTIVE WORK** +> +> This plan is fully implemented (all 9 stages complete). It is preserved +> for reference only. Standalone client/server mode (`--server`/`--client`) +> was added as follow-on work outside this plan's scope. + **Project Goal:** Add synchronous/blocking execution mode alongside existing async mode **Date:** 2025-10-16 **Target Repository:** rusty-comms -**Status:** PLANNING +**Status:** COMPLETE (archived) --- diff --git a/src/benchmark_blocking.rs b/src/benchmark_blocking.rs index 8ecfbf4f..18a22068 100644 --- a/src/benchmark_blocking.rs +++ b/src/benchmark_blocking.rs @@ -716,7 +716,7 @@ impl BlockingBenchmarkRunner { /// - No async/await - all operations block /// - No Tokio runtime /// - Uses BlockingTransport instead of Transport - /// - Streaming output not yet implemented (Stage 5) + /// - Streaming output supported via `ResultsManagerBlocking` /// /// ## Returns /// - `Ok(BenchmarkResults)`: Complete test results with metrics @@ -1309,24 +1309,21 @@ impl BlockingBenchmarkRunner { let latency = send_time.elapsed(); - // Record latency for all measured messages - if true { - // Stream latency if enabled - if let Some(ref mut manager) = results_manager { - let record = crate::results::MessageLatencyRecord::new( - i as u64, - self.mechanism, - self.config.message_size, - crate::metrics::LatencyType::RoundTrip, - latency, - send_timestamp_ns, - ); - let _ = manager.stream_latency_record(&record); - } - - // Record in metrics collector - metrics_collector.record_message(self.config.message_size, Some(latency))?; + // Stream latency if enabled + if let Some(ref mut manager) = results_manager { + let record = crate::results::MessageLatencyRecord::new( + i as u64, + self.mechanism, + self.config.message_size, + crate::metrics::LatencyType::RoundTrip, + latency, + send_timestamp_ns, + ); + let _ = manager.stream_latency_record(&record); } + + // Record in metrics collector + metrics_collector.record_message(self.config.message_size, Some(latency))?; } } diff --git a/src/cli.rs b/src/cli.rs index 1a05d58f..74a428df 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -845,10 +845,11 @@ pub fn parse_duration_micros(s: &str) -> Result { return Err("Duration cannot be negative".to_string()); } - // Convert to Duration based on the unit + // Convert to Duration based on the unit, using float-based + // conversion to preserve fractional inputs like "1.5ms" let duration = match unit { - "us" => Duration::from_micros(num as u64), - "ms" => Duration::from_millis(num as u64), + "us" => Duration::from_secs_f64(num / 1_000_000.0), + "ms" => Duration::from_secs_f64(num / 1_000.0), "s" => Duration::from_secs_f64(num), "m" => Duration::from_secs_f64(num * 60.0), "h" => Duration::from_secs_f64(num * 3600.0), diff --git a/src/ipc/mod.rs b/src/ipc/mod.rs index d1127a88..deaa68c5 100644 --- a/src/ipc/mod.rs +++ b/src/ipc/mod.rs @@ -363,34 +363,15 @@ impl Message { } } - /// Create a new message for blocking mode with monotonic timestamp + /// Create a new message for blocking mode with monotonic timestamp. /// - /// This method creates a message and captures the timestamp using a - /// monotonic clock (CLOCK_MONOTONIC on Linux). The monotonic clock is - /// not affected by NTP adjustments or system time changes. - /// - /// ## Parameters - /// - `id`: Unique identifier for the message - /// - `payload`: Message content as byte vector - /// - `message_type`: Type classification for the message - /// - /// ## Returns - /// Message with monotonic timestamp captured at creation time - /// - /// ## Use Case - /// - /// This method should be used by blocking transport implementations - /// to capture timestamps right before serialization, providing accurate - /// IPC latency measurements that exclude serialization overhead from - /// the measurement. + /// Semantic alias for [`Message::new()`]. Both constructors capture + /// a monotonic timestamp at creation time. This variant exists to + /// make call sites self-documenting when used in blocking transport + /// code paths. + #[inline] pub fn new_for_blocking(id: u64, payload: Vec, message_type: MessageType) -> Self { - Self { - id, - timestamp: get_monotonic_time_ns(), - payload, - message_type, - receive_time_ns: 0, - } + Self::new(id, payload, message_type) } /// Update the message timestamp to current monotonic time @@ -1522,18 +1503,26 @@ mod tests { // ===== Existing Tests ===== - /// Test message size analysis + /// Verify serialized message size is reasonable relative to payload #[test] fn test_message_size_analysis() { - // Test with 100 bytes payload (typical benchmark size) let payload = vec![0u8; 100]; let msg = Message::new(1, payload, MessageType::OneWay); let serialized = bincode::serialize(&msg).unwrap(); - println!("\n=== Message Size Analysis ==="); - println!("Payload size: 100 bytes"); - println!("Bincode serialized size: {} bytes", serialized.len()); + // Overhead should be modest: id(8) + timestamp(8) + len_prefix(8) + // + payload(100) + message_type(4) + receive_time(0, skipped) + assert!( + serialized.len() < 150, + "Serialized size {} is unexpectedly large for 100-byte payload", + serialized.len() + ); + assert!( + serialized.len() > 100, + "Serialized size {} should be larger than payload alone", + serialized.len() + ); } /// Test message creation and basic functionality @@ -1662,36 +1651,29 @@ mod tests { t2 - t1 ); } -} - -#[test] -fn test_timestamp_offset_update_in_serialized_buffer() { - use super::*; - // Create a message with timestamp 0 - let mut msg = Message::new(42, vec![1, 2, 3, 4], MessageType::OneWay); - msg.timestamp = 0; + /// Verify that the timestamp can be updated in-place within a + /// serialized buffer at the expected offset (bytes 8..16). + #[test] + fn test_timestamp_offset_update_in_serialized_buffer() { + let mut msg = Message::new(42, vec![1, 2, 3, 4], MessageType::OneWay); + msg.timestamp = 0; - // Serialize it - let mut serialized = bincode::serialize(&msg).unwrap(); + let mut serialized = bincode::serialize(&msg).unwrap(); - // Verify timestamp is 0 in bytes 8-15 - let extracted_ts = u64::from_le_bytes(serialized[8..16].try_into().unwrap()); - assert_eq!(extracted_ts, 0, "Timestamp should be 0 initially"); + let extracted_ts = u64::from_le_bytes(serialized[8..16].try_into().unwrap()); + assert_eq!(extracted_ts, 0, "Timestamp should be 0 initially"); - // Now update the timestamp bytes - let new_ts: u64 = 123456789; - let ts_bytes = new_ts.to_le_bytes(); - serialized[8..16].copy_from_slice(&ts_bytes); + let new_ts: u64 = 123_456_789; + serialized[8..16].copy_from_slice(&new_ts.to_le_bytes()); - // Verify it was updated - let updated_ts = u64::from_le_bytes(serialized[8..16].try_into().unwrap()); - assert_eq!(updated_ts, new_ts, "Timestamp should be updated"); + let updated_ts = u64::from_le_bytes(serialized[8..16].try_into().unwrap()); + assert_eq!(updated_ts, new_ts, "Timestamp should be updated"); - // Deserialize and verify - let deserialized: Message = bincode::deserialize(&serialized).unwrap(); - assert_eq!( - deserialized.timestamp, new_ts, - "Deserialized timestamp should match" - ); + let deserialized: Message = bincode::deserialize(&serialized).unwrap(); + assert_eq!( + deserialized.timestamp, new_ts, + "Deserialized timestamp should match" + ); + } } diff --git a/src/ipc/posix_message_queue.rs b/src/ipc/posix_message_queue.rs index 6a3d56f1..b6db8f99 100644 --- a/src/ipc/posix_message_queue.rs +++ b/src/ipc/posix_message_queue.rs @@ -1116,7 +1116,9 @@ mod tests { Ok(bp_detected) => { if bp_detected { // This is expected for the first few full-queue sends. - println!("Regular backpressure detected, continuing to force a timeout."); + tracing::trace!( + "Regular backpressure detected, continuing to force a timeout." + ); } } Err(e) => { diff --git a/src/ipc/shared_memory.rs b/src/ipc/shared_memory.rs index 52249a8e..4ba2cd17 100644 --- a/src/ipc/shared_memory.rs +++ b/src/ipc/shared_memory.rs @@ -911,7 +911,9 @@ mod tests { // The first few sends might succeed without backpressure. // Some might even succeed with backpressure if the timing is just right. if backpressure_detected { - println!("Regular backpressure detected, continuing to force a timeout."); + tracing::trace!( + "Regular backpressure detected, continuing to force a timeout." + ); } } Err(e) => { diff --git a/src/ipc/shared_memory_blocking.rs b/src/ipc/shared_memory_blocking.rs index 2d717bff..c92b819b 100644 --- a/src/ipc/shared_memory_blocking.rs +++ b/src/ipc/shared_memory_blocking.rs @@ -423,11 +423,11 @@ impl SharedMemoryRingBuffer { } let data_len = u32::from_le_bytes(len_bytes) as usize; - // Validate data length - if data_len > capacity { + // Validate data length (reject zero or oversized frames) + if data_len == 0 || data_len > capacity { libc::pthread_mutex_unlock(&self.mutex as *const _ as *mut _); return Err(anyhow!( - "Invalid data length: {} exceeds capacity {}", + "Invalid data length: {} (capacity: {}, min: 1)", data_len, capacity )); diff --git a/src/ipc/shared_memory_direct.rs b/src/ipc/shared_memory_direct.rs index 3cc66aa4..a23e34ef 100644 --- a/src/ipc/shared_memory_direct.rs +++ b/src/ipc/shared_memory_direct.rs @@ -53,7 +53,7 @@ const MAX_PAYLOAD_SIZE: usize = 8192; // 8 KB /// Raw message structure stored directly in shared memory. /// -/// This struct is designed for minimal overhead IPC. It uses `#[repr(C, packed)]` +/// This struct is designed for minimal overhead IPC. It uses `#[repr(C)]` /// to ensure predictable memory layout across process boundaries. /// /// # Memory Layout @@ -120,9 +120,9 @@ struct RawSharedMessage { /// Fixed-size payload buffer. /// - /// Maximum 1MB. Only the first `payload_len` bytes are valid. - /// If the source payload is smaller, only those bytes are copied. - /// If larger, it's truncated to MAX_PAYLOAD_SIZE. + /// Maximum 8 KB (MAX_PAYLOAD_SIZE). Only the first `payload_len` + /// bytes are valid. If the source payload is smaller, only those + /// bytes are copied. If larger, it's truncated to MAX_PAYLOAD_SIZE. payload: [u8; MAX_PAYLOAD_SIZE], /// Message type (converted from MessageType enum). @@ -674,14 +674,22 @@ impl BlockingTransport for BlockingSharedMemoryDirect { let message_type = >::from(message_type_u32); let payload_len = (*ptr).payload_len; - // PERF: Allocate payload without zero-filling. The original - // code used vec![0u8; payload_len] which calls memset to zero - // every byte, then immediately overwrites them all with - // copy_nonoverlapping. Vec::with_capacity allocates the same - // memory but skips the redundant zeroing. set_len() tells Rust - // the buffer is valid after the copy. This eliminates one - // memset per received message, which is significant for large - // payloads and reduces tail-latency spikes from page faults. + // Validate payload_len to prevent OOB reads from corrupted or + // malicious shared memory data. + if payload_len > MAX_PAYLOAD_SIZE { + (*ptr).ready = 0; + libc::pthread_cond_signal(&mut (*ptr).cond); + libc::pthread_mutex_unlock(&mut (*ptr).mutex); + return Err(anyhow!( + "Corrupt payload_len {} exceeds MAX_PAYLOAD_SIZE {}", + payload_len, + MAX_PAYLOAD_SIZE + )); + } + + // PERF: Allocate payload without zero-filling. Vec::with_capacity + // skips the redundant zeroing that vec![0u8; N] would do, since + // copy_nonoverlapping immediately overwrites the buffer. let mut payload = Vec::with_capacity(payload_len); std::ptr::copy_nonoverlapping( (*ptr).payload.as_ptr(), @@ -724,12 +732,13 @@ impl BlockingTransport for BlockingSharedMemoryDirect { } fn receive_blocking_timed(&mut self) -> Result<(Message, u64)> { - // SHM-direct has no deserialization (direct memcpy), so the - // timestamp is captured immediately after the data read and - // before mutex unlock/signal. This uses the default implementation - // since there's no meaningful deserialization to exclude. + // SHM-direct captures receive_time_ns inside the mutex, immediately + // after the condvar wake-up and before unlock. Use that in-message + // timestamp for accurate latency measurement rather than capturing + // a new one here (which would include unlock + return overhead). let msg = self.receive_blocking()?; - Ok((msg, crate::ipc::get_monotonic_time_ns())) + let ts = msg.receive_time_ns; + Ok((msg, ts)) } fn close_blocking(&mut self) -> Result<()> { diff --git a/src/ipc/tcp_socket.rs b/src/ipc/tcp_socket.rs index 65bed105..45e76cce 100644 --- a/src/ipc/tcp_socket.rs +++ b/src/ipc/tcp_socket.rs @@ -413,25 +413,33 @@ impl IpcTransport for TcpSocketTransport { ); // Configure socket options for low latency. - // Failure here means the socket would run - // without TCP_NODELAY, which skews benchmark - // results, so we drop the connection instead. - if let Ok(std_stream) = stream.into_std() { - let socket = - socket2::Socket::from(std_stream.try_clone().unwrap_or_else(|e| { - panic!( - "Failed to clone TCP stream for \ - socket tuning on connection {}: {}", - connection_id, e - ); - })); - socket.set_nodelay(true).unwrap_or_else(|e| { - warn!("set_nodelay failed on connection {}: {}", connection_id, e); - }); - let _ = socket.set_recv_buffer_size(buffer_size); - let _ = socket.set_send_buffer_size(buffer_size); - - if let Ok(tokio_stream) = TcpStream::from_std(std_stream) { + // On failure we skip this connection rather than + // panicking, since the server should remain stable. + let std_stream = match stream.into_std() { + Ok(s) => s, + Err(e) => { + warn!("into_std() failed on connection {}: {}", connection_id, e); + continue; + } + }; + + let cloned = match std_stream.try_clone() { + Ok(c) => c, + Err(e) => { + warn!("try_clone() failed on connection {}: {}", connection_id, e); + continue; + } + }; + + let socket = socket2::Socket::from(cloned); + if let Err(e) = socket.set_nodelay(true) { + warn!("set_nodelay failed on connection {}: {}", connection_id, e); + } + let _ = socket.set_recv_buffer_size(buffer_size); + let _ = socket.set_send_buffer_size(buffer_size); + + match TcpStream::from_std(std_stream) { + Ok(tokio_stream) => { let handler_sender = message_sender.clone(); let handler_connections = connections.clone(); @@ -442,6 +450,9 @@ impl IpcTransport for TcpSocketTransport { handler_connections, )); } + Err(e) => { + warn!("from_std() failed on connection {}: {}", connection_id, e); + } } } Err(e) => { @@ -588,7 +599,9 @@ mod tests { match client.send(&message).await { Ok(backpressure_detected) => { if backpressure_detected { - println!("Regular backpressure detected, continuing to force a timeout."); + tracing::trace!( + "Regular backpressure detected, continuing to force a timeout." + ); } } Err(e) => { diff --git a/src/ipc/unix_domain_socket.rs b/src/ipc/unix_domain_socket.rs index 707d5f4f..7d8041fb 100644 --- a/src/ipc/unix_domain_socket.rs +++ b/src/ipc/unix_domain_socket.rs @@ -578,7 +578,9 @@ mod tests { match client.send(&message).await { Ok(backpressure_detected) => { if backpressure_detected { - println!("Regular backpressure detected, continuing to force a timeout."); + tracing::trace!( + "Regular backpressure detected, continuing to force a timeout." + ); } } Err(e) => { diff --git a/src/ipc/unix_domain_socket_blocking.rs b/src/ipc/unix_domain_socket_blocking.rs index d8bdce50..b2aab19b 100644 --- a/src/ipc/unix_domain_socket_blocking.rs +++ b/src/ipc/unix_domain_socket_blocking.rs @@ -280,21 +280,40 @@ impl BlockingTransport for BlockingUnixDomainSocket { let total_len = 4 + serialized.len(); let mut written = 0usize; - // writev may not write everything in one call, so loop until complete + // writev may not write everything in one call, so loop until + // complete. On partial writes we fall back to write_all on the + // combined [len_prefix || serialized] buffer at the correct offset. while written < total_len { let result = unsafe { libc::writev(fd, iov.as_ptr(), 2) }; if result < 0 { return Err(std::io::Error::last_os_error()) .context("Failed to write message via writev"); } + if result == 0 { + return Err(anyhow::anyhow!( + "writev returned 0 bytes written (peer closed?)" + )); + } written += result as usize; if written < total_len { - // Partial write - fall back to regular write for remainder - // This is rare for small messages on UDS - let remaining = &serialized[written.saturating_sub(4)..]; - stream - .write_all(remaining) - .context("Failed to write remaining data")?; + // Partial write — fall back to write_all for the + // unsent portion. Build the remaining slice correctly + // by accounting for which part of [len_bytes || + // serialized] was already written. + if written < 4 { + // Still within the length prefix + stream + .write_all(&len_bytes[written..]) + .context("Failed to write remaining length prefix")?; + stream + .write_all(&serialized) + .context("Failed to write serialized data")?; + } else { + // Length prefix fully sent; write remaining payload + stream + .write_all(&serialized[written - 4..]) + .context("Failed to write remaining data")?; + } break; } } diff --git a/src/lib.rs b/src/lib.rs index 7961a233..7ff85147 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,9 +66,9 @@ //! //! The library is designed for high-performance benchmarking with: //! -//! - **Zero-copy serialization** where possible using bincode +//! - **Compact binary serialization** via bincode (zero-copy direct memcpy for SHM-direct) //! - **HDR histograms** for accurate latency measurement without coordination omission -//! - **Async I/O** throughout using Tokio for scalable concurrent operations +//! - **Dual execution modes**: async I/O (Tokio) and blocking I/O (std) for comparison //! - **Configurable buffer sizes** and queue depths for optimal performance tuning //! - **Comprehensive metrics** including percentiles, throughput, and error rates @@ -251,8 +251,8 @@ pub mod defaults { /// Default warmup iterations /// /// 1,000 warmup iterations help stabilize performance by: - /// - Allowing JIT compilation to optimize hot paths - /// - Filling CPU caches with relevant data + /// - Filling CPU caches and TLBs with relevant data + /// - Warming OS page caches and kernel buffers /// - Establishing network connections and OS buffers /// - Reducing measurement variance from cold-start effects pub const WARMUP_ITERATIONS: usize = 1000; diff --git a/src/logging.rs b/src/logging.rs index 8ced30eb..0bd70596 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -1,38 +1,13 @@ //! # Logging Configuration Module //! -//! This module provides centralized logging setup and management for the -//! benchmark suite. It configures the `tracing` framework to provide -//! flexible, structured, and performant logging to both the console and -//! optional log files. +//! This module provides a custom colorized log formatter for the +//! benchmark suite's console output. It is used by `main.rs` when +//! constructing the `tracing_subscriber` layer stack. //! -//! ## Key Features +//! ## Exported Types //! -//! - **Dual Output**: Supports simultaneous logging to both the console -//! (stdout) and a dedicated log file. -//! - **Level Control**: Allows independent configuration of log levels for -//! console and file outputs. -//! - **Dynamic Filtering**: Uses `tracing_subscriber` to allow log levels -//! to be set via environment variables (e.g., `RUST_LOG`). -//! - **Human-Readable Format**: Configures a clean, readable format for -//! console output to improve developer experience. -//! -//! ## Usage -//! -//! The primary function, `init_logging`, should be called once at the -//! beginning of the application's `main` function to set up the global -//! logger. -//! -//! ```rust,ignore -//! // In main.rs -//! use ipc_benchmark::logging; -//! -//! fn main() -> anyhow::Result<()> { -//! let log_file = Some("benchmark.log".to_string()); -//! logging::init_logging(log_level, &log_file)?; -//! // ... rest of the application -//! Ok(()) -//! } -//! ``` +//! - [`ColorizedFormatter`]: A `FormatEvent` implementation that renders +//! log events with ANSI colors and a compact single-line layout. use colored::*; use std::cell::RefCell; diff --git a/src/main.rs b/src/main.rs index 9c1b630f..29773e15 100644 --- a/src/main.rs +++ b/src/main.rs @@ -118,8 +118,6 @@ fn main() -> Result<()> { /// * `Err(anyhow::Error)` - Benchmark failed with error #[tokio::main] async fn run_async_mode(args: Args) -> Result<()> { - // === ALL EXISTING MAIN() LOGIC STARTS HERE === - // Configure logging level based on verbosity flags. // This level applies to both the log file and stdout. // - default: INFO @@ -343,7 +341,7 @@ async fn run_async_mode(args: Args) -> Result<()> { /// ## Key Differences from Async Mode /// /// - Uses `BlockingBenchmarkRunner` instead of `BenchmarkRunner` -/// - No streaming output support (will be added in Stage 5) +/// - Streaming output supported via `ResultsManagerBlocking` /// - All operations block the calling thread /// - Uses standard library I/O instead of Tokio /// @@ -594,7 +592,7 @@ fn run_blocking_benchmark_for_mechanism( // Pass results_manager for streaming latency records let results = runner .run(Some(results_manager)) - .context(format!("Benchmark failed for {}", mechanism))?; + .with_context(|| format!("Benchmark failed for {}", mechanism))?; Ok(results) } diff --git a/src/results.rs b/src/results.rs index edf0096e..6a6a9d5d 100644 --- a/src/results.rs +++ b/src/results.rs @@ -427,7 +427,7 @@ pub struct SystemInfo { /// Available system memory in gigabytes pub memory_gb: f64, - /// Rust compiler version used to build the benchmark + /// Minimum Supported Rust Version (MSRV) from Cargo.toml pub rust_version: String, /// Benchmark suite version @@ -1242,30 +1242,34 @@ impl ResultsManager { /// ## Returns /// Estimated available memory in gigabytes /// - /// ## Implementation Note - /// - /// This is a placeholder implementation that returns reasonable - /// defaults. A production implementation would use platform-specific - /// system information APIs to get accurate memory information. + /// Reads total physical memory from `/proc/meminfo` on Linux, + /// falling back to 0.0 on other platforms or read failure. fn get_memory_gb() -> f64 { - // This is a simplified implementation - // In a real implementation, you'd want to use a system info crate - 16.0 // Default assumption + #[cfg(target_os = "linux")] + { + if let Ok(contents) = std::fs::read_to_string("/proc/meminfo") { + for line in contents.lines() { + if let Some(rest) = line.strip_prefix("MemTotal:") { + let kb_str = rest.trim().trim_end_matches(" kB").trim(); + if let Ok(kb) = kb_str.parse::() { + return kb as f64 / 1_048_576.0; + } + } + } + } + 0.0 + } + #[cfg(not(target_os = "linux"))] + { + 0.0 + } } - /// Get Rust version - /// - /// Retrieves the Rust compiler version used to build the benchmark. - /// This information is important for correlating performance with - /// compiler optimizations and language features. - /// - /// ## Returns - /// Rust version string from build-time metadata - /// - /// ## Build-time Detection + /// Get the Minimum Supported Rust Version (MSRV) from Cargo.toml. /// - /// The version is captured at build time from Cargo metadata, - /// ensuring it reflects the actual compiler used for the benchmark. + /// Returns the `rust-version` field from Cargo.toml, captured at + /// build time. This is the MSRV, not necessarily the compiler + /// version used to build this binary. fn get_rust_version() -> String { env!("CARGO_PKG_RUST_VERSION").to_string() } @@ -1882,8 +1886,8 @@ impl Default for SystemInfo { os: std::env::consts::OS.to_string(), architecture: std::env::consts::ARCH.to_string(), cpu_cores: num_cpus::get(), - memory_gb: 16.0, - rust_version: "1.75.0".to_string(), + memory_gb: ResultsManager::get_memory_gb(), + rust_version: env!("CARGO_PKG_RUST_VERSION").to_string(), benchmark_version: crate::VERSION.to_string(), } } diff --git a/src/results_blocking.rs b/src/results_blocking.rs index 1aaa2990..cd30dc76 100644 --- a/src/results_blocking.rs +++ b/src/results_blocking.rs @@ -1114,30 +1114,34 @@ impl BlockingResultsManager { /// ## Returns /// Estimated available memory in gigabytes /// - /// ## Implementation Note - /// - /// This is a placeholder implementation that returns reasonable - /// defaults. A production implementation would use platform-specific - /// system information APIs to get accurate memory information. + /// Reads total physical memory from `/proc/meminfo` on Linux, + /// falling back to 0.0 on other platforms or read failure. fn get_memory_gb() -> f64 { - // This is a simplified implementation - // In a real implementation, you'd want to use a system info crate - 16.0 // Default assumption + #[cfg(target_os = "linux")] + { + if let Ok(contents) = std::fs::read_to_string("/proc/meminfo") { + for line in contents.lines() { + if let Some(rest) = line.strip_prefix("MemTotal:") { + let kb_str = rest.trim().trim_end_matches(" kB").trim(); + if let Ok(kb) = kb_str.parse::() { + return kb as f64 / 1_048_576.0; + } + } + } + } + 0.0 + } + #[cfg(not(target_os = "linux"))] + { + 0.0 + } } - /// Get Rust version - /// - /// Retrieves the Rust compiler version used to build the benchmark. - /// This information is important for correlating performance with - /// compiler optimizations and language features. - /// - /// ## Returns - /// Rust version string from build-time metadata - /// - /// ## Build-time Detection + /// Get the Minimum Supported Rust Version (MSRV) from Cargo.toml. /// - /// The version is captured at build time from Cargo metadata, - /// ensuring it reflects the actual compiler used for the benchmark. + /// Returns the `rust-version` field from Cargo.toml, captured at + /// build time. This is the MSRV, not necessarily the compiler + /// version used to build this binary. fn get_rust_version() -> String { env!("CARGO_PKG_RUST_VERSION").to_string() } diff --git a/src/utils.rs b/src/utils.rs index 252d3550..33b70a62 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -22,7 +22,6 @@ //! - **Extensibility**: Easy to add new formatters and validators use std::path::PathBuf; -use std::time::{SystemTime, UNIX_EPOCH}; /// Returns the system-appropriate temporary directory for IPC files. /// @@ -169,39 +168,6 @@ where .map_err(|e| anyhow::anyhow!("Thread communication error: {}", e))? } -/// Get current timestamp as nanoseconds since Unix epoch -/// -/// Provides high-precision timing information for performance measurement -/// and result correlation. The nanosecond precision enables accurate timing -/// even for very fast operations. -/// -/// ## Returns -/// Number of nanoseconds since January 1, 1970, 00:00:00 UTC -/// -/// ## Precision -/// -/// The returned value has nanosecond precision on systems that support it, -/// though the actual resolution depends on system capabilities. Most modern -/// systems provide microsecond or better resolution. -/// -/// ## Error Handling -/// -/// If the system time is before the Unix epoch (very rare), returns 0 -/// to provide a safe fallback rather than panicking. -/// -/// ## Usage -/// -/// - Timestamping measurement data -/// - Calculating time differences -/// - Correlating events across different system components -/// - High-precision performance measurement -pub fn current_timestamp_ns() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_nanos() as u64 -} - /// Sets the CPU affinity for the current thread to the specified core. /// /// This function takes a core ID as input and attempts to pin the current diff --git a/utils/dashboard/README.md b/utils/dashboard/README.md index 26ea3aac..f21e7b57 100644 --- a/utils/dashboard/README.md +++ b/utils/dashboard/README.md @@ -30,17 +30,21 @@ To ensure full dashboard functionality, run ipc-benchmark with both output param ```bash # Minimum command for dashboard compatibility -./ipc-benchmark --mechanism SharedMemory --message-size 1024 \ - -o ./dashboard_data/ \ - --streaming-output-json \ - --duration 30s +mkdir -p ./dashboard_data +./target/release/ipc-benchmark -m shm -s 1024 -d 30s \ + -o ./dashboard_data/shm_1024_summary.json \ + --streaming-output-json ./dashboard_data/shm_1024_streaming.json ``` +> **Note:** `-o` is a file path (not a directory). You must name each output +> file explicitly. The dashboard discovers files by JSON structure, not by +> naming convention. + #### **Expected Output Structure** ``` dashboard_data/ -├── sharedmemory_1024_summary.json # Summary statistics -└── sharedmemory_1024_streaming.json # Per-message latency data +├── shm_1024_summary.json # Final results (user-chosen name) +└── shm_1024_streaming.json # Per-message latency data (user-chosen name) ``` ### **Troubleshooting Data Issues**