Skip to content

Commit aeb500b

Browse files
sberg-rhclaude
andcommitted
feat: Support send_delay and include_first_message in standalone client
- Add --send-delay support: inserts a configurable pause after each message send (blocking uses thread::sleep, async uses tokio::sleep) - Add --include-first-message support: when false (default), sends a canary message before measurement to warm up the connection, matching the existing BenchmarkRunner behavior - Applied to both one-way and round-trip tests in both blocking and async client paths Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 93bfac7 commit aeb500b

1 file changed

Lines changed: 54 additions & 0 deletions

File tree

src/main.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1435,6 +1435,12 @@ fn run_standalone_client_blocking(
14351435
if config.one_way {
14361436
let mut metrics = MetricsCollector::new(None, config.percentiles.clone())?;
14371437

1438+
// Send canary to warm up the connection if first message excluded
1439+
if !config.include_first_message {
1440+
let canary = Message::new(u64::MAX, payload.clone(), MessageType::OneWay);
1441+
let _ = transport.send_blocking(&canary);
1442+
}
1443+
14381444
let start = std::time::Instant::now();
14391445
let count = if let Some(test_duration) = config.duration {
14401446
info!(
@@ -1446,6 +1452,9 @@ fn run_standalone_client_blocking(
14461452
let msg = Message::new(c, payload.clone(), MessageType::OneWay);
14471453
transport.send_blocking(&msg)?;
14481454
metrics.record_message(config.message_size, None)?;
1455+
if let Some(delay) = config.send_delay {
1456+
std::thread::sleep(delay);
1457+
}
14491458
c += 1;
14501459
}
14511460
c
@@ -1458,6 +1467,9 @@ fn run_standalone_client_blocking(
14581467
let msg = Message::new(i as u64, payload.clone(), MessageType::OneWay);
14591468
transport.send_blocking(&msg)?;
14601469
metrics.record_message(config.message_size, None)?;
1470+
if let Some(delay) = config.send_delay {
1471+
std::thread::sleep(delay);
1472+
}
14611473
}
14621474
msg_count as u64
14631475
};
@@ -1478,6 +1490,14 @@ fn run_standalone_client_blocking(
14781490
let mut metrics =
14791491
MetricsCollector::new(Some(LatencyType::RoundTrip), config.percentiles.clone())?;
14801492

1493+
// Send canary to warm up the connection if first message excluded
1494+
if !config.include_first_message {
1495+
let canary = Message::new(u64::MAX, payload.clone(), MessageType::Request);
1496+
if transport.send_blocking(&canary).is_ok() {
1497+
let _ = transport.receive_blocking();
1498+
}
1499+
}
1500+
14811501
if let Some(test_duration) = config.duration {
14821502
info!(
14831503
"Running round-trip latency test (duration={:.2?})...",
@@ -1502,6 +1522,9 @@ fn run_standalone_client_blocking(
15021522
);
15031523
let _ = results_manager.stream_latency_record(&record);
15041524

1525+
if let Some(delay) = config.send_delay {
1526+
std::thread::sleep(delay);
1527+
}
15051528
i += 1;
15061529
}
15071530
} else {
@@ -1525,6 +1548,10 @@ fn run_standalone_client_blocking(
15251548
latency,
15261549
);
15271550
let _ = results_manager.stream_latency_record(&record);
1551+
1552+
if let Some(delay) = config.send_delay {
1553+
std::thread::sleep(delay);
1554+
}
15281555
}
15291556
}
15301557

@@ -1620,6 +1647,12 @@ async fn run_standalone_client_async(
16201647
if config.one_way {
16211648
let mut metrics = MetricsCollector::new(None, config.percentiles.clone())?;
16221649

1650+
// Send canary to warm up the connection if first message excluded
1651+
if !config.include_first_message {
1652+
let canary = Message::new(u64::MAX, payload.clone(), MessageType::OneWay);
1653+
let _ = transport.send(&canary).await;
1654+
}
1655+
16231656
let start = std::time::Instant::now();
16241657
let count = if let Some(test_duration) = config.duration {
16251658
info!(
@@ -1631,6 +1664,9 @@ async fn run_standalone_client_async(
16311664
let msg = Message::new(c, payload.clone(), MessageType::OneWay);
16321665
transport.send(&msg).await?;
16331666
metrics.record_message(config.message_size, None)?;
1667+
if let Some(delay) = config.send_delay {
1668+
tokio::time::sleep(delay).await;
1669+
}
16341670
c += 1;
16351671
}
16361672
c
@@ -1643,6 +1679,9 @@ async fn run_standalone_client_async(
16431679
let msg = Message::new(i as u64, payload.clone(), MessageType::OneWay);
16441680
transport.send(&msg).await?;
16451681
metrics.record_message(config.message_size, None)?;
1682+
if let Some(delay) = config.send_delay {
1683+
tokio::time::sleep(delay).await;
1684+
}
16461685
}
16471686
msg_count as u64
16481687
};
@@ -1663,6 +1702,14 @@ async fn run_standalone_client_async(
16631702
let mut metrics =
16641703
MetricsCollector::new(Some(LatencyType::RoundTrip), config.percentiles.clone())?;
16651704

1705+
// Send canary to warm up the connection if first message excluded
1706+
if !config.include_first_message {
1707+
let canary = Message::new(u64::MAX, payload.clone(), MessageType::Request);
1708+
if transport.send(&canary).await.is_ok() {
1709+
let _ = transport.receive().await;
1710+
}
1711+
}
1712+
16661713
if let Some(test_duration) = config.duration {
16671714
info!(
16681715
"Running round-trip latency test (duration={:.2?})...",
@@ -1687,6 +1734,9 @@ async fn run_standalone_client_async(
16871734
);
16881735
let _ = results_manager.stream_latency_record(&record);
16891736

1737+
if let Some(delay) = config.send_delay {
1738+
tokio::time::sleep(delay).await;
1739+
}
16901740
i += 1;
16911741
}
16921742
} else {
@@ -1710,6 +1760,10 @@ async fn run_standalone_client_async(
17101760
latency,
17111761
);
17121762
let _ = results_manager.stream_latency_record(&record);
1763+
1764+
if let Some(delay) = config.send_delay {
1765+
tokio::time::sleep(delay).await;
1766+
}
17131767
}
17141768
}
17151769

0 commit comments

Comments
 (0)