Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 139 additions & 14 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Planner for [`LogicalPlan`] to [`ExecutionPlan`]

use std::borrow::Cow;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use crate::datasource::file_format::file_type_to_format;
Expand Down Expand Up @@ -84,7 +84,7 @@ use datafusion_expr::expr::{
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::utils::split_conjunction;
use datafusion_expr::utils::{expr_to_columns, split_conjunction};
use datafusion_expr::{
Analyze, BinaryExpr, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension,
FetchType, Filter, JoinType, Operator, RecursiveQuery, SkipType, StringifiedPlan,
Expand Down Expand Up @@ -757,7 +757,7 @@ impl DefaultPhysicalPlanner {
if let Some(provider) =
target.as_any().downcast_ref::<DefaultTableSource>()
{
let filters = extract_dml_filters(input)?;
let filters = extract_dml_filters(input, table_name)?;
provider
.table_provider
.delete_from(session_state, filters)
Expand All @@ -783,7 +783,7 @@ impl DefaultPhysicalPlanner {
{
// For UPDATE, the assignments are encoded in the projection of input
// We pass the filters and let the provider handle the projection
let filters = extract_dml_filters(input)?;
let filters = extract_dml_filters(input, table_name)?;
// Extract assignments from the projection in input plan
let assignments = extract_update_assignments(input)?;
provider
Expand Down Expand Up @@ -2067,24 +2067,149 @@ fn get_physical_expr_pair(
}

/// Extract filter predicates from a DML input plan (DELETE/UPDATE).
/// Walks the logical plan tree and collects Filter predicates,
/// splitting AND conjunctions into individual expressions.
/// Column qualifiers are stripped so expressions can be evaluated against
/// the TableProvider's schema.
///
fn extract_dml_filters(input: &Arc<LogicalPlan>) -> Result<Vec<Expr>> {
/// Walks the logical plan tree and collects Filter predicates and any filters
/// pushed down into TableScan nodes, splitting AND conjunctions into individual expressions.
///
/// For UPDATE...FROM queries involving multiple tables, this function only extracts predicates
/// that reference the target table. Filters from source table scans are excluded to prevent
/// incorrect filter semantics.
///
/// Column qualifiers are stripped so expressions can be evaluated against the TableProvider's
/// schema. Deduplication is performed because filters may appear in both Filter nodes and
/// TableScan.filters when the optimizer performs partial (Inexact) filter pushdown.
///
/// # Parameters
/// - `input`: The logical plan tree to extract filters from (typically a DELETE or UPDATE plan)
/// - `target`: The target table reference to scope filter extraction (prevents multi-table filter leakage)
///
/// # Returns
/// A vector of unqualified filter expressions that can be passed to the TableProvider for execution.
/// Returns an empty vector if no applicable filters are found.
///
fn extract_dml_filters(
input: &Arc<LogicalPlan>,
target: &TableReference,
) -> Result<Vec<Expr>> {
let mut filters = Vec::new();
let mut allowed_refs = vec![target.clone()];

// First pass: collect any alias references to the target table
input.apply(|node| {
if let LogicalPlan::SubqueryAlias(alias) = node
// Check if this alias points to the target table
&& let LogicalPlan::TableScan(scan) = alias.input.as_ref()
&& scan.table_name.resolved_eq(target)
{
allowed_refs.push(TableReference::bare(alias.alias.to_string()));
}
Ok(TreeNodeRecursion::Continue)
})?;

input.apply(|node| {
if let LogicalPlan::Filter(filter) = node {
// Split AND predicates into individual expressions
filters.extend(split_conjunction(&filter.predicate).into_iter().cloned());
match node {
LogicalPlan::Filter(filter) => {
// Split AND predicates into individual expressions
for predicate in split_conjunction(&filter.predicate) {
if predicate_is_on_target_multi(predicate, &allowed_refs)? {
filters.push(predicate.clone());
}
}
}
LogicalPlan::TableScan(TableScan {
table_name,
filters: scan_filters,
..
}) => {
// Only extract filters from the target table scan.
// This prevents incorrect filter extraction in UPDATE...FROM scenarios
// where multiple table scans may have filters.
if table_name.resolved_eq(target) {
for filter in scan_filters {
filters.extend(split_conjunction(filter).into_iter().cloned());
}
}
}
// Plans without filter information
LogicalPlan::EmptyRelation(_)
| LogicalPlan::Values(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Explain(_)
| LogicalPlan::Analyze(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::Extension(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Dml(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::Copy(_)
| LogicalPlan::Unnest(_)
| LogicalPlan::RecursiveQuery(_) => {
// No filters to extract from leaf/meta plans
}
// Plans with inputs (may contain filters in children)
LogicalPlan::Projection(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Limit(_)
| LogicalPlan::Sort(_)
| LogicalPlan::Union(_)
| LogicalPlan::Join(_)
| LogicalPlan::Repartition(_)
| LogicalPlan::Aggregate(_)
| LogicalPlan::Window(_)
| LogicalPlan::Subquery(_) => {
// Filter information may appear in child nodes; continue traversal
// to extract filters from Filter/TableScan nodes deeper in the plan
}
}
Ok(TreeNodeRecursion::Continue)
})?;

// Strip table qualifiers from column references
filters.into_iter().map(strip_column_qualifiers).collect()
// Strip qualifiers and deduplicate. This ensures:
// 1. Only target-table predicates are retained from Filter nodes
// 2. Qualifiers stripped for TableProvider compatibility
// 3. Duplicates removed (from Filter nodes + TableScan.filters)
//
// Deduplication is necessary because filters may appear in both Filter nodes
// and TableScan.filters when the optimizer performs partial (Inexact) pushdown.
let mut seen_filters = HashSet::new();
filters
.into_iter()
.try_fold(Vec::new(), |mut deduped, filter| {
let unqualified = strip_column_qualifiers(filter).map_err(|e| {
e.context(format!(
"Failed to strip column qualifiers for DML filter on table '{target}'"
))
})?;
if seen_filters.insert(unqualified.clone()) {
deduped.push(unqualified);
}
Ok(deduped)
})
}

/// Determine whether a predicate references only columns from the target table
/// or its aliases.
///
/// Columns may be qualified with the target table name or any of its aliases.
/// Unqualified columns are also accepted as they implicitly belong to the target table.
fn predicate_is_on_target_multi(
expr: &Expr,
allowed_refs: &[TableReference],
) -> Result<bool> {
let mut columns = HashSet::new();
expr_to_columns(expr, &mut columns)?;

// Short-circuit on first mismatch: returns false if any column references a table not in allowed_refs.
// Columns are accepted if:
// 1. They are unqualified (no relation specified), OR
// 2. Their relation matches one of the allowed table references using resolved equality
Ok(!columns.iter().any(|column| {
column.relation.as_ref().is_some_and(|relation| {
!allowed_refs
.iter()
.any(|allowed| relation.resolved_eq(allowed))
})
}))
}

/// Strip table qualifiers from column references in an expression.
Expand Down
Loading
Loading