Skip to content

Commit bd957b0

Browse files
committed
fix: add graceful compaction shutdown and queue tests
1 parent af98d63 commit bd957b0

File tree

3 files changed

+114
-12
lines changed

3 files changed

+114
-12
lines changed

src/compaction/driver.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -686,10 +686,16 @@ where
686686
let mut sleep = runtime.sleep(interval).fuse();
687687
futures::select_biased! {
688688
_ = sleep => Some(CompactionTriggerReason::Periodic),
689-
msg = tick_rx.next() => msg.map(|_| CompactionTriggerReason::Kick),
689+
msg = tick_rx.next() => match msg {
690+
Some(CompactionTrigger::Kick) => Some(CompactionTriggerReason::Kick),
691+
Some(CompactionTrigger::Shutdown) | None => None,
692+
},
690693
}
691694
} else {
692-
tick_rx.next().await.map(|_| CompactionTriggerReason::Kick)
695+
match tick_rx.next().await {
696+
Some(CompactionTrigger::Kick) => Some(CompactionTriggerReason::Kick),
697+
Some(CompactionTrigger::Shutdown) | None => None,
698+
}
693699
}
694700
}
695701

src/compaction/handle.rs

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
use std::marker::PhantomData;
44

55
use fusio::executor::Executor;
6+
#[cfg(test)]
7+
use fusio::executor::JoinHandle;
8+
#[cfg(test)]
9+
use futures::SinkExt;
610
use futures::{channel::mpsc, future::AbortHandle};
711

812
/// Handle to a background compaction worker.
@@ -12,10 +16,12 @@ use futures::{channel::mpsc, future::AbortHandle};
1216
#[derive(Debug)]
1317
pub(crate) enum CompactionTrigger {
1418
Kick,
19+
Shutdown,
1520
}
1621

1722
pub(crate) struct CompactionHandle<E: Executor> {
18-
abort: AbortHandle,
23+
abort: Option<AbortHandle>,
24+
join: Option<E::JoinHandle<()>>,
1925
trigger: Option<mpsc::Sender<CompactionTrigger>>,
2026
_marker: PhantomData<E>,
2127
}
@@ -24,11 +30,12 @@ impl<E: Executor> CompactionHandle<E> {
2430
/// Create a new compaction handle.
2531
pub(crate) fn new(
2632
abort: AbortHandle,
27-
_join: Option<E::JoinHandle<()>>,
33+
join: Option<E::JoinHandle<()>>,
2834
trigger: Option<mpsc::Sender<CompactionTrigger>>,
2935
) -> Self {
3036
Self {
31-
abort,
37+
abort: Some(abort),
38+
join,
3239
trigger,
3340
_marker: PhantomData,
3441
}
@@ -41,10 +48,29 @@ impl<E: Executor> CompactionHandle<E> {
4148
let _ = sender.try_send(CompactionTrigger::Kick);
4249
}
4350
}
51+
52+
/// Gracefully stop the compaction worker and wait for it to exit.
53+
#[cfg(test)]
54+
pub(crate) async fn shutdown(mut self) {
55+
if let Some(mut sender) = self.trigger.take() {
56+
let _ = sender.send(CompactionTrigger::Shutdown).await;
57+
}
58+
if let Some(join) = self.join.take() {
59+
let _ = join.join().await;
60+
}
61+
self.abort.take();
62+
}
4463
}
4564

4665
impl<E: Executor> Drop for CompactionHandle<E> {
4766
fn drop(&mut self) {
48-
self.abort.abort();
67+
if let Some(sender) = &self.trigger {
68+
let mut sender = sender.clone();
69+
let _ = sender.try_send(CompactionTrigger::Shutdown);
70+
}
71+
if let Some(abort) = self.abort.take() {
72+
abort.abort();
73+
}
74+
let _ = self.join.take();
4975
}
5076
}

src/db/tests/core/compaction.rs

Lines changed: 76 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1431,12 +1431,13 @@ async fn compaction_self_kick_advances_without_periodic_tick()
14311431
let id_allocator = Arc::new(AtomicU64::new(10));
14321432
let executor = LocalCompactionExecutor::with_id_allocator(Arc::clone(&sst_cfg), id_allocator);
14331433
let driver = Arc::new(db.compaction_driver());
1434-
let worker_config = CompactionWorkerConfig::new(None, 1, 1, CascadeConfig::default());
1434+
let worker_config =
1435+
CompactionWorkerConfig::new(None, 1, 1, CascadeConfig::new(0, Duration::from_millis(0)));
14351436
let handle = driver.spawn_worker(Arc::clone(&db.executor), planner, executor, worker_config);
14361437

14371438
handle.kick();
14381439

1439-
let deadline = Instant::now() + Duration::from_secs(2);
1440+
let deadline = Instant::now() + Duration::from_secs(10);
14401441
loop {
14411442
let snapshot = db.manifest.snapshot_latest(db.manifest_table).await?;
14421443
if let Some(version) = snapshot.latest_version.as_ref() {
@@ -1457,6 +1458,7 @@ async fn compaction_self_kick_advances_without_periodic_tick()
14571458
"expected L2 compaction to complete without new writes"
14581459
);
14591460

1461+
handle.shutdown().await;
14601462
fs::remove_dir_all(&db_root)?;
14611463
Ok(())
14621464
}
@@ -2386,7 +2388,7 @@ async fn compaction_periodic_trigger_records_metrics() -> Result<(), Box<dyn std
23862388
wait_for_executions(&executed, 1).await,
23872389
"expected compaction execution"
23882390
);
2389-
drop(handle);
2391+
handle.shutdown().await;
23902392

