Skip to content

Commit 914aec1

Browse files
authored
[otap-df-quiver] WAL: Shrink payload buffer after large bundle spikes (open-telemetry#1554)
Follow up for [an issue noted during the main WAL PR](open-telemetry#1537 (comment)). Add memory reclamation for the WAL writer's payload buffer. Previously, after serializing a large RecordBundle, the buffer's capacity would remain elevated indefinitely since `Vec::clear()` preserves capacity. Introduce a high-water mark with configurable decay to track typical usage. After each append, the mark decays by a configurable fraction (default 15/16 ≈ 6% per append). When capacity exceeds 2× the decayed high-water mark and is above 64 KB, shrink_to() reclaims excess memory. - Add `buffer_decay_rate` option to `WalWriterOptions` with validation - Add `InvalidConfig` error variant for configuration validation - Add tests for decay rate validation and shrinking behavior - Add (ignored) test to observe RSS behavior
1 parent af45394 commit 914aec1

3 files changed

Lines changed: 307 additions & 2 deletions

File tree

rust/otap-dataflow/crates/quiver/src/wal/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,9 @@ pub enum WalError {
189189
/// Writer cannot proceed because configured capacity limits were reached.
190190
#[error("wal at capacity: {0}")]
191191
WalAtCapacity(&'static str),
192+
/// Configuration parameter is invalid.
193+
#[error("invalid wal configuration: {0}")]
194+
InvalidConfig(&'static str),
192195
/// Test-only failure that simulates a crash at a specific point.
193196
#[error("wal crash injected: {0}")]
194197
InjectedCrash(&'static str),

rust/otap-dataflow/crates/quiver/src/wal/tests.rs

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2464,3 +2464,192 @@ fn wal_recovery_scan_benchmark() {
24642464
);
24652465
println!("\n===============================\n");
24662466
}
2467+
2468+
/// Manual test to observe RSS behavior after a large bundle spike.
2469+
///
2470+
/// Run with: cargo test wal_memory_after_large_bundle -- --ignored --nocapture
2471+
#[test]
2472+
#[ignore]
2473+
fn wal_memory_after_large_bundle_spike() {
2474+
fn get_rss_kb() -> Option<u64> {
2475+
// Read RSS from /proc/self/statm (Linux-specific)
2476+
// statm format: size resident shared text lib data dt (all in pages)
2477+
let statm = std::fs::read_to_string("/proc/self/statm").ok()?;
2478+
let resident_pages: u64 = statm.split_whitespace().nth(1)?.parse().ok()?;
2479+
let page_size = 4096u64; // Standard page size on Linux
2480+
Some(resident_pages * page_size / 1024)
2481+
}
2482+
2483+
fn print_rss(label: &str) {
2484+
if let Some(rss) = get_rss_kb() {
2485+
println!(
2486+
" RSS {}: {} KB ({:.1} MB)",
2487+
label,
2488+
rss,
2489+
rss as f64 / 1024.0
2490+
);
2491+
}
2492+
}
2493+
2494+
let (_dir, wal_path) = temp_wal("memory_spike.wal");
2495+
let descriptor = BundleDescriptor::new(vec![slot_descriptor(0, "Data")]);
2496+
2497+
let mut writer = WalWriter::open(WalWriterOptions::new(
2498+
wal_path.clone(),
2499+
[0xEE; 16],
2500+
FlushPolicy::Immediate,
2501+
))
2502+
.expect("writer");
2503+
2504+
println!("=== Memory Spike Test ===");
2505+
println!();
2506+
2507+
// Phase 1: Write a few small bundles as baseline
2508+
println!("Phase 1: Writing 100 small bundles (baseline)...");
2509+
for _ in 0..100 {
2510+
let small_slot = FixtureSlot::new(SlotId::new(0), 0x01, &[1, 2, 3]);
2511+
let small_bundle = FixtureBundle::new(descriptor.clone(), vec![small_slot]);
2512+
let _ = writer.append_bundle(&small_bundle).expect("append small");
2513+
}
2514+
print_rss("after baseline");
2515+
2516+
// Phase 2: Write a very large bundle (~1000 MB)
2517+
println!("Phase 2: Writing 1 large bundle (~1000 MB)...");
2518+
{
2519+
let large_slot = FixtureSlot::with_batch(
2520+
SlotId::new(0),
2521+
0x02,
2522+
build_complex_batch(1_000_000, "large", 1024), // ~1000 MB
2523+
);
2524+
let large_bundle = FixtureBundle::new(descriptor.clone(), vec![large_slot]);
2525+
let _ = writer.append_bundle(&large_bundle).expect("append large");
2526+
// print_rss("after large bundle (before drop)");
2527+
}
2528+
// large_slot and large_bundle are now dropped
2529+
print_rss("after large bundle dropped");
2530+
2531+
// Phase 3: Write many small bundles
2532+
println!(
2533+
"Phase 3: Writing 100 small bundles...should observe RSS shrinking back toward baseline."
2534+
);
2535+
for i in 0..100 {
2536+
let small_slot = FixtureSlot::new(SlotId::new(0), 0x03, &[4, 5, 6]);
2537+
let small_bundle = FixtureBundle::new(descriptor.clone(), vec![small_slot]);
2538+
let _ = writer.append_bundle(&small_bundle).expect("append small");
2539+
drop(small_bundle);
2540+
if (i + 1) % 10 == 0 {
2541+
print_rss(&format!("after {} small bundles", i + 1));
2542+
}
2543+
}
2544+
}
2545+
2546+
#[test]
2547+
fn wal_buffer_decay_rate_rejects_zero_denominator() {
2548+
let (_dir, wal_path) = temp_wal("decay_rate_zero_denom.wal");
2549+
let options = WalWriterOptions::new(wal_path, [0xAA; 16], FlushPolicy::Immediate)
2550+
.with_buffer_decay_rate(15, 0); // Invalid: denominator is zero
2551+
2552+
let result = WalWriter::open(options);
2553+
let err = result.expect_err("should reject zero denominator");
2554+
assert!(
2555+
matches!(err, WalError::InvalidConfig(msg) if msg.contains("denominator")),
2556+
"unexpected error: {:?}",
2557+
err
2558+
);
2559+
}
2560+
2561+
#[test]
2562+
fn wal_buffer_decay_rate_rejects_numerator_gte_denominator() {
2563+
let (_dir, wal_path) = temp_wal("decay_rate_bad_ratio.wal");
2564+
let options = WalWriterOptions::new(wal_path, [0xAA; 16], FlushPolicy::Immediate)
2565+
.with_buffer_decay_rate(16, 16); // Invalid: numerator >= denominator (no decay)
2566+
2567+
let result = WalWriter::open(options);
2568+
let err = result.expect_err("should reject numerator >= denominator");
2569+
assert!(
2570+
matches!(err, WalError::InvalidConfig(msg) if msg.contains("numerator")),
2571+
"unexpected error: {:?}",
2572+
err
2573+
);
2574+
}
2575+
2576+
#[test]
2577+
fn wal_buffer_decay_rate_accepts_valid_values() {
2578+
let (_dir, wal_path) = temp_wal("decay_rate_valid.wal");
2579+
// These should all succeed - validation happens at open() time
2580+
let _ = WalWriter::open(
2581+
WalWriterOptions::new(wal_path.clone(), [0xAA; 16], FlushPolicy::Immediate)
2582+
.with_buffer_decay_rate(0, 1), // Aggressive: decay to zero immediately
2583+
)
2584+
.expect("(0, 1) should be valid");
2585+
2586+
let _ = WalWriter::open(
2587+
WalWriterOptions::new(wal_path.clone(), [0xAA; 16], FlushPolicy::Immediate)
2588+
.with_buffer_decay_rate(1, 2), // 50% decay per append
2589+
)
2590+
.expect("(1, 2) should be valid");
2591+
2592+
let _ = WalWriter::open(
2593+
WalWriterOptions::new(wal_path.clone(), [0xAA; 16], FlushPolicy::Immediate)
2594+
.with_buffer_decay_rate(31, 32), // ~3% decay per append (conservative)
2595+
)
2596+
.expect("(31, 32) should be valid");
2597+
2598+
let _ = WalWriter::open(
2599+
WalWriterOptions::new(wal_path, [0xAA; 16], FlushPolicy::Immediate)
2600+
.with_buffer_decay_rate(999, 1000), // ~0.1% decay per append (very conservative)
2601+
)
2602+
.expect("(999, 1000) should be valid");
2603+
}
2604+
2605+
#[test]
2606+
fn wal_buffer_decay_rate_affects_shrinking_behavior() {
2607+
// Test that a faster decay rate causes faster shrinking.
2608+
// We use a small threshold and aggressive decay to observe the effect.
2609+
use super::writer::test_support::get_payload_buffer_capacity;
2610+
2611+
let (_dir, wal_path) = temp_wal("decay_rate_behavior.wal");
2612+
let descriptor = BundleDescriptor::new(vec![slot_descriptor(0, "Data")]);
2613+
2614+
// Use aggressive decay (1/2 = 50% per append) to see faster shrinking
2615+
let mut writer = WalWriter::open(
2616+
WalWriterOptions::new(wal_path, [0xEE; 16], FlushPolicy::Immediate)
2617+
.with_buffer_decay_rate(1, 2),
2618+
)
2619+
.expect("writer");
2620+
2621+
// Write a moderately large bundle to grow the buffer
2622+
let large_slot = FixtureSlot::with_batch(
2623+
SlotId::new(0),
2624+
0x01,
2625+
build_complex_batch(1000, "medium", 256), // ~256 KB payload
2626+
);
2627+
let bundle = FixtureBundle::new(descriptor.clone(), vec![large_slot]);
2628+
let _ = writer.append_bundle(&bundle).expect("append");
2629+
drop(bundle);
2630+
2631+
let capacity_after_large = get_payload_buffer_capacity(&writer);
2632+
assert!(
2633+
capacity_after_large >= 256 * 1024,
2634+
"buffer should have grown: {}",
2635+
capacity_after_large
2636+
);
2637+
2638+
// Write small bundles; with 50% decay the high-water mark drops fast
2639+
// After ~10 appends: high_water ≈ initial * (1/2)^10 ≈ 0.1% of initial
2640+
for _ in 0..20 {
2641+
let small_slot = FixtureSlot::new(SlotId::new(0), 0x02, &[1, 2, 3]);
2642+
let small_bundle = FixtureBundle::new(descriptor.clone(), vec![small_slot]);
2643+
let _ = writer.append_bundle(&small_bundle).expect("append");
2644+
}
2645+
2646+
let capacity_after_small = get_payload_buffer_capacity(&writer);
2647+
// With aggressive decay, buffer should have shrunk significantly
2648+
// (exact threshold depends on SHRINK_THRESHOLD constant, but it should be smaller)
2649+
assert!(
2650+
capacity_after_small < capacity_after_large,
2651+
"buffer should have shrunk: before={}, after={}",
2652+
capacity_after_large,
2653+
capacity_after_small
2654+
);
2655+
}

0 commit comments

Comments
 (0)