From 7c3a05a59e43511802c9bbb9987edd906999fab7 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Sun, 15 Feb 2026 12:38:48 +0000 Subject: [PATCH] Pushdown physical filters to SQLExecutor --- datafusion-federation/examples/shared/mod.rs | 16 +++++-- datafusion-federation/src/schema_cast/mod.rs | 14 +++++- datafusion-federation/src/sql/analyzer.rs | 8 +++- datafusion-federation/src/sql/executor.rs | 16 +++++-- datafusion-federation/src/sql/mod.rs | 48 ++++++++++++++++++-- 5 files changed, 91 insertions(+), 11 deletions(-) diff --git a/datafusion-federation/examples/shared/mod.rs b/datafusion-federation/examples/shared/mod.rs index 4b19990..6efca86 100644 --- a/datafusion-federation/examples/shared/mod.rs +++ b/datafusion-federation/examples/shared/mod.rs @@ -6,7 +6,7 @@ use datafusion::{ catalog::SchemaProvider, error::{DataFusionError, Result}, execution::context::{SessionContext, SessionState}, - physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}, + physical_plan::{stream::RecordBatchStreamAdapter, PhysicalExpr, SendableRecordBatchStream}, sql::unparser::dialect::{DefaultDialect, Dialect}, }; use futures::TryStreamExt; @@ -50,7 +50,12 @@ impl SQLExecutor for MockSqliteExecutor { Some("sqlite_exec".to_string()) } - fn execute(&self, sql: &str, schema: SchemaRef) -> Result { + fn execute( + &self, + sql: &str, + schema: SchemaRef, + _filters: &[Arc], + ) -> Result { // Execute it using the remote datafusion session context let future_stream = _execute(self.session.clone(), sql.to_string()); let stream = futures::stream::once(future_stream).try_flatten(); @@ -103,7 +108,12 @@ impl SQLExecutor for MockPostgresExecutor { Some("postgres_exec".to_string()) } - fn execute(&self, sql: &str, schema: SchemaRef) -> Result { + fn execute( + &self, + sql: &str, + schema: SchemaRef, + _filters: &[Arc], + ) -> Result { // Execute it using the remote datafusion session context let future_stream = _execute(self.session.clone(), sql.to_string()); let stream = futures::stream::once(future_stream).try_flatten(); diff --git a/datafusion-federation/src/schema_cast/mod.rs b/datafusion-federation/src/schema_cast/mod.rs index 4ead0ea..526886c 100644 --- a/datafusion-federation/src/schema_cast/mod.rs +++ b/datafusion-federation/src/schema_cast/mod.rs @@ -1,12 +1,15 @@ use async_stream::stream; use datafusion::arrow::datatypes::SchemaRef; use datafusion::common::Statistics; +use datafusion::config::ConfigOptions; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_plan::filter_pushdown::{FilterDescription, FilterPushdownPhase}; use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PhysicalExpr, + PlanProperties, }; use futures::StreamExt; use std::any::Any; @@ -131,4 +134,13 @@ impl ExecutionPlan for SchemaCastScanExec { fn metrics(&self) -> Option { Some(self.metrics_set.clone_inner()) } + + fn gather_filters_for_pushdown( + &self, + _phase: FilterPushdownPhase, + parent_filters: Vec>, + _config: &ConfigOptions, + ) -> Result { + FilterDescription::from_children(parent_filters, &self.children()) + } } diff --git a/datafusion-federation/src/sql/analyzer.rs b/datafusion-federation/src/sql/analyzer.rs index 3303cce..625af38 100644 --- a/datafusion-federation/src/sql/analyzer.rs +++ b/datafusion-federation/src/sql/analyzer.rs @@ -574,6 +574,7 @@ mod tests { use async_trait::async_trait; use datafusion::arrow::datatypes::{Schema, SchemaRef}; use datafusion::execution::SendableRecordBatchStream; + use datafusion::physical_plan::PhysicalExpr; use datafusion::sql::unparser::dialect::Dialect; use datafusion::sql::unparser::plan_to_sql; use datafusion::{ @@ -604,7 +605,12 @@ mod tests { unimplemented!() } - fn execute(&self, _query: &str, _schema: SchemaRef) -> Result { + fn execute( + &self, + _query: &str, + _schema: SchemaRef, + _filters: &[Arc], + ) -> Result { unimplemented!() } diff --git a/datafusion-federation/src/sql/executor.rs b/datafusion-federation/src/sql/executor.rs index 781a9cd..f09474f 100644 --- a/datafusion-federation/src/sql/executor.rs +++ b/datafusion-federation/src/sql/executor.rs @@ -5,7 +5,7 @@ use datafusion::{ common::Statistics, error::Result, logical_expr::LogicalPlan, - physical_plan::{metrics::MetricsSet, SendableRecordBatchStream}, + physical_plan::{metrics::MetricsSet, PhysicalExpr, SendableRecordBatchStream}, sql::{sqlparser::ast, unparser::dialect::Dialect}, }; use std::sync::Arc; @@ -41,8 +41,18 @@ pub trait SQLExecutor: Sync + Send { None } - /// Execute a SQL query - fn execute(&self, query: &str, schema: SchemaRef) -> Result; + /// Execute a SQL query. + /// + /// `filters` contain physical expressions generated at runtime, like + /// `DynamicFilterPhysicalExpr`. Since the concrete expression values only become available when + /// the `SendableRecordBatchStream` is executed, they must be manually added to the SQL query, + /// if necessary. However, they can be safely ignored. + fn execute( + &self, + query: &str, + schema: SchemaRef, + filters: &[Arc], + ) -> Result; /// Returns statistics for this `SQLExecutor` node. If statistics are not available, it should /// return [`Statistics::new_unknown`] (the default), not an error. See the `ExecutionPlan` diff --git a/datafusion-federation/src/sql/mod.rs b/datafusion-federation/src/sql/mod.rs index f0f1ce6..11ddada 100644 --- a/datafusion-federation/src/sql/mod.rs +++ b/datafusion-federation/src/sql/mod.rs @@ -15,6 +15,7 @@ use datafusion::{ tree_node::{Transformed, TreeNode}, Statistics, }, + config::ConfigOptions, error::{DataFusionError, Result}, execution::{context::SessionState, TaskContext}, logical_expr::{Extension, LogicalPlan}, @@ -22,8 +23,11 @@ use datafusion::{ physical_expr::EquivalenceProperties, physical_plan::{ execution_plan::{Boundedness, EmissionType}, + filter_pushdown::{ + ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown, + }, metrics::MetricsSet, - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, PlanProperties, SendableRecordBatchStream, }, sql::{sqlparser::ast::Statement, unparser::Unparser}, @@ -162,6 +166,7 @@ pub struct VirtualExecutionPlan { executor: Arc, props: PlanProperties, statistics: Statistics, + filters: Vec>, } impl VirtualExecutionPlan { @@ -178,6 +183,7 @@ impl VirtualExecutionPlan { executor, props, statistics, + filters: Vec::new(), } } @@ -358,7 +364,8 @@ impl ExecutionPlan for VirtualExecutionPlan { _partition: usize, _context: Arc, ) -> Result { - self.executor.execute(&self.final_sql()?, self.schema()) + self.executor + .execute(&self.final_sql()?, self.schema(), &self.filters) } fn properties(&self) -> &PlanProperties { @@ -372,6 +379,36 @@ impl ExecutionPlan for VirtualExecutionPlan { fn metrics(&self) -> Option { self.executor.metrics() } + + fn handle_child_pushdown_result( + &self, + _phase: FilterPushdownPhase, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + let parent_filters: Vec<_> = child_pushdown_result + .clone() + .parent_filters + .into_iter() + .map(|f| f.filter) + .collect(); + + if parent_filters.is_empty() { + return Ok(FilterPushdownPropagation { + filters: vec![], + updated_node: None, + }); + } + + let filters_pushed_down = vec![PushedDown::Yes; parent_filters.len()]; + let mut node = self.clone(); + node.filters = parent_filters; + + Ok(FilterPushdownPropagation { + filters: filters_pushed_down, + updated_node: Some(Arc::new(node)), + }) + } } #[cfg(test)] @@ -416,7 +453,12 @@ mod tests { Arc::new(unparser::dialect::DefaultDialect {}) } - fn execute(&self, _query: &str, _schema: SchemaRef) -> Result { + fn execute( + &self, + _query: &str, + _schema: SchemaRef, + _filters: &[Arc], + ) -> Result { unimplemented!() }