Skip to content

Commit 2b87000

Browse files
authored
planner: add task based warning handling and collection (#61077)
ref #60106
1 parent 80fba18 commit 2b87000

File tree

4 files changed

+117
-4
lines changed

4 files changed

+117
-4
lines changed

pkg/planner/core/index_join_path.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,8 +646,9 @@ func getBestIndexJoinInnerTaskByProp(ds *logicalop.DataSource, prop *property.Ph
646646
var innerCopTask base.Task
647647
if prop.IndexJoinProp.TableRangeScan {
648648
innerCopTask = buildDataSource2TableScanByIndexJoinProp(ds, prop)
649+
} else {
650+
innerCopTask = buildDataSource2IndexScanByIndexJoinProp(ds, prop)
649651
}
650-
innerCopTask = buildDataSource2IndexScanByIndexJoinProp(ds, prop)
651652
if innerCopTask.Invalid() {
652653
return base.InvalidTask, 0, nil
653654
}

pkg/planner/core/optimizer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,6 +1192,9 @@ func physicalOptimize(logic base.LogicalPlan, planCounter *base.PlanCounterTp) (
11921192
return nil, 0, plannererrors.ErrInternal.GenWithStackByArgs(errMsg)
11931193
}
11941194

1195+
// collect the warnings from task.
1196+
logic.SCtx().GetSessionVars().StmtCtx.AppendWarnings(t.(*RootTask).warnings.GetWarnings())
1197+
11951198
if err = t.Plan().ResolveIndices(); err != nil {
11961199
return nil, 0, err
11971200
}

pkg/planner/core/task_base.go

Lines changed: 105 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@
1515
package core
1616

1717
import (
18+
"math"
19+
1820
"github.com/pingcap/tidb/pkg/expression"
1921
"github.com/pingcap/tidb/pkg/kv"
2022
"github.com/pingcap/tidb/pkg/planner/cardinality"
2123
"github.com/pingcap/tidb/pkg/planner/core/base"
2224
"github.com/pingcap/tidb/pkg/planner/core/cost"
2325
"github.com/pingcap/tidb/pkg/planner/property"
2426
"github.com/pingcap/tidb/pkg/statistics"
27+
"github.com/pingcap/tidb/pkg/util/context"
2528
"github.com/pingcap/tidb/pkg/util/logutil"
2629
"github.com/pingcap/tidb/pkg/util/size"
2730
"github.com/pingcap/tipb/go-tipb"
@@ -34,6 +37,72 @@ var (
3437
_ base.Task = &CopTask{}
3538
)
3639

40+
var _ context.WarnGetterAppender = &simpleWarnings{}
41+
42+
type simpleWarnings struct {
43+
warnings []*context.SQLWarn
44+
}
45+
46+
// WarningCount returns the number of warnings.
47+
func (s *simpleWarnings) WarningCount() int {
48+
return len(s.warnings)
49+
}
50+
51+
// Copy implemented the simple warnings copy to avoid use the same warnings slice for different task instance.
52+
func (s *simpleWarnings) Copy(src *simpleWarnings) {
53+
warnings := make([]*context.SQLWarn, 0, len(src.warnings))
54+
warnings = append(warnings, src.warnings...)
55+
s.warnings = warnings
56+
}
57+
58+
// CopyFrom copy the warnings from src to s.
59+
func (s *simpleWarnings) CopyFrom(src ...*simpleWarnings) {
60+
if src == nil {
61+
return
62+
}
63+
length := 0
64+
for _, one := range src {
65+
if one == nil {
66+
continue
67+
}
68+
length += one.WarningCount()
69+
}
70+
s.warnings = make([]*context.SQLWarn, 0, length)
71+
for _, one := range src {
72+
if one == nil {
73+
continue
74+
}
75+
s.warnings = append(s.warnings, one.warnings...)
76+
}
77+
}
78+
79+
// AppendWarning appends a warning to the warnings slice.
80+
func (s *simpleWarnings) AppendWarning(warn error) {
81+
if len(s.warnings) < math.MaxUint16 {
82+
s.warnings = append(s.warnings, &context.SQLWarn{Level: context.WarnLevelWarning, Err: warn})
83+
}
84+
}
85+
86+
// AppendNote appends a note to the warnings slice.
87+
func (s *simpleWarnings) AppendNote(note error) {
88+
if len(s.warnings) < math.MaxUint16 {
89+
s.warnings = append(s.warnings, &context.SQLWarn{Level: context.WarnLevelNote, Err: note})
90+
}
91+
}
92+
93+
// GetWarnings returns the internal all stored warnings.
94+
func (s *simpleWarnings) GetWarnings() []context.SQLWarn {
95+
// we just reuse and reorganize pointer of warning elem across different level's
96+
// task warnings slice to avoid copy them totally leading mem cost.
97+
// when best task is finished and final warnings is determined, we should convert
98+
// pointer to struct to append it to session context.
99+
warnings := make([]context.SQLWarn, 0, len(s.warnings))
100+
for _, w := range s.warnings {
101+
warnings = append(warnings, *w)
102+
}
103+
return warnings
104+
}
105+
37106
// ************************************* RootTask Start ******************************************
38107

39108
// RootTask is the final sink node of a plan graph. It should be a single goroutine on tidb.
@@ -45,6 +114,9 @@ type RootTask struct {
45114
// For copTask and rootTask, when we compose physical tree bottom-up, index join need some special info
46115
// fetched from underlying ds which built index range or table range based on these runtime constant.
47116
IndexJoinInfo *IndexJoinInfo
117+
118+
// warnings passed through different task copy attached with more upper operator specific warnings. (not concurrent safe)
119+
warnings simpleWarnings
48120
}
49121

50122
// GetPlan returns the root task's plan.
@@ -69,16 +141,24 @@ func (t *RootTask) SetEmpty(x bool) {
69141

70142
// Copy implements Task interface.
71143
func (t *RootTask) Copy() base.Task {
72-
return &RootTask{
144+
nt := &RootTask{
73145
p: t.p,
74146

75147
// when copying, just copy it out.
76148
IndexJoinInfo: t.IndexJoinInfo,
77149
}
150+
// since *t will reuse the same warnings slice, we need to copy it out.
151+
// because different task instance should have different warning slice.
152+
nt.warnings.Copy(&t.warnings)
153+
return nt
78154
}
79155

80156
// ConvertToRootTask implements Task interface.
81157
func (t *RootTask) ConvertToRootTask(_ base.PlanContext) base.Task {
158+
// root -> root, only copy another one instance.
159+
// *p: a new pointer to pointer current task's physic plan
160+
// warnings: a new slice to store current task-bound(p-bound) warnings.
161+
// *indexInfo: a new pointer to inherit the index join info upward if necessary.
82162
return t.Copy().(*RootTask)
83163
}
84164

@@ -136,6 +216,9 @@ type MppTask struct {
136216
// So physical plan be like: PhysicalHashAgg -> PhysicalSelection -> TableReader -> ExchangeSender -> PhysicalTableScan(mpp tiflash)
137217
rootTaskConds []expression.Expression
138218
tblColHists *statistics.HistColl
219+
220+
// warnings passed through different task copy attached with more upper operator specific warnings. (not concurrent safe)
221+
warnings simpleWarnings
139222
}
140223

141224
// Count implements Task interface.
@@ -146,6 +229,9 @@ func (t *MppTask) Count() float64 {
146229
// Copy implements Task interface.
147230
func (t *MppTask) Copy() base.Task {
148231
nt := *t
232+
// since *t will reuse the same warnings slice, we need to copy it out.
233+
// cause different task instance should have different warning slice.
234+
nt.warnings.Copy(&t.warnings)
149235
return &nt
150236
}
151237

@@ -178,7 +264,14 @@ func (t *MppTask) MemoryUsage() (sum int64) {
178264
}
179265

180266
// ConvertToRootTaskImpl implements Task interface.
181-
func (t *MppTask) ConvertToRootTaskImpl(ctx base.PlanContext) *RootTask {
267+
func (t *MppTask) ConvertToRootTaskImpl(ctx base.PlanContext) (rt *RootTask) {
268+
defer func() {
269+
// mppTask should inherit the indexJoinInfo upward.
270+
// because mpp task bottom doesn't form the indexJoin related cop task.
271+
if t.warnings.WarningCount() > 0 {
272+
rt.warnings.CopyFrom(&t.warnings)
273+
}
274+
}()
182275
// In disaggregated-tiflash mode, need to consider generated column.
183276
tryExpandVirtualColumn(t.p)
184277
sender := PhysicalExchangeSender{
@@ -192,7 +285,7 @@ func (t *MppTask) ConvertToRootTaskImpl(ctx base.PlanContext) *RootTask {
192285
}.Init(ctx, t.p.QueryBlockOffset())
193286
p.SetStats(t.p.StatsInfo())
194287
collectPartitionInfosFromMPPPlan(p, t.p)
195-
rt := &RootTask{}
288+
rt = &RootTask{}
196289
rt.SetPlan(p)
197290

198291
if len(t.rootTaskConds) > 0 {
@@ -269,6 +362,9 @@ type CopTask struct {
269362
// For copTask and rootTask, when we compose physical tree bottom-up, index join need some special info
270363
// fetched from underlying ds which built index range or table range based on these runtime constant.
271364
IndexJoinInfo *IndexJoinInfo
365+
366+
// warnings passed through different task copy attached with more upper operator specific warnings. (not concurrent safe)
367+
warnings simpleWarnings
272368
}
273369

274370
// Invalid implements Task interface.
@@ -287,6 +383,9 @@ func (t *CopTask) Count() float64 {
287383
// Copy implements Task interface.
288384
func (t *CopTask) Copy() base.Task {
289385
nt := *t
386+
// since *t will reuse the same warnings slice, we need to copy it out.
387+
// cause different task instance should have different warning slice.
388+
nt.warnings.Copy(&t.warnings)
290389
return &nt
291390
}
292391

@@ -348,6 +447,9 @@ func (t *CopTask) convertToRootTaskImpl(ctx base.PlanContext) (rt *RootTask) {
348447
// return indexJoinInfo upward, when copTask is converted to rootTask.
349448
rt.IndexJoinInfo = t.IndexJoinInfo
350449
}
450+
if t.warnings.WarningCount() > 0 {
451+
rt.warnings.CopyFrom(&t.warnings)
452+
}
351453
}()
352454
// copTasks are run in parallel, to make the estimated cost closer to execution time, we amortize
353455
// the cost to cop iterator workers. According to `CopClient::Send`, the concurrency

pkg/util/context/warn.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,13 @@ type WarnAppender interface {
8585
AppendNote(msg error)
8686
}
8787

88+
// WarnGetterAppender provides a function to add a warning and get all warnings.
89+
type WarnGetterAppender interface {
90+
WarnAppender
91+
// GetWarnings gets all warnings. The slice is not copied, so it should not be modified.
92+
GetWarnings() []SQLWarn
93+
}
94+
8895
// WarnHandler provides a handler to append and get warnings.
8996
type WarnHandler interface {
9097
WarnAppender

0 commit comments

Comments
 (0)