Skip to content

Commit 2092bbf

Browse files
recompute aggregate schema + fix tests
1 parent cdc9e17 commit 2092bbf

2 files changed

Lines changed: 156 additions & 118 deletions

File tree

datafusion-federation/src/sql/analyzer/rewrite_table_scan.rs

Lines changed: 114 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use datafusion::{
99
logical_expr::{
1010
self,
1111
expr::{Alias, Exists, InSubquery},
12-
Expr, LogicalPlan, LogicalPlanBuilder, Projection, Subquery,
12+
Aggregate, Expr, LogicalPlan, LogicalPlanBuilder, Projection, Subquery,
1313
},
1414
sql::TableReference,
1515
};
@@ -65,117 +65,137 @@ impl RewriteTableScanAnalyzer {
6565
};
6666

6767
// In f_up, rewrite the column names in the expressions.
68-
let rewrite_column_names_in_expressions = |plan: LogicalPlan| {
69-
let plan = match plan {
70-
LogicalPlan::Unnest(unnest) => rewrite_unnest_plan(unnest, known_rewrites)?,
71-
_ => plan,
72-
};
68+
let rewrite_column_names_in_expressions =
69+
|plan: LogicalPlan| -> Result<Transformed<LogicalPlan>> {
70+
let plan = match plan {
71+
LogicalPlan::Unnest(unnest) => rewrite_unnest_plan(unnest, known_rewrites)?,
72+
_ => plan,
73+
};
7374

74-
plan.map_expressions(|expr| {
75-
expr.transform_up(|expr| {
76-
#[expect(deprecated)]
77-
match expr {
78-
Expr::Column(col) => {
79-
rewrite_column(col, known_rewrites).map(|t| t.update_data(Expr::Column))
80-
}
81-
Expr::Alias(alias) => match &alias.relation {
82-
Some(relation) => {
83-
let Some(rewrite) = known_rewrites.get(relation).and_then(
84-
|rewrite| match rewrite {
75+
let plan = plan.map_expressions(|expr| {
76+
expr.transform_up(|expr| {
77+
#[expect(deprecated)]
78+
match expr {
79+
Expr::Column(col) => rewrite_column(col, known_rewrites)
80+
.map(|t| t.update_data(Expr::Column)),
81+
Expr::Alias(alias) => match &alias.relation {
82+
Some(relation) => {
83+
let Some(rewrite) =
84+
known_rewrites.get(relation).and_then(|rewrite| {
85+
match rewrite {
86+
MultiPartTableReference::TableReference(
87+
rewrite,
88+
) => Some(rewrite),
89+
_ => None,
90+
}
91+
})
92+
else {
93+
return Ok(Transformed::no(Expr::Alias(alias)));
94+
};
95+
96+
Ok(Transformed::yes(Expr::Alias(Alias::new(
97+
*alias.expr,
98+
Some(rewrite.clone()),
99+
alias.name,
100+
))))
101+
}
102+
None => Ok(Transformed::no(Expr::Alias(alias))),
103+
},
104+
Expr::Wildcard { qualifier, options } => {
105+
if let Some(rewrite) = qualifier
106+
.as_ref()
107+
.and_then(|q| known_rewrites.get(q))
108+
.and_then(|rewrite| match rewrite {
85109
MultiPartTableReference::TableReference(rewrite) => {
86110
Some(rewrite)
87111
}
88112
_ => None,
89-
},
90-
) else {
91-
return Ok(Transformed::no(Expr::Alias(alias)));
92-
};
93-
94-
Ok(Transformed::yes(Expr::Alias(Alias::new(
95-
*alias.expr,
96-
Some(rewrite.clone()),
97-
alias.name,
98-
))))
99-
}
100-
None => Ok(Transformed::no(Expr::Alias(alias))),
101-
},
102-
Expr::Wildcard { qualifier, options } => {
103-
if let Some(rewrite) = qualifier
104-
.as_ref()
105-
.and_then(|q| known_rewrites.get(q))
106-
.and_then(|rewrite| match rewrite {
107-
MultiPartTableReference::TableReference(rewrite) => {
108-
Some(rewrite)
109-
}
110-
_ => None,
111-
})
112-
{
113-
Ok(Transformed::yes(Expr::Wildcard {
114-
qualifier: Some(rewrite.clone()),
115-
options,
116-
}))
117-
} else {
118-
Ok(Transformed::no(Expr::Wildcard { qualifier, options }))
113+
})
114+
{
115+
Ok(Transformed::yes(Expr::Wildcard {
116+
qualifier: Some(rewrite.clone()),
117+
options,
118+
}))
119+
} else {
120+
Ok(Transformed::no(Expr::Wildcard { qualifier, options }))
121+
}
119122
}
120-
}
121-
// We can't match directly on the outer ref columns until https://github.com/apache/datafusion/issues/16147 is fixed.
122-
Expr::ScalarSubquery(Subquery {
123-
outer_ref_columns,
124-
subquery,
125-
}) => {
126-
let outer_ref_columns =
127-
rewrite_outer_reference_columns(outer_ref_columns, known_rewrites)?;
128-
129-
Ok(Transformed::yes(Expr::ScalarSubquery(Subquery {
123+
// We can't match directly on the outer ref columns until https://github.com/apache/datafusion/issues/16147 is fixed.
124+
Expr::ScalarSubquery(Subquery {
130125
outer_ref_columns,
131126
subquery,
132-
})))
133-
}
134-
Expr::Exists(Exists {
135-
subquery:
136-
Subquery {
137-
subquery,
127+
}) => {
128+
let outer_ref_columns = rewrite_outer_reference_columns(
138129
outer_ref_columns,
139-
},
140-
negated,
141-
}) => {
142-
let outer_ref_columns =
143-
rewrite_outer_reference_columns(outer_ref_columns, known_rewrites)?;
144-
145-
Ok(Transformed::yes(Expr::Exists(Exists {
146-
subquery: Subquery {
130+
known_rewrites,
131+
)?;
132+
133+
Ok(Transformed::yes(Expr::ScalarSubquery(Subquery {
147134
outer_ref_columns,
148135
subquery,
149-
},
136+
})))
137+
}
138+
Expr::Exists(Exists {
139+
subquery:
140+
Subquery {
141+
subquery,
142+
outer_ref_columns,
143+
},
150144
negated,
151-
})))
152-
}
153-
Expr::InSubquery(InSubquery {
154-
subquery:
155-
Subquery {
145+
}) => {
146+
let outer_ref_columns = rewrite_outer_reference_columns(
156147
outer_ref_columns,
157-
subquery,
158-
},
159-
expr,
160-
negated,
161-
}) => {
162-
let outer_ref_columns =
163-
rewrite_outer_reference_columns(outer_ref_columns, known_rewrites)?;
164-
165-
Ok(Transformed::yes(Expr::InSubquery(InSubquery {
148+
known_rewrites,
149+
)?;
150+
151+
Ok(Transformed::yes(Expr::Exists(Exists {
152+
subquery: Subquery {
153+
outer_ref_columns,
154+
subquery,
155+
},
156+
negated,
157+
})))
158+
}
159+
Expr::InSubquery(InSubquery {
160+
subquery:
161+
Subquery {
162+
outer_ref_columns,
163+
subquery,
164+
},
166165
expr,
167-
subquery: Subquery {
168-
outer_ref_columns,
169-
subquery,
170-
},
171166
negated,
172-
})))
167+
}) => {
168+
let outer_ref_columns = rewrite_outer_reference_columns(
169+
outer_ref_columns,
170+
known_rewrites,
171+
)?;
172+
173+
Ok(Transformed::yes(Expr::InSubquery(InSubquery {
174+
expr,
175+
subquery: Subquery {
176+
outer_ref_columns,
177+
subquery,
178+
},
179+
negated,
180+
})))
181+
}
182+
_ => Ok(Transformed::no(expr)),
173183
}
174-
_ => Ok(Transformed::no(expr)),
184+
})
185+
})?;
186+
187+
plan.map_data(|plan| match plan {
188+
LogicalPlan::Aggregate(aggr) => {
189+
// Recalculate the aggregate schema now that all of the inner expressions have been rewritten.
190+
Ok(LogicalPlan::Aggregate(Aggregate::try_new(
191+
aggr.input,
192+
aggr.group_expr,
193+
aggr.aggr_expr,
194+
)?))
175195
}
196+
plan => Ok(plan),
176197
})
177-
})
178-
};
198+
};
179199

180200
plan.transform_down_up_with_subqueries(
181201
rewrite_table_scans,

0 commit comments

Comments
 (0)