Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 0 additions & 31 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 1 addition & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ rust-version = "1.70"
authors = ["IPC Benchmark Contributors"]
description = "A comprehensive interprocess communication benchmark suite"
license = "Apache-2.0"
repository = "https://github.com/your-org/ipc-benchmark"
documentation = "https://docs.rs/ipc-benchmark"
repository = "https://github.com/redhat-performance/rusty-comms"
readme = "README.md"
keywords = ["ipc", "benchmark", "performance", "concurrency"]
categories = ["development-tools::profiling", "concurrency"]
Expand All @@ -23,18 +22,14 @@ tokio = { version = "1.38.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
clap = { version = ">=4.4.18, <4.5.0", features = ["derive"] }
crossbeam = "0.8"
shared_memory = "0.12"
libc = "0.2"
nix = { version = "0.29", features = ["time"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
# MSRV: uuid 1.21+ requires Rust 1.85+
uuid = { version = ">=1.17, <1.21", features = ["v4"] }
chrono = { version = "0.4", features = ["serde"] }
num_cpus = "1.17"
rand = "0.8"
statistics = "0.4"
hdrhistogram = "7.5"
bincode = "1.3"
async-trait = "0.1"
Expand Down
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ This benchmark suite uses **high-precision monotonic clocks** to measure true IP

#### Clock Source

- **Unix/Linux**: Uses `CLOCK_MONOTONIC` via the nix crate
- **Unix/Linux**: Uses `CLOCK_MONOTONIC` via direct `libc::clock_gettime` call
- **Windows**: Falls back to system time (less precise)
- **Characteristics**: Monotonic clocks measure time from system boot and are unaffected by NTP adjustments, daylight saving time, or manual clock changes

Expand Down Expand Up @@ -481,7 +481,7 @@ ipc-benchmark -m all --continue-on-error

# Run only round-trip tests (one-way and round-trip run
# sequentially by default; use these flags to select one)
ipc-benchmark --round-trip --no-one-way
ipc-benchmark --round-trip

# Custom percentiles for latency analysis
ipc-benchmark --percentiles 50 90 95 99 99.9 99.99
Expand All @@ -501,7 +501,7 @@ ipc-benchmark -m shm --buffer-size 16384
This benchmark runs the server as a separate child process for each test to ensure strong isolation and realistic IPC behavior.

- The parent process spawns the same binary in a special "server-only" mode and waits for a readiness byte via a pipe connected to the child's stdout.
- On Unix, the readiness signal is a single byte `0x01` written to stdout. On Windows, tests use a simple `echo` to emit a single character (e.g., `R`).
- The readiness signal is a single byte `0x01` written to stdout on all platforms.
- The child process is terminated at the end of each test; resources are cleaned up by the transport implementation.

Binary resolution strategy used by the spawner:
Expand Down Expand Up @@ -548,6 +548,8 @@ If you need to analyze the raw performance data, including the first-message spi
```bash
# Include the first message in the final results
ipc-benchmark --include-first-message
```

### Understanding Test Types: Throughput vs. Latency

This benchmark suite can be used to measure two primary aspects of IPC performance: **throughput** and **latency**. The configuration you choose will determine which of these you are primarily testing.
Expand Down
8 changes: 7 additions & 1 deletion docs/archive/blocking-mode-implementation-plan.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
# Implementation Plan: Blocking/Synchronous Mode for IPC Benchmark

> **HISTORICAL DOCUMENT — DO NOT FOLLOW FOR ACTIVE WORK**
>
> This plan is fully implemented (all 9 stages complete). It is preserved
> for reference only. Standalone client/server mode (`--server`/`--client`)
> was added as follow-on work outside this plan's scope.

**Project Goal:** Add synchronous/blocking execution mode alongside existing async mode
**Date:** 2025-10-16
**Target Repository:** rusty-comms
**Status:** PLANNING
**Status:** COMPLETE (archived)

---

Expand Down
33 changes: 15 additions & 18 deletions src/benchmark_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ impl BlockingBenchmarkRunner {
/// - No async/await - all operations block
/// - No Tokio runtime
/// - Uses BlockingTransport instead of Transport
/// - Streaming output not yet implemented (Stage 5)
/// - Streaming output supported via `ResultsManagerBlocking`
///
/// ## Returns
/// - `Ok(BenchmarkResults)`: Complete test results with metrics
Expand Down Expand Up @@ -1309,24 +1309,21 @@ impl BlockingBenchmarkRunner {

let latency = send_time.elapsed();

// Record latency for all measured messages
if true {
// Stream latency if enabled
if let Some(ref mut manager) = results_manager {
let record = crate::results::MessageLatencyRecord::new(
i as u64,
self.mechanism,
self.config.message_size,
crate::metrics::LatencyType::RoundTrip,
latency,
send_timestamp_ns,
);
let _ = manager.stream_latency_record(&record);
}

// Record in metrics collector
metrics_collector.record_message(self.config.message_size, Some(latency))?;
// Stream latency if enabled
if let Some(ref mut manager) = results_manager {
let record = crate::results::MessageLatencyRecord::new(
i as u64,
self.mechanism,
self.config.message_size,
crate::metrics::LatencyType::RoundTrip,
latency,
send_timestamp_ns,
);
let _ = manager.stream_latency_record(&record);
}

// Record in metrics collector
metrics_collector.record_message(self.config.message_size, Some(latency))?;
}
}

Expand Down
7 changes: 4 additions & 3 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,10 +845,11 @@ pub fn parse_duration_micros(s: &str) -> Result<Duration, String> {
return Err("Duration cannot be negative".to_string());
}

// Convert to Duration based on the unit
// Convert to Duration based on the unit, using float-based
// conversion to preserve fractional inputs like "1.5ms"
let duration = match unit {
"us" => Duration::from_micros(num as u64),
"ms" => Duration::from_millis(num as u64),
"us" => Duration::from_secs_f64(num / 1_000_000.0),
"ms" => Duration::from_secs_f64(num / 1_000.0),
"s" => Duration::from_secs_f64(num),
"m" => Duration::from_secs_f64(num * 60.0),
"h" => Duration::from_secs_f64(num * 3600.0),
Expand Down
96 changes: 39 additions & 57 deletions src/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,34 +363,15 @@ impl Message {
}
}

/// Create a new message for blocking mode with monotonic timestamp
/// Create a new message for blocking mode with monotonic timestamp.
///
/// This method creates a message and captures the timestamp using a
/// monotonic clock (CLOCK_MONOTONIC on Linux). The monotonic clock is
/// not affected by NTP adjustments or system time changes.
///
/// ## Parameters
/// - `id`: Unique identifier for the message
/// - `payload`: Message content as byte vector
/// - `message_type`: Type classification for the message
///
/// ## Returns
/// Message with monotonic timestamp captured at creation time
///
/// ## Use Case
///
/// This method should be used by blocking transport implementations
/// to capture timestamps right before serialization, providing accurate
/// IPC latency measurements that exclude serialization overhead from
/// the measurement.
/// Semantic alias for [`Message::new()`]. Both constructors capture
/// a monotonic timestamp at creation time. This variant exists to
/// make call sites self-documenting when used in blocking transport
/// code paths.
#[inline]
pub fn new_for_blocking(id: u64, payload: Vec<u8>, message_type: MessageType) -> Self {
Self {
id,
timestamp: get_monotonic_time_ns(),
payload,
message_type,
receive_time_ns: 0,
}
Self::new(id, payload, message_type)
}

/// Update the message timestamp to current monotonic time
Expand Down Expand Up @@ -1522,18 +1503,26 @@ mod tests {

// ===== Existing Tests =====

/// Test message size analysis
/// Verify serialized message size is reasonable relative to payload
#[test]
fn test_message_size_analysis() {
// Test with 100 bytes payload (typical benchmark size)
let payload = vec![0u8; 100];
let msg = Message::new(1, payload, MessageType::OneWay);

let serialized = bincode::serialize(&msg).unwrap();

println!("\n=== Message Size Analysis ===");
println!("Payload size: 100 bytes");
println!("Bincode serialized size: {} bytes", serialized.len());
// Overhead should be modest: id(8) + timestamp(8) + len_prefix(8)
// + payload(100) + message_type(4) + receive_time(0, skipped)
assert!(
serialized.len() < 150,
"Serialized size {} is unexpectedly large for 100-byte payload",
serialized.len()
);
assert!(
serialized.len() > 100,
"Serialized size {} should be larger than payload alone",
serialized.len()
);
}

/// Test message creation and basic functionality
Expand Down Expand Up @@ -1662,36 +1651,29 @@ mod tests {
t2 - t1
);
}
}

#[test]
fn test_timestamp_offset_update_in_serialized_buffer() {
use super::*;

// Create a message with timestamp 0
let mut msg = Message::new(42, vec![1, 2, 3, 4], MessageType::OneWay);
msg.timestamp = 0;
/// Verify that the timestamp can be updated in-place within a
/// serialized buffer at the expected offset (bytes 8..16).
#[test]
fn test_timestamp_offset_update_in_serialized_buffer() {
let mut msg = Message::new(42, vec![1, 2, 3, 4], MessageType::OneWay);
msg.timestamp = 0;

// Serialize it
let mut serialized = bincode::serialize(&msg).unwrap();
let mut serialized = bincode::serialize(&msg).unwrap();

// Verify timestamp is 0 in bytes 8-15
let extracted_ts = u64::from_le_bytes(serialized[8..16].try_into().unwrap());
assert_eq!(extracted_ts, 0, "Timestamp should be 0 initially");
let extracted_ts = u64::from_le_bytes(serialized[8..16].try_into().unwrap());
assert_eq!(extracted_ts, 0, "Timestamp should be 0 initially");

// Now update the timestamp bytes
let new_ts: u64 = 123456789;
let ts_bytes = new_ts.to_le_bytes();
serialized[8..16].copy_from_slice(&ts_bytes);
let new_ts: u64 = 123_456_789;
serialized[8..16].copy_from_slice(&new_ts.to_le_bytes());

// Verify it was updated
let updated_ts = u64::from_le_bytes(serialized[8..16].try_into().unwrap());
assert_eq!(updated_ts, new_ts, "Timestamp should be updated");
let updated_ts = u64::from_le_bytes(serialized[8..16].try_into().unwrap());
assert_eq!(updated_ts, new_ts, "Timestamp should be updated");

// Deserialize and verify
let deserialized: Message = bincode::deserialize(&serialized).unwrap();
assert_eq!(
deserialized.timestamp, new_ts,
"Deserialized timestamp should match"
);
let deserialized: Message = bincode::deserialize(&serialized).unwrap();
assert_eq!(
deserialized.timestamp, new_ts,
"Deserialized timestamp should match"
);
}
}
4 changes: 3 additions & 1 deletion src/ipc/posix_message_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1116,7 +1116,9 @@ mod tests {
Ok(bp_detected) => {
if bp_detected {
// This is expected for the first few full-queue sends.
println!("Regular backpressure detected, continuing to force a timeout.");
tracing::trace!(
"Regular backpressure detected, continuing to force a timeout."
);
}
}
Err(e) => {
Expand Down
4 changes: 3 additions & 1 deletion src/ipc/shared_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,9 @@ mod tests {
// The first few sends might succeed without backpressure.
// Some might even succeed with backpressure if the timing is just right.
if backpressure_detected {
println!("Regular backpressure detected, continuing to force a timeout.");
tracing::trace!(
"Regular backpressure detected, continuing to force a timeout."
);
}
}
Err(e) => {
Expand Down
6 changes: 3 additions & 3 deletions src/ipc/shared_memory_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,11 +423,11 @@ impl SharedMemoryRingBuffer {
}
let data_len = u32::from_le_bytes(len_bytes) as usize;

// Validate data length
if data_len > capacity {
// Validate data length (reject zero or oversized frames)
if data_len == 0 || data_len > capacity {
libc::pthread_mutex_unlock(&self.mutex as *const _ as *mut _);
return Err(anyhow!(
"Invalid data length: {} exceeds capacity {}",
"Invalid data length: {} (capacity: {}, min: 1)",
Comment on lines +426 to +430

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick win

Validate against published bytes, not just capacity.

This still trusts any corrupted data_len that is <= capacity. If data_len + 4 exceeds the bytes currently published in the ring, the copy at Lines 448-461 will read stale data and then advance read_pos past write_pos, permanently desynchronizing the buffer. read_data() already has the missing incomplete-frame check; read_data_blocking() needs the same guard before copying.

Patch
         if data_len == 0 || data_len > capacity {
             libc::pthread_mutex_unlock(&self.mutex as *const _ as *mut _);
             return Err(anyhow!(
                 "Invalid data length: {} (capacity: {}, min: 1)",
                 data_len,
                 capacity
             ));
         }
+        if self.available_read_data() < data_len + 4 {
+            libc::pthread_mutex_unlock(&self.mutex as *const _ as *mut _);
+            return Err(anyhow!(
+                "Incomplete message: length {} exceeds published bytes",
+                data_len
+            ));
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Validate data length (reject zero or oversized frames)
if data_len == 0 || data_len > capacity {
libc::pthread_mutex_unlock(&self.mutex as *const _ as *mut _);
return Err(anyhow!(
"Invalid data length: {} exceeds capacity {}",
"Invalid data length: {} (capacity: {}, min: 1)",
// Validate data length (reject zero or oversized frames)
if data_len == 0 || data_len > capacity {
libc::pthread_mutex_unlock(&self.mutex as *const _ as *mut _);
return Err(anyhow!(
"Invalid data length: {} (capacity: {}, min: 1)",
data_len,
capacity
));
}
if self.available_read_data() < data_len + 4 {
libc::pthread_mutex_unlock(&self.mutex as *const _ as *mut _);
return Err(anyhow!(
"Incomplete message: length {} exceeds published bytes",
data_len
));
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/ipc/shared_memory_blocking.rs` around lines 426 - 430, The read path in
read_data_blocking() only checks data_len against capacity, but it must also
verify the full frame is currently published before copying. Add the same
incomplete-frame guard used in read_data() in shared_memory_blocking.rs, right
before the memcpy/copy loop, so data_len + header size does not exceed the
available published bytes and read_pos is not advanced past write_pos on
corrupted frames.

data_len,
capacity
));
Expand Down
Loading
Loading