diff --git a/crates/core/src/delta_datafusion/planner.rs b/crates/core/src/delta_datafusion/planner.rs index e04b9c8e55..d89513b78b 100644 --- a/crates/core/src/delta_datafusion/planner.rs +++ b/crates/core/src/delta_datafusion/planner.rs @@ -22,10 +22,10 @@ //! }; //! //! let state = state.with_query_planner(Arc::new(merge_planner)); -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use async_trait::async_trait; -use datafusion::logical_expr::LogicalPlan; +use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode}; use datafusion::physical_planner::PhysicalPlanner; use datafusion::{ execution::{context::QueryPlanner, session_state::SessionState}, @@ -34,28 +34,81 @@ use datafusion::{ }; use crate::delta_datafusion::DataFusionResult; +use crate::operations::delete::DeleteMetricExtensionPlanner; +use crate::operations::merge::MergeMetricExtensionPlanner; +use crate::operations::update::UpdateMetricExtensionPlanner; +use crate::operations::write::metrics::WriteMetricExtensionPlanner; + +const DELTA_EXTENSION_PLANNERS: LazyLock>> = + LazyLock::new(|| { + vec![ + MergeMetricExtensionPlanner::new(), + WriteMetricExtensionPlanner::new(), + DeleteMetricExtensionPlanner::new(), + UpdateMetricExtensionPlanner::new(), + ] + }); + +const DELTA_PLANNER: LazyLock> = LazyLock::new(|| Arc::new(DeltaPlanner)); /// Deltaplanner #[derive(Debug)] -pub struct DeltaPlanner { - /// custom extension planner - pub extension_planner: T, +pub struct DeltaPlanner; + +impl DeltaPlanner { + pub fn new() -> Arc { + DELTA_PLANNER.clone() + } } #[async_trait] -impl QueryPlanner - for DeltaPlanner -{ +impl QueryPlanner for DeltaPlanner { async fn create_physical_plan( &self, logical_plan: &LogicalPlan, session_state: &SessionState, ) -> DataFusionResult> { let planner = Arc::new(Box::new(DefaultPhysicalPlanner::with_extension_planners( - vec![Arc::new(self.extension_planner.clone())], + vec![DeltaExtensionPlanner::new()], ))); planner .create_physical_plan(logical_plan, session_state) .await } } + +pub struct DeltaExtensionPlanner; + +impl DeltaExtensionPlanner { + pub fn new() -> Arc { + Arc::new(Self {}) + } +} + +#[async_trait] +impl ExtensionPlanner for DeltaExtensionPlanner { + async fn plan_extension( + &self, + planner: &dyn PhysicalPlanner, + node: &dyn UserDefinedLogicalNode, + logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc], + session_state: &SessionState, + ) -> DataFusionResult>> { + for ext_planner in DELTA_EXTENSION_PLANNERS.iter() { + if let Some(plan) = ext_planner + .plan_extension( + planner, + node, + logical_inputs, + physical_inputs, + session_state, + ) + .await? + { + return Ok(Some(plan)); + } + } + Ok(None) + } +} diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 07123d43de..41cdad2f63 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -158,7 +158,13 @@ impl DeleteBuilder { } #[derive(Clone, Debug)] -struct DeleteMetricExtensionPlanner {} +pub(crate) struct DeleteMetricExtensionPlanner {} + +impl DeleteMetricExtensionPlanner { + pub fn new() -> Arc { + Arc::new(Self {}) + } +} #[async_trait] impl ExtensionPlanner for DeleteMetricExtensionPlanner { @@ -204,12 +210,10 @@ async fn execute_non_empty_expr( let mut actions: Vec = Vec::new(); let table_partition_cols = snapshot.metadata().partition_columns().clone(); - let delete_planner = DeltaPlanner:: { - extension_planner: DeleteMetricExtensionPlanner {}, - }; + let delete_planner = DeltaPlanner::new(); let state = SessionStateBuilder::new_from_existing(state.clone()) - .with_query_planner(Arc::new(delete_planner)) + .with_query_planner(delete_planner) .build(); let scan_config = DeltaScanConfigBuilder::default() diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index f32f39ad6e..a8bf241927 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -36,7 +36,7 @@ use std::time::Instant; use arrow_schema::{DataType, Field, SchemaBuilder}; use async_trait::async_trait; use datafusion::common::tree_node::{Transformed, TreeNode}; -use datafusion::common::{Column, DFSchema, ExprSchema, ScalarValue, TableReference}; +use datafusion::common::{plan_err, Column, DFSchema, ExprSchema, ScalarValue, TableReference}; use datafusion::datasource::provider_as_source; use datafusion::error::Result as DataFusionResult; use datafusion::execution::context::SessionConfig; @@ -631,7 +631,13 @@ pub struct MergeMetrics { pub rewrite_time_ms: u64, } #[derive(Clone, Debug)] -struct MergeMetricExtensionPlanner {} +pub(crate) struct MergeMetricExtensionPlanner {} + +impl MergeMetricExtensionPlanner { + pub fn new() -> Arc { + Arc::new(Self {}) + } +} #[async_trait] impl ExtensionPlanner for MergeMetricExtensionPlanner { @@ -711,6 +717,9 @@ impl ExtensionPlanner for MergeMetricExtensionPlanner { } if let Some(barrier) = node.as_any().downcast_ref::() { + if physical_inputs.len() != 1 { + return plan_err!("MergeBarrierExec expects exactly one input"); + } let schema = barrier.input.schema(); return Ok(Some(Arc::new(MergeBarrierExec::new( physical_inputs.first().unwrap().clone(), @@ -755,12 +764,10 @@ async fn execute( } let current_metadata = snapshot.metadata(); - let merge_planner = DeltaPlanner:: { - extension_planner: MergeMetricExtensionPlanner {}, - }; + let merge_planner = DeltaPlanner::new(); let state = SessionStateBuilder::new_from_existing(state) - .with_query_planner(Arc::new(merge_planner)) + .with_query_planner(merge_planner) .build(); // TODO: Given the join predicate, remove any expression that involve the diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index bf95949ba4..d7ac1ec098 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -201,7 +201,13 @@ impl UpdateBuilder { } #[derive(Clone, Debug)] -struct UpdateMetricExtensionPlanner {} +pub(crate) struct UpdateMetricExtensionPlanner {} + +impl UpdateMetricExtensionPlanner { + pub fn new() -> Arc { + Arc::new(Self {}) + } +} #[async_trait] impl ExtensionPlanner for UpdateMetricExtensionPlanner { @@ -276,12 +282,10 @@ async fn execute( .with_optimizer_rules(rules) .build(); - let update_planner = DeltaPlanner:: { - extension_planner: UpdateMetricExtensionPlanner {}, - }; + let update_planner = DeltaPlanner::new(); let state = SessionStateBuilder::new_from_existing(state) - .with_query_planner(Arc::new(update_planner)) + .with_query_planner(update_planner) .build(); let exec_start = Instant::now(); diff --git a/crates/core/src/operations/write/metrics.rs b/crates/core/src/operations/write/metrics.rs index 1f265234e7..bedd26b47c 100644 --- a/crates/core/src/operations/write/metrics.rs +++ b/crates/core/src/operations/write/metrics.rs @@ -17,6 +17,12 @@ pub(crate) const SOURCE_COUNT_METRIC: &str = "num_source_rows"; #[derive(Clone, Debug)] pub(crate) struct WriteMetricExtensionPlanner {} +impl WriteMetricExtensionPlanner { + pub fn new() -> Arc { + Arc::new(Self {}) + } +} + #[async_trait] impl ExtensionPlanner for WriteMetricExtensionPlanner { async fn plan_extension( diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index 52625bed94..910a12ca4d 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -36,7 +36,7 @@ pub use configs::WriterStatsConfig; use datafusion::execution::SessionStateBuilder; use delta_kernel::engine::arrow_conversion::TryIntoKernel as _; use generated_columns::{able_to_gc, add_generated_columns, add_missing_generated_columns}; -use metrics::{WriteMetricExtensionPlanner, SOURCE_COUNT_ID, SOURCE_COUNT_METRIC}; +use metrics::{SOURCE_COUNT_ID, SOURCE_COUNT_METRIC}; use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; @@ -425,9 +425,7 @@ impl std::future::IntoFuture for WriteBuilder { let mut metrics = WriteMetrics::default(); let exec_start = Instant::now(); - let write_planner = DeltaPlanner:: { - extension_planner: WriteMetricExtensionPlanner {}, - }; + let write_planner = DeltaPlanner::new(); // Create table actions to initialize table in case it does not yet exist // and should be created @@ -437,12 +435,12 @@ impl std::future::IntoFuture for WriteBuilder { let state = match this.state { Some(state) => SessionStateBuilder::new_from_existing(state.clone()) - .with_query_planner(Arc::new(write_planner)) + .with_query_planner(write_planner.clone()) .build(), None => { let state = SessionStateBuilder::new() .with_default_features() - .with_query_planner(Arc::new(write_planner)) + .with_query_planner(write_planner) .build(); register_store(this.log_store.clone(), state.runtime_env().clone()); state