Skip to content

Commit 76edd15

Browse files
Recalculate schemas for all applicable nodes
1 parent 7b2147e commit 76edd15

1 file changed

Lines changed: 39 additions & 16 deletions

File tree

datafusion-federation/src/sql/analyzer/rewrite_table_scan.rs

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ use datafusion::{
77
},
88
error::DataFusionError,
99
logical_expr::{
10-
self,
10+
self, build_join_schema,
1111
expr::{Alias, Exists, InSubquery},
12-
Aggregate, Expr, LogicalPlan, LogicalPlanBuilder, Projection, Subquery,
12+
Aggregate, Expr, Join, LogicalPlan, LogicalPlanBuilder, Projection, Subquery,
13+
SubqueryAlias, Union, Window,
1314
},
1415
sql::TableReference,
1516
};
@@ -184,22 +185,44 @@ impl RewriteTableScanAnalyzer {
184185
})
185186
})?;
186187

188+
// Recalculate the schemas now that all of the inner expressions have been rewritten.
187189
plan.map_data(|plan| match plan {
188-
LogicalPlan::Aggregate(aggr) => {
189-
// Recalculate the aggregate schema now that all of the inner expressions have been rewritten.
190-
Ok(LogicalPlan::Aggregate(Aggregate::try_new(
191-
aggr.input,
192-
aggr.group_expr,
193-
aggr.aggr_expr,
194-
)?))
195-
}
196-
LogicalPlan::Projection(projection) => {
197-
// Recalculate the projection schema now that all of the inner expressions have been rewritten.
198-
Ok(LogicalPlan::Projection(Projection::try_new(
199-
projection.expr,
200-
projection.input,
201-
)?))
190+
LogicalPlan::Aggregate(aggr) => Ok(LogicalPlan::Aggregate(Aggregate::try_new(
191+
aggr.input,
192+
aggr.group_expr,
193+
aggr.aggr_expr,
194+
)?)),
195+
LogicalPlan::Window(window) => Ok(LogicalPlan::Window(Window::try_new(
196+
window.window_expr,
197+
window.input,
198+
)?)),
199+
LogicalPlan::Projection(projection) => Ok(LogicalPlan::Projection(
200+
Projection::try_new(projection.expr, projection.input)?,
201+
)),
202+
LogicalPlan::Join(join) => {
203+
let join_schema = build_join_schema(
204+
join.left.schema(),
205+
join.right.schema(),
206+
&join.join_type,
207+
)?;
208+
209+
Ok(LogicalPlan::Join(Join {
210+
left: join.left,
211+
right: join.right,
212+
on: join.on,
213+
filter: join.filter,
214+
join_type: join.join_type,
215+
join_constraint: join.join_constraint,
216+
schema: Arc::new(join_schema),
217+
null_equals_null: join.null_equals_null,
218+
}))
202219
}
220+
LogicalPlan::SubqueryAlias(subquery_alias) => Ok(LogicalPlan::SubqueryAlias(
221+
SubqueryAlias::try_new(subquery_alias.input, subquery_alias.alias)?,
222+
)),
223+
LogicalPlan::Union(union) => Ok(LogicalPlan::Union(
224+
Union::try_new_with_loose_types(union.inputs)?,
225+
)),
203226
plan => Ok(plan),
204227
})
205228
};

0 commit comments

Comments
 (0)