Skip to content

planner: add task based warning handling and collection #61077

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/planner/core/index_join_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,8 +646,9 @@ func getBestIndexJoinInnerTaskByProp(ds *logicalop.DataSource, prop *property.Ph
var innerCopTask base.Task
if prop.IndexJoinProp.TableRangeScan {
innerCopTask = buildDataSource2TableScanByIndexJoinProp(ds, prop)
} else {
innerCopTask = buildDataSource2IndexScanByIndexJoinProp(ds, prop)
}
innerCopTask = buildDataSource2IndexScanByIndexJoinProp(ds, prop)
if innerCopTask.Invalid() {
return base.InvalidTask, 0, nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,9 @@ func physicalOptimize(logic base.LogicalPlan, planCounter *base.PlanCounterTp) (
return nil, 0, plannererrors.ErrInternal.GenWithStackByArgs(errMsg)
}

// collect the warnings from task.
logic.SCtx().GetSessionVars().StmtCtx.AppendWarnings(t.(*RootTask).warnings.GetWarnings())

if err = t.Plan().ResolveIndices(); err != nil {
return nil, 0, err
}
Expand Down
108 changes: 105 additions & 3 deletions pkg/planner/core/task_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
package core

import (
"math"

"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/planner/cardinality"
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/core/cost"
"github.com/pingcap/tidb/pkg/planner/property"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
"github.com/pingcap/tipb/go-tipb"
Expand All @@ -34,6 +37,72 @@ var (
_ base.Task = &CopTask{}
)

var _ context.WarnGetterAppender = &simpleWarnings{}

type simpleWarnings struct {
warnings []*context.SQLWarn
}

// WarningCount returns the number of warnings.
func (s *simpleWarnings) WarningCount() int {
return len(s.warnings)
}

// Copy implemented the simple warnings copy to avoid use the same warnings slice for different task instance.
func (s *simpleWarnings) Copy(src *simpleWarnings) {
warnings := make([]*context.SQLWarn, 0, len(src.warnings))
warnings = append(warnings, src.warnings...)
s.warnings = warnings
}

// CopyFrom copy the warnings from src to s.
func (s *simpleWarnings) CopyFrom(src ...*simpleWarnings) {
if src == nil {
return
}
length := 0
for _, one := range src {
if one == nil {
continue
}
length += one.WarningCount()
}
s.warnings = make([]*context.SQLWarn, 0, length)
for _, one := range src {
if one == nil {
continue
}
s.warnings = append(s.warnings, one.warnings...)
}
}

// AppendWarning appends a warning to the warnings slice.
func (s *simpleWarnings) AppendWarning(warn error) {
if len(s.warnings) < math.MaxUint16 {
s.warnings = append(s.warnings, &context.SQLWarn{Level: context.WarnLevelWarning, Err: warn})
}
}

// AppendNote appends a note to the warnings slice.
func (s *simpleWarnings) AppendNote(note error) {
if len(s.warnings) < math.MaxUint16 {
s.warnings = append(s.warnings, &context.SQLWarn{Level: context.WarnLevelNote, Err: note})
}
}

// GetWarnings returns the internal all stored warnings.
func (s *simpleWarnings) GetWarnings() []context.SQLWarn {
// we just reuse and reorganize pointer of warning elem across different level's
// task warnings slice to avoid copy them totally leading mem cost.
// when best task is finished and final warnings is determined, we should convert
// pointer to struct to append it to session context.
warnings := make([]context.SQLWarn, 0, len(s.warnings))
for _, w := range s.warnings {
warnings = append(warnings, *w)
}
return warnings
}

// ************************************* RootTask Start ******************************************

// RootTask is the final sink node of a plan graph. It should be a single goroutine on tidb.
Expand All @@ -45,6 +114,9 @@ type RootTask struct {
// For copTask and rootTask, when we compose physical tree bottom-up, index join need some special info
// fetched from underlying ds which built index range or table range based on these runtime constant.
IndexJoinInfo *IndexJoinInfo

// warnings passed through different task copy attached with more upper operator specific warnings. (not concurrent safe)
warnings simpleWarnings
}

// GetPlan returns the root task's plan.
Expand All @@ -69,16 +141,24 @@ func (t *RootTask) SetEmpty(x bool) {

// Copy implements Task interface.
func (t *RootTask) Copy() base.Task {
return &RootTask{
nt := &RootTask{
p: t.p,

// when copying, just copy it out.
IndexJoinInfo: t.IndexJoinInfo,
}
// since *t will reuse the same warnings slice, we need to copy it out.
// because different task instance should have different warning slice.
nt.warnings.Copy(&t.warnings)
return nt
}

// ConvertToRootTask implements Task interface.
func (t *RootTask) ConvertToRootTask(_ base.PlanContext) base.Task {
// root -> root, only copy another one instance.
// *p: a new pointer to pointer current task's physic plan
// warnings: a new slice to store current task-bound(p-bound) warnings.
// *indexInfo: a new pointer to inherit the index join info upward if necessary.
return t.Copy().(*RootTask)
}

Expand Down Expand Up @@ -136,6 +216,9 @@ type MppTask struct {
// So physical plan be like: PhysicalHashAgg -> PhysicalSelection -> TableReader -> ExchangeSender -> PhysicalTableScan(mpp tiflash)
rootTaskConds []expression.Expression
tblColHists *statistics.HistColl

// warnings passed through different task copy attached with more upper operator specific warnings. (not concurrent safe)
warnings simpleWarnings
}

// Count implements Task interface.
Expand All @@ -146,6 +229,9 @@ func (t *MppTask) Count() float64 {
// Copy implements Task interface.
func (t *MppTask) Copy() base.Task {
nt := *t
// since *t will reuse the same warnings slice, we need to copy it out.
// cause different task instance should have different warning slice.
nt.warnings.Copy(&t.warnings)
return &nt
}

Expand Down Expand Up @@ -178,7 +264,14 @@ func (t *MppTask) MemoryUsage() (sum int64) {
}

// ConvertToRootTaskImpl implements Task interface.
func (t *MppTask) ConvertToRootTaskImpl(ctx base.PlanContext) *RootTask {
func (t *MppTask) ConvertToRootTaskImpl(ctx base.PlanContext) (rt *RootTask) {
defer func() {
// mppTask should inherit the indexJoinInfo upward.
// because mpp task bottom doesn't form the indexJoin related cop task.
if t.warnings.WarningCount() > 0 {
rt.warnings.CopyFrom(&t.warnings)
}
}()
// In disaggregated-tiflash mode, need to consider generated column.
tryExpandVirtualColumn(t.p)
sender := PhysicalExchangeSender{
Expand All @@ -192,7 +285,7 @@ func (t *MppTask) ConvertToRootTaskImpl(ctx base.PlanContext) *RootTask {
}.Init(ctx, t.p.QueryBlockOffset())
p.SetStats(t.p.StatsInfo())
collectPartitionInfosFromMPPPlan(p, t.p)
rt := &RootTask{}
rt = &RootTask{}
rt.SetPlan(p)

if len(t.rootTaskConds) > 0 {
Expand Down Expand Up @@ -269,6 +362,9 @@ type CopTask struct {
// For copTask and rootTask, when we compose physical tree bottom-up, index join need some special info
// fetched from underlying ds which built index range or table range based on these runtime constant.
IndexJoinInfo *IndexJoinInfo

// warnings passed through different task copy attached with more upper operator specific warnings. (not concurrent safe)
warnings simpleWarnings
}

// Invalid implements Task interface.
Expand All @@ -287,6 +383,9 @@ func (t *CopTask) Count() float64 {
// Copy implements Task interface.
func (t *CopTask) Copy() base.Task {
nt := *t
// since *t will reuse the same warnings slice, we need to copy it out.
// cause different task instance should have different warning slice.
nt.warnings.Copy(&t.warnings)
return &nt
}

Expand Down Expand Up @@ -348,6 +447,9 @@ func (t *CopTask) convertToRootTaskImpl(ctx base.PlanContext) (rt *RootTask) {
// return indexJoinInfo upward, when copTask is converted to rootTask.
rt.IndexJoinInfo = t.IndexJoinInfo
}
if t.warnings.WarningCount() > 0 {
rt.warnings.CopyFrom(&t.warnings)
}
}()
// copTasks are run in parallel, to make the estimated cost closer to execution time, we amortize
// the cost to cop iterator workers. According to `CopClient::Send`, the concurrency
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/context/warn.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ type WarnAppender interface {
AppendNote(msg error)
}

// WarnGetterAppender provides a function to add a warning and get all warnings.
type WarnGetterAppender interface {
WarnAppender
// GetWarnings gets all warnings. The slice is not copied, so it should not be modified.
GetWarnings() []SQLWarn
}

// WarnHandler provides a handler to append and get warnings.
type WarnHandler interface {
WarnAppender
Expand Down