Skip to content

Commit d080b8c

Browse files
Remove test-side contention from DDL sync ordering checks
1 parent 2e91054 commit d080b8c

1 file changed

Lines changed: 18 additions & 24 deletions

File tree

crates/contextdb-engine/tests/sql_surface_tests.rs

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use contextdb_core::{MemoryAccountant, Value};
22
use contextdb_engine::Database;
33
use std::collections::HashMap;
44
use std::sync::atomic::{AtomicBool, Ordering};
5-
use std::sync::{Arc, Barrier, Mutex};
5+
use std::sync::{Arc, Barrier};
66
use std::thread;
77
use std::time::{Duration, Instant};
88
use tempfile::TempDir;
@@ -2470,8 +2470,10 @@ fn sql_15_ddl_dml_lsn_causal_ordering() {
24702470
let iterations = 40;
24712471
let barrier = Arc::new(Barrier::new(workers + 1));
24722472
let done = Arc::new(AtomicBool::new(false));
2473-
let expected_tables = Arc::new(Mutex::new(Vec::<String>::new()));
2474-
let poller_expected = expected_tables.clone();
2473+
let expected_tables: Vec<String> = (0..workers)
2474+
.flat_map(|worker| (0..iterations).map(move |i| format!("lsn_race_{worker}_{i}")))
2475+
.collect();
2476+
let poller_expected = Arc::new(expected_tables.clone());
24752477
let poller_db = db.clone();
24762478
let poller_done = done.clone();
24772479
let poller_barrier = barrier.clone();
@@ -2484,7 +2486,7 @@ fn sql_15_ddl_dml_lsn_causal_ordering() {
24842486
poller_barrier.wait();
24852487

24862488
loop {
2487-
let expected_len = poller_expected.lock().unwrap().len();
2489+
let expected_len = poller_expected.len();
24882490
let changes = poller_db.changes_since(watermark);
24892491
let before_seen = seen_creates.len();
24902492
if !changes.ddl.is_empty() || !changes.rows.is_empty() {
@@ -2525,15 +2527,14 @@ fn sql_15_ddl_dml_lsn_causal_ordering() {
25252527
}
25262528
}
25272529

2528-
let expected = poller_expected.lock().unwrap().clone();
2530+
let expected = poller_expected.as_ref().clone();
25292531
(row_before_create, seen_creates, expected)
25302532
});
25312533

25322534
let mut handles = Vec::new();
25332535
for worker in 0..workers {
25342536
let db = db.clone();
25352537
let barrier = barrier.clone();
2536-
let expected = expected_tables.clone();
25372538
handles.push(thread::spawn(move || {
25382539
barrier.wait();
25392540
for i in 0..iterations {
@@ -2543,7 +2544,6 @@ fn sql_15_ddl_dml_lsn_causal_ordering() {
25432544
&empty(),
25442545
)
25452546
.unwrap();
2546-
expected.lock().unwrap().push(table.clone());
25472547
db.execute(
25482548
&format!("INSERT INTO {table} (id, val) VALUES ($id, 'data')"),
25492549
&params(vec![("id", Value::Uuid(Uuid::new_v4()))]),
@@ -2590,8 +2590,10 @@ fn sql_16_ddl_lsn_no_duplicates_under_contention() {
25902590
let iterations = 25;
25912591
let barrier = Arc::new(Barrier::new(workers + 1));
25922592
let done = Arc::new(AtomicBool::new(false));
2593-
let expected_columns = Arc::new(Mutex::new(Vec::<String>::new()));
2594-
let poller_expected = expected_columns.clone();
2593+
let expected_columns: Vec<String> = (0..workers)
2594+
.flat_map(|worker| (0..iterations).map(move |i| format!("c_{worker}_{i}")))
2595+
.collect();
2596+
let poller_expected = Arc::new(expected_columns.clone());
25952597
let poller_db = db.clone();
25962598
let poller_done = done.clone();
25972599
let poller_barrier = barrier.clone();
@@ -2603,7 +2605,7 @@ fn sql_16_ddl_lsn_no_duplicates_under_contention() {
26032605
poller_barrier.wait();
26042606

26052607
loop {
2606-
let expected_len = poller_expected.lock().unwrap().len();
2608+
let expected_len = poller_expected.len();
26072609
let changes = poller_db.changes_since(watermark);
26082610
let before_seen = seen_columns.len();
26092611
if !changes.ddl.is_empty() || !changes.rows.is_empty() {
@@ -2644,19 +2646,14 @@ fn sql_16_ddl_lsn_no_duplicates_under_contention() {
26442646
}
26452647
}
26462648

2647-
let expected = poller_expected.lock().unwrap().len();
2648-
(
2649-
seen_columns,
2650-
poller_expected.lock().unwrap().clone(),
2651-
expected,
2652-
)
2649+
let expected = poller_expected.len();
2650+
(seen_columns, poller_expected.as_ref().clone(), expected)
26532651
});
26542652

26552653
let mut handles = Vec::new();
26562654
for worker in 0..workers {
26572655
let db = db.clone();
26582656
let barrier = barrier.clone();
2659-
let expected = expected_columns.clone();
26602657
handles.push(thread::spawn(move || {
26612658
barrier.wait();
26622659
for i in 0..iterations {
@@ -2671,7 +2668,6 @@ fn sql_16_ddl_lsn_no_duplicates_under_contention() {
26712668
&empty(),
26722669
)
26732670
.unwrap();
2674-
expected.lock().unwrap().push(col);
26752671
}
26762672
}));
26772673
}
@@ -2702,11 +2698,11 @@ fn sql_17_sync_watermark_does_not_skip_ddl() {
27022698

27032699
let barrier = Arc::new(Barrier::new(3));
27042700
let done = Arc::new(AtomicBool::new(false));
2705-
let expected_tables = Arc::new(Mutex::new(Vec::<String>::new()));
2701+
let expected_tables: Vec<String> = (0..120).map(|i| format!("watermark_race_{i}")).collect();
27062702

27072703
let poller_db = db.clone();
27082704
let poller_done = done.clone();
2709-
let poller_expected = expected_tables.clone();
2705+
let poller_expected = Arc::new(expected_tables.clone());
27102706
let poller_barrier = barrier.clone();
27112707
let poller = thread::spawn(move || {
27122708
let mut watermark = 0_u64;
@@ -2715,7 +2711,7 @@ fn sql_17_sync_watermark_does_not_skip_ddl() {
27152711
poller_barrier.wait();
27162712

27172713
loop {
2718-
let expected_len = poller_expected.lock().unwrap().len();
2714+
let expected_len = poller_expected.len();
27192715
let changes = poller_db.changes_since(watermark);
27202716
let before_seen = seen_tables.len();
27212717
if !changes.ddl.is_empty() || !changes.rows.is_empty() {
@@ -2751,12 +2747,11 @@ fn sql_17_sync_watermark_does_not_skip_ddl() {
27512747
}
27522748
}
27532749

2754-
let expected = poller_expected.lock().unwrap().clone();
2750+
let expected = poller_expected.as_ref().clone();
27552751
(seen_tables, expected)
27562752
});
27572753

27582754
let ddl_db = db.clone();
2759-
let ddl_expected = expected_tables.clone();
27602755
let ddl_barrier = barrier.clone();
27612756
let ddl_thread = thread::spawn(move || {
27622757
ddl_barrier.wait();
@@ -2768,7 +2763,6 @@ fn sql_17_sync_watermark_does_not_skip_ddl() {
27682763
&empty(),
27692764
)
27702765
.unwrap();
2771-
ddl_expected.lock().unwrap().push(table);
27722766
}
27732767
});
27742768

0 commit comments

Comments
 (0)