23912393
let snapshot = metrics.snapshot();
23922394
assert!(snapshot.trigger_periodic >= 1);
@@ -2473,7 +2475,7 @@ async fn compaction_kick_triggers_without_periodic_tick() -> Result<(), Box<dyn
24732475
wait_for_executions(&executed, 1).await,
24742476
"expected compaction work after kick"
24752477
);
2476-
drop(handle);
2478+
handle.shutdown().await;
24772479

24782480
let snapshot = metrics.snapshot();
24792481
assert_eq!(snapshot.trigger_kick, 1);
@@ -2482,6 +2484,74 @@ async fn compaction_kick_triggers_without_periodic_tick() -> Result<(), Box<dyn
24822484
Ok(())
24832485
}
24842486

2487+
#[tokio::test(flavor = "current_thread")]
2488+
async fn cascade_scheduling_queue_size_one() -> Result<(), Box<dyn std::error::Error>> {
2489+
let schema = Arc::new(Schema::new(vec![
2490+
Field::new("id", DataType::Utf8, false),
2491+
Field::new("v", DataType::Int32, false),
2492+
]));
2493+
let mode_cfg = SchemaBuilder::from_schema(Arc::clone(&schema))
2494+
.primary_key("id")
2495+
.build()
2496+
.expect("schema builder");
2497+
let executor = Arc::new(TokioExecutor::default());
2498+
let mut inner: DbInner<InMemoryFs, TokioExecutor> =
2499+
DB::new(mode_cfg, Arc::clone(&executor)).await?.into_inner();
2500+
let metrics = Arc::new(CompactionMetrics::new());
2501+
inner.compaction_metrics = Some(Arc::clone(&metrics));
2502+
2503+
let entry = SstEntry::new(
2504+
SsTableId::new(1),
2505+
None,
2506+
None,
2507+
Path::from("L0/001.parquet"),
2508+
None,
2509+
);
2510+
inner
2511+
.manifest
2512+
.apply_version_edits(
2513+
inner.manifest_table,
2514+
&[VersionEdit::AddSsts {
2515+
level: 0,
2516+
entries: vec![entry],
2517+
}],
2518+
)
2519+
.await?;
2520+
2521+
let executed = Arc::new(StdMutex::new(Vec::new()));
2522+
let planner = CascadePlanner;
2523+
let executor = CountingExecutor::new(Arc::clone(&executed));
2524+
let driver = Arc::new(inner.compaction_driver());
2525+
let worker_config =
2526+
CompactionWorkerConfig::new(None, 1, 1, CascadeConfig::new(1, Duration::from_millis(0)));
2527+
let handle = driver.spawn_worker(
2528+
Arc::clone(&inner.executor),
2529+
planner,
2530+
executor,
2531+
worker_config,
2532+
);
2533+
2534+
handle.kick();
2535+
assert!(
2536+
wait_for_executions(&executed, 2).await,
2537+
"expected L0->L1 and L1->L2 executions with queue size 1"
2538+
);
2539+
handle.shutdown().await;
2540+
2541+
let tasks = executed.lock().expect("executed lock").clone();
2542+
let l0_to_l1 = tasks
2543+
.iter()
2544+
.filter(|(source, target)| *source == 0 && *target == 1)
2545+
.count();
2546+
let l1_to_l2 = tasks
2547+
.iter()
2548+
.filter(|(source, target)| *source == 1 && *target == 2)
2549+
.count();
2550+
assert_eq!(l0_to_l1, 1);
2551+
assert_eq!(l1_to_l2, 1);
2552+
Ok(())
2553+
}
2554+
24852555
#[tokio::test(flavor = "current_thread")]
24862556
async fn cascade_scheduling_respects_budget() -> Result<(), Box<dyn std::error::Error>> {
24872557
let schema = Arc::new(Schema::new(vec![
@@ -2534,7 +2604,7 @@ async fn cascade_scheduling_respects_budget() -> Result<(), Box<dyn std::error::
25342604
wait_for_executions(&executed, 1).await,
25352605
"expected initial compaction execution"
25362606
);
2537-
drop(handle);
2607+
handle.shutdown().await;
25382608

25392609
let tasks = executed.lock().expect("executed lock").clone();
25402610
let l0_to_l1 = tasks
@@ -2617,7 +2687,7 @@ async fn cascade_scheduling_respects_cooldown() -> Result<(), Box<dyn std::error
26172687
"expected two L0->L1 executions and one cascade"
26182688
);
26192689

2620-
drop(handle);
2690+
handle.shutdown().await;
26212691

26222692
let tasks = executed.lock().expect("executed lock").clone();
26232693
let l0_to_l1 = tasks

0 commit comments

Comments
 (0)