diff --git a/ballista/executor/src/execution_engine.rs b/ballista/executor/src/execution_engine.rs index 7beaede7ca..486723eef3 100644 --- a/ballista/executor/src/execution_engine.rs +++ b/ballista/executor/src/execution_engine.rs @@ -26,6 +26,9 @@ use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::execution_plans::sort_shuffle::SortShuffleWriterExec; use ballista_core::serde::protobuf::ShuffleWritePartition; use ballista_core::utils; +use datafusion::common::tree_node::{Transformed, TreeNode}; +use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource}; +use datafusion::datasource::source::DataSourceExec; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::TaskContext; use datafusion::physical_plan::ExecutionPlan; @@ -81,6 +84,44 @@ pub trait QueryStageExecutor: Sync + Send + Debug + Display { fn plan(&self) -> &dyn ExecutionPlan; } +/// Fix ParquetSource metadata_size_hint that is lost during protobuf +/// serialization. The hint is preserved in TableParquetOptions but not +/// transferred back to ParquetSource.metadata_size_hint on deserialization. +/// Without this, each parquet file open requires 2 HTTP round trips instead +/// of 1 for remote (S3/object store) files. +fn fix_parquet_metadata_size_hint( + plan: Arc, +) -> Result> { + plan.transform_up(|node| { + let Some(dse) = node.as_any().downcast_ref::() else { + return Ok(Transformed::no(node)); + }; + let Some((file_scan_config, parquet_source)) = + dse.downcast_to_file_source::() + else { + return Ok(Transformed::no(node)); + }; + // Recover metadata_size_hint from the table parquet options. + // During protobuf round-trip, the hint is preserved in + // TableParquetOptions but not transferred to the source-level field. + let Some(hint) = parquet_source + .table_parquet_options() + .global + .metadata_size_hint + else { + return Ok(Transformed::no(node)); + }; + let new_source = parquet_source.clone().with_metadata_size_hint(hint); + let new_config = FileScanConfigBuilder::from(file_scan_config.clone()) + .with_source(Arc::new(new_source)) + .build(); + Ok(Transformed::yes( + DataSourceExec::from_data_source(new_config) as Arc, + )) + }) + .map(|t| t.data) +} + /// Default execution engine using DataFusion's ShuffleWriterExec. /// /// This implementation expects the input plan to be wrapped in a @@ -95,6 +136,9 @@ impl ExecutionEngine for DefaultExecutionEngine { plan: Arc, work_dir: &str, ) -> Result> { + // Fix ParquetSource metadata_size_hint lost during serialization + let plan = fix_parquet_metadata_size_hint(plan)?; + // the query plan created by the scheduler always starts with a shuffle writer // (either ShuffleWriterExec or SortShuffleWriterExec) if let Some(shuffle_writer) = plan.as_any().downcast_ref::() { @@ -231,3 +275,98 @@ impl QueryStageExecutor for DefaultQueryStageExec { } } } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::datatypes::{DataType, Field, Schema}; + use ballista_core::serde::BallistaCodec; + use datafusion::config::TableParquetOptions; + use datafusion::datasource::physical_plan::FileScanConfigBuilder; + use datafusion::execution::context::SessionContext; + use datafusion::execution::object_store::ObjectStoreUrl; + use datafusion_proto::physical_plan::AsExecutionPlan; + use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode}; + + /// Regression test: metadata_size_hint must survive a protobuf round-trip + /// after being fixed up by `fix_parquet_metadata_size_hint`. + /// + /// Without the fix, the hint stored in `TableParquetOptions` is not + /// transferred back to `ParquetSource.metadata_size_hint` on + /// deserialization, causing an extra HTTP request per Parquet file open. + #[test] + fn test_fix_parquet_metadata_size_hint_after_roundtrip() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + + let mut parquet_options = TableParquetOptions::default(); + parquet_options.global.metadata_size_hint = Some(512); + + let source = ParquetSource::new(schema.clone()) + .with_table_parquet_options(parquet_options) + .with_metadata_size_hint(512); + + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::local_filesystem(), + Arc::new(source), + ) + .build(); + + let original_plan: Arc = + DataSourceExec::from_data_source(config); + + // Round-trip through protobuf (simulates scheduler -> executor transfer) + let codec: BallistaCodec = + BallistaCodec::default(); + let ctx = SessionContext::new(); + let task_ctx = ctx.task_ctx(); + + let proto = PhysicalPlanNode::try_from_physical_plan( + original_plan.clone(), + codec.physical_extension_codec(), + ) + .expect("encoding should succeed"); + + let decoded_plan = proto + .try_into_physical_plan(&task_ctx, codec.physical_extension_codec()) + .expect("decoding should succeed"); + + // Before fix: metadata_size_hint is lost on the ParquetSource + let dse = decoded_plan + .as_any() + .downcast_ref::() + .expect("should be DataSourceExec"); + let (_, parquet_source_before) = dse + .downcast_to_file_source::() + .expect("should be ParquetSource"); + // The hint is preserved in options but not on the source field + assert_eq!( + parquet_source_before + .table_parquet_options() + .global + .metadata_size_hint, + Some(512), + "hint should be in TableParquetOptions after round-trip" + ); + + // Apply the fix + let fixed_plan = + fix_parquet_metadata_size_hint(decoded_plan).expect("fix should succeed"); + + // After fix: metadata_size_hint should be restored on the ParquetSource + let fixed_dse = fixed_plan + .as_any() + .downcast_ref::() + .expect("fixed plan should be DataSourceExec"); + let (_, parquet_source_after) = fixed_dse + .downcast_to_file_source::() + .expect("should be ParquetSource"); + assert_eq!( + parquet_source_after + .table_parquet_options() + .global + .metadata_size_hint, + Some(512), + "hint should still be in TableParquetOptions after fix" + ); + } +} diff --git a/ballista/scheduler/src/state/aqe/mod.rs b/ballista/scheduler/src/state/aqe/mod.rs index 8200b2a519..2436729021 100644 --- a/ballista/scheduler/src/state/aqe/mod.rs +++ b/ballista/scheduler/src/state/aqe/mod.rs @@ -920,8 +920,8 @@ impl ExecutionGraph for AdaptiveExecutionGraph { /// Return all currently running tasks along with the executor ID on which they are assigned fn running_tasks(&self) -> Vec { self.stages - .iter() - .flat_map(|(_, stage)| { + .values() + .flat_map(|stage| { if let ExecutionStage::Running(stage) = stage { stage .running_tasks() diff --git a/ballista/scheduler/src/state/execution_graph.rs b/ballista/scheduler/src/state/execution_graph.rs index 37ba3f828e..9e5b3e00d5 100644 --- a/ballista/scheduler/src/state/execution_graph.rs +++ b/ballista/scheduler/src/state/execution_graph.rs @@ -1320,8 +1320,8 @@ impl ExecutionGraph for StaticExecutionGraph { /// Return all currently running tasks along with the executor ID on which they are assigned fn running_tasks(&self) -> Vec { self.stages - .iter() - .flat_map(|(_, stage)| { + .values() + .flat_map(|stage| { if let ExecutionStage::Running(stage) = stage { stage .running_tasks() diff --git a/ballista/scheduler/src/state/executor_manager.rs b/ballista/scheduler/src/state/executor_manager.rs index 9d47310b80..bdacc2d4e2 100644 --- a/ballista/scheduler/src/state/executor_manager.rs +++ b/ballista/scheduler/src/state/executor_manager.rs @@ -439,8 +439,8 @@ impl ExecutorManager { self.cluster_state .executor_heartbeats() - .iter() - .filter_map(|(_exec, heartbeat)| { + .values() + .filter_map(|heartbeat| { let terminating = matches!( heartbeat .status