diff --git a/tests/dedup_compaction_test.rs b/tests/dedup_compaction_test.rs index 9544cfe..f45d82d 100644 --- a/tests/dedup_compaction_test.rs +++ b/tests/dedup_compaction_test.rs @@ -80,6 +80,7 @@ async fn dedup_compaction_collapses_cross_flush_duplicates() -> Result<()> { #[serial] #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn dedup_commits_despite_concurrent_appends() -> Result<()> { + use std::sync::atomic::Ordering::{Acquire, Release}; let cfg = TestConfigBuilder::new("dedup_occ_race").with_buffer_mode(BufferMode::Enabled).build(); let _env = walrus_env_guard(&cfg.core.timefusion_data_dir); let db = Arc::new(Database::with_config(Arc::clone(&cfg)).await?); @@ -95,31 +96,44 @@ async fn dedup_commits_despite_concurrent_appends() -> Result<()> { // Append fire: fresh-timestamp rows (same partition date space, distinct // ids) committing continuously while dedup rewrites the sealed chunk. let stop = Arc::new(std::sync::atomic::AtomicBool::new(false)); + let committed = Arc::new(std::sync::atomic::AtomicU64::new(0)); let appender = { - let (db, project_id, stop) = (Arc::clone(&db), project_id.clone(), Arc::clone(&stop)); + let (db, project_id, stop, committed) = (Arc::clone(&db), project_id.clone(), Arc::clone(&stop), Arc::clone(&committed)); tokio::spawn(async move { let mut i = 0u64; - while !stop.load(std::sync::atomic::Ordering::Relaxed) { + while !stop.load(Acquire) { let now = chrono::Utc::now().timestamp_micros(); let batch = json_to_batch(vec![test_span_ts(&format!("live_{i}"), "live", &project_id, now)]).unwrap(); db.insert_records_batch(&project_id, "otel_logs_and_spans", vec![batch], true, None).await.unwrap(); i += 1; + committed.store(i, Release); } i }) }; + // Gate dedup on the appender's first committed row so the race is guaranteed, + // not a scheduling artifact: on a loaded CI runner dedup could otherwise finish + // before the spawned task is scheduled, failing the `appended > 0` assertion. + while committed.load(Acquire) == 0 { + tokio::task::yield_now().await; + } + let table_ref = db.unified_tables().read().await.get("otel_logs_and_spans").expect("table created").clone(); let date = chrono::DateTime::::from_timestamp_micros(ts).unwrap().date_naive(); let dropped = db.dedup_partition(&table_ref, "otel_logs_and_spans", &project_id, date).await?; - stop.store(true, std::sync::atomic::Ordering::Relaxed); + stop.store(true, Release); let appended = appender.await?; assert!(appended > 0, "appender must have raced at least one commit"); assert_eq!(dropped, 1, "dedup must collapse the duplicate despite concurrent appends"); let count_sql = format!("SELECT COUNT(*) FROM otel_logs_and_spans WHERE project_id = '{}' AND id = 'dup_id'", project_id); let post = db.query_delta_only(&count_sql).await?; - assert_eq!(post[0].column(0).as_primitive::().value(0), 1); + assert_eq!( + post[0].column(0).as_primitive::().value(0), + 1, + "post-dedup: dup_id row should be collapsed to 1" + ); Ok(()) }