Skip to content

Commit f3e5292

Browse files
fix: Properly shutdown quickwit-serve when subcomponents panic or otherwise error.
Before this change, the `if let Err` block silently swallows the error and logs it. The code continues on to the `shutdown_handle.await` call. In the case where the `tokio::try_join!` returns an error (such as when any of the three components for the three `JoinHandle` arguments panic), the `shutdown_handle` is not guaranteed to have completed, so the program sits there waiting for a SIGTERM, even though some components aren’t running.
1 parent b69ddfe commit f3e5292

File tree

1 file changed

+11
-3
lines changed
  • quickwit/quickwit-serve/src

1 file changed

+11
-3
lines changed

quickwit/quickwit-serve/src/lib.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ use anyhow::{Context, bail};
5454
use bytesize::ByteSize;
5555
pub(crate) use decompression::Body;
5656
pub use format::BodyFormat;
57+
use futures::stream::Abortable;
58+
use futures::stream::AbortHandle;
5759
use futures::StreamExt;
5860
use itertools::Itertools;
5961
use once_cell::sync::Lazy;
@@ -396,14 +398,14 @@ fn start_shard_positions_service(
396398
/// Usually called when receiving a SIGTERM signal, e.g. k8s trying to
397399
/// decomission a pod.
398400
async fn shutdown_signal_handler(
399-
shutdown_signal: BoxFutureInfaillible<()>,
401+
shutdown_signal: Abortable<BoxFutureInfaillible<()>>,
400402
universe: Universe,
401403
ingester_opt: Option<Ingester>,
402404
grpc_shutdown_trigger_tx: oneshot::Sender<()>,
403405
rest_shutdown_trigger_tx: oneshot::Sender<()>,
404406
cluster: Cluster,
405407
) -> HashMap<String, ActorExitStatus> {
406-
shutdown_signal.await;
408+
let _ = shutdown_signal.await;
407409
// We must decommission the ingester first before terminating the indexing pipelines that
408410
// may consume from it. We also need to keep the gRPC server running while doing so.
409411
if let Some(ingester) = ingester_opt
@@ -825,8 +827,10 @@ pub async fn serve_quickwit(
825827
"node_readiness_reporting",
826828
);
827829

830+
let (shutdown_signal_abort_handle, shutdown_signal_abort_reg) = AbortHandle::new_pair();
831+
let shutdown_signal_abortable = Abortable::new(shutdown_signal, shutdown_signal_abort_reg);
828832
let shutdown_handle = tokio::spawn(shutdown_signal_handler(
829-
shutdown_signal,
833+
shutdown_signal_abortable,
830834
universe,
831835
ingester_opt,
832836
grpc_shutdown_trigger_tx,
@@ -851,8 +855,12 @@ pub async fn serve_quickwit(
851855

852856
if let Err(err) = tokio::try_join!(grpc_join_handle, rest_join_handle, chitchat_server_handle) {
853857
error!("server failed: {err:?}");
858+
859+
// Trigger a shutdown by completing the shutdown_signal handle.
860+
shutdown_signal_abort_handle.abort();
854861
}
855862

863+
info!("waiting for services to shutdown");
856864
let actor_exit_statuses = shutdown_handle
857865
.await
858866
.context("failed to gracefully shutdown services")?;

0 commit comments

Comments
 (0)