Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion pkg/executor/windows/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,21 @@ go_test(
"window_sql_test.go",
],
flaky = True,
shard_count = 7,
shard_count = 8,
deps = [
":windows",
"//pkg/executor/internal/exec",
"//pkg/executor/internal/testutil",
"//pkg/expression",
"//pkg/expression/aggregation",
"//pkg/parser/ast",
"//pkg/parser/mysql",
"//pkg/planner/core/base",
"//pkg/planner/core/operator/logicalop",
"//pkg/planner/core/operator/physicalop",
"//pkg/planner/property",
"//pkg/testkit",
"//pkg/types",
"@com_github_stretchr_testify//require",
],
)
15 changes: 15 additions & 0 deletions pkg/executor/windows/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package windows

import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/executor/aggfuncs"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/executor/internal/vecgroupchecker"
Expand All @@ -26,6 +27,20 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx"
)

// BuildOrdered constructs the executor for a window plan whose child already
// provides the required partition/order property.
func BuildOrdered(sctx sessionctx.Context, v *physicalop.PhysicalWindow, childExec exec.Executor) (exec.Executor, error) {
windowExec, err := Build(sctx, v, childExec, true)
if err != nil {
return nil, err
}
pipelinedExec, ok := windowExec.(*PipelinedWindowExec)
if !ok {
return nil, errors.New("ordered window must be built with pipelined window executor")
}
return &OrderedWindowExec{PipelinedWindowExec: pipelinedExec}, nil
}

// Build constructs the concrete executor for a window physical plan.
func Build(sctx sessionctx.Context, v *physicalop.PhysicalWindow, childExec exec.Executor, forcePipelined bool) (exec.Executor, error) {
base := exec.NewBaseExecutor(sctx, v.Schema(), v.ID(), childExec)
Expand Down
16 changes: 15 additions & 1 deletion pkg/executor/windows/pipelined_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,32 @@ type PipelinedWindowExec struct {
initializedSlidingWindow bool
}

// OrderedWindowExec executes windows whose input already satisfies the required
// partition/order property.
type OrderedWindowExec struct {
*PipelinedWindowExec
}

// Close implements the Executor Close interface.
func (e *PipelinedWindowExec) Close() error {
return errors.Trace(e.BaseExecutor.Close())
}

// Open implements the Executor Open interface
func (e *PipelinedWindowExec) Open(ctx context.Context) (err error) {
if err := e.BaseExecutor.Open(ctx); err != nil {
return err
}
return e.OpenSelf()
}

// OpenSelf initializes the executor state without opening children.
func (e *PipelinedWindowExec) OpenSelf() error {
e.done, e.newPartition, e.whole, e.initializedSlidingWindow = false, false, false, false
e.dataIdx, e.curRowIdx, e.dropped, e.rowToConsume, e.accumulated = 0, 0, 0, 0, 0
e.lastStartRow, e.lastEndRow, e.stagedStartRow, e.stagedEndRow, e.rowStart, e.rowCnt = 0, 0, 0, 0, 0, 0
e.rows, e.data = make([]chunk.Row, 0), make([]dataInfo, 0)
return e.BaseExecutor.Open(ctx)
return nil
}

func (e *PipelinedWindowExec) firstResultChunkNotReady() bool {
Expand Down
86 changes: 86 additions & 0 deletions pkg/executor/windows/window_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,24 @@
package windows_test

import (
"context"
"fmt"
"testing"

"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/executor/internal/testutil"
windowexec "github.com/pingcap/tidb/pkg/executor/windows"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/aggregation"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
"github.com/pingcap/tidb/pkg/planner/core/operator/physicalop"
"github.com/pingcap/tidb/pkg/planner/property"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/types"
"github.com/stretchr/testify/require"
)

func TestWindowExecutorsBasic(t *testing.T) {
Expand All @@ -43,6 +56,79 @@ func TestWindowExecutorsBasic(t *testing.T) {
}
}

func TestBuildOrderedWindowExec(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@tidb_enable_pipelined_window_function = 0")
sctx := tk.Session()

colA := &expression.Column{Index: 0, UniqueID: 1, RetType: types.NewFieldType(mysql.TypeLonglong)}
colB := &expression.Column{Index: 1, UniqueID: 2, RetType: types.NewFieldType(mysql.TypeLonglong)}
childSchema := expression.NewSchema(colA, colB)
childExec := testutil.BuildMockDataSource(testutil.MockDataSourceParameters{
Ctx: sctx,
DataSchema: childSchema,
Ndvs: []int{-2, -2},
Datums: [][]any{
{int64(1), int64(1), int64(2), int64(2)},
{int64(1), int64(2), int64(1), int64(2)},
},
Rows: 4,
})
childExec.PrepareChunks()

windowFunc, err := aggregation.NewWindowFuncDesc(sctx.GetExprCtx(), ast.WindowFuncRowNumber, nil, false)
require.NoError(t, err)
windowSchema := childSchema.Clone()
windowSchema.Append(&expression.Column{Index: 2, UniqueID: 3, RetType: types.NewFieldType(mysql.TypeLonglong)})
windowPlan := physicalWindowForTest(sctx.GetPlanCtx(), windowSchema, colA, colB, windowFunc)

orderedExec, err := windowexec.BuildOrdered(sctx, windowPlan, childExec)
require.NoError(t, err)
require.IsType(t, &windowexec.OrderedWindowExec{}, orderedExec)

ctx := context.Background()
require.NoError(t, orderedExec.Open(ctx))
defer func() {
require.NoError(t, orderedExec.Close())
}()

chk := exec.NewFirstChunk(orderedExec)
rows := make([]string, 0, 4)
for {
require.NoError(t, orderedExec.Next(ctx, chk))
if chk.NumRows() == 0 {
break
}
for i := range chk.NumRows() {
row := chk.GetRow(i)
rows = append(rows, fmt.Sprintf("%d %d %d", row.GetInt64(0), row.GetInt64(1), row.GetInt64(2)))
}
}
require.Equal(t, []string{"1 1 1", "1 2 2", "2 1 1", "2 2 2"}, rows)
}

func physicalWindowForTest(
sctx base.PlanContext,
schema *expression.Schema,
partitionCol *expression.Column,
orderCol *expression.Column,
windowFunc *aggregation.WindowFuncDesc,
) *physicalop.PhysicalWindow {
windowPlan := physicalop.PhysicalWindow{
WindowFuncDescs: []*aggregation.WindowFuncDesc{windowFunc},
PartitionBy: []property.SortItem{{Col: partitionCol}},
OrderBy: []property.SortItem{{Col: orderCol}},
Frame: &logicalop.WindowFrame{
Type: ast.Rows,
Start: &logicalop.FrameBound{Type: ast.CurrentRow},
End: &logicalop.FrameBound{Type: ast.CurrentRow},
},
}.Init(sctx, nil, 0)
windowPlan.SetSchema(schema)
return windowPlan
}

func TestWindowReturnColumnNullableAttribute(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
Expand Down
Loading