Skip to content

Commit 5a5d3e8

Browse files
committed
executor: use statement context for pre-analyze stats flush (pingcap#68146)
Backport pingcap#68146 to release-8.5.
1 parent dd0e7c8 commit 5a5d3e8

14 files changed

Lines changed: 342 additions & 318 deletions

pkg/executor/adapter.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
378378
}
379379

380380
if executor == nil {
381-
b := newExecutorBuilder(a.Ctx, a.InfoSchema, a.Ti)
381+
b := newExecutorBuilder(ctx, a.Ctx, a.InfoSchema, a.Ti)
382382
executor = b.build(a.Plan)
383383
if b.err != nil {
384384
return nil, b.err
@@ -608,7 +608,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
608608
}
609609
ctx = a.observeStmtBeginForTopSQL(ctx)
610610

611-
e, err := a.buildExecutor()
611+
e, err := a.buildExecutor(ctx)
612612
if err != nil {
613613
return nil, err
614614
}
@@ -1295,7 +1295,7 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, lockErr error
12951295
a.resetPhaseDurations()
12961296

12971297
a.inheritContextFromExecuteStmt()
1298-
e, err := a.buildExecutor()
1298+
e, err := a.buildExecutor(ctx)
12991299
if err != nil {
13001300
return nil, err
13011301
}
@@ -1321,28 +1321,28 @@ type pessimisticTxn interface {
13211321
}
13221322

13231323
// buildExecutor build an executor from plan, prepared statement may need additional procedure.
1324-
func (a *ExecStmt) buildExecutor() (exec.Executor, error) {
1324+
func (a *ExecStmt) buildExecutor(ctx context.Context) (exec.Executor, error) {
13251325
defer func(start time.Time) { a.phaseBuildDurations[0] += time.Since(start) }(time.Now())
1326-
ctx := a.Ctx
1327-
stmtCtx := ctx.GetSessionVars().StmtCtx
1326+
sctx := a.Ctx
1327+
stmtCtx := sctx.GetSessionVars().StmtCtx
13281328
if _, ok := a.Plan.(*plannercore.Execute); !ok {
13291329
if stmtCtx.Priority == mysql.NoPriority && a.LowerPriority {
13301330
stmtCtx.Priority = kv.PriorityLow
13311331
}
13321332
}
1333-
if _, ok := a.Plan.(*plannercore.Analyze); ok && ctx.GetSessionVars().InRestrictedSQL {
1334-
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
1333+
if _, ok := a.Plan.(*plannercore.Analyze); ok && sctx.GetSessionVars().InRestrictedSQL {
1334+
sctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
13351335
}
13361336

1337-
b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti)
1337+
b := newExecutorBuilder(ctx, sctx, a.InfoSchema, a.Ti)
13381338
e := b.build(a.Plan)
13391339
if b.err != nil {
13401340
return nil, errors.Trace(b.err)
13411341
}
13421342

13431343
failpoint.Inject("assertTxnManagerAfterBuildExecutor", func() {
13441344
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerAfterBuildExecutor", true)
1345-
sessiontxn.AssertTxnManagerInfoSchema(b.ctx, b.is)
1345+
sessiontxn.AssertTxnManagerInfoSchema(b.sctx, b.is)
13461346
})
13471347

13481348
// ExecuteExec is not a real Executor, we only use it to build another Executor from a prepared statement.
@@ -1352,7 +1352,7 @@ func (a *ExecStmt) buildExecutor() (exec.Executor, error) {
13521352
return nil, err
13531353
}
13541354
if executorExec.lowerPriority {
1355-
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
1355+
sctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
13561356
}
13571357
e = executorExec.stmtExec
13581358
}

pkg/executor/analyze_col_v2.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ func (e *AnalyzeColumnsExecV2) handleNDVForSpecialIndexes(indexInfos []*model.In
445445
}
446446
}
447447
}()
448-
tasks := e.buildSubIndexJobForSpecialIndex(indexInfos)
448+
tasks := e.buildSubIndexJobForSpecialIndex(context.Background(), indexInfos)
449449
taskCh := make(chan *analyzeTask, len(tasks))
450450
for _, task := range tasks {
451451
AddNewAnalyzeJob(e.ctx, task.job)
@@ -526,11 +526,11 @@ func (e *AnalyzeColumnsExecV2) subIndexWorkerForNDV(taskCh chan *analyzeTask, re
526526

527527
// buildSubIndexJobForSpecialIndex builds sub index pushed down task to calculate the NDV information for indexes containing virtual column.
528528
// This is because we cannot push the calculation of the virtual column down to the tikv side.
529-
func (e *AnalyzeColumnsExecV2) buildSubIndexJobForSpecialIndex(indexInfos []*model.IndexInfo) []*analyzeTask {
529+
func (e *AnalyzeColumnsExecV2) buildSubIndexJobForSpecialIndex(ctx context.Context, indexInfos []*model.IndexInfo) []*analyzeTask {
530530
_, offset := timeutil.Zone(e.ctx.GetSessionVars().Location())
531531
tasks := make([]*analyzeTask, 0, len(indexInfos))
532532
sc := e.ctx.GetSessionVars().StmtCtx
533-
concurrency := adaptiveAnlayzeDistSQLConcurrency(context.Background(), e.ctx)
533+
concurrency := adaptiveAnlayzeDistSQLConcurrency(ctx, e.ctx)
534534
for _, indexInfo := range indexInfos {
535535
base := baseAnalyzeExec{
536536
ctx: e.ctx,

pkg/executor/analyze_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"testing"
2222

2323
"github.com/pingcap/tidb/pkg/domain"
24+
"github.com/pingcap/tidb/pkg/executor"
2425
"github.com/pingcap/tidb/pkg/infoschema"
2526
"github.com/pingcap/tidb/pkg/parser/model"
2627
"github.com/pingcap/tidb/pkg/session"
@@ -183,3 +184,22 @@ func TestAnalyzePartitionTableByConcurrencyInDynamic(t *testing.T) {
183184
tk.MustQuery("show stats_topn where partition_name = 'global' and table_name = 't'").CheckAt([]int{5, 6}, expected)
184185
}
185186
}
187+
188+
func TestBuildAnalyzePreFlushUsesStatementContext(t *testing.T) {
189+
store := testkit.CreateMockStore(t)
190+
tk := testkit.NewTestKit(t, store)
191+
tk.MustExec("use test")
192+
tk.MustExec("create table t_pre_analyze_flush_ctx(a int, b int, key idx_b(b))")
193+
tk.MustExec("insert into t_pre_analyze_flush_ctx values (1, 1), (2, 2)")
194+
195+
stmtNodes, err := tk.Session().Parse(context.Background(), "analyze table t_pre_analyze_flush_ctx all columns")
196+
require.NoError(t, err)
197+
require.Len(t, stmtNodes, 1)
198+
stmt, err := (&executor.Compiler{Ctx: tk.Session()}).Compile(context.Background(), stmtNodes[0])
199+
require.NoError(t, err)
200+
201+
ctx, cancel := context.WithCancel(context.Background())
202+
cancel()
203+
err = executor.BuildExecutorForTest(ctx, stmt)
204+
require.ErrorIs(t, err, context.Canceled)
205+
}

pkg/executor/analyze_utils_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ func TestCollectStatsDeltaFlushObjectsForAnalyzeDottedNames(t *testing.T) {
5454
}, targets)
5555
}
5656

57-
func TestCanBroadcastToTiDBRPCForTestRejectsInvalidEndpoints(t *testing.T) {
58-
// Regression for next-gen realcluster tests: in-process domains can register
59-
// multiple server infos with an empty IP/default :10080 but no TiDB RPC
60-
// listener. Such targets must not take the broadcast path.
61-
require.False(t, canBroadcastToTiDBRPCForTest(context.Background(), []string{"", ""}))
57+
// BuildExecutorForTest builds stmt's executor tree. It is exported only for
58+
// external package tests that need to assert executor-build behavior.
59+
func BuildExecutorForTest(ctx context.Context, stmt *ExecStmt) error {
60+
_, err := stmt.buildExecutor(ctx)
61+
return err
6262
}

pkg/executor/benchmark_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func buildHashAggExecutor(ctx sessionctx.Context, src exec.Executor, schema *exp
6868
plan.SetSchema(schema)
6969
plan.Init(ctx.GetPlanCtx(), nil, 0)
7070
plan.SetChildren(nil)
71-
b := newExecutorBuilder(ctx, nil, nil)
71+
b := newExecutorBuilder(context.Background(), ctx, nil, nil)
7272
exec := b.build(plan)
7373
hashAgg := exec.(*aggregate.HashAggExec)
7474
hashAgg.SetChildren(0, src)
@@ -120,7 +120,7 @@ func buildStreamAggExecutor(ctx sessionctx.Context, srcExec exec.Executor, schem
120120
plan = sg
121121
}
122122

123-
b := newExecutorBuilder(ctx, nil, nil)
123+
b := newExecutorBuilder(context.Background(), ctx, nil, nil)
124124
return b.build(plan)
125125
}
126126

@@ -353,7 +353,7 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f
353353
plan = win
354354
}
355355

356-
b := newExecutorBuilder(ctx, nil, nil)
356+
b := newExecutorBuilder(context.Background(), ctx, nil, nil)
357357
exec := b.build(plan)
358358
return exec
359359
}
@@ -1254,7 +1254,7 @@ func prepare4IndexInnerHashJoin(tc *IndexJoinTestCase, outerDS *testutil.MockDat
12541254
keyOff2IdxOff[i] = i
12551255
}
12561256

1257-
readerBuilder, err := newExecutorBuilder(tc.Ctx, nil, nil).
1257+
readerBuilder, err := newExecutorBuilder(context.Background(), tc.Ctx, nil, nil).
12581258
newDataReaderBuilder(&mockPhysicalIndexReader{e: innerDS})
12591259
if err != nil {
12601260
return nil, err
@@ -1328,7 +1328,7 @@ func prepare4IndexMergeJoin(tc *IndexJoinTestCase, outerDS *testutil.MockDataSou
13281328
outerCompareFuncs = append(outerCompareFuncs, expression.GetCmpFunction(nil, outerJoinKeys[i], outerJoinKeys[i]))
13291329
}
13301330

1331-
readerBuilder, err := newExecutorBuilder(tc.Ctx, nil, nil).
1331+
readerBuilder, err := newExecutorBuilder(context.Background(), tc.Ctx, nil, nil).
13321332
newDataReaderBuilder(&mockPhysicalIndexReader{e: innerDS})
13331333
if err != nil {
13341334
return nil, err

pkg/executor/brie.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ func (bq *brieQueue) clearTask(sc *stmtctx.StatementContext) {
235235
}
236236

237237
func (b *executorBuilder) parseTSString(ts string) (uint64, error) {
238-
sc := stmtctx.NewStmtCtxWithTimeZone(b.ctx.GetSessionVars().Location())
238+
sc := stmtctx.NewStmtCtxWithTimeZone(b.sctx.GetSessionVars().Location())
239239
t, err := types.ParseTime(sc.TypeCtx(), ts, mysql.TypeTimestamp, types.MaxFsp)
240240
if err != nil {
241241
return 0, err
@@ -250,27 +250,27 @@ func (b *executorBuilder) parseTSString(ts string) (uint64, error) {
250250
func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema) exec.Executor {
251251
if s.Kind == ast.BRIEKindShowBackupMeta {
252252
return execOnce(&showMetaExec{
253-
BaseExecutor: exec.NewBaseExecutor(b.ctx, schema, 0),
253+
BaseExecutor: exec.NewBaseExecutor(b.sctx, schema, 0),
254254
showConfig: buildShowMetadataConfigFrom(s),
255255
})
256256
}
257257

258258
if s.Kind == ast.BRIEKindShowQuery {
259259
return execOnce(&showQueryExec{
260-
BaseExecutor: exec.NewBaseExecutor(b.ctx, schema, 0),
260+
BaseExecutor: exec.NewBaseExecutor(b.sctx, schema, 0),
261261
targetID: uint64(s.JobID),
262262
})
263263
}
264264

265265
if s.Kind == ast.BRIEKindCancelJob {
266266
return &cancelJobExec{
267-
BaseExecutor: exec.NewBaseExecutor(b.ctx, schema, 0),
267+
BaseExecutor: exec.NewBaseExecutor(b.sctx, schema, 0),
268268
targetID: uint64(s.JobID),
269269
}
270270
}
271271

272272
e := &BRIEExec{
273-
BaseExecutor: exec.NewBaseExecutor(b.ctx, schema, 0),
273+
BaseExecutor: exec.NewBaseExecutor(b.sctx, schema, 0),
274274
info: &brieTaskInfo{
275275
kind: s.Kind,
276276
},

0 commit comments

Comments
 (0)