From b9b273d45111438116a0496480c425e94f2e9ed8 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Tue, 13 Jan 2026 02:01:09 +0000 Subject: [PATCH] Add exponential backoff and log suppression for scheduler disconnection When the scheduler disconnects or becomes unavailable, the executor poll loop previously logged a WARN message every ~100ms, spamming logs during graceful shutdown scenarios. Changes: - Add exponential backoff (100ms to 30s) when scheduler connection fails - Reduce log level from WARN to DEBUG after 5 consecutive failures - Log restoration message when connection is re-established - Prevents log spam when scheduler shuts down before executor --- Cargo.lock | 85 ++++++++++++++++++++----- Cargo.toml | 1 + ballista/executor/Cargo.toml | 1 + ballista/executor/src/execution_loop.rs | 54 ++++++++++++++-- 4 files changed, 117 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 167efc6a7e..d71d034c3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -180,7 +180,7 @@ dependencies = [ "miniz_oxide", "num-bigint", "quad-rand", - "rand", + "rand 0.9.2", "regex-lite", "serde", "serde_bytes", @@ -961,6 +961,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "getrandom 0.2.16", + "instant", + "rand 0.8.5", +] + [[package]] name = "ballista" version = "50.0.0" @@ -992,7 +1003,7 @@ dependencies = [ "env_logger", "futures", "mimalloc", - "rand", + "rand 0.9.2", "serde", "serde_json", "snmalloc-rs", @@ -1037,7 +1048,7 @@ dependencies = [ "parking_lot", "prost", "prost-types", - "rand", + "rand 0.9.2", "rustc_version", "serde", "tempfile", @@ -1075,6 +1086,7 @@ dependencies = [ "arrow", "arrow-flight", "async-trait", + "backoff", "ballista-core", "clap 4.5.50", "dashmap", @@ -1118,7 +1130,7 @@ dependencies = [ "prometheus", "prost", "prost-types", - "rand", + "rand 0.9.2", "rstest", "serde", "tokio", @@ -1256,7 +1268,7 @@ dependencies = [ "log", "num", "pin-project-lite", - "rand", + "rand 0.9.2", "rustls", "rustls-native-certs", "rustls-pemfile", @@ -1805,7 +1817,7 @@ dependencies = [ "object_store", "parking_lot", "parquet", - "rand", + "rand 0.9.2", "regex", "sqlparser", "tempfile", @@ -1952,7 +1964,7 @@ dependencies = [ "log", "object_store", "parquet", - "rand", + "rand 0.9.2", "tempfile", "tokio", "tokio-util", @@ -2062,7 +2074,7 @@ dependencies = [ "object_store", "parking_lot", "parquet", - "rand", + "rand 0.9.2", "tokio", ] @@ -2086,7 +2098,7 @@ dependencies = [ "object_store", "parking_lot", "parquet", - "rand", + "rand 0.9.2", "tempfile", "url", ] @@ -2167,7 +2179,7 @@ dependencies = [ "itertools 0.14.0", "log", "md-5", - "rand", + "rand 0.9.2", "regex", "sha2", "unicode-segmentation", @@ -2939,7 +2951,7 @@ dependencies = [ "into-attr-derive", "pest", "pest_derive", - "rand", + "rand 0.9.2", "tempfile", ] @@ -3394,6 +3406,15 @@ dependencies = [ "rustversion", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "integer-encoding" version = "3.0.4" @@ -3933,7 +3954,7 @@ dependencies = [ "parking_lot", "percent-encoding", "quick-xml", - "rand", + "rand 0.9.2", "reqwest", "ring", "rustls-pemfile", @@ -4565,7 +4586,7 @@ dependencies = [ "bytes", "getrandom 0.3.4", "lru-slab", - "rand", + "rand 0.9.2", "ring", "rustc-hash", "rustls", @@ -4616,14 +4637,35 @@ dependencies = [ "nibble_vec", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + [[package]] name = "rand" version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" dependencies = [ - "rand_chacha", - "rand_core", + "rand_chacha 0.9.0", + "rand_core 0.9.3", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.4", ] [[package]] @@ -4633,7 +4675,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.9.3", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.16", ] [[package]] @@ -5994,7 +6045,7 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "470dbf6591da1b39d43c14523b2b469c86879a53e8b758c8e090a470fe7b1fbe" dependencies = [ - "rand", + "rand 0.9.2", "web-time", ] diff --git a/Cargo.toml b/Cargo.toml index 7eda09171a..375c8171ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,6 +75,7 @@ dashmap = { version = "6.1" } async-trait = { version = "0.1" } serde = { version = "1.0" } tokio-stream = { version = "0.1" } +backoff = { version = "0.4" } url = { version = "2.5" } [patch.crates-io] diff --git a/ballista/executor/Cargo.toml b/ballista/executor/Cargo.toml index 732036d20d..07b1e9da66 100644 --- a/ballista/executor/Cargo.toml +++ b/ballista/executor/Cargo.toml @@ -40,6 +40,7 @@ default = ["build-binary", "mimalloc"] arrow = { workspace = true } arrow-flight = { workspace = true } async-trait = { workspace = true } +backoff = { workspace = true } ballista-core = { path = "../core", version = "50.0.0" } clap = { workspace = true, optional = true } dashmap = { workspace = true } diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 00f6919ef2..91c877b263 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -19,6 +19,8 @@ use crate::cpu_bound_executor::DedicatedExecutor; use crate::executor::Executor; use crate::executor_process::remove_job_dir; use crate::{as_task_status, TaskExecutionTimes}; +use backoff::backoff::Backoff; +use backoff::ExponentialBackoff; use ballista_core::error::BallistaError; use ballista_core::extension::SessionConfigHelperExt; use ballista_core::serde::protobuf::{ @@ -41,12 +43,15 @@ use std::convert::TryInto; use std::error::Error; use std::ops::Deref; use std::sync::mpsc::{Receiver, Sender, TryRecvError}; -use std::time::{SystemTime, UNIX_EPOCH}; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::oneshot::Sender as OneShotSender; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tonic::codegen::{Body, Bytes, StdError}; +/// Number of consecutive failures before reducing log level from WARN to DEBUG. +const QUIET_AFTER_FAILURES: u32 = 5; + pub async fn poll_loop( mut scheduler: SchedulerGrpcClient, executor: Arc, @@ -83,16 +88,21 @@ where } }); + // Track consecutive scheduler connection failures for backoff and log suppression + let mut consecutive_failures: u32 = 0; + let mut backoff = ExponentialBackoff { + initial_interval: Duration::from_millis(100), + max_interval: Duration::from_secs(30), + max_elapsed_time: None, // Never give up + ..ExponentialBackoff::default() + }; + loop { // Wait for task slots to be available before asking for new work let permit = available_task_slots.acquire().await.unwrap(); // Make the slot available again drop(permit); - // Keeps track of whether we received task in last iteration - // to avoid going in sleep mode between polling - let mut active_job = false; - let task_status: Vec = sample_tasks_status(&mut task_status_receiver).await; @@ -107,8 +117,21 @@ where *report_ready; + // Keeps track of whether we received task in last iteration + // to avoid going in sleep mode between polling + let active_job; + match poll_work_result { Ok(result) => { + // Reset backoff state on successful connection + if consecutive_failures > 0 { + info!( + "Scheduler connection restored after {consecutive_failures} failed attempts" + ); + } + consecutive_failures = 0; + backoff.reset(); + let PollWorkResult { tasks, jobs_to_clean, @@ -197,7 +220,24 @@ where } } Err(error) => { - warn!("Executor poll work loop failed. If this continues to happen the Scheduler might be marked as dead. Error: {error}"); + consecutive_failures = consecutive_failures.saturating_add(1); + + // Log at WARN level for first few failures, then reduce to DEBUG to avoid log spam + if consecutive_failures <= QUIET_AFTER_FAILURES { + warn!( + "Executor poll work loop failed (attempt {consecutive_failures}). If this continues, the scheduler might be unavailable. Error: {error}" + ); + } else { + debug!( + "Executor poll work loop failed (attempt {consecutive_failures}). Error: {error}" + ); + } + + // Apply exponential backoff before retrying + if let Some(duration) = backoff.next_backoff() { + tokio::time::sleep(duration).await; + } + continue; } }