Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions ballista/executor/src/execution_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::execution_plans::sort_shuffle::SortShuffleWriterExec;
use ballista_core::serde::protobuf::ShuffleWritePartition;
use ballista_core::utils;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
use datafusion::datasource::source::DataSourceExec;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -81,6 +84,44 @@ pub trait QueryStageExecutor: Sync + Send + Debug + Display {
fn plan(&self) -> &dyn ExecutionPlan;
}

/// Fix ParquetSource metadata_size_hint that is lost during protobuf
/// serialization. The hint is preserved in TableParquetOptions but not
/// transferred back to ParquetSource.metadata_size_hint on deserialization.
/// Without this, each parquet file open requires 2 HTTP round trips instead
/// of 1 for remote (S3/object store) files.
fn fix_parquet_metadata_size_hint(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(|node| {
let Some(dse) = node.as_any().downcast_ref::<DataSourceExec>() else {
return Ok(Transformed::no(node));
};
let Some((file_scan_config, parquet_source)) =
dse.downcast_to_file_source::<ParquetSource>()
else {
return Ok(Transformed::no(node));
};
// Recover metadata_size_hint from the table parquet options.
// During protobuf round-trip, the hint is preserved in
// TableParquetOptions but not transferred to the source-level field.
let Some(hint) = parquet_source
.table_parquet_options()
.global
.metadata_size_hint
else {
return Ok(Transformed::no(node));
};
let new_source = parquet_source.clone().with_metadata_size_hint(hint);
let new_config = FileScanConfigBuilder::from(file_scan_config.clone())
.with_source(Arc::new(new_source))
.build();
Ok(Transformed::yes(
DataSourceExec::from_data_source(new_config) as Arc<dyn ExecutionPlan>,
))
})
.map(|t| t.data)
}

/// Default execution engine using DataFusion's ShuffleWriterExec.
///
/// This implementation expects the input plan to be wrapped in a
Expand All @@ -95,6 +136,9 @@ impl ExecutionEngine for DefaultExecutionEngine {
plan: Arc<dyn ExecutionPlan>,
work_dir: &str,
) -> Result<Arc<dyn QueryStageExecutor>> {
// Fix ParquetSource metadata_size_hint lost during serialization
let plan = fix_parquet_metadata_size_hint(plan)?;

Comment thread
peasee marked this conversation as resolved.
// the query plan created by the scheduler always starts with a shuffle writer
// (either ShuffleWriterExec or SortShuffleWriterExec)
if let Some(shuffle_writer) = plan.as_any().downcast_ref::<ShuffleWriterExec>() {
Expand Down Expand Up @@ -231,3 +275,98 @@ impl QueryStageExecutor for DefaultQueryStageExec {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::{DataType, Field, Schema};
use ballista_core::serde::BallistaCodec;
use datafusion::config::TableParquetOptions;
use datafusion::datasource::physical_plan::FileScanConfigBuilder;
use datafusion::execution::context::SessionContext;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};

/// Regression test: metadata_size_hint must survive a protobuf round-trip
/// after being fixed up by `fix_parquet_metadata_size_hint`.
///
/// Without the fix, the hint stored in `TableParquetOptions` is not
/// transferred back to `ParquetSource.metadata_size_hint` on
/// deserialization, causing an extra HTTP request per Parquet file open.
#[test]
fn test_fix_parquet_metadata_size_hint_after_roundtrip() {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));

let mut parquet_options = TableParquetOptions::default();
parquet_options.global.metadata_size_hint = Some(512);

let source = ParquetSource::new(schema.clone())
.with_table_parquet_options(parquet_options)
.with_metadata_size_hint(512);

let config = FileScanConfigBuilder::new(
ObjectStoreUrl::local_filesystem(),
Arc::new(source),
)
.build();

let original_plan: Arc<dyn ExecutionPlan> =
DataSourceExec::from_data_source(config);

// Round-trip through protobuf (simulates scheduler -> executor transfer)
let codec: BallistaCodec<LogicalPlanNode, PhysicalPlanNode> =
BallistaCodec::default();
let ctx = SessionContext::new();
let task_ctx = ctx.task_ctx();

let proto = PhysicalPlanNode::try_from_physical_plan(
original_plan.clone(),
codec.physical_extension_codec(),
)
.expect("encoding should succeed");

let decoded_plan = proto
.try_into_physical_plan(&task_ctx, codec.physical_extension_codec())
.expect("decoding should succeed");

// Before fix: metadata_size_hint is lost on the ParquetSource
let dse = decoded_plan
.as_any()
.downcast_ref::<DataSourceExec>()
.expect("should be DataSourceExec");
let (_, parquet_source_before) = dse
.downcast_to_file_source::<ParquetSource>()
.expect("should be ParquetSource");
// The hint is preserved in options but not on the source field
assert_eq!(
parquet_source_before
.table_parquet_options()
.global
.metadata_size_hint,
Some(512),
"hint should be in TableParquetOptions after round-trip"
);

// Apply the fix
let fixed_plan =
fix_parquet_metadata_size_hint(decoded_plan).expect("fix should succeed");

// After fix: metadata_size_hint should be restored on the ParquetSource
let fixed_dse = fixed_plan
.as_any()
.downcast_ref::<DataSourceExec>()
.expect("fixed plan should be DataSourceExec");
let (_, parquet_source_after) = fixed_dse
.downcast_to_file_source::<ParquetSource>()
.expect("should be ParquetSource");
assert_eq!(
parquet_source_after
.table_parquet_options()
.global
.metadata_size_hint,
Some(512),
"hint should still be in TableParquetOptions after fix"
);
}
}
4 changes: 2 additions & 2 deletions ballista/scheduler/src/state/aqe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,8 +920,8 @@ impl ExecutionGraph for AdaptiveExecutionGraph {
/// Return all currently running tasks along with the executor ID on which they are assigned
fn running_tasks(&self) -> Vec<RunningTaskInfo> {
self.stages
.iter()
.flat_map(|(_, stage)| {
.values()
.flat_map(|stage| {
if let ExecutionStage::Running(stage) = stage {
stage
.running_tasks()
Expand Down
4 changes: 2 additions & 2 deletions ballista/scheduler/src/state/execution_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1320,8 +1320,8 @@ impl ExecutionGraph for StaticExecutionGraph {
/// Return all currently running tasks along with the executor ID on which they are assigned
fn running_tasks(&self) -> Vec<RunningTaskInfo> {
self.stages
.iter()
.flat_map(|(_, stage)| {
.values()
.flat_map(|stage| {
if let ExecutionStage::Running(stage) = stage {
stage
.running_tasks()
Expand Down
4 changes: 2 additions & 2 deletions ballista/scheduler/src/state/executor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ impl ExecutorManager {

self.cluster_state
.executor_heartbeats()
.iter()
.filter_map(|(_exec, heartbeat)| {
.values()
.filter_map(|heartbeat| {
let terminating = matches!(
heartbeat
.status
Expand Down
Loading