Skip to content

Commit 81b75b3

Browse files
authored
Merge branch 'main' into lukim/rust-1.93
2 parents c09617b + f0154aa commit 81b75b3

8 files changed

Lines changed: 115 additions & 22 deletions

File tree

.github/workflows/docs.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,6 @@ jobs:
4141
name: github-pages
4242
url: ${{ steps.deployment.outputs.page_url }}
4343
steps:
44-
- uses: actions/configure-pages@v5
45-
- uses: actions/deploy-pages@v4
44+
- uses: actions/configure-pages@v6
45+
- uses: actions/deploy-pages@v5
4646
id: deployment

Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,17 @@ resolver = "2"
44
members = ["datafusion-federation"]
55

66
[workspace.package]
7-
version = "0.5.1"
7+
version = "0.5.3"
88
edition = "2021"
99
license = "Apache-2.0"
1010
readme = "README.md"
1111
repository = "https://github.com/datafusion-contrib/datafusion-federation"
1212

1313
[workspace.dependencies]
14-
arrow-json = "57.2"
14+
arrow-json = "58.0"
1515
async-stream = "0.3"
1616
async-trait = "0.1"
17-
datafusion = "52.0"
18-
datafusion-federation = { path = "./datafusion-federation", version = "0.5.1" }
17+
datafusion = "53"
18+
datafusion-federation = { path = "./datafusion-federation", version = "0.5.3" }
1919
futures = "0.3"
2020
tokio = { version = "1.41", features = ["full"] }

