Skip to content

Commit e6939df

Browse files
committed
planner: narrow rollup grouping position preservation
1 parent d42ec75 commit e6939df

2 files changed

Lines changed: 67 additions & 25 deletions

File tree

pkg/planner/core/issuetest/planner_issue_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -895,6 +895,18 @@ ORDER BY field1`).Check(testkit.Rows())
895895
tk.MustQuery("show warnings").Check(testkit.Rows())
896896
tk.MustQuery("select /* issue:65965 */ a, b, a as d, sum(c) from t1 group by 1, 2, 3 with rollup").Sort().Check(expected)
897897
tk.MustQuery("show warnings").Check(testkit.Rows())
898+
899+
mixedExpected := testkit.Rows(
900+
"1 1 3",
901+
"1 <nil> 3",
902+
"4 4 6",
903+
"4 <nil> 6",
904+
"7 7 9",
905+
"7 <nil> 9",
906+
"<nil> <nil> 18",
907+
)
908+
tk.MustQuery("select /* issue:65965 */ a, a as d, sum(c) from t1 group by a, d, a with rollup").Sort().Check(mixedExpected)
909+
tk.MustQuery("show warnings").Check(testkit.Rows())
898910
})
899911

900912
// issue-67802-mutable-user-var-join-cond-should-not-become-inner-side-filter

pkg/planner/core/logical_plan_builder.go

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -147,18 +147,11 @@ func (b *PlanBuilder) buildExpand(p base.LogicalPlan, gbyItems []expression.Expr
147147
b.optFlag |= rule.FlagResolveExpand
148148

149149
// Rollup syntax require expand OP to do the data expansion, different data replica supply the different grouping layout.
150-
expandGbyExprs := gbyItems
151-
expandGbySourceFieldIndices := gbyItemSourceFieldIndices
152-
gbyExprsRefPos := make([]int, 0, len(gbyItems))
153-
keepGbyItemPositions := needPreserveRollupGbyItemPositions(gbyItems, gbyItemSourceFieldIndices)
154-
if !keepGbyItemPositions {
155-
expandGbyExprs, gbyExprsRefPos = expression.DeduplicateGbyExpression(gbyItems)
156-
expandGbySourceFieldIndices = deriveDeduplicatedGbySourceFieldIndices(gbyExprsRefPos, gbyItemSourceFieldIndices, len(expandGbyExprs))
157-
}
150+
expandGbyExprs, expandGbySourceFieldIndices, rollupGbyExprsRefPos := deduplicateRollupGbyItems(gbyItems, gbyItemSourceFieldIndices)
158151

159152
// Build another projection below. When a repeated GROUP BY item is introduced by a SELECT
160-
// alias or ordinal, keep every original GROUP BY item position visible to Expand because
161-
// those positions can have different ROLLUP output nullability.
153+
// alias or ordinal, keep that alias/ordinal-backed position visible to Expand because
154+
// its ROLLUP output nullability can differ from ordinary repeated items.
162155
proj := logicalop.LogicalProjection{Exprs: make([]expression.Expression, 0, p.Schema().Len()+len(expandGbyExprs))}.Init(b.ctx, b.getSelectOffset())
163156
// project: child's output and GbyExprs in advance. (make every group-by item to be a column)
164157
projSchema := p.Schema().Clone()
@@ -195,15 +188,7 @@ func (b *PlanBuilder) buildExpand(p base.LogicalPlan, gbyItems []expression.Expr
195188
proj.SetSchema(projSchema)
196189
proj.SetChildren(p)
197190
proj.Proj4Expand = true
198-
var newGbyItems []expression.Expression
199-
if keepGbyItemPositions {
200-
newGbyItems = make([]expression.Expression, 0, len(distinctGbyCols))
201-
for _, col := range distinctGbyCols {
202-
newGbyItems = append(newGbyItems, col.Clone())
203-
}
204-
} else {
205-
newGbyItems = expression.RestoreGbyExpression(distinctGbyCols, gbyExprsRefPos)
206-
}
191+
newGbyItems := expression.RestoreGbyExpression(distinctGbyCols, rollupGbyExprsRefPos)
207192

208193
// build expand.
209194
rollupGroupingSets := expression.RollupGroupingSets(newGbyItems)
@@ -272,21 +257,59 @@ func (b *PlanBuilder) buildExpand(p base.LogicalPlan, gbyItems []expression.Expr
272257
return expand, newGbyItems, nil
273258
}
274259

275-
func needPreserveRollupGbyItemPositions(gbyItems []expression.Expression, sourceFieldIndices []int) bool {
276-
for i, sourceFieldIndex := range sourceFieldIndices {
277-
if sourceFieldIndex < 0 {
260+
func deduplicateRollupGbyItems(gbyItems []expression.Expression, sourceFieldIndices []int) ([]expression.Expression, []int, []int) {
261+
preservePositions := collectPreservedRollupGbyItemPositions(gbyItems, sourceFieldIndices)
262+
if !slices.Contains(preservePositions, true) {
263+
expandGbyExprs, gbyExprsRefPos := expression.DeduplicateGbyExpression(gbyItems)
264+
expandSourceFieldIndices := deriveDeduplicatedGbySourceFieldIndices(gbyExprsRefPos, sourceFieldIndices, len(expandGbyExprs))
265+
return expandGbyExprs, expandSourceFieldIndices, gbyExprsRefPos
266+
}
267+
268+
expandGbyExprs := make([]expression.Expression, 0, len(gbyItems))
269+
expandSourceFieldIndices := make([]int, 0, len(gbyItems))
270+
rollupRefPos := make([]int, 0, len(gbyItems))
271+
ordinaryRefPos := make(map[string]int, len(gbyItems))
272+
273+
for i, item := range gbyItems {
274+
sourceFieldIndex := sourceFieldIndexAt(sourceFieldIndices, i)
275+
if preservePositions[i] {
276+
refPos := len(expandGbyExprs)
277+
expandGbyExprs = append(expandGbyExprs, item)
278+
expandSourceFieldIndices = append(expandSourceFieldIndices, sourceFieldIndex)
279+
rollupRefPos = append(rollupRefPos, refPos)
280+
continue
281+
}
282+
283+
key := string(item.CanonicalHashCode())
284+
if _, ok := ordinaryRefPos[key]; ok {
285+
continue
286+
}
287+
refPos := len(expandGbyExprs)
288+
ordinaryRefPos[key] = refPos
289+
expandGbyExprs = append(expandGbyExprs, item)
290+
expandSourceFieldIndices = append(expandSourceFieldIndices, sourceFieldIndex)
291+
rollupRefPos = append(rollupRefPos, refPos)
292+
}
293+
return expandGbyExprs, expandSourceFieldIndices, rollupRefPos
294+
}
295+
296+
func collectPreservedRollupGbyItemPositions(gbyItems []expression.Expression, sourceFieldIndices []int) []bool {
297+
preservePositions := make([]bool, len(gbyItems))
298+
for i := range gbyItems {
299+
if sourceFieldIndexAt(sourceFieldIndices, i) < 0 {
278300
continue
279301
}
280302
for j := range gbyItems {
281303
if i == j {
282304
continue
283305
}
284306
if bytes.Equal(gbyItems[i].CanonicalHashCode(), gbyItems[j].CanonicalHashCode()) {
285-
return true
307+
preservePositions[i] = true
308+
break
286309
}
287310
}
288311
}
289-
return false
312+
return preservePositions
290313
}
291314

292315
func deriveDeduplicatedGbySourceFieldIndices(gbyExprsRefPos []int, sourceFieldIndices []int, distinctLen int) []int {
@@ -296,12 +319,19 @@ func deriveDeduplicatedGbySourceFieldIndices(gbyExprsRefPos []int, sourceFieldIn
296319
}
297320
for idx, refPos := range gbyExprsRefPos {
298321
if res[refPos] == -1 {
299-
res[refPos] = sourceFieldIndices[idx]
322+
res[refPos] = sourceFieldIndexAt(sourceFieldIndices, idx)
300323
}
301324
}
302325
return res
303326
}
304327

328+
func sourceFieldIndexAt(sourceFieldIndices []int, idx int) int {
329+
if idx < len(sourceFieldIndices) {
330+
return sourceFieldIndices[idx]
331+
}
332+
return -1
333+
}
334+
305335
func (b *PlanBuilder) buildAggregation(ctx context.Context, p base.LogicalPlan, aggFuncList []*ast.AggregateFuncExpr, gbyItems []expression.Expression,
306336
correlatedAggMap map[*ast.AggregateFuncExpr]int) (base.LogicalPlan, map[int]int, error) {
307337
b.optFlag |= rule.FlagBuildKeyInfo

0 commit comments

Comments
 (0)