Skip to content

Commit 01d951e

Browse files
committed
executor: use statement context for pre-analyze stats flush
1 parent fdfb657 commit 01d951e

10 files changed

Lines changed: 58 additions & 24 deletions

pkg/executor/adapter.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
419419
}
420420

421421
if executor == nil {
422-
b := newExecutorBuilder(a.Ctx, a.InfoSchema, a.Ti)
422+
b := newExecutorBuilder(ctx, a.Ctx, a.InfoSchema, a.Ti)
423423
executor = b.build(a.Plan)
424424
if b.err != nil {
425425
return nil, b.err
@@ -678,7 +678,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
678678
execStartTime = time.Now()
679679
}
680680

681-
e, err := a.buildExecutor()
681+
e, err := a.buildExecutor(ctx)
682682
if err != nil {
683683
return nil, err
684684
}
@@ -1371,7 +1371,7 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error
13711371
a.resetPhaseDurations()
13721372

13731373
a.inheritContextFromExecuteStmt()
1374-
e, err := a.buildExecutor()
1374+
e, err := a.buildExecutor(ctx)
13751375
if err != nil {
13761376
return nil, err
13771377
}
@@ -1397,20 +1397,20 @@ type pessimisticTxn interface {
13971397
}
13981398

13991399
// buildExecutor build an executor from plan, prepared statement may need additional procedure.
1400-
func (a *ExecStmt) buildExecutor() (exec.Executor, error) {
1400+
func (a *ExecStmt) buildExecutor(ctx context.Context) (exec.Executor, error) {
14011401
defer func(start time.Time) { a.phaseBuildDurations[0] += time.Since(start) }(time.Now())
1402-
ctx := a.Ctx
1403-
stmtCtx := ctx.GetSessionVars().StmtCtx
1402+
sctx := a.Ctx
1403+
stmtCtx := sctx.GetSessionVars().StmtCtx
14041404
if _, ok := a.Plan.(*plannercore.Execute); !ok {
14051405
if stmtCtx.Priority == mysql.NoPriority && a.LowerPriority {
14061406
stmtCtx.Priority = kv.PriorityLow
14071407
}
14081408
}
1409-
if _, ok := a.Plan.(*plannercore.Analyze); ok && ctx.GetSessionVars().InRestrictedSQL {
1410-
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
1409+
if _, ok := a.Plan.(*plannercore.Analyze); ok && sctx.GetSessionVars().InRestrictedSQL {
1410+
sctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
14111411
}
14121412

1413-
b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti)
1413+
b := newExecutorBuilder(ctx, sctx, a.InfoSchema, a.Ti)
14141414
e := b.build(a.Plan)
14151415
if b.err != nil {
14161416
return nil, errors.Trace(b.err)
@@ -1428,7 +1428,7 @@ func (a *ExecStmt) buildExecutor() (exec.Executor, error) {
14281428
return nil, err
14291429
}
14301430
if executorExec.lowerPriority {
1431-
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
1431+
sctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
14321432
}
14331433
e = executorExec.stmtExec
14341434
}

pkg/executor/analyze.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ func flushStatsDeltaForAnalyze(ctx context.Context, sctx sessionctx.Context, pla
9797
if len(flushObjects) == 0 {
9898
return nil
9999
}
100+
if err := ctx.Err(); err != nil {
101+
return err
102+
}
100103

101104
// HACK: Some tests register in-process TiDB domains but do not start TiDB RPC
102105
// endpoints. Broadcasting FLUSH STATS_DELTA CLUSTER to those mock endpoints can

pkg/executor/analyze_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"time"
2424

2525
"github.com/pingcap/tidb/pkg/domain"
26+
"github.com/pingcap/tidb/pkg/executor"
2627
"github.com/pingcap/tidb/pkg/infoschema"
2728
"github.com/pingcap/tidb/pkg/parser/ast"
2829
"github.com/pingcap/tidb/pkg/session"
@@ -214,6 +215,25 @@ func TestAnalyzeSaveResultErrorDoesNotHang(t *testing.T) {
214215
}
215216
}
216217

218+
func TestBuildAnalyzePreFlushUsesStatementContext(t *testing.T) {
219+
store := testkit.CreateMockStore(t)
220+
tk := testkit.NewTestKit(t, store)
221+
tk.MustExec("use test")
222+
tk.MustExec("create table t_pre_analyze_flush_ctx(a int, b int, key idx_b(b))")
223+
tk.MustExec("insert into t_pre_analyze_flush_ctx values (1, 1), (2, 2)")
224+
225+
stmtNodes, err := tk.Session().Parse(context.Background(), "analyze table t_pre_analyze_flush_ctx all columns")
226+
require.NoError(t, err)
227+
require.Len(t, stmtNodes, 1)
228+
stmt, err := (&executor.Compiler{Ctx: tk.Session()}).Compile(context.Background(), stmtNodes[0])
229+
require.NoError(t, err)
230+
231+
ctx, cancel := context.WithCancel(context.Background())
232+
cancel()
233+
err = executor.BuildExecutorForTest(ctx, stmt)
234+
require.ErrorIs(t, err, context.Canceled)
235+
}
236+
217237
func TestAnalyzeKillDuringSaveDoesNotHang(t *testing.T) {
218238
store := testkit.CreateMockStore(t)
219239
tk := testkit.NewTestKit(t, store)

pkg/executor/analyze_utils_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,3 +60,10 @@ func TestCanBroadcastToTiDBRPCForTestRejectsInvalidEndpoints(t *testing.T) {
6060
// listener. Such targets must not take the broadcast path.
6161
require.False(t, canBroadcastToTiDBRPCForTest(context.Background(), []string{"", ""}))
6262
}
63+
64+
// BuildExecutorForTest builds stmt's executor tree. It is exported only for
65+
// external package tests that need to assert executor-build behavior.
66+
func BuildExecutorForTest(ctx context.Context, stmt *ExecStmt) error {
67+
_, err := stmt.buildExecutor(ctx)
68+
return err
69+
}

pkg/executor/benchmark_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func buildHashAggExecutor(ctx sessionctx.Context, src exec.Executor, schema *exp
6767
plan.SetSchema(schema)
6868
plan.Init(ctx.GetPlanCtx(), nil, 0)
6969
plan.SetChildren(nil)
70-
b := newExecutorBuilder(ctx, nil, nil)
70+
b := newExecutorBuilder(context.Background(), ctx, nil, nil)
7171
exec := b.build(plan)
7272
hashAgg := exec.(*aggregate.HashAggExec)
7373
hashAgg.SetChildren(0, src)
@@ -119,7 +119,7 @@ func buildStreamAggExecutor(ctx sessionctx.Context, srcExec exec.Executor, schem
119119
plan = sg
120120
}
121121

122-
b := newExecutorBuilder(ctx, nil, nil)
122+
b := newExecutorBuilder(context.Background(), ctx, nil, nil)
123123
return b.build(plan)
124124
}
125125

@@ -352,7 +352,7 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f
352352
plan = win
353353
}
354354

355-
b := newExecutorBuilder(ctx, nil, nil)
355+
b := newExecutorBuilder(context.Background(), ctx, nil, nil)
356356
exec := b.build(plan)
357357
return exec
358358
}
@@ -1253,7 +1253,7 @@ func prepare4IndexInnerHashJoin(tc *IndexJoinTestCase, outerDS *testutil.MockDat
12531253
keyOff2IdxOff[i] = i
12541254
}
12551255

1256-
readerBuilder, err := newExecutorBuilder(tc.Ctx, nil, nil).
1256+
readerBuilder, err := newExecutorBuilder(context.Background(), tc.Ctx, nil, nil).
12571257
newDataReaderBuilder(&mockPhysicalIndexReader{e: innerDS})
12581258
if err != nil {
12591259
return nil, err
@@ -1328,7 +1328,7 @@ func prepare4IndexMergeJoin(tc *IndexJoinTestCase, outerDS *testutil.MockDataSou
13281328
outerCompareFuncs = append(outerCompareFuncs, expression.GetCmpFunction(nil, outerJoinKeys[i], outerJoinKeys[i]))
13291329
}
13301330

1331-
readerBuilder, err := newExecutorBuilder(tc.Ctx, nil, nil).
1331+
readerBuilder, err := newExecutorBuilder(context.Background(), tc.Ctx, nil, nil).
13321332
newDataReaderBuilder(&mockPhysicalIndexReader{e: innerDS})
13331333
if err != nil {
13341334
return nil, err

pkg/executor/builder.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ import (
104104
// executorBuilder builds an Executor from a Plan.
105105
// The InfoSchema must not change during execution.
106106
type executorBuilder struct {
107+
// stmtCtx is the statement-scoped context.Context for executor build steps that
108+
// can block before Executor.Open/Next receives the execution context.
109+
stmtCtx context.Context
107110
sctx sessionctx.Context
108111
is infoschema.InfoSchema
109112
err error // err is set when there is error happened during Executor building process.
@@ -146,9 +149,10 @@ type CTEStorages struct {
146149
initErr error
147150
}
148151

149-
func newExecutorBuilder(sctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo) *executorBuilder {
152+
func newExecutorBuilder(stmtCtx context.Context, sctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo) *executorBuilder {
150153
txnManager := sessiontxn.GetTxnManager(sctx)
151154
return &executorBuilder{
155+
stmtCtx: stmtCtx,
152156
sctx: sctx,
153157
is: is,
154158
Ti: ti,
@@ -177,7 +181,7 @@ type MockExecutorBuilder struct {
177181
// NewMockExecutorBuilderForTest is ONLY used in test.
178182
func NewMockExecutorBuilderForTest(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo) *MockExecutorBuilder {
179183
return &MockExecutorBuilder{
180-
executorBuilder: newExecutorBuilder(ctx, is, ti),
184+
executorBuilder: newExecutorBuilder(context.Background(), ctx, is, ti),
181185
}
182186
}
183187

@@ -3315,8 +3319,8 @@ func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) exec.Executor {
33153319
// buildAnalyzeSamplingPushdown reads base count / modify_count from mysql.stats_meta
33163320
// while constructing column analyze tasks. Flush pending deltas first so the base
33173321
// values include pre-analyze changes and later delta dumps cannot double count them.
3318-
// TODO: Determine whether context.Background is appropriate here; if not, use the proper statement context.
3319-
if err := flushStatsDeltaForAnalyze(kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats), b.sctx, v); err != nil {
3322+
intest.Assert(b.stmtCtx != nil, "missing statement context for analyze")
3323+
if err := flushStatsDeltaForAnalyze(kv.WithInternalSourceType(b.stmtCtx, kv.InternalTxnStats), b.sctx, v); err != nil {
33203324
b.err = err
33213325
return nil
33223326
}

pkg/executor/builder_index_join_cleanup_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func TestBuildExecutorForIndexJoinHashJoinErrorCleansChildren(t *testing.T) {
8181
hashJoinPlan.SetSchema(expression.MergeSchema(lookupSchema, otherSchema))
8282
hashJoinPlan.SetChildren(lookupMockPlan, otherMockPlan)
8383

84-
execBuilder := newExecutorBuilder(ctx, nil, nil)
84+
execBuilder := newExecutorBuilder(context.Background(), ctx, nil, nil)
8585
execBuilder.forDataReaderBuilder = true
8686
execBuilder.dataReaderTS = 1
8787
readerBuilder, err := execBuilder.newDataReaderBuilder(hashJoinPlan)
@@ -106,7 +106,7 @@ func TestBuildExecutorForIndexJoinHashJoinErrorCleansChildren(t *testing.T) {
106106
func TestBuildCTEStorageProducerCleansStoragesOnRecursiveBuildError(t *testing.T) {
107107
ctx := mock.NewContext()
108108
ctx.GetSessionVars().StmtCtx.CTEStorageMap = map[int]*CTEStorages{}
109-
builder := newExecutorBuilder(ctx, nil, nil)
109+
builder := newExecutorBuilder(context.Background(), ctx, nil, nil)
110110

111111
stats := &property.StatsInfo{RowCount: 1}
112112
schema := expression.NewSchema(&expression.Column{

pkg/executor/coprocessor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func (h *CoprocessorDAGHandler) buildDAGExecutor(ctx context.Context, req *copro
199199
}
200200
plan = core.InjectExtraProjection(plan)
201201
// Build executor.
202-
b := newExecutorBuilder(h.sctx, is, nil)
202+
b := newExecutorBuilder(ctx, h.sctx, is, nil)
203203
return b.build(plan), nil
204204
}
205205

pkg/executor/executor_required_rows_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -741,7 +741,7 @@ func buildMergeJoinExec(ctx sessionctx.Context, joinType base.JoinType, innerSrc
741741
j.CompareFuncs = append(j.CompareFuncs, expression.GetCmpFunction(ctx.GetExprCtx(), j.LeftJoinKeys[i], j.RightJoinKeys[i]))
742742
}
743743

744-
b := newExecutorBuilder(ctx, nil, nil)
744+
b := newExecutorBuilder(context.Background(), ctx, nil, nil)
745745
return b.build(j)
746746
}
747747

pkg/executor/select.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -616,7 +616,7 @@ func init() {
616616
return nil, err
617617
}
618618

619-
e := newExecutorBuilder(sctx, is, nil)
619+
e := newExecutorBuilder(ctx, sctx, is, nil)
620620
executor := e.build(p)
621621
if e.err != nil {
622622
return nil, e.err

0 commit comments

Comments
 (0)