Skip to content

Commit c9e7857

Browse files
authored
Merge pull request #29 from spiceai/peasee/260421-parquet-metadata-size-hint
fix: preserve parquet metadata size hint
2 parents 729428c + ee54710 commit c9e7857

4 files changed

Lines changed: 145 additions & 6 deletions

File tree

ballista/executor/src/execution_engine.rs

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ use ballista_core::execution_plans::ShuffleWriterExec;
2626
use ballista_core::execution_plans::sort_shuffle::SortShuffleWriterExec;
2727
use ballista_core::serde::protobuf::ShuffleWritePartition;
2828
use ballista_core::utils;
29+
use datafusion::common::tree_node::{Transformed, TreeNode};
30+
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
31+
use datafusion::datasource::source::DataSourceExec;
2932
use datafusion::error::{DataFusionError, Result};
3033
use datafusion::execution::context::TaskContext;
3134
use datafusion::physical_plan::ExecutionPlan;
@@ -81,6 +84,44 @@ pub trait QueryStageExecutor: Sync + Send + Debug + Display {
8184
fn plan(&self) -> &dyn ExecutionPlan;
8285
}
8386

87+
/// Fix ParquetSource metadata_size_hint that is lost during protobuf
88+
/// serialization. The hint is preserved in TableParquetOptions but not
89+
/// transferred back to ParquetSource.metadata_size_hint on deserialization.
90+
/// Without this, each parquet file open requires 2 HTTP round trips instead
91+
/// of 1 for remote (S3/object store) files.
92+
fn fix_parquet_metadata_size_hint(
93+
plan: Arc<dyn ExecutionPlan>,
94+
) -> Result<Arc<dyn ExecutionPlan>> {
95+
plan.transform_up(|node| {
96+
let Some(dse) = node.as_any().downcast_ref::<DataSourceExec>() else {
97+
return Ok(Transformed::no(node));
98+
};
99+
let Some((file_scan_config, parquet_source)) =
100+
dse.downcast_to_file_source::<ParquetSource>()
101+
else {
102+
return Ok(Transformed::no(node));
103+
};
104+
// Recover metadata_size_hint from the table parquet options.
105+
// During protobuf round-trip, the hint is preserved in
106+
// TableParquetOptions but not transferred to the source-level field.
107+
let Some(hint) = parquet_source
108+
.table_parquet_options()
109+
.global
110+
.metadata_size_hint
111+
else {
112+
return Ok(Transformed::no(node));
113+
};
114+
let new_source = parquet_source.clone().with_metadata_size_hint(hint);
115+
let new_config = FileScanConfigBuilder::from(file_scan_config.clone())
116+
.with_source(Arc::new(new_source))
117+
.build();
118+
Ok(Transformed::yes(
119+
DataSourceExec::from_data_source(new_config) as Arc<dyn ExecutionPlan>,
120+
))
121+
})
122+
.map(|t| t.data)
123+
}
124+
84125
/// Default execution engine using DataFusion's ShuffleWriterExec.
85126
///
86127
/// This implementation expects the input plan to be wrapped in a
@@ -95,6 +136,9 @@ impl ExecutionEngine for DefaultExecutionEngine {
95136
plan: Arc<dyn ExecutionPlan>,
96137
work_dir: &str,
97138
) -> Result<Arc<dyn QueryStageExecutor>> {
139+
// Fix ParquetSource metadata_size_hint lost during serialization
140+
let plan = fix_parquet_metadata_size_hint(plan)?;
141+
98142
// the query plan created by the scheduler always starts with a shuffle writer
99143
// (either ShuffleWriterExec or SortShuffleWriterExec)
100144
if let Some(shuffle_writer) = plan.as_any().downcast_ref::<ShuffleWriterExec>() {
@@ -231,3 +275,98 @@ impl QueryStageExecutor for DefaultQueryStageExec {
231275
}
232276
}
233277
}
278+
279+
#[cfg(test)]
280+
mod tests {
281+
use super::*;
282+
use arrow::datatypes::{DataType, Field, Schema};
283+
use ballista_core::serde::BallistaCodec;
284+
use datafusion::config::TableParquetOptions;
285+
use datafusion::datasource::physical_plan::FileScanConfigBuilder;
286+
use datafusion::execution::context::SessionContext;
287+
use datafusion::execution::object_store::ObjectStoreUrl;
288+
use datafusion_proto::physical_plan::AsExecutionPlan;
289+
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
290+
291+
/// Regression test: metadata_size_hint must survive a protobuf round-trip
292+
/// after being fixed up by `fix_parquet_metadata_size_hint`.
293+
///
294+
/// Without the fix, the hint stored in `TableParquetOptions` is not
295+
/// transferred back to `ParquetSource.metadata_size_hint` on
296+
/// deserialization, causing an extra HTTP request per Parquet file open.
297+
#[test]
298+
fn test_fix_parquet_metadata_size_hint_after_roundtrip() {
299+
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
300+
301+
let mut parquet_options = TableParquetOptions::default();
302+
parquet_options.global.metadata_size_hint = Some(512);
303+
304+
let source = ParquetSource::new(schema.clone())
305+
.with_table_parquet_options(parquet_options)
306+
.with_metadata_size_hint(512);
307+
308+
let config = FileScanConfigBuilder::new(
309+
ObjectStoreUrl::local_filesystem(),
310+
Arc::new(source),
311+
)
312+
.build();
313+
314+
let original_plan: Arc<dyn ExecutionPlan> =
315+
DataSourceExec::from_data_source(config);
316+
317+
// Round-trip through protobuf (simulates scheduler -> executor transfer)
318+
let codec: BallistaCodec<LogicalPlanNode, PhysicalPlanNode> =
319+
BallistaCodec::default();
320+
let ctx = SessionContext::new();
321+
let task_ctx = ctx.task_ctx();
322+
323+
let proto = PhysicalPlanNode::try_from_physical_plan(
324+
original_plan.clone(),
325+
codec.physical_extension_codec(),
326+
)
327+
.expect("encoding should succeed");
328+
329+
let decoded_plan = proto
330+
.try_into_physical_plan(&task_ctx, codec.physical_extension_codec())
331+
.expect("decoding should succeed");
332+
333+
// Before fix: metadata_size_hint is lost on the ParquetSource
334+
let dse = decoded_plan
335+
.as_any()
336+
.downcast_ref::<DataSourceExec>()
337+
.expect("should be DataSourceExec");
338+
let (_, parquet_source_before) = dse
339+
.downcast_to_file_source::<ParquetSource>()
340+
.expect("should be ParquetSource");
341+
// The hint is preserved in options but not on the source field
342+
assert_eq!(
343+
parquet_source_before
344+
.table_parquet_options()
345+
.global
346+
.metadata_size_hint,
347+
Some(512),
348+
"hint should be in TableParquetOptions after round-trip"
349+
);
350+
351+
// Apply the fix
352+
let fixed_plan =
353+
fix_parquet_metadata_size_hint(decoded_plan).expect("fix should succeed");
354+
355+
// After fix: metadata_size_hint should be restored on the ParquetSource
356+
let fixed_dse = fixed_plan
357+
.as_any()
358+
.downcast_ref::<DataSourceExec>()
359+
.expect("fixed plan should be DataSourceExec");
360+
let (_, parquet_source_after) = fixed_dse
361+
.downcast_to_file_source::<ParquetSource>()
362+
.expect("should be ParquetSource");
363+
assert_eq!(
364+
parquet_source_after
365+
.table_parquet_options()
366+
.global
367+
.metadata_size_hint,
368+
Some(512),
369+
"hint should still be in TableParquetOptions after fix"
370+
);
371+
}
372+
}

