diff --git a/datafusion-federation/src/schema_cast/mod.rs b/datafusion-federation/src/schema_cast/mod.rs index 3ee892c..c237132 100644 --- a/datafusion-federation/src/schema_cast/mod.rs +++ b/datafusion-federation/src/schema_cast/mod.rs @@ -1,5 +1,6 @@ use async_stream::stream; use datafusion::arrow::datatypes::SchemaRef; +use datafusion::common::Statistics; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; @@ -113,4 +114,8 @@ impl ExecutionPlan for SchemaCastScanExec { }, ))) } + + fn partition_statistics(&self, partition: Option) -> Result { + self.input.partition_statistics(partition) + } } diff --git a/datafusion-federation/src/sql/executor.rs b/datafusion-federation/src/sql/executor.rs index e45c6f6..79af1c1 100644 --- a/datafusion-federation/src/sql/executor.rs +++ b/datafusion-federation/src/sql/executor.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use core::fmt; use datafusion::{ arrow::datatypes::SchemaRef, + common::Statistics, error::Result, logical_expr::LogicalPlan, physical_plan::SendableRecordBatchStream, @@ -43,6 +44,13 @@ pub trait SQLExecutor: Sync + Send { /// Execute a SQL query fn execute(&self, query: &str, schema: SchemaRef) -> Result; + /// Returns statistics for this `SQLExecutor` node. If statistics are not available, it should + /// return [`Statistics::new_unknown`] (the default), not an error. See the `ExecutionPlan` + /// trait. + async fn statistics(&self, plan: &LogicalPlan) -> Result { + Ok(Statistics::new_unknown(plan.schema().as_arrow())) + } + /// Returns the tables provided by the remote async fn table_names(&self) -> Result>; diff --git a/datafusion-federation/src/sql/mod.rs b/datafusion-federation/src/sql/mod.rs index 08da182..b783cbf 100644 --- a/datafusion-federation/src/sql/mod.rs +++ b/datafusion-federation/src/sql/mod.rs @@ -12,6 +12,7 @@ use async_trait::async_trait; use datafusion::{ arrow::datatypes::{Schema, SchemaRef}, common::tree_node::{Transformed, TreeNode}, + common::Statistics, error::{DataFusionError, Result}, execution::{context::SessionState, TaskContext}, logical_expr::{Extension, LogicalPlan}, @@ -140,9 +141,12 @@ impl FederationPlanner for SQLFederationPlanner { _session_state: &SessionState, ) -> Result> { let schema = Arc::new(node.plan().schema().as_arrow().clone()); + let plan = node.plan().clone(); + let statistics = self.executor.statistics(&plan).await?; let input = Arc::new(VirtualExecutionPlan::new( - node.plan().clone(), + plan, Arc::clone(&self.executor), + statistics, )); let schema_cast_exec = schema_cast::SchemaCastScanExec::new(input, schema); Ok(Arc::new(schema_cast_exec)) @@ -154,10 +158,11 @@ struct VirtualExecutionPlan { plan: LogicalPlan, executor: Arc, props: PlanProperties, + statistics: Statistics, } impl VirtualExecutionPlan { - pub fn new(plan: LogicalPlan, executor: Arc) -> Self { + pub fn new(plan: LogicalPlan, executor: Arc, statistics: Statistics) -> Self { let schema: Schema = plan.schema().as_ref().into(); let props = PlanProperties::new( EquivalenceProperties::new(Arc::new(schema)), @@ -169,6 +174,7 @@ impl VirtualExecutionPlan { plan, executor, props, + statistics, } } @@ -343,6 +349,10 @@ impl ExecutionPlan for VirtualExecutionPlan { fn properties(&self) -> &PlanProperties { &self.props } + + fn partition_statistics(&self, _partition: Option) -> Result { + Ok(self.statistics.clone()) + } } #[cfg(test)]