Skip to content

Commit 998ffd7

Browse files
fix(error): surface buried FetchFailed from wrapped DataFusionError
A BallistaError::FetchFailed bubbled through the DataFusion stream chain on a shared shuffle stream typically arrives as DataFusionError::Shared(Arc(DataFusionError::ArrowError( ArrowError::ExternalError(BallistaError::FetchFailed(...))))) The existing From<DataFusionError> for BallistaError only unwrapped ArrowError, so the inner FetchFailed stayed buried as BallistaError::DataFusionError(...). FailedTask::from then fell through its pattern match to the catch-all arm and emitted FailedReason::ExecutionError (non-retryable), bypassing the schedulers FetchPartitionError recovery path that reruns the offending map stage. Drill through Shared, Context, Diagnostic, Collection, and External so the FetchFailed surfaces and the scheduler can recover. For Shared with multiple strong references where Arc::try_unwrap fails, fall back to a borrowed walk that reconstructs FetchFailed from its cloneable fields.
1 parent 383e165 commit 998ffd7

1 file changed

Lines changed: 163 additions & 0 deletions

File tree

ballista/core/src/error.rs

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::{
2121
error::Error,
2222
fmt::{Display, Formatter},
2323
io, result,
24+
sync::Arc,
2425
};
2526

2627
use crate::serde::protobuf::failed_task::FailedReason;
@@ -113,13 +114,88 @@ impl From<parser::ParserError> for BallistaError {
113114

114115
impl From<DataFusionError> for BallistaError {
115116
fn from(e: DataFusionError) -> Self {
117+
// BallistaError::FetchFailed must reach the top level so FailedTask::from
118+
// routes it to FailedReason::FetchPartitionError (the scheduler's
119+
// map-stage rerun path). When a stream is shared between consumers,
120+
// DataFusion wraps the underlying error as
121+
// DataFusionError::Shared(Arc(DataFusionError::ArrowError(
122+
// ArrowError::ExternalError(BallistaError::FetchFailed(...)))))
123+
// and similar wrappers appear for Context, Diagnostic, Collection, and
124+
// External. Drill through them so the inner BallistaError surfaces.
116125
match e {
117126
DataFusionError::ArrowError(e, _) => Self::from(*e),
127+
DataFusionError::External(e) => match e.downcast::<BallistaError>() {
128+
Ok(b) => *b,
129+
Err(e) => match e.downcast::<DataFusionError>() {
130+
Ok(d) => Self::from(*d),
131+
Err(e) => BallistaError::DataFusionError(Box::new(
132+
DataFusionError::External(e),
133+
)),
134+
},
135+
},
136+
DataFusionError::Context(_, inner) => Self::from(*inner),
137+
DataFusionError::Diagnostic(_, inner) => Self::from(*inner),
138+
DataFusionError::Collection(mut errs) if !errs.is_empty() => {
139+
Self::from(errs.swap_remove(0))
140+
}
141+
DataFusionError::Shared(arc) => match Arc::try_unwrap(arc) {
142+
Ok(inner) => Self::from(inner),
143+
Err(arc) => find_fetch_failed(arc.as_ref()).unwrap_or_else(|| {
144+
BallistaError::DataFusionError(Box::new(DataFusionError::Shared(arc)))
145+
}),
146+
},
118147
_ => BallistaError::DataFusionError(Box::new(e)),
119148
}
120149
}
121150
}
122151

152+
/// Walk a borrowed [`DataFusionError`] chain looking for a buried
153+
/// [`BallistaError::FetchFailed`], reconstructing it from its cloneable fields.
154+
///
155+
/// Used when ownership of the inner error cannot be taken — e.g. for
156+
/// [`DataFusionError::Shared`] with multiple strong references — so the
157+
/// classification path in `From<BallistaError> for FailedTask` still sees the
158+
/// FetchFailed variant.
159+
fn find_fetch_failed(e: &DataFusionError) -> Option<BallistaError> {
160+
match e {
161+
DataFusionError::ArrowError(arrow, _) => find_fetch_failed_in_arrow(arrow),
162+
DataFusionError::External(err) => find_fetch_failed_in_dyn(err.as_ref()),
163+
DataFusionError::Context(_, inner) | DataFusionError::Diagnostic(_, inner) => {
164+
find_fetch_failed(inner)
165+
}
166+
DataFusionError::Collection(errs) => errs.iter().find_map(find_fetch_failed),
167+
DataFusionError::Shared(arc) => find_fetch_failed(arc.as_ref()),
168+
_ => None,
169+
}
170+
}
171+
172+
fn find_fetch_failed_in_arrow(e: &ArrowError) -> Option<BallistaError> {
173+
match e {
174+
ArrowError::ExternalError(err) => find_fetch_failed_in_dyn(err.as_ref()),
175+
_ => None,
176+
}
177+
}
178+
179+
fn find_fetch_failed_in_dyn(err: &(dyn Error + 'static)) -> Option<BallistaError> {
180+
if let Some(BallistaError::FetchFailed(executor_id, map_stage, map_partition, desc)) =
181+
err.downcast_ref::<BallistaError>()
182+
{
183+
return Some(BallistaError::FetchFailed(
184+
executor_id.clone(),
185+
*map_stage,
186+
*map_partition,
187+
desc.clone(),
188+
));
189+
}
190+
if let Some(df) = err.downcast_ref::<DataFusionError>() {
191+
return find_fetch_failed(df);
192+
}
193+
if let Some(arrow) = err.downcast_ref::<ArrowError>() {
194+
return find_fetch_failed_in_arrow(arrow);
195+
}
196+
None
197+
}
198+
123199
impl From<io::Error> for BallistaError {
124200
fn from(e: io::Error) -> Self {
125201
BallistaError::IoError(e)
@@ -256,3 +332,90 @@ impl From<BallistaError> for FailedTask {
256332
}
257333

258334
impl Error for BallistaError {}
335+
336+
#[cfg(test)]
337+
mod tests {
338+
use super::*;
339+
340+
fn fetch_failed() -> BallistaError {
341+
BallistaError::FetchFailed(
342+
"executor-1:50052".to_string(),
343+
1,
344+
74,
345+
"Error connecting to Ballista scheduler or executor".to_string(),
346+
)
347+
}
348+
349+
fn assert_routes_to_fetch_partition_error(err: BallistaError) {
350+
let failed: FailedTask = err.into();
351+
match failed.failed_reason {
352+
Some(FailedReason::FetchPartitionError(fp)) => {
353+
assert_eq!(fp.executor_id, "executor-1:50052");
354+
assert_eq!(fp.map_stage_id, 1);
355+
assert_eq!(fp.map_partition_id, 74);
356+
}
357+
other => panic!(
358+
"expected FetchPartitionError, got {other:?} (error: {})",
359+
failed.error
360+
),
361+
}
362+
}
363+
364+
#[test]
365+
fn fetch_failed_through_arrow_external_routes_to_fetch_partition_error() {
366+
let arrow = ArrowError::ExternalError(Box::new(fetch_failed()));
367+
let df = DataFusionError::ArrowError(Box::new(arrow), Some(String::new()));
368+
assert_routes_to_fetch_partition_error(BallistaError::from(df));
369+
}
370+
371+
#[test]
372+
fn fetch_failed_through_shared_routes_to_fetch_partition_error() {
373+
// Reproduces the production wrapping from a shared shuffle stream:
374+
// DataFusionError::Shared(Arc(ArrowError(ExternalError(FetchFailed))))
375+
let arrow = ArrowError::ExternalError(Box::new(fetch_failed()));
376+
let inner = DataFusionError::ArrowError(Box::new(arrow), Some(String::new()));
377+
let shared = DataFusionError::Shared(Arc::new(inner));
378+
assert_routes_to_fetch_partition_error(BallistaError::from(shared));
379+
}
380+
381+
#[test]
382+
fn fetch_failed_through_shared_with_extra_refs_still_routes() {
383+
// When Arc::try_unwrap fails (extra strong refs alive), the borrow-walk
384+
// fallback must still surface the inner FetchFailed.
385+
let arrow = ArrowError::ExternalError(Box::new(fetch_failed()));
386+
let inner = DataFusionError::ArrowError(Box::new(arrow), Some(String::new()));
387+
let arc = Arc::new(inner);
388+
let extra_ref = Arc::clone(&arc);
389+
let shared = DataFusionError::Shared(arc);
390+
assert_routes_to_fetch_partition_error(BallistaError::from(shared));
391+
drop(extra_ref);
392+
}
393+
394+
#[test]
395+
fn fetch_failed_through_context_routes_to_fetch_partition_error() {
396+
let arrow = ArrowError::ExternalError(Box::new(fetch_failed()));
397+
let inner = DataFusionError::ArrowError(Box::new(arrow), Some(String::new()));
398+
let ctx =
399+
DataFusionError::Context("reading shuffle".to_string(), Box::new(inner));
400+
assert_routes_to_fetch_partition_error(BallistaError::from(ctx));
401+
}
402+
403+
#[test]
404+
fn fetch_failed_through_external_routes_to_fetch_partition_error() {
405+
let external = DataFusionError::External(Box::new(fetch_failed()));
406+
assert_routes_to_fetch_partition_error(BallistaError::from(external));
407+
}
408+
409+
#[test]
410+
fn unrelated_shared_error_is_not_misclassified() {
411+
let inner = DataFusionError::Plan("planning failed".to_string());
412+
let shared = DataFusionError::Shared(Arc::new(inner));
413+
let bal = BallistaError::from(shared);
414+
let failed: FailedTask = bal.into();
415+
assert!(
416+
matches!(failed.failed_reason, Some(FailedReason::ExecutionError(_))),
417+
"expected ExecutionError, got {:?}",
418+
failed.failed_reason
419+
);
420+
}
421+
}

0 commit comments

Comments
 (0)