From 573aa7d6b8090ca75bf9a87d70bce2f7cee4b42e Mon Sep 17 00:00:00 2001 From: cijothomas Date: Fri, 13 Mar 2026 17:29:44 -0700 Subject: [PATCH] Handle OS signals (SIGTERM/SIGINT) for graceful pipeline shutdown --- rust/otap-dataflow/Cargo.toml | 2 +- .../crates/controller/src/lib.rs | 146 ++++++++++++++++-- 2 files changed, 131 insertions(+), 17 deletions(-) diff --git a/rust/otap-dataflow/Cargo.toml b/rust/otap-dataflow/Cargo.toml index 84e8826d11..5c2fdcaa0c 100644 --- a/rust/otap-dataflow/Cargo.toml +++ b/rust/otap-dataflow/Cargo.toml @@ -165,7 +165,7 @@ tempfile = "3" thiserror = "2.0.17" tracing = { version = ">=0.1.40", default-features = false } tracing-subscriber = { version = "0.3", default-features = false } -tokio = { version = "1.48.0", features = ["rt", "time", "net", "io-util", "sync", "macros", "rt-multi-thread", "fs", "io-std", "process"] } +tokio = { version = "1.48.0", features = ["rt", "time", "net", "io-util", "sync", "macros", "rt-multi-thread", "fs", "io-std", "process", "signal"] } tokio-stream = "0.1.17" tokio-util = { version = "0.7.17" } tonic = { version = "0.14", default-features = false, features = [ diff --git a/rust/otap-dataflow/crates/controller/src/lib.rs b/rust/otap-dataflow/crates/controller/src/lib.rs index 2b4570618b..fea72988f9 100644 --- a/rust/otap-dataflow/crates/controller/src/lib.rs +++ b/rust/otap-dataflow/crates/controller/src/lib.rs @@ -64,7 +64,7 @@ use otap_df_engine::ReceivedAtNode; use otap_df_engine::Unwindable; use otap_df_engine::context::{ControllerContext, PipelineContext}; use otap_df_engine::control::{ - PipelineCtrlMsgReceiver, PipelineCtrlMsgSender, pipeline_ctrl_msg_channel, + PipelineAdminSender, PipelineCtrlMsgReceiver, PipelineCtrlMsgSender, pipeline_ctrl_msg_channel, }; use otap_df_engine::entity_context::{ node_entity_key, pipeline_entity_key, set_pipeline_entity_key, @@ -1256,21 +1256,19 @@ impl> = ctrl_msg_senders + .into_iter() + .map(|sender| Arc::new(sender) as Arc) + .collect(); + let signal_senders = admin_senders.clone(); + // Start the admin HTTP server let admin_server_handle = spawn_thread_local_task( "http-admin", admin_tracing_setup, move |cancellation_token| { - // Convert the concrete senders to trait objects for the admin crate - let admin_senders: Vec> = - ctrl_msg_senders - .into_iter() - .map(|sender| { - Arc::new(sender) - as Arc - }) - .collect(); - otap_df_admin::run( admin_settings, obs_state_handle, @@ -1281,6 +1279,15 @@ impl> = Vec::with_capacity(threads.len()); for (thread_name, thread_id, pipeline_key, handle) in threads { @@ -1325,11 +1332,6 @@ impl>) { + // The JoinHandle is intentionally dropped — the thread is detached and + // will be cleaned up when the process exits. + drop( + thread::Builder::new() + .name("signal-handler".into()) + .spawn(move || { + // Build a lightweight single-threaded runtime for signal handling. + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to create signal-handler runtime"); + + rt.block_on(async { + // ── First signal: graceful shutdown ───────────────── + let signal_name = Self::recv_termination_signal().await; + + otel_info!( + "shutdown.signal_received", + signal = signal_name, + message = "OS termination signal received, initiating graceful \ + shutdown. Send the signal again to force immediate exit." + ); + + // Give pipelines a generous deadline to drain (60 s by default — + // matches the default Kubernetes terminationGracePeriodSeconds). + let deadline = + std::time::Instant::now() + std::time::Duration::from_secs(60); + + let mut errors = Vec::new(); + for sender in &senders { + if let Err(e) = sender.try_send_shutdown( + deadline, + format!("Shutdown requested via OS signal ({signal_name})."), + ) { + errors.push(e.to_string()); + } + } + + if errors.is_empty() { + otel_info!( + "shutdown.signal_dispatched", + pipeline_count = senders.len(), + message = "Shutdown message sent to all pipelines" + ); + } else { + otel_error!( + "shutdown.signal_dispatch_failed", + error_count = errors.len(), + message = "Some pipelines could not be notified of shutdown" + ); + } + + // ── Second signal: force exit ─────────────────────── + let signal_name = Self::recv_termination_signal().await; + otel_error!( + "shutdown.force_exit", + signal = signal_name, + message = "Second termination signal received, forcing immediate exit" + ); + std::process::exit(1); + }); + }) + .expect("failed to spawn signal-handler thread"), + ); + } + + /// Awaits the first OS termination signal and returns its name. + /// + /// On Unix this listens for both SIGINT and SIGTERM. + /// On other platforms only Ctrl-C (SIGINT equivalent) is supported. + async fn recv_termination_signal() -> &'static str { + #[cfg(unix)] + { + use tokio::signal::unix::{SignalKind, signal}; + + let mut sigterm = + signal(SignalKind::terminate()).expect("failed to register SIGTERM handler"); + let mut sigint = + signal(SignalKind::interrupt()).expect("failed to register SIGINT handler"); + + tokio::select! { + _ = sigterm.recv() => "SIGTERM", + _ = sigint.recv() => "SIGINT", + } + } + + #[cfg(not(unix))] + { + tokio::signal::ctrl_c() + .await + .expect("failed to listen for Ctrl-C"); + "Ctrl-C" + } + } + /// Selects which CPU cores to use based on the given allocation. fn select_cores_for_allocation( mut available_core_ids: Vec,