Skip to content

Commit bb13a4e

Browse files
authored
[ENH] Gracefully shutdown GC system (#4246)
## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - We would like to wait until existing GC jobs to finish on shutdown. Thus we will call `system.stop()` to gracefully shutdown all components when SIGINT/SIGTERM is received. Notice that the system component will always be shutdown once the existing message has been handled, so we don't need to worry about partial progress in the GC job. - New functionality - N/A ## Test plan *How are these changes tested?* - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes *Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the [docs repository](https://github.com/chroma-core/docs)?*
1 parent f78c675 commit bb13a4e

File tree

2 files changed

+29
-21
lines changed

2 files changed

+29
-21
lines changed

Diff for: rust/garbage_collector/src/garbage_collector_component.rs

+5
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,11 @@ impl Component for GarbageCollector {
178178
|| Some(span!(parent: None, tracing::Level::INFO, "Scheduled garbage collection")),
179179
);
180180
}
181+
182+
fn on_stop_timeout(&self) -> Duration {
183+
// NOTE: Increased timeout for remaining jobs to finish
184+
Duration::from_secs(60)
185+
}
181186
}
182187

183188
#[derive(Debug)]

Diff for: rust/garbage_collector/src/lib.rs

+24-21
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use chroma_tracing::{
77
use config::GarbageCollectorConfig;
88
use garbage_collector_component::GarbageCollector;
99
use tokio::signal::unix::{signal, SignalKind};
10-
use tokio::time::{sleep, Duration};
1110
use tracing::{debug, error, info};
1211

1312
mod config;
@@ -56,7 +55,7 @@ pub async fn garbage_collector_service_entrypoint() -> Result<(), Box<dyn std::e
5655
})?;
5756

5857
let system = System::new();
59-
let dispatcher_handle = system.start_component(dispatcher);
58+
let mut dispatcher_handle = system.start_component(dispatcher);
6059

6160
// Start a background task to periodically check for garbage.
6261
// Garbage collector is a component that gets notified every
@@ -68,36 +67,40 @@ pub async fn garbage_collector_service_entrypoint() -> Result<(), Box<dyn std::e
6867
e
6968
})?;
7069

71-
garbage_collector_component.set_dispatcher(dispatcher_handle);
70+
garbage_collector_component.set_dispatcher(dispatcher_handle.clone());
7271
garbage_collector_component.set_system(system.clone());
7372

74-
let _ = system.start_component(garbage_collector_component);
73+
let mut garbage_collector_handle = system.start_component(garbage_collector_component);
7574

7675
// Keep the service running and handle shutdown signals
7776
let mut sigterm = signal(SignalKind::terminate())?;
7877
let mut sigint = signal(SignalKind::interrupt())?;
7978

8079
info!("Service running, waiting for signals");
81-
loop {
82-
tokio::select! {
83-
_ = sigterm.recv() => {
84-
info!("Received SIGTERM signal");
85-
break;
86-
}
87-
_ = sigint.recv() => {
88-
info!("Received SIGINT signal");
89-
break;
90-
}
91-
_ = sleep(Duration::from_secs(1)) => {
92-
// Keep the service running
93-
continue;
94-
}
80+
tokio::select! {
81+
_ = sigterm.recv() => {
82+
info!("Received SIGTERM signal");
83+
}
84+
_ = sigint.recv() => {
85+
info!("Received SIGINT signal");
9586
}
9687
}
97-
98-
// Give some time for any in-progress garbage collection to complete
9988
info!("Starting graceful shutdown, waiting for in-progress tasks");
100-
sleep(Duration::from_secs(5)).await;
89+
// NOTE: We should first stop the garbage collector. The garbage collector will finish the remaining jobs before shutdown.
90+
// We cannot directly shutdown the dispatcher and system because that will fail remaining jobs.
91+
garbage_collector_handle.stop();
92+
garbage_collector_handle
93+
.join()
94+
.await
95+
.expect("Garbage collector should be stoppable");
96+
dispatcher_handle.stop();
97+
dispatcher_handle
98+
.join()
99+
.await
100+
.expect("Dispatcher should be stoppable");
101+
system.stop().await;
102+
system.join().await;
103+
101104
info!("Shutting down garbage collector service");
102105
Ok(())
103106
}

0 commit comments

Comments
 (0)