Skip to content

Commit e18eab0

Browse files
Add exponential backoff and log suppression for scheduler disconnection
When the scheduler disconnects or becomes unavailable, the executor poll loop previously logged a WARN message every ~100ms, spamming logs during graceful shutdown scenarios. Changes: - Add exponential backoff (100ms to 30s) when scheduler connection fails - Reduce log level from WARN to DEBUG after 5 consecutive failures - Log restoration message when connection is re-established - Prevents log spam when scheduler shuts down before executor
1 parent 9c17685 commit e18eab0

1 file changed

Lines changed: 41 additions & 5 deletions

File tree

ballista/executor/src/execution_loop.rs

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ use std::ops::Deref;
4343
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
4444
use std::time::{SystemTime, UNIX_EPOCH};
4545
use std::{sync::Arc, time::Duration};
46+
47+
/// Initial backoff delay when scheduler connection fails (100ms).
48+
const INITIAL_BACKOFF_MS: u64 = 100;
49+
/// Maximum backoff delay when scheduler connection fails (30s).
50+
const MAX_BACKOFF_MS: u64 = 30_000;
51+
/// Number of consecutive failures before reducing log level from WARN to DEBUG.
52+
const QUIET_AFTER_FAILURES: u32 = 5;
4653
use tokio::sync::oneshot::Sender as OneShotSender;
4754
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
4855
use tonic::codegen::{Body, Bytes, StdError};
@@ -83,16 +90,16 @@ where
8390
}
8491
});
8592

93+
// Track consecutive scheduler connection failures for backoff and log suppression
94+
let mut consecutive_failures: u32 = 0;
95+
let mut current_backoff_ms: u64 = INITIAL_BACKOFF_MS;
96+
8697
loop {
8798
// Wait for task slots to be available before asking for new work
8899
let permit = available_task_slots.acquire().await.unwrap();
89100
// Make the slot available again
90101
drop(permit);
91102

92-
// Keeps track of whether we received task in last iteration
93-
// to avoid going in sleep mode between polling
94-
let mut active_job = false;
95-
96103
let task_status: Vec<TaskStatus> =
97104
sample_tasks_status(&mut task_status_receiver).await;
98105

@@ -107,8 +114,21 @@ where
107114

108115
*report_ready;
109116

117+
// Keeps track of whether we received task in last iteration
118+
// to avoid going in sleep mode between polling
119+
let active_job;
120+
110121
match poll_work_result {
111122
Ok(result) => {
123+
// Reset backoff state on successful connection
124+
if consecutive_failures > 0 {
125+
info!(
126+
"Scheduler connection restored after {consecutive_failures} failed attempts"
127+
);
128+
}
129+
consecutive_failures = 0;
130+
current_backoff_ms = INITIAL_BACKOFF_MS;
131+
112132
let PollWorkResult {
113133
tasks,
114134
jobs_to_clean,
@@ -197,7 +217,23 @@ where
197217
}
198218
}
199219
Err(error) => {
200-
warn!("Executor poll work loop failed. If this continues to happen the Scheduler might be marked as dead. Error: {error}");
220+
consecutive_failures = consecutive_failures.saturating_add(1);
221+
222+
// Log at WARN level for first few failures, then reduce to DEBUG to avoid log spam
223+
if consecutive_failures <= QUIET_AFTER_FAILURES {
224+
warn!(
225+
"Executor poll work loop failed (attempt {consecutive_failures}). If this continues, the Scheduler might be unavailable. Error: {error}"
226+
);
227+
} else {
228+
debug!(
229+
"Executor poll work loop failed (attempt {consecutive_failures}). Error: {error}"
230+
);
231+
}
232+
233+
// Apply exponential backoff before retrying
234+
tokio::time::sleep(Duration::from_millis(current_backoff_ms)).await;
235+
current_backoff_ms = (current_backoff_ms * 2).min(MAX_BACKOFF_MS);
236+
continue;
201237
}
202238
}
203239

0 commit comments

Comments
 (0)