Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ tempfile = "3"
thiserror = "2.0.17"
tracing = { version = ">=0.1.40", default-features = false }
tracing-subscriber = { version = "0.3", default-features = false }
tokio = { version = "1.48.0", features = ["rt", "time", "net", "io-util", "sync", "macros", "rt-multi-thread", "fs", "io-std", "process"] }
tokio = { version = "1.48.0", features = ["rt", "time", "net", "io-util", "sync", "macros", "rt-multi-thread", "fs", "io-std", "process", "signal"] }
tokio-stream = "0.1.17"
tokio-util = { version = "0.7.17" }
tonic = { version = "0.14", default-features = false, features = [
Expand Down
146 changes: 130 additions & 16 deletions rust/otap-dataflow/crates/controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use otap_df_engine::ReceivedAtNode;
use otap_df_engine::Unwindable;
use otap_df_engine::context::{ControllerContext, PipelineContext};
use otap_df_engine::control::{
PipelineCtrlMsgReceiver, PipelineCtrlMsgSender, pipeline_ctrl_msg_channel,
PipelineAdminSender, PipelineCtrlMsgReceiver, PipelineCtrlMsgSender, pipeline_ctrl_msg_channel,
};
use otap_df_engine::entity_context::{
node_entity_key, pipeline_entity_key, set_pipeline_entity_key,
Expand Down Expand Up @@ -1256,21 +1256,19 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug + ReceivedAtNode + U
// Drop the original metrics sender so only pipeline threads hold references
drop(metrics_reporter);

// Convert the concrete senders to trait objects once, shared by both the
// admin HTTP server and the OS-signal handler for graceful shutdown.
let admin_senders: Vec<Arc<dyn PipelineAdminSender>> = ctrl_msg_senders
.into_iter()
.map(|sender| Arc::new(sender) as Arc<dyn PipelineAdminSender>)
.collect();
let signal_senders = admin_senders.clone();

// Start the admin HTTP server
let admin_server_handle = spawn_thread_local_task(
"http-admin",
admin_tracing_setup,
move |cancellation_token| {
// Convert the concrete senders to trait objects for the admin crate
let admin_senders: Vec<Arc<dyn otap_df_engine::control::PipelineAdminSender>> =
ctrl_msg_senders
.into_iter()
.map(|sender| {
Arc::new(sender)
as Arc<dyn otap_df_engine::control::PipelineAdminSender>
})
.collect();

otap_df_admin::run(
admin_settings,
obs_state_handle,
Expand All @@ -1281,6 +1279,15 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug + ReceivedAtNode + U
},
)?;

// Start the OS-signal listener on a dedicated background thread so it
// runs concurrently with the pipeline join loop below. When the first
// SIGINT/SIGTERM arrives it sends shutdown messages to all pipelines,
// which causes them to drain and their threads to exit, unblocking the
// join loop.
if run_mode == RunMode::ParkMainThread {
Self::spawn_shutdown_signal_listener(signal_senders);
}

// Wait for all pipeline threads to finish and collect their results
let mut results: Vec<Result<(), Error>> = Vec::with_capacity(threads.len());
for (thread_name, thread_id, pipeline_key, handle) in threads {
Expand Down Expand Up @@ -1325,11 +1332,6 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug + ReceivedAtNode + U
return Err(err);
}

// In standard engine mode we keep the main thread parked after startup.
if run_mode == RunMode::ParkMainThread {
thread::park();
}

// All pipelines have finished; shut down the admin HTTP server and metric aggregator gracefully.
engine_metrics_handle.shutdown_and_join()?;
admin_server_handle.shutdown_and_join()?;
Expand All @@ -1343,6 +1345,118 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug + ReceivedAtNode + U
Ok(())
}

/// Spawns a dedicated background thread that listens for OS termination
/// signals (SIGINT / SIGTERM on Unix, Ctrl-C on all platforms) and sends
/// graceful shutdown messages to every pipeline.
///
/// Follows the same double-signal convention used by the OpenTelemetry
/// Collector (Go):
/// - **First signal** → initiates graceful shutdown (drain pipelines).
/// - **Second signal** → forces immediate process exit (`std::process::exit(1)`).
///
/// This allows Kubernetes (or any process manager that sends SIGTERM) to
/// trigger an orderly drain of in-flight telemetry data before the pod is
/// killed. If the graceful drain hangs, a second signal provides an escape
/// hatch.
///
/// The spawned thread is intentionally detached — it will be cleaned up
/// when the process exits normally or is force-killed by the second signal.
fn spawn_shutdown_signal_listener(senders: Vec<Arc<dyn PipelineAdminSender>>) {
// The JoinHandle is intentionally dropped — the thread is detached and
// will be cleaned up when the process exits.
drop(
thread::Builder::new()
.name("signal-handler".into())
.spawn(move || {
// Build a lightweight single-threaded runtime for signal handling.
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create signal-handler runtime");

rt.block_on(async {
// ── First signal: graceful shutdown ─────────────────
let signal_name = Self::recv_termination_signal().await;

otel_info!(
"shutdown.signal_received",
signal = signal_name,
message = "OS termination signal received, initiating graceful \
shutdown. Send the signal again to force immediate exit."
);

// Give pipelines a generous deadline to drain (60 s by default —
// matches the default Kubernetes terminationGracePeriodSeconds).
let deadline =
std::time::Instant::now() + std::time::Duration::from_secs(60);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: named constant for 60 secs?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additional nit: should be configurable


let mut errors = Vec::new();
for sender in &senders {
if let Err(e) = sender.try_send_shutdown(
deadline,
Copy link
Copy Markdown
Member

@lalitb lalitb Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One correctness issue here: try_send_shutdown() can drop the shutdown request if the pipeline control channel is full at signal time. That makes graceful shutdown best-effort under backpressure.

For this PR, I think the smallest fix could be a bounded retry inside the existing rt.block_on(async { ... }) block, e.g. retry try_send_shutdown() a few times with a short tokio::time::sleep(...).await between attempts before giving up and logging the error. That avoids trait changes and closes the immediate gap.

As a follow-up, we can either add a proper async shutdown send on the trait or move shutdown onto a dedicated out-of-band signal such as a watch channel.

Longer term, a dedicated out-of-band shutdown signal (e.g. watch channel per pipeline) also gives us a clean reusable ShutdownHandle that supervisor and OpAMP can call directly - same shutdown path regardless of trigger source.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, separate channel for control signals makes sense.

format!("Shutdown requested via OS signal ({signal_name})."),
) {
errors.push(e.to_string());
}
}

if errors.is_empty() {
otel_info!(
"shutdown.signal_dispatched",
pipeline_count = senders.len(),
message = "Shutdown message sent to all pipelines"
);
} else {
otel_error!(
"shutdown.signal_dispatch_failed",
error_count = errors.len(),
message = "Some pipelines could not be notified of shutdown"
);
}

// ── Second signal: force exit ───────────────────────
let signal_name = Self::recv_termination_signal().await;
otel_error!(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently it seems like it is sync, but if this ever uses a bufferred writer (which I doubt it will) this message may not get printed before exit. Maybe an eprintln! right under this for good measure?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree. Will swap to eprintln.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the purpose of raw_error!
I would prefer to use it for consistency.

"shutdown.force_exit",
signal = signal_name,
message = "Second termination signal received, forcing immediate exit"
);
std::process::exit(1);
});
})
.expect("failed to spawn signal-handler thread"),
);
}

/// Awaits the first OS termination signal and returns its name.
///
/// On Unix this listens for both SIGINT and SIGTERM.
/// On other platforms only Ctrl-C (SIGINT equivalent) is supported.
async fn recv_termination_signal() -> &'static str {
#[cfg(unix)]
{
use tokio::signal::unix::{SignalKind, signal};

let mut sigterm =
signal(SignalKind::terminate()).expect("failed to register SIGTERM handler");
let mut sigint =
signal(SignalKind::interrupt()).expect("failed to register SIGINT handler");

tokio::select! {
_ = sigterm.recv() => "SIGTERM",
_ = sigint.recv() => "SIGINT",
}
}

#[cfg(not(unix))]
{
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the windows equivalent that handles both ctrl_c, ctrl_break.
#[cfg(windows)]
{
use tokio::signal::windows::{ctrl_c, ctrl_break};

let mut sigint = ctrl_c()
    .expect("failed to register Ctrl-C handler");
let mut sigterm = ctrl_break()
    .expect("failed to register Ctrl-Break handler");

tokio::select! {
    _ = sigterm.recv() => "CTRL_BREAK (SIGTERM-equivalent)",
    _ = sigint.recv()  => "CTRL_C (SIGINT-equivalent)",
}

}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: On windows platform Ctrl+C can't be reliably sent to a process without console handle and it will be ignored. Safest option is to use CTRL_BREAK.

tokio::signal::ctrl_c()
.await
.expect("failed to listen for Ctrl-C");
"Ctrl-C"
}
}

/// Selects which CPU cores to use based on the given allocation.
fn select_cores_for_allocation(
mut available_core_ids: Vec<CoreId>,
Expand Down
Loading