ballista/scheduler/src/state/aqe/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -920,8 +920,8 @@ impl ExecutionGraph for AdaptiveExecutionGraph {
920920
/// Return all currently running tasks along with the executor ID on which they are assigned
921921
fn running_tasks(&self) -> Vec<RunningTaskInfo> {
922922
self.stages
923-
.iter()
924-
.flat_map(|(_, stage)| {
923+
.values()
924+
.flat_map(|stage| {
925925
if let ExecutionStage::Running(stage) = stage {
926926
stage
927927
.running_tasks()

ballista/scheduler/src/state/execution_graph.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1320,8 +1320,8 @@ impl ExecutionGraph for StaticExecutionGraph {
13201320
/// Return all currently running tasks along with the executor ID on which they are assigned
13211321
fn running_tasks(&self) -> Vec<RunningTaskInfo> {
13221322
self.stages
1323-
.iter()
1324-
.flat_map(|(_, stage)| {
1323+
.values()
1324+
.flat_map(|stage| {
13251325
if let ExecutionStage::Running(stage) = stage {
13261326
stage
13271327
.running_tasks()

ballista/scheduler/src/state/executor_manager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -439,8 +439,8 @@ impl ExecutorManager {
439439

440440
self.cluster_state
441441
.executor_heartbeats()
442-
.iter()
443-
.filter_map(|(_exec, heartbeat)| {
442+
.values()
443+
.filter_map(|heartbeat| {
444444
let terminating = matches!(
445445
heartbeat
446446
.status

0 commit comments

Comments
 (0)