Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion-federation/src/schema_cast/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_stream::stream;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::common::Statistics;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
Expand Down Expand Up @@ -113,4 +114,8 @@ impl ExecutionPlan for SchemaCastScanExec {
},
)))
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
self.input.partition_statistics(partition)
}
}
8 changes: 8 additions & 0 deletions datafusion-federation/src/sql/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use async_trait::async_trait;
use core::fmt;
use datafusion::{
arrow::datatypes::SchemaRef,
common::Statistics,
error::Result,
logical_expr::LogicalPlan,
physical_plan::SendableRecordBatchStream,
Expand Down Expand Up @@ -43,6 +44,13 @@ pub trait SQLExecutor: Sync + Send {
/// Execute a SQL query
fn execute(&self, query: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream>;

/// Returns statistics for this `SQLExecutor` node. If statistics are not available, it should
/// return [`Statistics::new_unknown`] (the default), not an error. See the `ExecutionPlan`
/// trait.
async fn statistics(&self, plan: &LogicalPlan) -> Result<Statistics> {
Ok(Statistics::new_unknown(plan.schema().as_arrow()))
}

/// Returns the tables provided by the remote
async fn table_names(&self) -> Result<Vec<String>>;

Expand Down
14 changes: 12 additions & 2 deletions datafusion-federation/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use async_trait::async_trait;
use datafusion::{
arrow::datatypes::{Schema, SchemaRef},
common::tree_node::{Transformed, TreeNode},
common::Statistics,
error::{DataFusionError, Result},
execution::{context::SessionState, TaskContext},
logical_expr::{Extension, LogicalPlan},
Expand Down Expand Up @@ -140,9 +141,12 @@ impl FederationPlanner for SQLFederationPlanner {
_session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
let schema = Arc::new(node.plan().schema().as_arrow().clone());
let plan = node.plan().clone();
let statistics = self.executor.statistics(&plan).await?;
let input = Arc::new(VirtualExecutionPlan::new(
node.plan().clone(),
plan,
Arc::clone(&self.executor),
statistics,
));
let schema_cast_exec = schema_cast::SchemaCastScanExec::new(input, schema);
Ok(Arc::new(schema_cast_exec))
Expand All @@ -154,10 +158,11 @@ struct VirtualExecutionPlan {
plan: LogicalPlan,
executor: Arc<dyn SQLExecutor>,
props: PlanProperties,
statistics: Statistics,
}

impl VirtualExecutionPlan {
pub fn new(plan: LogicalPlan, executor: Arc<dyn SQLExecutor>) -> Self {
pub fn new(plan: LogicalPlan, executor: Arc<dyn SQLExecutor>, statistics: Statistics) -> Self {
let schema: Schema = plan.schema().as_ref().into();
let props = PlanProperties::new(
EquivalenceProperties::new(Arc::new(schema)),
Expand All @@ -169,6 +174,7 @@ impl VirtualExecutionPlan {
plan,
executor,
props,
statistics,
}
}

Expand Down Expand Up @@ -343,6 +349,10 @@ impl ExecutionPlan for VirtualExecutionPlan {
fn properties(&self) -> &PlanProperties {
&self.props
}

fn partition_statistics(&self, _partition: Option<usize>) -> Result<Statistics> {
Ok(self.statistics.clone())
}
}

#[cfg(test)]
Expand Down