Skip to content

Commit cc9ad9e

Browse files
fix operator of partition (#21276)
fix operator of partition Approved by: @ouyuanning, @aunjgr, @badboynt1
1 parent f839524 commit cc9ad9e

File tree

3 files changed

+52
-3
lines changed

3 files changed

+52
-3
lines changed

pkg/sql/colexec/mock_util.go

+14
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,20 @@ func MakeMockBatchs() *batch.Batch {
129129
return bat
130130
}
131131

132+
func MakeMockPartitionBatchs(val int32) *batch.Batch {
133+
bat := batch.New([]string{"a"})
134+
vecs := makeMockPartitionVecs(val)
135+
bat.Vecs = vecs
136+
bat.SetRowCount(vecs[0].Length())
137+
return bat
138+
}
139+
140+
func makeMockPartitionVecs(val int32) []*vector.Vector {
141+
vecs := make([]*vector.Vector, 1)
142+
vecs[0] = testutil.MakeInt32Vector([]int32{val, val, val}, nil)
143+
return vecs
144+
}
145+
132146
func makeMockTimeWinVecs() []*vector.Vector {
133147
vecs := make([]*vector.Vector, 2)
134148
vecs[0] = testutil.MakeDatetimeVector([]string{

pkg/sql/colexec/partition/partition.go

+34-1
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,9 @@ func (ctr *container) pickAndSend(proc *process.Process, result *vm.CallResult)
189189
hasSame := false
190190
var row int64
191191
var cols []*vector.Vector
192+
fromRemoveBatch := false
192193
for {
193-
if wholeLength == 0 {
194+
if wholeLength == 0 || fromRemoveBatch {
194195
choice = ctr.pickFirstRow()
195196
} else {
196197
if choice, hasSame = ctr.pickSameRow(row, cols); !hasSame {
@@ -211,14 +212,22 @@ func (ctr *container) pickAndSend(proc *process.Process, result *vm.CallResult)
211212
wholeLength++
212213

213214
ctr.indexList[choice]++
215+
removeHasSame := true
214216
if ctr.indexList[choice] == int64(ctr.batchList[choice].RowCount()) {
217+
removeHasSame = ctr.hasSameRow(row, cols, choice)
215218
ctr.removeBatch(proc, choice)
219+
fromRemoveBatch = true
220+
} else {
221+
fromRemoveBatch = false
216222
}
217223

218224
if len(ctr.indexList) == 0 {
219225
sendOver = true
220226
break
221227
}
228+
if !removeHasSame {
229+
break
230+
}
222231
}
223232
ctr.buf.SetRowCount(wholeLength)
224233
result.Batch = ctr.buf
@@ -270,6 +279,30 @@ func (ctr *container) pickSameRow(row int64, cols []*vector.Vector) (batIndex in
270279
return j, hasSame
271280
}
272281

282+
func (ctr *container) hasSameRow(row int64, cols []*vector.Vector, choice int) (hasSame bool) {
283+
l := len(ctr.indexList)
284+
285+
for j := 0; j < l; j++ {
286+
if j == choice {
287+
continue
288+
}
289+
hasSame = true
290+
for k := 0; k < len(ctr.compares); k++ {
291+
ctr.compares[k].Set(0, cols[k])
292+
ctr.compares[k].Set(1, ctr.orderCols[j][k])
293+
result := ctr.compares[k].Compare(0, 1, row, ctr.indexList[j])
294+
if result != 0 {
295+
hasSame = false
296+
break
297+
}
298+
}
299+
if hasSame {
300+
break
301+
}
302+
}
303+
return hasSame
304+
}
305+
273306
func (ctr *container) removeBatch(proc *process.Process, index int) {
274307
if ctr.batchList[index] != nil {
275308
ctr.batchList[index].Clean(proc.Mp())

pkg/sql/colexec/partition/partition_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,10 @@ func newExpression(pos int32, typeID types.T) *plan.Expr {
100100
}
101101

102102
func resetChildren(arg *Partition) {
103-
bat := colexec.MakeMockBatchs()
104-
op := colexec.NewMockOperator().WithBatchs([]*batch.Batch{bat})
103+
bat1 := colexec.MakeMockPartitionBatchs(1)
104+
bat2 := colexec.MakeMockPartitionBatchs(2)
105+
bat3 := colexec.MakeMockPartitionBatchs(3)
106+
op := colexec.NewMockOperator().WithBatchs([]*batch.Batch{bat1, bat2, bat3})
105107
arg.Children = nil
106108
arg.AppendChild(op)
107109
}

0 commit comments

Comments
 (0)