Skip to content

Commit 824265a

Browse files
authored
feat(ipc): Implement Backpressure Detection with Unit Tests (#68)
* feat(ipc): Implement backpressure detection and warnings This commit introduces a backpressure detection mechanism across all IPC transport types to provide better diagnostics during long-running or high-throughput benchmarks. When a transport's underlying buffer becomes full, send operations can block or slow down, impacting performance metrics. This feature detects such scenarios and issues a one-time warning to the user, helping them understand if their benchmark is measuring pure transport speed or a backpressure-limited scenario. Implementation Details: - The IpcTransport::send method signature has been changed to return a Result<bool>, where true indicates that backpressure was detected during the send operation. - For Shared Memory, backpressure is detected when the ring buffer is full, causing the send operation to retry. - For TCP and Unix Domain Sockets, backpressure is detected heuristically by timing the send call. An unusually long duration suggests the OS send buffer is full and the call was blocked. - For POSIX Message Queues, backpressure is detected when a send operation returns an EAGAIN error, indicating a full queue. - The benchmark runner has been updated to handle the new send signature. AI-assisted-by: Gemini 2.5 Pro feat(ipc): Add unit tests for backpressure detection This commit introduces unit tests for the backpressure detection feature across all IPC transport types. These tests validate that backpressure is correctly identified when buffers or queues become full. Key changes: - Added `test_shared_memory_backpressure` to verify backpressure detection in the shared memory transport. - Added `test_tcp_socket_backpressure` to test the heuristic-based backpressure detection in TCP sockets. - Added `test_uds_backpressure` for Unix Domain Sockets, which uses a similar heuristic to TCP. - Added `test_pmq_backpressure` to ensure backpressure is detected when the POSIX Message Queue is full. Additionally, the core logic for stream-based transports (TCP and UDS) has been improved to use timeouts on send operations. This prevents tests from hanging and provides a more robust backpressure signal. A custom `IpcError::BackpressureTimeout` is now returned when a send operation times out due to a full buffer, making the behavior consistent across all transport types. AI-assisted-by: Gemini 2.5 Pro docs(ipc): Document backpressure detection feature This commit updates the documentation to reflect the new backpressure detection and warning mechanism. Key changes include: - Updated doc comments for the \IpcTransport::send\ method and the new \IpcError::BackpressureTimeout\ variant to explain the behavior. - Added doc comments to each transport's \send\ implementation detailing its specific backpressure detection strategy. - Added a "Backpressure Warnings" section to the \README.md\ to help users interpret benchmark results correctly. - Updated \CONTRIBUTING.md\ to require that new transport implementations include backpressure detection logic. AI-assisted-by: Gemini 2.5 Pro * remove errant added AGENTS.md file
1 parent a3fa17d commit 824265a

10 files changed

Lines changed: 634 additions & 60 deletions

CONTRIBUTING.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,15 @@ Use descriptive branch names:
120120
- `docs/improve-readme` - Documentation
121121
- `perf/optimize-serialization` - Performance improvements
122122

123+
### Implementing a New Transport
124+
125+
When adding a new IPC transport mechanism, it must implement the `IpcTransport` trait defined in `src/ipc/mod.rs`. Key requirements include:
126+
127+
- **Asynchronous Operations**: All I/O methods must be `async`.
128+
- **Bidirectional Support**: The transport should ideally support sending and receiving.
129+
- **Backpressure Detection**: The `send` method is required to detect when the transport's buffer is full and the operation is blocked. It must return `Ok(true)` when backpressure is detected. See the existing implementations for examples.
130+
- **Resource Cleanup**: The `close` method and the `Drop` trait must be implemented to ensure all system resources (files, memory segments, etc.) are properly cleaned up.
131+
123132
## Code Style
124133

125134
### Rust Style Guidelines

Cargo.lock

Lines changed: 23 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ socket2 = "0.5"
4040
tracing-appender = "0.2.3"
4141
time = ">=0.3.34, <0.3.36" # Pinned to a compatible range for MSRV 1.70
4242
parking_lot = "0.12.4"
43+
thiserror = "2.0.16"
4344

4445
[dev-dependencies]
4546
criterion = "0.5"

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,20 @@ taskset -c 0-3 ipc-benchmark --concurrency 4
346346
- **Noise**: Run tests on idle systems for best accuracy
347347
- **Repetition**: Run multiple test executions and average results
348348

349+
### Backpressure Warnings
350+
351+
During high-throughput tests, it's possible for the sender to produce data faster than the receiver can consume it. When the underlying OS buffers or message queues become full, the send operation will slow down or block. This condition is known as **backpressure**.
352+
353+
This benchmark tool automatically detects backpressure and will issue a warning message the first time it occurs for a given transport, for example:
354+
355+
```
356+
WARN rusty_comms::ipc::shared_memory: Shared memory buffer is full; backpressure is occurring.
357+
This may impact latency and throughput measurements.
358+
Consider increasing the buffer size if this is not the desired scenario.
359+
```
360+
361+
When you see this warning, it means your benchmark may be measuring a backpressure-limited scenario rather than the pure, unconstrained performance of the IPC mechanism. This is a valid and important scenario to test, but it's crucial for interpreting the results correctly. If your goal is to measure maximum throughput, you may need to increase the buffer sizes (`--buffer-size`) or investigate the receiver's performance.
362+
349363
## Troubleshooting
350364

351365
### Common Issues

src/benchmark.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ impl BenchmarkRunner {
431431
let payload = vec![0u8; self.config.message_size];
432432
for i in 0..self.config.warmup_iterations {
433433
let message = Message::new(i as u64, payload.clone(), MessageType::OneWay);
434-
client_transport.send(&message).await?;
434+
let _ = client_transport.send(&message).await?;
435435
}
436436

437437
// Clean up resources
@@ -682,7 +682,7 @@ impl BenchmarkRunner {
682682
)
683683
.await
684684
{
685-
Ok(Ok(())) => {
685+
Ok(Ok(_)) => {
686686
let latency = send_time.elapsed();
687687
metrics_collector
688688
.record_message(self.config.message_size, Some(latency))?;
@@ -716,7 +716,7 @@ impl BenchmarkRunner {
716716
for i in 0..msg_count {
717717
let send_time = Instant::now();
718718
let message = Message::new(i as u64, payload.clone(), MessageType::OneWay);
719-
client_transport.send(&message).await?;
719+
let _ = client_transport.send(&message).await?;
720720

721721
let latency = send_time.elapsed();
722722
metrics_collector.record_message(self.config.message_size, Some(latency))?;
@@ -859,7 +859,7 @@ impl BenchmarkRunner {
859859
)
860860
.await
861861
{
862-
Ok(Ok(())) => {
862+
Ok(Ok(_)) => {
863863
// Send succeeded - increment counter immediately to ensure unique message IDs
864864
i += 1;
865865

@@ -911,7 +911,7 @@ impl BenchmarkRunner {
911911
for i in 0..msg_count {
912912
let send_time = Instant::now();
913913
let message = Message::new(i as u64, payload.clone(), MessageType::Request);
914-
client_transport.send(&message).await?;
914+
let _ = client_transport.send(&message).await?;
915915

916916
let _ = client_transport.receive().await?;
917917
let latency = send_time.elapsed();
@@ -1231,7 +1231,7 @@ impl BenchmarkRunner {
12311231
)
12321232
.await
12331233
{
1234-
Ok(Ok(())) => {
1234+
Ok(Ok(_)) => {
12351235
let one_way_latency = send_start.elapsed();
12361236

12371237
// Try to receive response and measure round-trip latency
@@ -1292,7 +1292,7 @@ impl BenchmarkRunner {
12921292
let send_start = Instant::now();
12931293
let message = Message::new(i as u64, payload.clone(), MessageType::Request);
12941294

1295-
client_transport.send(&message).await?;
1295+
let _ = client_transport.send(&message).await?;
12961296
let one_way_latency = send_start.elapsed();
12971297

12981298
let _ = client_transport.receive().await?;

src/ipc/mod.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
use anyhow::Result;
5050
use async_trait::async_trait;
5151
use serde::{Deserialize, Serialize};
52+
use thiserror::Error;
5253
use time::OffsetDateTime;
5354
use tokio::sync::mpsc;
5455

@@ -68,6 +69,21 @@ pub use tcp_socket::TcpSocketTransport;
6869
#[cfg(unix)]
6970
pub use unix_domain_socket::UnixDomainSocketTransport;
7071

72+
/// Custom error types for IPC operations.
73+
#[derive(Error, Debug)]
74+
pub enum IpcError {
75+
/// Error indicating a timeout that occurred due to backpressure.
76+
///
77+
/// This error is returned when a send operation cannot complete within a
78+
/// reasonable time because the underlying buffer or queue is full,
79+
/// indicating that the receiver is not processing messages fast enough.
80+
#[error("Timeout sending message due to backpressure")]
81+
BackpressureTimeout,
82+
/// A generic IPC error.
83+
#[error("IPC error: {0}")]
84+
Generic(#[from] anyhow::Error),
85+
}
86+
7187
/// Connection identifier for tracking multiple client connections
7288
///
7389
/// This type alias provides a clear identifier for individual connections
@@ -471,15 +487,15 @@ pub trait IpcTransport: Send + Sync {
471487
/// - `message`: Message to transmit
472488
///
473489
/// ## Returns
474-
/// - `Ok(())`: Message sent successfully
490+
/// - `Ok(bool)`: `true` if backpressure was detected, `false` otherwise
475491
/// - `Err(anyhow::Error)`: Transmission failed
476492
///
477493
/// ## Performance Considerations
478494
///
479495
/// This method should be optimized for low latency and high throughput
480496
/// as it's called frequently during benchmarking. Implementations
481497
/// should minimize copies and allocations where possible.
482-
async fn send(&mut self, message: &Message) -> Result<()>;
498+
async fn send(&mut self, message: &Message) -> Result<bool>;
483499

484500
/// Receive a message (legacy single-connection interface)
485501
///
@@ -644,7 +660,8 @@ pub trait IpcTransport: Send + Sync {
644660
message: &Message,
645661
) -> Result<()> {
646662
// Default implementation ignores connection_id and uses legacy send
647-
self.send(message).await
663+
self.send(message).await?;
664+
Ok(())
648665
}
649666

650667
/// Get list of active connection IDs

0 commit comments

Comments
 (0)