Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions ballista/executor/src/execution_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::execution_plans::sort_shuffle::SortShuffleWriterExec;
use ballista_core::serde::protobuf::ShuffleWritePartition;
use ballista_core::utils;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
use datafusion::datasource::source::DataSourceExec;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -81,6 +84,44 @@ pub trait QueryStageExecutor: Sync + Send + Debug + Display {
fn plan(&self) -> &dyn ExecutionPlan;
}

/// Fix ParquetSource metadata_size_hint that is lost during protobuf
/// serialization. The hint is preserved in TableParquetOptions but not
/// transferred back to ParquetSource.metadata_size_hint on deserialization.
/// Without this, each parquet file open requires 2 HTTP round trips instead
/// of 1 for remote (S3/object store) files.
fn fix_parquet_metadata_size_hint(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(|node| {
let Some(dse) = node.as_any().downcast_ref::<DataSourceExec>() else {
return Ok(Transformed::no(node));
};
let Some((file_scan_config, parquet_source)) =
dse.downcast_to_file_source::<ParquetSource>()
else {
return Ok(Transformed::no(node));
};
// Recover metadata_size_hint from the table parquet options.
// During protobuf round-trip, the hint is preserved in
// TableParquetOptions but not transferred to the source-level field.
let Some(hint) = parquet_source
.table_parquet_options()
.global
.metadata_size_hint
else {
return Ok(Transformed::no(node));
};
let new_source = parquet_source.clone().with_metadata_size_hint(hint);
let new_config = FileScanConfigBuilder::from(file_scan_config.clone())
.with_source(Arc::new(new_source))
.build();
Ok(Transformed::yes(
DataSourceExec::from_data_source(new_config) as Arc<dyn ExecutionPlan>,
))
})
.map(|t| t.data)
}

/// Default execution engine using DataFusion's ShuffleWriterExec.
///
/// This implementation expects the input plan to be wrapped in a
Expand All @@ -95,6 +136,9 @@ impl ExecutionEngine for DefaultExecutionEngine {
plan: Arc<dyn ExecutionPlan>,
work_dir: &str,
) -> Result<Arc<dyn QueryStageExecutor>> {
// Fix ParquetSource metadata_size_hint lost during serialization
let plan = fix_parquet_metadata_size_hint(plan)?;

Comment thread
peasee marked this conversation as resolved.
// the query plan created by the scheduler always starts with a shuffle writer
// (either ShuffleWriterExec or SortShuffleWriterExec)
if let Some(shuffle_writer) = plan.as_any().downcast_ref::<ShuffleWriterExec>() {
Expand Down
Loading