Skip to content

Commit c14e3e7

Browse files
fix(ballista): recover from transient shuffle-fetch connection failures (#36)
* fix(error): surface buried FetchFailed from wrapped DataFusionError A BallistaError::FetchFailed bubbled through the DataFusion stream chain on a shared shuffle stream typically arrives as DataFusionError::Shared(Arc(DataFusionError::ArrowError( ArrowError::ExternalError(BallistaError::FetchFailed(...))))) The existing From<DataFusionError> for BallistaError only unwrapped ArrowError, so the inner FetchFailed stayed buried as BallistaError::DataFusionError(...). FailedTask::from then fell through its pattern match to the catch-all arm and emitted FailedReason::ExecutionError (non-retryable), bypassing the schedulers FetchPartitionError recovery path that reruns the offending map stage. Drill through Shared, Context, Diagnostic, Collection, and External so the FetchFailed surfaces and the scheduler can recover. For Shared with multiple strong references where Arc::try_unwrap fails, fall back to a borrowed walk that reconstructs FetchFailed from its cloneable fields. * fix(client): retry transient shuffle-fetch connects and apply gRPC transport defaults BallistaClient::try_new previously called endpoint.connect() once with no client-side connect_timeout, tcp_keepalive, or http2 keepalive (a None GrpcClientConfig was passed to create_grpc_client_endpoint), and a single transport-level failure propagated straight to FetchFailed without any retry. The IO_RETRIES_TIMES loop in execute_do_get / execute_do_action only applies *after* the channel has been established, so connect-time failures got no second chance — a single TCP RST during a shuffle connection storm against the same executor was enough to fail a task. Match the scheduler flight proxy: pass a real GrpcClientConfig::default to create_grpc_client_endpoint, and wrap endpoint.connect() in the same 3-attempt / 3s-backoff retry budget used by the post-connect retry loops. After exhausting the budget the error message reports the attempt count so it stays distinguishable from a permanent failure.
1 parent 383e165 commit c14e3e7

2 files changed

Lines changed: 208 additions & 7 deletions

File tree

ballista/core/src/client.rs

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use datafusion::error::Result;
4747
use crate::extension::BallistaConfigGrpcEndpoint;
4848
use crate::serde::protobuf;
4949

50-
use crate::utils::create_grpc_client_endpoint;
50+
use crate::utils::{GrpcClientConfig, create_grpc_client_endpoint};
5151

5252
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};
5353
use futures::{Stream, StreamExt};
@@ -80,7 +80,14 @@ impl BallistaClient {
8080
let addr = format!("{scheme}://{host}:{port}");
8181
debug!("BallistaClient connecting to {addr}");
8282

83-
let mut endpoint = create_grpc_client_endpoint(addr.clone(), None)
83+
// Apply the same transport defaults (connect_timeout, tcp_keepalive,
84+
// http2 keepalive) the scheduler's flight proxy uses for its own
85+
// executor connections. Without this the shuffle client gets a bare
86+
// tonic Endpoint with no timeouts and no keepalive — a single bad peer
87+
// can hang the fetch indefinitely, and a half-open connection isn't
88+
// detected.
89+
let grpc_config = GrpcClientConfig::default();
90+
let mut endpoint = create_grpc_client_endpoint(addr.clone(), Some(&grpc_config))
8491
.map_err(|e| {
8592
BallistaError::GrpcConnectionError(format!(
8693
"Error creating endpoint to Ballista scheduler or executor at {addr}: {e:?}"
@@ -97,11 +104,42 @@ impl BallistaClient {
97104
})?;
98105
}
99106

100-
let connection = endpoint.connect().await.map_err(|e| {
101-
BallistaError::GrpcConnectionError(format!(
102-
"Error connecting to Ballista scheduler or executor at {addr}: {e:?}"
103-
))
104-
})?;
107+
// Retry transient connect failures with the same budget the do_get /
108+
// do_action paths below use. A single TCP RST during connection setup
109+
// (typically when many shuffle clients dial the same executor at once)
110+
// would otherwise propagate straight to FetchFailed without any retry,
111+
// since the IO_RETRIES_TIMES loop in execute_do_get only applies after
112+
// the channel is established.
113+
let mut last_err: Option<tonic::transport::Error> = None;
114+
let mut connection = None;
115+
for attempt in 0..IO_RETRIES_TIMES {
116+
if attempt > 0 {
117+
warn!(
118+
"Connection to {addr} failed (attempt {attempt}/{IO_RETRIES_TIMES}), sleeping {IO_RETRY_WAIT_TIME_MS} ms before retry"
119+
);
120+
tokio::time::sleep(std::time::Duration::from_millis(
121+
IO_RETRY_WAIT_TIME_MS,
122+
))
123+
.await;
124+
}
125+
match endpoint.connect().await {
126+
Ok(c) => {
127+
connection = Some(c);
128+
break;
129+
}
130+
Err(e) => last_err = Some(e),
131+
}
132+
}
133+
let connection = match connection {
134+
Some(c) => c,
135+
None => {
136+
return Err(BallistaError::GrpcConnectionError(format!(
137+
"Error connecting to Ballista scheduler or executor at {addr} after {IO_RETRIES_TIMES} attempts: {:?}",
138+
last_err
139+
.expect("at least one attempt failed when connection is None")
140+
)));
141+
}
142+
};
105143

106144
let flight_client = FlightServiceClient::new(connection)
107145
.max_decoding_message_size(max_message_size)

ballista/core/src/error.rs

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::{
2121
error::Error,
2222
fmt::{Display, Formatter},
2323
io, result,
24+
sync::Arc,
2425
};
2526

2627
use crate::serde::protobuf::failed_task::FailedReason;
@@ -113,13 +114,88 @@ impl From<parser::ParserError> for BallistaError {
113114

114115
impl From<DataFusionError> for BallistaError {
115116
fn from(e: DataFusionError) -> Self {
117+
// BallistaError::FetchFailed must reach the top level so FailedTask::from
118+
// routes it to FailedReason::FetchPartitionError (the scheduler's
119+
// map-stage rerun path). When a stream is shared between consumers,
120+
// DataFusion wraps the underlying error as
121+
// DataFusionError::Shared(Arc(DataFusionError::ArrowError(
122+
// ArrowError::ExternalError(BallistaError::FetchFailed(...)))))
123+
// and similar wrappers appear for Context, Diagnostic, Collection, and
124+
// External. Drill through them so the inner BallistaError surfaces.
116125
match e {
117126
DataFusionError::ArrowError(e, _) => Self::from(*e),
127+
DataFusionError::External(e) => match e.downcast::<BallistaError>() {
128+
Ok(b) => *b,
129+
Err(e) => match e.downcast::<DataFusionError>() {
130+
Ok(d) => Self::from(*d),
131+
Err(e) => BallistaError::DataFusionError(Box::new(
132+
DataFusionError::External(e),
133+
)),
134+
},
135+
},
136+
DataFusionError::Context(_, inner) => Self::from(*inner),
137+
DataFusionError::Diagnostic(_, inner) => Self::from(*inner),
138+
DataFusionError::Collection(mut errs) if !errs.is_empty() => {
139+
Self::from(errs.swap_remove(0))
140+
}
141+
DataFusionError::Shared(arc) => match Arc::try_unwrap(arc) {
142+
Ok(inner) => Self::from(inner),
143+
Err(arc) => find_fetch_failed(arc.as_ref()).unwrap_or_else(|| {
144+
BallistaError::DataFusionError(Box::new(DataFusionError::Shared(arc)))
145+
}),
146+
},
118147
_ => BallistaError::DataFusionError(Box::new(e)),
119148
}
120149
}
121150
}
122151

152+
/// Walk a borrowed [`DataFusionError`] chain looking for a buried
153+
/// [`BallistaError::FetchFailed`], reconstructing it from its cloneable fields.
154+
///
155+
/// Used when ownership of the inner error cannot be taken — e.g. for
156+
/// [`DataFusionError::Shared`] with multiple strong references — so the
157+
/// classification path in `From<BallistaError> for FailedTask` still sees the
158+
/// FetchFailed variant.
159+
fn find_fetch_failed(e: &DataFusionError) -> Option<BallistaError> {
160+
match e {
161+
DataFusionError::ArrowError(arrow, _) => find_fetch_failed_in_arrow(arrow),
162+
DataFusionError::External(err) => find_fetch_failed_in_dyn(err.as_ref()),
163+
DataFusionError::Context(_, inner) | DataFusionError::Diagnostic(_, inner) => {
164+
find_fetch_failed(inner)
165+
}
166+
DataFusionError::Collection(errs) => errs.iter().find_map(find_fetch_failed),
167+
DataFusionError::Shared(arc) => find_fetch_failed(arc.as_ref()),
168+
_ => None,
169+
}
170+
}
171+
172+
fn find_fetch_failed_in_arrow(e: &ArrowError) -> Option<BallistaError> {
173+
match e {
174+
ArrowError::ExternalError(err) => find_fetch_failed_in_dyn(err.as_ref()),
175+
_ => None,
176+
}
177+
}
178+
179+
fn find_fetch_failed_in_dyn(err: &(dyn Error + 'static)) -> Option<BallistaError> {
180+
if let Some(BallistaError::FetchFailed(executor_id, map_stage, map_partition, desc)) =
181+
err.downcast_ref::<BallistaError>()
182+
{
183+
return Some(BallistaError::FetchFailed(
184+
executor_id.clone(),
185+
*map_stage,
186+
*map_partition,
187+
desc.clone(),
188+
));
189+
}
190+
if let Some(df) = err.downcast_ref::<DataFusionError>() {
191+
return find_fetch_failed(df);
192+
}
193+
if let Some(arrow) = err.downcast_ref::<ArrowError>() {
194+
return find_fetch_failed_in_arrow(arrow);
195+
}
196+
None
197+
}
198+
123199
impl From<io::Error> for BallistaError {
124200
fn from(e: io::Error) -> Self {
125201
BallistaError::IoError(e)
@@ -256,3 +332,90 @@ impl From<BallistaError> for FailedTask {
256332
}
257333

258334
impl Error for BallistaError {}
335+
336+
#[cfg(test)]
337+
mod tests {
338+
use super::*;
339+
340+
fn fetch_failed() -> BallistaError {
341+
BallistaError::FetchFailed(
342+
"executor-1:50052".to_string(),
343+
1,
344+
74,
345+
"Error connecting to Ballista scheduler or executor".to_string(),
346+
)
347+
}
348+
349+
fn assert_routes_to_fetch_partition_error(err: BallistaError) {
350+
let failed: FailedTask = err.into();
351+
match failed.failed_reason {
352+
Some(FailedReason::FetchPartitionError(fp)) => {
353+
assert_eq!(fp.executor_id, "executor-1:50052");
354+
assert_eq!(fp.map_stage_id, 1);
355+
assert_eq!(fp.map_partition_id, 74);
356+
}
357+
other => panic!(
358+
"expected FetchPartitionError, got {other:?} (error: {})",
359+
failed.error
360+
),
361+
}
362+
}
363+
364+
#[test]
365+
fn fetch_failed_through_arrow_external_routes_to_fetch_partition_error() {
366+
let arrow = ArrowError::ExternalError(Box::new(fetch_failed()));
367+
let df = DataFusionError::ArrowError(Box::new(arrow), Some(String::new()));
368+
assert_routes_to_fetch_partition_error(BallistaError::from(df));
369+
}
370+
371+
#[test]
372+
fn fetch_failed_through_shared_routes_to_fetch_partition_error() {
373+
// Reproduces the production wrapping from a shared shuffle stream:
374+
// DataFusionError::Shared(Arc(ArrowError(ExternalError(FetchFailed))))
375+
let arrow = ArrowError::ExternalError(Box::new(fetch_failed()));
376+
let inner = DataFusionError::ArrowError(Box::new(arrow), Some(String::new()));
377+
let shared = DataFusionError::Shared(Arc::new(inner));
378+
assert_routes_to_fetch_partition_error(BallistaError::from(shared));
379+
}
380+
381+
#[test]
382+
fn fetch_failed_through_shared_with_extra_refs_still_routes() {
383+
// When Arc::try_unwrap fails (extra strong refs alive), the borrow-walk
384+
// fallback must still surface the inner FetchFailed.
385+
let arrow = ArrowError::ExternalError(Box::new(fetch_failed()));
386+
let inner = DataFusionError::ArrowError(Box::new(arrow), Some(String::new()));
387+
let arc = Arc::new(inner);
388+
let extra_ref = Arc::clone(&arc);
389+
let shared = DataFusionError::Shared(arc);
390+
assert_routes_to_fetch_partition_error(BallistaError::from(shared));
391+
drop(extra_ref);
392+
}
393+
394+
#[test]
395+
fn fetch_failed_through_context_routes_to_fetch_partition_error() {
396+
let arrow = ArrowError::ExternalError(Box::new(fetch_failed()));
397+
let inner = DataFusionError::ArrowError(Box::new(arrow), Some(String::new()));
398+
let ctx =
399+
DataFusionError::Context("reading shuffle".to_string(), Box::new(inner));
400+
assert_routes_to_fetch_partition_error(BallistaError::from(ctx));
401+
}
402+
403+
#[test]
404+
fn fetch_failed_through_external_routes_to_fetch_partition_error() {
405+
let external = DataFusionError::External(Box::new(fetch_failed()));
406+
assert_routes_to_fetch_partition_error(BallistaError::from(external));
407+
}
408+
409+
#[test]
410+
fn unrelated_shared_error_is_not_misclassified() {
411+
let inner = DataFusionError::Plan("planning failed".to_string());
412+
let shared = DataFusionError::Shared(Arc::new(inner));
413+
let bal = BallistaError::from(shared);
414+
let failed: FailedTask = bal.into();
415+
assert!(
416+
matches!(failed.failed_reason, Some(FailedReason::ExecutionError(_))),
417+
"expected ExecutionError, got {:?}",
418+
failed.failed_reason
419+
);
420+
}
421+
}

0 commit comments

Comments
 (0)