Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 62 additions & 9 deletions crates/core/src/delta_datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
//! };
//!
//! let state = state.with_query_planner(Arc::new(merge_planner));
use std::sync::Arc;
use std::sync::{Arc, LazyLock};

use async_trait::async_trait;
use datafusion::logical_expr::LogicalPlan;
use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion::physical_planner::PhysicalPlanner;
use datafusion::{
execution::{context::QueryPlanner, session_state::SessionState},
Expand All @@ -34,28 +34,81 @@ use datafusion::{
};

use crate::delta_datafusion::DataFusionResult;
use crate::operations::delete::DeleteMetricExtensionPlanner;
use crate::operations::merge::MergeMetricExtensionPlanner;
use crate::operations::update::UpdateMetricExtensionPlanner;
use crate::operations::write::metrics::WriteMetricExtensionPlanner;

const DELTA_EXTENSION_PLANNERS: LazyLock<Vec<Arc<dyn ExtensionPlanner + Send + Sync>>> =
LazyLock::new(|| {
vec![
MergeMetricExtensionPlanner::new(),
WriteMetricExtensionPlanner::new(),
DeleteMetricExtensionPlanner::new(),
UpdateMetricExtensionPlanner::new(),
]
});

const DELTA_PLANNER: LazyLock<Arc<DeltaPlanner>> = LazyLock::new(|| Arc::new(DeltaPlanner));

/// Deltaplanner
#[derive(Debug)]
pub struct DeltaPlanner<T: ExtensionPlanner> {
/// custom extension planner
pub extension_planner: T,
pub struct DeltaPlanner;

impl DeltaPlanner {
pub fn new() -> Arc<Self> {
DELTA_PLANNER.clone()
}
}

#[async_trait]
impl<T: ExtensionPlanner + Send + Sync + 'static + Clone + std::fmt::Debug> QueryPlanner
for DeltaPlanner<T>
{
impl QueryPlanner for DeltaPlanner {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let planner = Arc::new(Box::new(DefaultPhysicalPlanner::with_extension_planners(
vec![Arc::new(self.extension_planner.clone())],
vec![DeltaExtensionPlanner::new()],
)));
planner
.create_physical_plan(logical_plan, session_state)
.await
}
}

pub struct DeltaExtensionPlanner;

impl DeltaExtensionPlanner {
pub fn new() -> Arc<Self> {
Arc::new(Self {})
}
}

#[async_trait]
impl ExtensionPlanner for DeltaExtensionPlanner {
async fn plan_extension(
&self,
planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
session_state: &SessionState,
) -> DataFusionResult<Option<Arc<dyn ExecutionPlan>>> {
for ext_planner in DELTA_EXTENSION_PLANNERS.iter() {
if let Some(plan) = ext_planner
.plan_extension(
planner,
node,
logical_inputs,
physical_inputs,
session_state,
)
.await?
{
return Ok(Some(plan));
}
}
Ok(None)
}
}
14 changes: 9 additions & 5 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,13 @@ impl DeleteBuilder {
}

#[derive(Clone, Debug)]
struct DeleteMetricExtensionPlanner {}
pub(crate) struct DeleteMetricExtensionPlanner {}

impl DeleteMetricExtensionPlanner {
pub fn new() -> Arc<Self> {
Arc::new(Self {})
}
}

#[async_trait]
impl ExtensionPlanner for DeleteMetricExtensionPlanner {
Expand Down Expand Up @@ -204,12 +210,10 @@ async fn execute_non_empty_expr(
let mut actions: Vec<Action> = Vec::new();
let table_partition_cols = snapshot.metadata().partition_columns().clone();

let delete_planner = DeltaPlanner::<DeleteMetricExtensionPlanner> {
extension_planner: DeleteMetricExtensionPlanner {},
};
let delete_planner = DeltaPlanner::new();

let state = SessionStateBuilder::new_from_existing(state.clone())
.with_query_planner(Arc::new(delete_planner))
.with_query_planner(delete_planner)
.build();

let scan_config = DeltaScanConfigBuilder::default()
Expand Down
19 changes: 13 additions & 6 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::time::Instant;
use arrow_schema::{DataType, Field, SchemaBuilder};
use async_trait::async_trait;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::{Column, DFSchema, ExprSchema, ScalarValue, TableReference};
use datafusion::common::{plan_err, Column, DFSchema, ExprSchema, ScalarValue, TableReference};
use datafusion::datasource::provider_as_source;
use datafusion::error::Result as DataFusionResult;
use datafusion::execution::context::SessionConfig;
Expand Down Expand Up @@ -631,7 +631,13 @@ pub struct MergeMetrics {
pub rewrite_time_ms: u64,
}
#[derive(Clone, Debug)]
struct MergeMetricExtensionPlanner {}
pub(crate) struct MergeMetricExtensionPlanner {}

impl MergeMetricExtensionPlanner {
pub fn new() -> Arc<Self> {
Arc::new(Self {})
}
}

#[async_trait]
impl ExtensionPlanner for MergeMetricExtensionPlanner {
Expand Down Expand Up @@ -711,6 +717,9 @@ impl ExtensionPlanner for MergeMetricExtensionPlanner {
}

if let Some(barrier) = node.as_any().downcast_ref::<MergeBarrier>() {
if physical_inputs.len() != 1 {
return plan_err!("MergeBarrierExec expects exactly one input");
}
let schema = barrier.input.schema();
return Ok(Some(Arc::new(MergeBarrierExec::new(
physical_inputs.first().unwrap().clone(),
Expand Down Expand Up @@ -755,12 +764,10 @@ async fn execute(
}

let current_metadata = snapshot.metadata();
let merge_planner = DeltaPlanner::<MergeMetricExtensionPlanner> {
extension_planner: MergeMetricExtensionPlanner {},
};
let merge_planner = DeltaPlanner::new();

let state = SessionStateBuilder::new_from_existing(state)
.with_query_planner(Arc::new(merge_planner))
.with_query_planner(merge_planner)
.build();

// TODO: Given the join predicate, remove any expression that involve the
Expand Down
14 changes: 9 additions & 5 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,13 @@ impl UpdateBuilder {
}

#[derive(Clone, Debug)]
struct UpdateMetricExtensionPlanner {}
pub(crate) struct UpdateMetricExtensionPlanner {}

impl UpdateMetricExtensionPlanner {
pub fn new() -> Arc<Self> {
Arc::new(Self {})
}
}

#[async_trait]
impl ExtensionPlanner for UpdateMetricExtensionPlanner {
Expand Down Expand Up @@ -276,12 +282,10 @@ async fn execute(
.with_optimizer_rules(rules)
.build();

let update_planner = DeltaPlanner::<UpdateMetricExtensionPlanner> {
extension_planner: UpdateMetricExtensionPlanner {},
};
let update_planner = DeltaPlanner::new();

let state = SessionStateBuilder::new_from_existing(state)
.with_query_planner(Arc::new(update_planner))
.with_query_planner(update_planner)
.build();

let exec_start = Instant::now();
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/operations/write/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ pub(crate) const SOURCE_COUNT_METRIC: &str = "num_source_rows";
#[derive(Clone, Debug)]
pub(crate) struct WriteMetricExtensionPlanner {}

impl WriteMetricExtensionPlanner {
pub fn new() -> Arc<Self> {
Arc::new(Self {})
}
}

#[async_trait]
impl ExtensionPlanner for WriteMetricExtensionPlanner {
async fn plan_extension(
Expand Down
10 changes: 4 additions & 6 deletions crates/core/src/operations/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub use configs::WriterStatsConfig;
use datafusion::execution::SessionStateBuilder;
use delta_kernel::engine::arrow_conversion::TryIntoKernel as _;
use generated_columns::{able_to_gc, add_generated_columns, add_missing_generated_columns};
use metrics::{WriteMetricExtensionPlanner, SOURCE_COUNT_ID, SOURCE_COUNT_METRIC};
use metrics::{SOURCE_COUNT_ID, SOURCE_COUNT_METRIC};
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
Expand Down Expand Up @@ -425,9 +425,7 @@ impl std::future::IntoFuture for WriteBuilder {
let mut metrics = WriteMetrics::default();
let exec_start = Instant::now();

let write_planner = DeltaPlanner::<WriteMetricExtensionPlanner> {
extension_planner: WriteMetricExtensionPlanner {},
};
let write_planner = DeltaPlanner::new();

// Create table actions to initialize table in case it does not yet exist
// and should be created
Expand All @@ -437,12 +435,12 @@ impl std::future::IntoFuture for WriteBuilder {

let state = match this.state {
Some(state) => SessionStateBuilder::new_from_existing(state.clone())
.with_query_planner(Arc::new(write_planner))
.with_query_planner(write_planner.clone())
.build(),
None => {
let state = SessionStateBuilder::new()
.with_default_features()
.with_query_planner(Arc::new(write_planner))
.with_query_planner(write_planner)
.build();
register_store(this.log_store.clone(), state.runtime_env().clone());
state
Expand Down
Loading