@@ -19,6 +19,8 @@ use crate::cpu_bound_executor::DedicatedExecutor;
1919use crate :: executor:: Executor ;
2020use crate :: executor_process:: remove_job_dir;
2121use crate :: { as_task_status, TaskExecutionTimes } ;
22+ use backoff:: backoff:: Backoff ;
23+ use backoff:: ExponentialBackoff ;
2224use ballista_core:: error:: BallistaError ;
2325use ballista_core:: extension:: SessionConfigHelperExt ;
2426use ballista_core:: serde:: protobuf:: {
@@ -38,12 +40,15 @@ use std::cell::LazyCell;
3840use std:: convert:: TryInto ;
3941use std:: error:: Error ;
4042use std:: sync:: mpsc:: { Receiver , Sender , TryRecvError } ;
41- use std:: time :: { SystemTime , UNIX_EPOCH } ;
42- use std:: { sync :: Arc , time:: Duration } ;
43+ use std:: sync :: Arc ;
44+ use std:: time:: { Duration , SystemTime , UNIX_EPOCH } ;
4345use tokio:: sync:: oneshot:: Sender as OneShotSender ;
4446use tokio:: sync:: { OwnedSemaphorePermit , Semaphore } ;
4547use tonic:: codegen:: { Body , Bytes , StdError } ;
4648
49+ /// Number of consecutive failures before reducing log level from WARN to DEBUG.
50+ const QUIET_AFTER_FAILURES : u32 = 5 ;
51+
4752pub async fn poll_loop < T : ' static + AsLogicalPlan , U : ' static + AsExecutionPlan , C > (
4853 mut scheduler : SchedulerGrpcClient < C > ,
4954 executor : Arc < Executor > ,
@@ -80,16 +85,21 @@ where
8085 }
8186 } ) ;
8287
88+ // Track consecutive scheduler connection failures for backoff and log suppression
89+ let mut consecutive_failures: u32 = 0 ;
90+ let mut backoff = ExponentialBackoff {
91+ initial_interval : Duration :: from_millis ( 100 ) ,
92+ max_interval : Duration :: from_secs ( 30 ) ,
93+ max_elapsed_time : None , // Never give up
94+ ..ExponentialBackoff :: default ( )
95+ } ;
96+
8397 loop {
8498 // Wait for task slots to be available before asking for new work
8599 let permit = available_task_slots. acquire ( ) . await . unwrap ( ) ;
86100 // Make the slot available again
87101 drop ( permit) ;
88102
89- // Keeps track of whether we received task in last iteration
90- // to avoid going in sleep mode between polling
91- let mut active_job = false ;
92-
93103 let task_status: Vec < TaskStatus > =
94104 sample_tasks_status ( & mut task_status_receiver) . await ;
95105
@@ -104,8 +114,21 @@ where
104114
105115 * report_ready;
106116
117+ // Keeps track of whether we received task in last iteration
118+ // to avoid going in sleep mode between polling
119+ let active_job;
120+
107121 match poll_work_result {
108122 Ok ( result) => {
123+ // Reset backoff state on successful connection
124+ if consecutive_failures > 0 {
125+ info ! (
126+ "Scheduler connection restored after {consecutive_failures} failed attempts"
127+ ) ;
128+ }
129+ consecutive_failures = 0 ;
130+ backoff. reset ( ) ;
131+
109132 let PollWorkResult {
110133 tasks,
111134 jobs_to_clean,
@@ -194,7 +217,24 @@ where
194217 }
195218 }
196219 Err ( error) => {
197- warn ! ( "Executor poll work loop failed. If this continues to happen the Scheduler might be marked as dead. Error: {error}" ) ;
220+ consecutive_failures = consecutive_failures. saturating_add ( 1 ) ;
221+
222+ // Log at WARN level for first few failures, then reduce to DEBUG to avoid log spam
223+ if consecutive_failures <= QUIET_AFTER_FAILURES {
224+ warn ! (
225+ "Executor poll work loop failed (attempt {consecutive_failures}). If this continues, the scheduler might be unavailable. Error: {error}"
226+ ) ;
227+ } else {
228+ debug ! (
229+ "Executor poll work loop failed (attempt {consecutive_failures}). Error: {error}"
230+ ) ;
231+ }
232+
233+ // Apply exponential backoff before retrying
234+ if let Some ( duration) = backoff. next_backoff ( ) {
235+ tokio:: time:: sleep ( duration) . await ;
236+ }
237+ continue ;
198238 }
199239 }
200240
0 commit comments