Skip to content

Commit 3f99d17

Browse files
committed
fix: keep force-flushed current-bucket rows queryable (e2e undercount)
The pressure_flush e2e test failed with an 8-of-150 undercount: when force_flush_current_buckets commits the open bucket's rows to Delta and inserts then repopulate the same bucket_id, the read path's per-bucket exclusion masked the current bucket's entire range from the Delta scan, hiding every force-flushed row (only the un-flushed MemBuffer remainder stayed visible). Fix: exclude only sealed (past) bucket ranges from the Delta scan, never the current open bucket's range. Force-flush removes rows from MemBuffer before committing, so the current bucket's MemBuffer rows and its Delta rows are disjoint — the union can't double-count them. Sealed buckets keep the exclusion (their commit-then-drain flush can briefly hold a row in both stores). Regression test force_flushed_current_bucket_rows_stay_queryable bootstraps the real layer (local MinIO), force-flushes the open bucket, repopulates it, and asserts all rows remain queryable. Fails 2-of-5 without the fix. Made force_flush_current_buckets pub(crate) to drive it deterministically.
1 parent c8c2e3b commit 3f99d17

2 files changed

Lines changed: 68 additions & 1 deletion

File tree

src/buffered_write_layer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ impl BufferedWriteLayer {
570570
/// bucket's rows atomically under the insert lock (no lost-write race) and
571571
/// leaves the bucket in place for ongoing inserts. On commit failure the
572572
/// rows are restored — durability never depended on this (WAL holds them).
573-
async fn force_flush_current_buckets(&self) -> anyhow::Result<()> {
573+
pub(crate) async fn force_flush_current_buckets(&self) -> anyhow::Result<()> {
574574
let _flush_guard = self.flush_lock.lock().await;
575575
let current = MemBuffer::current_bucket_id();
576576
// WAL-ordering safety: `advance_by_counts` consumes entries sequentially

src/database.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4521,11 +4521,27 @@ impl TableProvider for ProjectRoutingTable {
45214521
// backwards and wrongly hid all the Delta rows *above* it. Fixed
45224522
// by listing the actual ranges MemBuffer currently holds and
45234523
// excluding only those.
4524+
//
4525+
// Exception: the current (open) bucket's range is NOT excluded. Under
4526+
// memory pressure `force_flush_current_buckets` commits the open
4527+
// bucket's rows to Delta while inserts keep repopulating the same
4528+
// bucket_id in MemBuffer — so Delta legitimately holds rows inside the
4529+
// current range. Excluding it would hide those force-flushed rows
4530+
// (observed as an 8-of-150 undercount in the pressure_flush e2e test).
4531+
// It's safe to scan both: force-flush removes rows from MemBuffer
4532+
// *before* committing, so the current bucket's MemBuffer rows and its
4533+
// Delta rows are disjoint — the union can't double-count them. (Sealed
4534+
// buckets keep the exclusion: their normal commit-then-drain flush can
4535+
// briefly hold the same row in both stores.)
45244536
let mem_ranges = layer.get_bucket_ranges(&project_id, &self.table_name);
4537+
let current_bucket_start = crate::mem_buffer::MemBuffer::current_bucket_id() * crate::mem_buffer::bucket_duration_micros();
45254538
let mut delta_filters = optimized_filters.clone();
45264539
let ts_col = || Box::new(col("timestamp"));
45274540
let ts_lit = |t: i64| Box::new(lit(ScalarValue::TimestampMicrosecond(Some(t), Some("UTC".into()))));
45284541
for (start, end) in &mem_ranges {
4542+
if *start == current_bucket_start {
4543+
continue;
4544+
}
45294545
// NOT (ts >= start AND ts < end) ≡ (ts < start) OR (ts >= end)
45304546
let below = Expr::BinaryExpr(BinaryExpr {
45314547
left: ts_col(),
@@ -5274,6 +5290,57 @@ mod tests {
52745290
.map_err(|_| anyhow::anyhow!("Test timed out after 30 seconds"))?
52755291
}
52765292

5293+
/// Regression for the pressure_flush e2e undercount (8-of-150): when
5294+
/// `force_flush_current_buckets` commits the open bucket's rows to Delta and
5295+
/// inserts then repopulate the same bucket_id, the query path must still
5296+
/// return the force-flushed rows. The old per-bucket exclusion masked the
5297+
/// current bucket's whole range from the Delta scan, hiding everything that
5298+
/// had been force-flushed. Drives the force-flush directly so it's
5299+
/// deterministic (no need to actually exhaust the memory budget).
5300+
#[serial]
5301+
#[tokio::test(flavor = "multi_thread")]
5302+
async fn force_flushed_current_bucket_rows_stay_queryable() -> Result<()> {
5303+
// SAFETY: walrus reads WALRUS_DATA_DIR from process env; #[serial] protects it.
5304+
let prefix = uuid::Uuid::new_v4().to_string()[..8].to_string();
5305+
let cfg = create_test_config(&prefix);
5306+
unsafe { std::env::set_var("WALRUS_DATA_DIR", cfg.core.wal_dir()) };
5307+
tokio::time::timeout(std::time::Duration::from_secs(50), async {
5308+
// Need the real buffered layer (force_flush path), so bootstrap the
5309+
// full stack rather than the layer-less setup_test_database().
5310+
let b = crate::bootstrap::bootstrap(Arc::clone(&cfg)).await?;
5311+
let project_id = format!("ffq_{}", prefix);
5312+
5313+
// 3 rows into the current (open) bucket, then force-flush them to
5314+
// Delta — leaving the bucket drained but its range still "current".
5315+
// skip_queue=false so the write flows through the buffered layer
5316+
// (WAL → MemBuffer), not straight to Delta.
5317+
for i in 0..3 {
5318+
let batch = json_to_batch(vec![test_span(&format!("flushed_{i}"), "span", &project_id)])?;
5319+
b.db.insert_records_batch(&project_id, "otel_logs_and_spans", vec![batch], false, None).await?;
5320+
}
5321+
b.buffered_layer.force_flush_current_buckets().await?;
5322+
5323+
// 2 more rows repopulate the same current bucket_id in MemBuffer.
5324+
for i in 0..2 {
5325+
let batch = json_to_batch(vec![test_span(&format!("buffered_{i}"), "span", &project_id)])?;
5326+
b.db.insert_records_batch(&project_id, "otel_logs_and_spans", vec![batch], false, None).await?;
5327+
}
5328+
5329+
// All 5 must be visible: 3 from Delta (force-flushed), 2 from MemBuffer.
5330+
// Pre-fix this returned 2 (the current range was excluded from Delta).
5331+
use datafusion::arrow::array::AsArray;
5332+
let sql = format!("SELECT COUNT(*) FROM otel_logs_and_spans WHERE project_id = '{}'", project_id);
5333+
let r = b.session_ctx.sql(&sql).await?.collect().await?;
5334+
let n = r[0].column(0).as_primitive::<arrow::datatypes::Int64Type>().value(0);
5335+
assert_eq!(n, 5, "force-flushed rows must remain queryable alongside repopulated MemBuffer rows");
5336+
5337+
b.shutdown.cancel();
5338+
Ok(())
5339+
})
5340+
.await
5341+
.map_err(|_| anyhow::anyhow!("Test timed out after 50 seconds"))?
5342+
}
5343+
52775344
#[serial]
52785345
#[tokio::test(flavor = "multi_thread")]
52795346
async fn test_multiple_projects() -> Result<()> {

0 commit comments

Comments
 (0)