From ff92d0c729018cdd7d5d3b879e70c6e41eb25fff Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Thu, 3 Jul 2025 18:11:46 +0100 Subject: [PATCH 1/3] Add ability to set statistics in the SQLExecutor --- datafusion-federation/src/schema_cast/mod.rs | 5 +++++ datafusion-federation/src/sql/executor.rs | 13 +++++++++++++ datafusion-federation/src/sql/mod.rs | 6 ++++++ 3 files changed, 24 insertions(+) diff --git a/datafusion-federation/src/schema_cast/mod.rs b/datafusion-federation/src/schema_cast/mod.rs index 3ee892c..c237132 100644 --- a/datafusion-federation/src/schema_cast/mod.rs +++ b/datafusion-federation/src/schema_cast/mod.rs @@ -1,5 +1,6 @@ use async_stream::stream; use datafusion::arrow::datatypes::SchemaRef; +use datafusion::common::Statistics; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; @@ -113,4 +114,8 @@ impl ExecutionPlan for SchemaCastScanExec { }, ))) } + + fn partition_statistics(&self, partition: Option) -> Result { + self.input.partition_statistics(partition) + } } diff --git a/datafusion-federation/src/sql/executor.rs b/datafusion-federation/src/sql/executor.rs index e45c6f6..69a1ba4 100644 --- a/datafusion-federation/src/sql/executor.rs +++ b/datafusion-federation/src/sql/executor.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use core::fmt; use datafusion::{ arrow::datatypes::SchemaRef, + common::Statistics, error::Result, logical_expr::LogicalPlan, physical_plan::SendableRecordBatchStream, @@ -43,6 +44,18 @@ pub trait SQLExecutor: Sync + Send { /// Execute a SQL query fn execute(&self, query: &str, schema: SchemaRef) -> Result; + /// Returns statistics for this `SQLExecutor` node. If statistics are not available, it should + /// return [`Statistics::new_unknown`] (the default), not an error. See the `ExecutionPlan` + /// trait. + fn partition_statistics( + &self, + _partition: Option, + _query: &str, + schema: SchemaRef, + ) -> Result { + Ok(Statistics::new_unknown(&schema)) + } + /// Returns the tables provided by the remote async fn table_names(&self) -> Result>; diff --git a/datafusion-federation/src/sql/mod.rs b/datafusion-federation/src/sql/mod.rs index 08da182..8907df8 100644 --- a/datafusion-federation/src/sql/mod.rs +++ b/datafusion-federation/src/sql/mod.rs @@ -12,6 +12,7 @@ use async_trait::async_trait; use datafusion::{ arrow::datatypes::{Schema, SchemaRef}, common::tree_node::{Transformed, TreeNode}, + common::Statistics, error::{DataFusionError, Result}, execution::{context::SessionState, TaskContext}, logical_expr::{Extension, LogicalPlan}, @@ -343,6 +344,11 @@ impl ExecutionPlan for VirtualExecutionPlan { fn properties(&self) -> &PlanProperties { &self.props } + + fn partition_statistics(&self, partition: Option) -> Result { + self.executor + .partition_statistics(partition, &self.final_sql()?, self.schema()) + } } #[cfg(test)] From a000be7a762b9634de9ef54bfcf807704cd27939 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Sat, 5 Jul 2025 15:35:35 +0100 Subject: [PATCH 2/3] Remove partition from statistics --- datafusion-federation/src/sql/executor.rs | 7 +------ datafusion-federation/src/sql/mod.rs | 5 ++--- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/datafusion-federation/src/sql/executor.rs b/datafusion-federation/src/sql/executor.rs index 69a1ba4..0ee2b82 100644 --- a/datafusion-federation/src/sql/executor.rs +++ b/datafusion-federation/src/sql/executor.rs @@ -47,12 +47,7 @@ pub trait SQLExecutor: Sync + Send { /// Returns statistics for this `SQLExecutor` node. If statistics are not available, it should /// return [`Statistics::new_unknown`] (the default), not an error. See the `ExecutionPlan` /// trait. - fn partition_statistics( - &self, - _partition: Option, - _query: &str, - schema: SchemaRef, - ) -> Result { + fn statistics(&self, _query: &str, schema: SchemaRef) -> Result { Ok(Statistics::new_unknown(&schema)) } diff --git a/datafusion-federation/src/sql/mod.rs b/datafusion-federation/src/sql/mod.rs index 8907df8..1bc04ec 100644 --- a/datafusion-federation/src/sql/mod.rs +++ b/datafusion-federation/src/sql/mod.rs @@ -345,9 +345,8 @@ impl ExecutionPlan for VirtualExecutionPlan { &self.props } - fn partition_statistics(&self, partition: Option) -> Result { - self.executor - .partition_statistics(partition, &self.final_sql()?, self.schema()) + fn partition_statistics(&self, _partition: Option) -> Result { + self.executor.statistics(&self.final_sql()?, self.schema()) } } From 35f8c9009b2e5c70980a780a8b67c9d63e7e07c7 Mon Sep 17 00:00:00 2001 From: nuno-faria Date: Wed, 9 Jul 2025 13:38:39 +0100 Subject: [PATCH 3/3] Collect remote statistics at plan time --- datafusion-federation/src/sql/executor.rs | 4 ++-- datafusion-federation/src/sql/mod.rs | 11 ++++++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/datafusion-federation/src/sql/executor.rs b/datafusion-federation/src/sql/executor.rs index 0ee2b82..79af1c1 100644 --- a/datafusion-federation/src/sql/executor.rs +++ b/datafusion-federation/src/sql/executor.rs @@ -47,8 +47,8 @@ pub trait SQLExecutor: Sync + Send { /// Returns statistics for this `SQLExecutor` node. If statistics are not available, it should /// return [`Statistics::new_unknown`] (the default), not an error. See the `ExecutionPlan` /// trait. - fn statistics(&self, _query: &str, schema: SchemaRef) -> Result { - Ok(Statistics::new_unknown(&schema)) + async fn statistics(&self, plan: &LogicalPlan) -> Result { + Ok(Statistics::new_unknown(plan.schema().as_arrow())) } /// Returns the tables provided by the remote diff --git a/datafusion-federation/src/sql/mod.rs b/datafusion-federation/src/sql/mod.rs index 1bc04ec..b783cbf 100644 --- a/datafusion-federation/src/sql/mod.rs +++ b/datafusion-federation/src/sql/mod.rs @@ -141,9 +141,12 @@ impl FederationPlanner for SQLFederationPlanner { _session_state: &SessionState, ) -> Result> { let schema = Arc::new(node.plan().schema().as_arrow().clone()); + let plan = node.plan().clone(); + let statistics = self.executor.statistics(&plan).await?; let input = Arc::new(VirtualExecutionPlan::new( - node.plan().clone(), + plan, Arc::clone(&self.executor), + statistics, )); let schema_cast_exec = schema_cast::SchemaCastScanExec::new(input, schema); Ok(Arc::new(schema_cast_exec)) @@ -155,10 +158,11 @@ struct VirtualExecutionPlan { plan: LogicalPlan, executor: Arc, props: PlanProperties, + statistics: Statistics, } impl VirtualExecutionPlan { - pub fn new(plan: LogicalPlan, executor: Arc) -> Self { + pub fn new(plan: LogicalPlan, executor: Arc, statistics: Statistics) -> Self { let schema: Schema = plan.schema().as_ref().into(); let props = PlanProperties::new( EquivalenceProperties::new(Arc::new(schema)), @@ -170,6 +174,7 @@ impl VirtualExecutionPlan { plan, executor, props, + statistics, } } @@ -346,7 +351,7 @@ impl ExecutionPlan for VirtualExecutionPlan { } fn partition_statistics(&self, _partition: Option) -> Result { - self.executor.statistics(&self.final_sql()?, self.schema()) + Ok(self.statistics.clone()) } }