Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions tests/dedup_compaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ async fn dedup_compaction_collapses_cross_flush_duplicates() -> Result<()> {
#[serial]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn dedup_commits_despite_concurrent_appends() -> Result<()> {
use std::sync::atomic::Ordering::{Acquire, Release};
let cfg = TestConfigBuilder::new("dedup_occ_race").with_buffer_mode(BufferMode::Enabled).build();
let _env = walrus_env_guard(&cfg.core.timefusion_data_dir);
let db = Arc::new(Database::with_config(Arc::clone(&cfg)).await?);
Expand All @@ -95,31 +96,44 @@ async fn dedup_commits_despite_concurrent_appends() -> Result<()> {
// Append fire: fresh-timestamp rows (same partition date space, distinct
// ids) committing continuously while dedup rewrites the sealed chunk.
let stop = Arc::new(std::sync::atomic::AtomicBool::new(false));
let committed = Arc::new(std::sync::atomic::AtomicU64::new(0));
let appender = {
let (db, project_id, stop) = (Arc::clone(&db), project_id.clone(), Arc::clone(&stop));
let (db, project_id, stop, committed) = (Arc::clone(&db), project_id.clone(), Arc::clone(&stop), Arc::clone(&committed));
tokio::spawn(async move {
let mut i = 0u64;
while !stop.load(std::sync::atomic::Ordering::Relaxed) {
while !stop.load(Acquire) {
let now = chrono::Utc::now().timestamp_micros();
let batch = json_to_batch(vec![test_span_ts(&format!("live_{i}"), "live", &project_id, now)]).unwrap();
db.insert_records_batch(&project_id, "otel_logs_and_spans", vec![batch], true, None).await.unwrap();
i += 1;
committed.store(i, Release);
}
i
})
};

// Gate dedup on the appender's first committed row so the race is guaranteed,
// not a scheduling artifact: on a loaded CI runner dedup could otherwise finish
// before the spawned task is scheduled, failing the `appended > 0` assertion.
while committed.load(Acquire) == 0 {
tokio::task::yield_now().await;
}

let table_ref = db.unified_tables().read().await.get("otel_logs_and_spans").expect("table created").clone();
let date = chrono::DateTime::<chrono::Utc>::from_timestamp_micros(ts).unwrap().date_naive();
let dropped = db.dedup_partition(&table_ref, "otel_logs_and_spans", &project_id, date).await?;
stop.store(true, std::sync::atomic::Ordering::Relaxed);
stop.store(true, Release);
let appended = appender.await?;
assert!(appended > 0, "appender must have raced at least one commit");
assert_eq!(dropped, 1, "dedup must collapse the duplicate despite concurrent appends");

let count_sql = format!("SELECT COUNT(*) FROM otel_logs_and_spans WHERE project_id = '{}' AND id = 'dup_id'", project_id);
let post = db.query_delta_only(&count_sql).await?;
assert_eq!(post[0].column(0).as_primitive::<Int64Type>().value(0), 1);
assert_eq!(
post[0].column(0).as_primitive::<Int64Type>().value(0),
1,
"post-dedup: dup_id row should be collapsed to 1"
);
Ok(())
}

Expand Down
Loading