Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions datafusion-federation/examples/shared/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use datafusion::{
catalog::SchemaProvider,
error::{DataFusionError, Result},
execution::context::{SessionContext, SessionState},
physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream},
physical_plan::{stream::RecordBatchStreamAdapter, PhysicalExpr, SendableRecordBatchStream},
sql::unparser::dialect::{DefaultDialect, Dialect},
};
use futures::TryStreamExt;
Expand Down Expand Up @@ -50,7 +50,12 @@ impl SQLExecutor for MockSqliteExecutor {
Some("sqlite_exec".to_string())
}

fn execute(&self, sql: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream> {
fn execute(
&self,
sql: &str,
schema: SchemaRef,
_filters: &[Arc<dyn PhysicalExpr>],
) -> Result<SendableRecordBatchStream> {
// Execute it using the remote datafusion session context
let future_stream = _execute(self.session.clone(), sql.to_string());
let stream = futures::stream::once(future_stream).try_flatten();
Expand Down Expand Up @@ -103,7 +108,12 @@ impl SQLExecutor for MockPostgresExecutor {
Some("postgres_exec".to_string())
}

fn execute(&self, sql: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream> {
fn execute(
&self,
sql: &str,
schema: SchemaRef,
_filters: &[Arc<dyn PhysicalExpr>],
) -> Result<SendableRecordBatchStream> {
// Execute it using the remote datafusion session context
let future_stream = _execute(self.session.clone(), sql.to_string());
let stream = futures::stream::once(future_stream).try_flatten();
Expand Down
14 changes: 13 additions & 1 deletion datafusion-federation/src/schema_cast/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use async_stream::stream;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::common::Statistics;
use datafusion::config::ConfigOptions;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::filter_pushdown::{FilterDescription, FilterPushdownPhase};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PhysicalExpr,
PlanProperties,
};
use futures::StreamExt;
use std::any::Any;
Expand Down Expand Up @@ -131,4 +134,13 @@ impl ExecutionPlan for SchemaCastScanExec {
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics_set.clone_inner())
}

fn gather_filters_for_pushdown(
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&self,
_phase: FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterDescription> {
FilterDescription::from_children(parent_filters, &self.children())
}
}
8 changes: 7 additions & 1 deletion datafusion-federation/src/sql/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ mod tests {
use async_trait::async_trait;
use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::PhysicalExpr;
use datafusion::sql::unparser::dialect::Dialect;
use datafusion::sql::unparser::plan_to_sql;
use datafusion::{
Expand Down Expand Up @@ -604,7 +605,12 @@ mod tests {
unimplemented!()
}

fn execute(&self, _query: &str, _schema: SchemaRef) -> Result<SendableRecordBatchStream> {
fn execute(
&self,
_query: &str,
_schema: SchemaRef,
_filters: &[Arc<dyn PhysicalExpr>],
) -> Result<SendableRecordBatchStream> {
unimplemented!()
}

Expand Down
16 changes: 13 additions & 3 deletions datafusion-federation/src/sql/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use datafusion::{
common::Statistics,
error::Result,
logical_expr::LogicalPlan,
physical_plan::{metrics::MetricsSet, SendableRecordBatchStream},
physical_plan::{metrics::MetricsSet, PhysicalExpr, SendableRecordBatchStream},
sql::{sqlparser::ast, unparser::dialect::Dialect},
};
use std::sync::Arc;
Expand Down Expand Up @@ -41,8 +41,18 @@ pub trait SQLExecutor: Sync + Send {
None
}

/// Execute a SQL query
fn execute(&self, query: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream>;
/// Execute a SQL query.
///
/// `filters` contain physical expressions generated at runtime, like
/// `DynamicFilterPhysicalExpr`. Since the concrete expression values only become available when
/// the `SendableRecordBatchStream` is executed, they must be manually added to the SQL query,
/// if necessary. However, they can be safely ignored.
fn execute(
&self,
query: &str,
schema: SchemaRef,
filters: &[Arc<dyn PhysicalExpr>],
) -> Result<SendableRecordBatchStream>;

/// Returns statistics for this `SQLExecutor` node. If statistics are not available, it should
/// return [`Statistics::new_unknown`] (the default), not an error. See the `ExecutionPlan`
Expand Down
48 changes: 45 additions & 3 deletions datafusion-federation/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@ use datafusion::{
tree_node::{Transformed, TreeNode},
Statistics,
},
config::ConfigOptions,
error::{DataFusionError, Result},
execution::{context::SessionState, TaskContext},
logical_expr::{Extension, LogicalPlan},
optimizer::{optimizer::Optimizer, OptimizerConfig, OptimizerRule},
physical_expr::EquivalenceProperties,
physical_plan::{
execution_plan::{Boundedness, EmissionType},
filter_pushdown::{
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
},
metrics::MetricsSet,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, PlanProperties,
SendableRecordBatchStream,
},
sql::{sqlparser::ast::Statement, unparser::Unparser},
Expand Down Expand Up @@ -162,6 +166,7 @@ pub struct VirtualExecutionPlan {
executor: Arc<dyn SQLExecutor>,
props: PlanProperties,
statistics: Statistics,
filters: Vec<Arc<dyn PhysicalExpr>>,
}

impl VirtualExecutionPlan {
Expand All @@ -178,6 +183,7 @@ impl VirtualExecutionPlan {
executor,
props,
statistics,
filters: Vec::new(),
}
}

Expand Down Expand Up @@ -358,7 +364,8 @@ impl ExecutionPlan for VirtualExecutionPlan {
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
self.executor.execute(&self.final_sql()?, self.schema())
self.executor
.execute(&self.final_sql()?, self.schema(), &self.filters)
}

fn properties(&self) -> &PlanProperties {
Expand All @@ -372,6 +379,36 @@ impl ExecutionPlan for VirtualExecutionPlan {
fn metrics(&self) -> Option<MetricsSet> {
self.executor.metrics()
}

fn handle_child_pushdown_result(
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&self,
_phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
let parent_filters: Vec<_> = child_pushdown_result
.clone()
.parent_filters
.into_iter()
.map(|f| f.filter)
.collect();

if parent_filters.is_empty() {
return Ok(FilterPushdownPropagation {
filters: vec![],
updated_node: None,
});
}

let filters_pushed_down = vec![PushedDown::Yes; parent_filters.len()];
let mut node = self.clone();
node.filters = parent_filters;

Ok(FilterPushdownPropagation {
filters: filters_pushed_down,
updated_node: Some(Arc::new(node)),
})
}
}

#[cfg(test)]
Expand Down Expand Up @@ -416,7 +453,12 @@ mod tests {
Arc::new(unparser::dialect::DefaultDialect {})
}

fn execute(&self, _query: &str, _schema: SchemaRef) -> Result<SendableRecordBatchStream> {
fn execute(
&self,
_query: &str,
_schema: SchemaRef,
_filters: &[Arc<dyn PhysicalExpr>],
) -> Result<SendableRecordBatchStream> {
unimplemented!()
}

Expand Down