Skip to content

Commit 42ae9be

Browse files
authored
fix(tests): increase timeouts for CI stability in durable buffer tests (#2793)
# Change Summary This change increases some of the timeouts for the durable_buffer_processor tests to improve stability on slow CI systems where pipeline initialization takes longer than expected. ## What issue does this PR close? * Addresses a flaky test reported in #2720. Includes fixes for similar tests with the same potential timing issue. ## How are these changes tested? Was able to manually reproduce the failure reported in #2720. Validated that tests pass on local runs. ## Are there any user-facing changes? No. This is a test-only change.
1 parent 524b78d commit 42ae9be

1 file changed

Lines changed: 48 additions & 14 deletions

File tree

rust/otap-dataflow/crates/otap/tests/durable_buffer_processor_tests.rs

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,27 @@ fn init_test_tracing() {
5858
});
5959
}
6060

61+
// ─────────────────────────────────────────────────────────────────────────────
62+
// CI Timing Constants
63+
// ─────────────────────────────────────────────────────────────────────────────
64+
65+
/// Generous timeout for `wait_for_condition` in flip threads.
66+
///
67+
/// These threads start before the pipeline is running. On slow CI (e.g.,
68+
/// overloaded Windows runners), pipeline startup -- Tokio runtime creation,
69+
/// Quiver engine init, segment finalization -- can consume several seconds
70+
/// before the first NACK reaches the exporter. This timeout must absorb all
71+
/// startup latency. It does not affect test duration on healthy machines
72+
/// because `wait_for_condition` returns immediately when the condition is met.
73+
const FLIP_CONDITION_TIMEOUT: Duration = Duration::from_secs(30);
74+
75+
/// Pipeline max_duration ceiling for tests with flip threads.
76+
///
77+
/// Must exceed the sum of all flip thread phase timeouts plus delivery time.
78+
/// On healthy machines the shutdown condition fires in ~2s; this is a safety
79+
/// ceiling for extremely slow CI.
80+
const PIPELINE_MAX_DURATION: Duration = Duration::from_secs(60);
81+
6182
// ─────────────────────────────────────────────────────────────────────────────
6283
// Test Configuration Builder
6384
// ─────────────────────────────────────────────────────────────────────────────
@@ -952,10 +973,14 @@ fn test_durable_buffer_retries_on_nack() {
952973
// Spawn a thread to flip the exporter after NACKs are observed
953974
let flip_test_id = test_id.to_owned();
954975
let flip_handle = std::thread::spawn(move || {
955-
// Wait for at least 5 NACKs (condition-based, not fixed timeout)
976+
// Wait for at least 5 NACKs (condition-based, not fixed timeout).
977+
// Use a generous timeout because this thread starts before the pipeline
978+
// is running. On slow CI, pipeline startup (runtime init, engine init,
979+
// segment finalization) can consume several seconds before the first
980+
// NACK arrives.
956981
let nacks_observed = wait_for_condition(
957982
|| flaky_exporter::nack_count_by_id(&flip_test_id) >= 5,
958-
Duration::from_secs(5), // generous timeout for CI
983+
FLIP_CONDITION_TIMEOUT,
959984
Duration::from_millis(10),
960985
);
961986
assert!(nacks_observed, "Expected at least 5 NACKs within timeout");
@@ -975,7 +1000,7 @@ fn test_durable_buffer_retries_on_nack() {
9751000
config,
9761001
&pipeline_group_id,
9771002
&pipeline_id,
978-
Duration::from_secs(10), // generous max timeout for CI
1003+
PIPELINE_MAX_DURATION,
9791004
Duration::from_secs(1),
9801005
Some(move || delivered_counter.load(Ordering::Relaxed) > 0),
9811006
true, // wait for telemetry cycle before shutdown so metrics are populated
@@ -2004,10 +2029,13 @@ fn test_durable_buffer_permanent_nack_rejects_without_retry() {
20042029
// tighter timeout in this thread caused failures on slow CI (see #2354).
20052030
let flip_test_id = test_id.to_owned();
20062031
let flip_handle = std::thread::spawn(move || {
2007-
// Wait for at least 3 permanent NACKs
2032+
// Wait for at least 3 permanent NACKs.
2033+
// Use a generous timeout because this thread starts before the pipeline
2034+
// is running. On slow CI, pipeline startup can consume several seconds
2035+
// before the first NACK arrives.
20082036
let permanent_nacks_observed = wait_for_condition(
20092037
|| flaky_exporter::permanent_nack_count_by_id(&flip_test_id) >= 3,
2010-
Duration::from_secs(5),
2038+
FLIP_CONDITION_TIMEOUT,
20112039
Duration::from_millis(10),
20122040
);
20132041
assert!(
@@ -2021,7 +2049,7 @@ fn test_durable_buffer_permanent_nack_rejects_without_retry() {
20212049

20222050
// Switch to ACK mode - new data should be delivered.
20232051
// The pipeline shutdown condition gates on delivered_counter > 0, so delivery
2024-
// is verified there (with a 15 s ceiling) rather than here.
2052+
// is verified there (with PIPELINE_MAX_DURATION ceiling) rather than here.
20252053
flaky_exporter::set_should_ack_by_id(&flip_test_id, true);
20262054

20272055
(permanent_nacks_before, transient_nacks_before)
@@ -2033,7 +2061,7 @@ fn test_durable_buffer_permanent_nack_rejects_without_retry() {
20332061
config,
20342062
&pipeline_group_id,
20352063
&pipeline_id,
2036-
Duration::from_secs(15),
2064+
PIPELINE_MAX_DURATION,
20372065
Duration::from_secs(1),
20382066
Some(move || delivered_counter.load(Ordering::Relaxed) > 0),
20392067
true, // wait for telemetry cycle before shutdown so metrics are populated
@@ -2224,10 +2252,15 @@ fn test_durable_buffer_mixed_transient_and_permanent_nacks() {
22242252

22252253
let flip_test_id = test_id.to_owned();
22262254
let flip_handle = std::thread::spawn(move || {
2227-
// Phase 1: Wait for transient NACKs
2255+
// Phase 1: Wait for transient NACKs.
2256+
// Use a generous timeout because this thread starts before the pipeline
2257+
// is running. On slow CI (e.g., overloaded Windows runners), pipeline
2258+
// startup (Tokio runtime creation, Quiver engine init, segment
2259+
// finalization) can consume several seconds before the first NACK
2260+
// arrives at the exporter.
22282261
let transient_observed = wait_for_condition(
22292262
|| flaky_exporter::nack_count_by_id(&flip_test_id) >= 3,
2230-
Duration::from_secs(5),
2263+
FLIP_CONDITION_TIMEOUT,
22312264
Duration::from_millis(10),
22322265
);
22332266
assert!(
@@ -2242,7 +2275,7 @@ fn test_durable_buffer_mixed_transient_and_permanent_nacks() {
22422275

22432276
let permanent_observed = wait_for_condition(
22442277
|| flaky_exporter::permanent_nack_count_by_id(&flip_test_id) >= 3,
2245-
Duration::from_secs(5),
2278+
FLIP_CONDITION_TIMEOUT,
22462279
Duration::from_millis(10),
22472280
);
22482281
assert!(
@@ -2253,9 +2286,10 @@ fn test_durable_buffer_mixed_transient_and_permanent_nacks() {
22532286
let permanent_nacks_phase2 = flaky_exporter::permanent_nack_count_by_id(&flip_test_id);
22542287

22552288
// Phase 3: Switch to ACK mode.
2256-
// The pipeline shutdown condition gates on delivered_counter > 0 with a 15 s
2257-
// ceiling, so delivery is verified there rather than here. Waiting
2258-
// here with a tighter timeout caused failures on slow CI (see #2354).
2289+
// The pipeline shutdown condition gates on delivered_counter > 0 with
2290+
// PIPELINE_MAX_DURATION ceiling, so delivery is verified there rather
2291+
// than here. Waiting here with a tighter timeout caused failures on
2292+
// slow CI (see #2354).
22592293
flaky_exporter::set_should_ack_by_id(&flip_test_id, true);
22602294

22612295
(transient_nacks_phase1, permanent_nacks_phase2)
@@ -2266,7 +2300,7 @@ fn test_durable_buffer_mixed_transient_and_permanent_nacks() {
22662300
config,
22672301
&pipeline_group_id,
22682302
&pipeline_id,
2269-
Duration::from_secs(15),
2303+
PIPELINE_MAX_DURATION,
22702304
Duration::from_secs(1),
22712305
Some(move || delivered_counter.load(Ordering::Relaxed) > 0),
22722306
true, // wait for telemetry cycle before shutdown so metrics are populated

0 commit comments

Comments
 (0)