Skip to content

Commit 61e0d82

Browse files
authored
Pushdown physical filters to SQLExecutor (#160)
1 parent 68d41cf commit 61e0d82

5 files changed

Lines changed: 91 additions & 11 deletions

File tree

datafusion-federation/examples/shared/mod.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use datafusion::{
66
catalog::SchemaProvider,
77
error::{DataFusionError, Result},
88
execution::context::{SessionContext, SessionState},
9-
physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream},
9+
physical_plan::{stream::RecordBatchStreamAdapter, PhysicalExpr, SendableRecordBatchStream},
1010
sql::unparser::dialect::{DefaultDialect, Dialect},
1111
};
1212
use futures::TryStreamExt;
@@ -50,7 +50,12 @@ impl SQLExecutor for MockSqliteExecutor {
5050
Some("sqlite_exec".to_string())
5151
}
5252

53-
fn execute(&self, sql: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream> {
53+
fn execute(
54+
&self,
55+
sql: &str,
56+
schema: SchemaRef,
57+
_filters: &[Arc<dyn PhysicalExpr>],
58+
) -> Result<SendableRecordBatchStream> {
5459
// Execute it using the remote datafusion session context
5560
let future_stream = _execute(self.session.clone(), sql.to_string());
5661
let stream = futures::stream::once(future_stream).try_flatten();
@@ -103,7 +108,12 @@ impl SQLExecutor for MockPostgresExecutor {
103108
Some("postgres_exec".to_string())
104109
}
105110

106-
fn execute(&self, sql: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream> {
111+
fn execute(
112+
&self,
113+
sql: &str,
114+
schema: SchemaRef,
115+
_filters: &[Arc<dyn PhysicalExpr>],
116+
) -> Result<SendableRecordBatchStream> {
107117
// Execute it using the remote datafusion session context
108118
let future_stream = _execute(self.session.clone(), sql.to_string());
109119
let stream = futures::stream::once(future_stream).try_flatten();

datafusion-federation/src/schema_cast/mod.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
use async_stream::stream;
22
use datafusion::arrow::datatypes::SchemaRef;
33
use datafusion::common::Statistics;
4+
use datafusion::config::ConfigOptions;
45
use datafusion::error::{DataFusionError, Result};
56
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
7+
use datafusion::physical_plan::filter_pushdown::{FilterDescription, FilterPushdownPhase};
68
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
79
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
810
use datafusion::physical_plan::{
9-
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
11+
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PhysicalExpr,
12+
PlanProperties,
1013
};
1114
use futures::StreamExt;
1215
use std::any::Any;
@@ -131,4 +134,13 @@ impl ExecutionPlan for SchemaCastScanExec {
131134
fn metrics(&self) -> Option<MetricsSet> {
132135
Some(self.metrics_set.clone_inner())
133136
}
137+
138+
fn gather_filters_for_pushdown(
139+
&self,
140+
_phase: FilterPushdownPhase,
141+
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
142+
_config: &ConfigOptions,
143+
) -> Result<FilterDescription> {
144+
FilterDescription::from_children(parent_filters, &self.children())
145+
}
134146
}

datafusion-federation/src/sql/analyzer.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,7 @@ mod tests {
574574
use async_trait::async_trait;
575575
use datafusion::arrow::datatypes::{Schema, SchemaRef};
576576
use datafusion::execution::SendableRecordBatchStream;
577+
use datafusion::physical_plan::PhysicalExpr;
577578
use datafusion::sql::unparser::dialect::Dialect;
578579
use datafusion::sql::unparser::plan_to_sql;
579580
use datafusion::{
@@ -604,7 +605,12 @@ mod tests {
604605
unimplemented!()
605606
}
606607

607-
fn execute(&self, _query: &str, _schema: SchemaRef) -> Result<SendableRecordBatchStream> {
608+
fn execute(
609+
&self,
610+
_query: &str,
611+
_schema: SchemaRef,
612+
_filters: &[Arc<dyn PhysicalExpr>],
613+
) -> Result<SendableRecordBatchStream> {
608614
unimplemented!()
609615
}
610616

datafusion-federation/src/sql/executor.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use datafusion::{
55
common::Statistics,
66
error::Result,
77
logical_expr::LogicalPlan,
8-
physical_plan::{metrics::MetricsSet, SendableRecordBatchStream},
8+
physical_plan::{metrics::MetricsSet, PhysicalExpr, SendableRecordBatchStream},
99
sql::{sqlparser::ast, unparser::dialect::Dialect},
1010
};
1111
use std::sync::Arc;
@@ -41,8 +41,18 @@ pub trait SQLExecutor: Sync + Send {
4141
None
4242
}
4343

44-
/// Execute a SQL query
45-
fn execute(&self, query: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream>;
44+
/// Execute a SQL query.
45+
///
46+
/// `filters` contain physical expressions generated at runtime, like
47+
/// `DynamicFilterPhysicalExpr`. Since the concrete expression values only become available when
48+
/// the `SendableRecordBatchStream` is executed, they must be manually added to the SQL query,
49+
/// if necessary. However, they can be safely ignored.
50+
fn execute(
51+
&self,
52+
query: &str,
53+
schema: SchemaRef,
54+
filters: &[Arc<dyn PhysicalExpr>],
55+
) -> Result<SendableRecordBatchStream>;
4656

4757
/// Returns statistics for this `SQLExecutor` node. If statistics are not available, it should
4858
/// return [`Statistics::new_unknown`] (the default), not an error. See the `ExecutionPlan`

datafusion-federation/src/sql/mod.rs

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,19 @@ use datafusion::{
1515
tree_node::{Transformed, TreeNode},
1616
Statistics,
1717
},
18+
config::ConfigOptions,
1819
error::{DataFusionError, Result},
1920
execution::{context::SessionState, TaskContext},
2021
logical_expr::{Extension, LogicalPlan},
2122
optimizer::{optimizer::Optimizer, OptimizerConfig, OptimizerRule},
2223
physical_expr::EquivalenceProperties,
2324
physical_plan::{
2425
execution_plan::{Boundedness, EmissionType},
26+
filter_pushdown::{
27+
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
28+
},
2529
metrics::MetricsSet,
26-
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
30+
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, PlanProperties,
2731
SendableRecordBatchStream,
2832
},
2933
sql::{sqlparser::ast::Statement, unparser::Unparser},
@@ -162,6 +166,7 @@ pub struct VirtualExecutionPlan {
162166
executor: Arc<dyn SQLExecutor>,
163167
props: PlanProperties,
164168
statistics: Statistics,
169+
filters: Vec<Arc<dyn PhysicalExpr>>,
165170
}
166171

167172
impl VirtualExecutionPlan {
@@ -178,6 +183,7 @@ impl VirtualExecutionPlan {
178183
executor,
179184
props,
180185
statistics,
186+
filters: Vec::new(),
181187
}
182188
}
183189

@@ -358,7 +364,8 @@ impl ExecutionPlan for VirtualExecutionPlan {
358364
_partition: usize,
359365
_context: Arc<TaskContext>,
360366
) -> Result<SendableRecordBatchStream> {
361-
self.executor.execute(&self.final_sql()?, self.schema())
367+
self.executor
368+
.execute(&self.final_sql()?, self.schema(), &self.filters)
362369
}
363370

364371
fn properties(&self) -> &PlanProperties {
@@ -372,6 +379,36 @@ impl ExecutionPlan for VirtualExecutionPlan {
372379
fn metrics(&self) -> Option<MetricsSet> {
373380
self.executor.metrics()
374381
}
382+
383+
fn handle_child_pushdown_result(
384+
&self,
385+
_phase: FilterPushdownPhase,
386+
child_pushdown_result: ChildPushdownResult,
387+
_config: &ConfigOptions,
388+
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
389+
let parent_filters: Vec<_> = child_pushdown_result
390+
.clone()
391+
.parent_filters
392+
.into_iter()
393+
.map(|f| f.filter)
394+
.collect();
395+
396+
if parent_filters.is_empty() {
397+
return Ok(FilterPushdownPropagation {
398+
filters: vec![],
399+
updated_node: None,
400+
});
401+
}
402+
403+
let filters_pushed_down = vec![PushedDown::Yes; parent_filters.len()];
404+
let mut node = self.clone();
405+
node.filters = parent_filters;
406+
407+
Ok(FilterPushdownPropagation {
408+
filters: filters_pushed_down,
409+
updated_node: Some(Arc::new(node)),
410+
})
411+
}
375412
}
376413

377414
#[cfg(test)]
@@ -416,7 +453,12 @@ mod tests {
416453
Arc::new(unparser::dialect::DefaultDialect {})
417454
}
418455

419-
fn execute(&self, _query: &str, _schema: SchemaRef) -> Result<SendableRecordBatchStream> {
456+
fn execute(
457+
&self,
458+
_query: &str,
459+
_schema: SchemaRef,
460+
_filters: &[Arc<dyn PhysicalExpr>],
461+
) -> Result<SendableRecordBatchStream> {
420462
unimplemented!()
421463
}
422464

0 commit comments

Comments
 (0)