datafusion-federation/CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
## [0.5.3](https://github.com/datafusion-contrib/datafusion-federation/compare/v0.5.2...v0.5.3) - 2026-03-30
11+
12+
### Other
13+
14+
- Fix `Explain Analyze` ([#168](https://github.com/datafusion-contrib/datafusion-federation/pull/168))
15+
16+
## [0.5.2](https://github.com/datafusion-contrib/datafusion-federation/compare/v0.5.1...v0.5.2) - 2026-03-24
17+
18+
### Other
19+
20+
- Upgrade to DataFusion 53 ([#164](https://github.com/datafusion-contrib/datafusion-federation/pull/164))
21+
1022
## [0.5.1](https://github.com/datafusion-contrib/datafusion-federation/compare/v0.5.0...v0.5.1) - 2026-03-04
1123

1224
### Added

datafusion-federation/src/optimizer/mod.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -191,14 +191,22 @@ impl FederationOptimizerRule {
191191
return Ok((None, ScanResult::Distinct(provider)));
192192
}
193193

194-
let Some(optimizer) = provider.optimizer() else {
195-
// No optimizer provided
196-
return Ok((None, ScanResult::None));
197-
};
194+
// Analyze plans (EXPLAIN ANALYZE) cannot be converted to SQL by
195+
// the Unparser, so they must not be federated as a whole. Only the
196+
// inner query should be federated; DataFusion's AnalyzeExec will
197+
// handle executing it and collecting metrics.
198+
if matches!(plan, LogicalPlan::Analyze(_)) {
199+
// Fall through to federate children instead.
200+
} else {
201+
let Some(optimizer) = provider.optimizer() else {
202+
// No optimizer provided
203+
return Ok((None, ScanResult::None));
204+
};
198205

199-
// If this is the root plan node; federate the entire plan
200-
let optimized = optimizer.optimize(plan.clone(), _config, |_, _| {})?;
201-
return Ok((Some(optimized), ScanResult::None));
206+
// If this is the root plan node; federate the entire plan
207+
let optimized = optimizer.optimize(plan.clone(), _config, |_, _| {})?;
208+
return Ok((Some(optimized), ScanResult::None));
209+
}
202210
}
203211

204212
// The plan is ambiguous; any input that is not yet optimized and has a

datafusion-federation/src/schema_cast/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ mod struct_cast;
2727
pub struct SchemaCastScanExec {
2828
input: Arc<dyn ExecutionPlan>,
2929
schema: SchemaRef,
30-
properties: PlanProperties,
30+
properties: Arc<PlanProperties>,
3131
metrics_set: ExecutionPlanMetricsSet,
3232
}
3333

@@ -36,12 +36,12 @@ impl SchemaCastScanExec {
3636
let eq_properties = input.equivalence_properties().clone();
3737
let emission_type = input.pipeline_behavior();
3838
let boundedness = input.boundedness();
39-
let properties = PlanProperties::new(
39+
let properties = Arc::new(PlanProperties::new(
4040
eq_properties,
4141
input.output_partitioning().clone(),
4242
emission_type,
4343
boundedness,
44-
);
44+
));
4545
Self {
4646
input,
4747
schema,
@@ -66,7 +66,7 @@ impl ExecutionPlan for SchemaCastScanExec {
6666
self
6767
}
6868

69-
fn properties(&self) -> &PlanProperties {
69+
fn properties(&self) -> &Arc<PlanProperties> {
7070
&self.properties
7171
}
7272

datafusion-federation/src/sql/analyzer.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use datafusion::{
55
logical_expr::{
66
expr::{
77
AggregateFunction, AggregateFunctionParams, Alias, Exists, InList, InSubquery,
8-
PlannedReplaceSelectItem, ScalarFunction, Sort, Unnest, WildcardOptions,
8+
PlannedReplaceSelectItem, ScalarFunction, SetComparison, Sort, Unnest, WildcardOptions,
99
WindowFunction, WindowFunctionParams,
1010
},
1111
Between, BinaryExpr, Case, Cast, Expr, GroupingSet, Like, Limit, LogicalPlan, Subquery,
@@ -563,6 +563,27 @@ fn rewrite_table_scans_in_expr(
563563
Ok(Expr::Unnest(Unnest::new(expr)))
564564
}
565565
Expr::ScalarVariable(_, _) | Expr::Literal(_, _) | Expr::Placeholder(_) => Ok(expr),
566+
Expr::SetComparison(sc) => {
567+
let expr = rewrite_table_scans_in_expr(*sc.expr, known_rewrites)?;
568+
let subquery_plan = rewrite_table_scans(&sc.subquery.subquery, known_rewrites)?;
569+
let outer_ref_columns = sc
570+
.subquery
571+
.outer_ref_columns
572+
.into_iter()
573+
.map(|e| rewrite_table_scans_in_expr(e, known_rewrites))
574+
.collect::<Result<Vec<Expr>>>()?;
575+
let subquery = Subquery {
576+
subquery: Arc::new(subquery_plan),
577+
outer_ref_columns,
578+
spans: Spans::new(),
579+
};
580+
Ok(Expr::SetComparison(SetComparison::new(
581+
Box::new(expr),
582+
subquery,
583+
sc.op,
584+
sc.quantifier,
585+
)))
586+
}
566587
}
567588
}
568589

datafusion-federation/src/sql/ast_analyzer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ impl VisitorMut for TableArgReplace {
9797
*args = Some(arg.clone());
9898
if alias.is_none() {
9999
*alias = Some(TableAlias {
100+
explicit: true,
100101
name: Ident::new(table.table()),
101102
columns: vec![],
102103
})

datafusion-federation/src/sql/mod.rs

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,20 +164,20 @@ impl FederationPlanner for SQLFederationPlanner {
164164
pub struct VirtualExecutionPlan {
165165
plan: LogicalPlan,
166166
executor: Arc<dyn SQLExecutor>,
167-
props: PlanProperties,
167+
props: Arc<PlanProperties>,
168168
statistics: Statistics,
169169
filters: Vec<Arc<dyn PhysicalExpr>>,
170170
}
171171

172172
impl VirtualExecutionPlan {
173173
pub fn new(plan: LogicalPlan, executor: Arc<dyn SQLExecutor>, statistics: Statistics) -> Self {
174174
let schema: Schema = plan.schema().as_arrow().clone();
175-
let props = PlanProperties::new(
175+
let props = Arc::new(PlanProperties::new(
176176
EquivalenceProperties::new(Arc::new(schema)),
177177
Partitioning::UnknownPartitioning(1),
178178
EmissionType::Incremental,
179179
Boundedness::Bounded,
180-
);
180+
));
181181
Self {
182182
plan,
183183
executor,
@@ -401,7 +401,7 @@ impl ExecutionPlan for VirtualExecutionPlan {
401401
.execute(&self.final_sql()?, self.schema(), &self.filters)
402402
}
403403

404-
fn properties(&self) -> &PlanProperties {
404+
fn properties(&self) -> &Arc<PlanProperties> {
405405
&self.props
406406
}
407407

@@ -766,6 +766,57 @@ mod tests {
766766
Ok(())
767767
}
768768

769+
/// EXPLAIN ANALYZE must not federate the Analyze wrapper — only the inner
770+
/// query should be federated. Otherwise the SQL Unparser fails because it
771+
/// cannot convert Analyze to SQL.
772+
#[tokio::test]
773+
async fn explain_analyze_not_federated() -> Result<(), DataFusionError> {
774+
let executor = TestExecutor {
775+
compute_context: "a".into(),
776+
};
777+
778+
let table_ref = "test_table".to_string();
779+
let table = get_test_table_provider(table_ref.clone(), executor);
780+
781+
let state = crate::default_session_state();
782+
let ctx = SessionContext::new_with_state(state);
783+
ctx.register_table(table_ref, table).unwrap();
784+
785+
let plan = ctx
786+
.sql("EXPLAIN ANALYZE SELECT * FROM test_table")
787+
.await?
788+
.into_optimized_plan()?;
789+
790+
// The top-level node must be Analyze, not Federated.
791+
assert!(
792+
matches!(plan, LogicalPlan::Analyze(_)),
793+
"Expected Analyze at root, got: {}",
794+
plan.display_indent()
795+
);
796+
797+
// The inner plan should contain a Federated extension node.
798+
let mut found_federated = false;
799+
plan.apply(|node| {
800+
if let LogicalPlan::Extension(ext) = node {
801+
if ext.node.name() == "Federated" {
802+
found_federated = true;
803+
return Ok(TreeNodeRecursion::Stop);
804+
}
805+
}
806+
Ok(TreeNodeRecursion::Continue)
807+
})?;
808+
assert!(
809+
found_federated,
810+
"Expected a Federated node inside the Analyze plan"
811+
);
812+
813+
// Physical planning should succeed (this is where it used to fail).
814+
let physical_plan = ctx.state().create_physical_plan(&plan).await?;
815+
assert_eq!(physical_plan.name(), "AnalyzeExec");
816+
817+
Ok(())
818+
}
819+
769820
#[tokio::test]
770821
async fn sql_query_rewriter_hook_invoked_and_rewrites_sql() -> Result<(), DataFusionError> {
771822
let executor = TestExecutor {

0 commit comments

Comments
 (0)