Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 68 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ dashmap = { version = "6.1" }
async-trait = { version = "0.1" }
serde = { version = "1.0" }
tokio-stream = { version = "0.1" }
backoff = { version = "0.4" }
url = { version = "2.5" }

[patch.crates-io]
Expand Down
1 change: 1 addition & 0 deletions ballista/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ default = ["build-binary", "mimalloc"]
arrow = { workspace = true }
arrow-flight = { workspace = true }
async-trait = { workspace = true }
backoff = { workspace = true }
ballista-core = { path = "../core", version = "50.0.0" }
clap = { workspace = true, optional = true }
dashmap = { workspace = true }
Expand Down
54 changes: 47 additions & 7 deletions ballista/executor/src/execution_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use crate::cpu_bound_executor::DedicatedExecutor;
use crate::executor::Executor;
use crate::executor_process::remove_job_dir;
use crate::{as_task_status, TaskExecutionTimes};
use backoff::backoff::Backoff;
use backoff::ExponentialBackoff;
use ballista_core::error::BallistaError;
use ballista_core::extension::SessionConfigHelperExt;
use ballista_core::serde::protobuf::{
Expand All @@ -41,12 +43,15 @@ use std::convert::TryInto;
use std::error::Error;
use std::ops::Deref;
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{sync::Arc, time::Duration};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::oneshot::Sender as OneShotSender;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tonic::codegen::{Body, Bytes, StdError};

/// Number of consecutive failures before reducing log level from WARN to DEBUG.
const QUIET_AFTER_FAILURES: u32 = 5;

pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan, C>(
mut scheduler: SchedulerGrpcClient<C>,
executor: Arc<Executor>,
Expand Down Expand Up @@ -83,16 +88,21 @@ where
}
});

// Track consecutive scheduler connection failures for backoff and log suppression
let mut consecutive_failures: u32 = 0;
let mut backoff = ExponentialBackoff {
initial_interval: Duration::from_millis(100),
max_interval: Duration::from_secs(30),
max_elapsed_time: None, // Never give up
..ExponentialBackoff::default()
};

loop {
// Wait for task slots to be available before asking for new work
let permit = available_task_slots.acquire().await.unwrap();
// Make the slot available again
drop(permit);

// Keeps track of whether we received task in last iteration
// to avoid going in sleep mode between polling
let mut active_job = false;

let task_status: Vec<TaskStatus> =
sample_tasks_status(&mut task_status_receiver).await;

Expand All @@ -107,8 +117,21 @@ where

*report_ready;

// Keeps track of whether we received task in last iteration
// to avoid going in sleep mode between polling
let active_job;

match poll_work_result {
Ok(result) => {
// Reset backoff state on successful connection
if consecutive_failures > 0 {
info!(
"Scheduler connection restored after {consecutive_failures} failed attempts"
);
}
consecutive_failures = 0;
backoff.reset();

let PollWorkResult {
tasks,
jobs_to_clean,
Expand Down Expand Up @@ -197,7 +220,24 @@ where
}
}
Err(error) => {
warn!("Executor poll work loop failed. If this continues to happen the Scheduler might be marked as dead. Error: {error}");
consecutive_failures = consecutive_failures.saturating_add(1);

// Log at WARN level for first few failures, then reduce to DEBUG to avoid log spam
if consecutive_failures <= QUIET_AFTER_FAILURES {
warn!(
"Executor poll work loop failed (attempt {consecutive_failures}). If this continues, the scheduler might be unavailable. Error: {error}"
);
} else {
debug!(
"Executor poll work loop failed (attempt {consecutive_failures}). Error: {error}"
);
}

// Apply exponential backoff before retrying
if let Some(duration) = backoff.next_backoff() {
tokio::time::sleep(duration).await;
}
continue;
}
}

Expand Down
Loading