Skip to content
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
90d112b
*: refactor traceevent to make some concept clear
tiancaiamao Jan 13, 2026
57952b2
rename traceevent.NewTrace to traceevent.NewTraceBuf
tiancaiamao Jan 13, 2026
16f1cad
make lint happy
tiancaiamao Jan 13, 2026
8fe28b1
fix CI & make bazel_prepare
tiancaiamao Jan 14, 2026
ae45bed
address comment
tiancaiamao Jan 15, 2026
2af9e30
make lint happy
tiancaiamao Jan 15, 2026
87b8e5e
fix test build
tiancaiamao Jan 15, 2026
b5dec50
make lint happy
tiancaiamao Jan 16, 2026
fd19e63
fix CI
tiancaiamao Jan 19, 2026
a999b83
make lint happy
tiancaiamao Jan 19, 2026
575eb19
make lint happy
tiancaiamao Jan 19, 2026
1271435
Merge branch 'master' into trace-buf
tiancaiamao Jan 19, 2026
e3f5024
address comment
tiancaiamao Jan 21, 2026
da43548
Update pkg/ddl/executor.go
tiancaiamao Jan 21, 2026
eb57a8a
fix build
tiancaiamao Jan 21, 2026
441167f
fix CI
tiancaiamao Jan 21, 2026
bf563f1
fix build
tiancaiamao Jan 21, 2026
fc8a822
stablize test
tiancaiamao Jan 21, 2026
d601503
Update pkg/util/traceevent/flightrecorder.go
tiancaiamao Jan 23, 2026
922c245
address comment
tiancaiamao Jan 23, 2026
98635b6
make bazel_prepare
tiancaiamao Jan 23, 2026
d4c6c36
Merge branch 'master' into trace-buf
tiancaiamao Feb 3, 2026
3a26d8a
fix CI
tiancaiamao Feb 3, 2026
773b92f
Merge remote-tracking branch 'origin/master' into trace-buf
tiancaiamao Mar 2, 2026
0c9b064
session, util/tracing, util/traceevent: fix trace id propagation for …
tiancaiamao Mar 2, 2026
d7833b0
Merge master into trace-buf
tiancaiamao Apr 3, 2026
98d7c39
Address review comments from PR #65562
tiancaiamao Apr 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6801,16 +6801,22 @@ func (e *executor) doDDLJob2(ctx sessionctx.Context, job *model.Job, args model.
// When fast create is enabled, we might merge multiple jobs into one, so do not
// depend on job.ID, use JobID from jobSubmitResult.
func (e *executor) DoDDLJobWrapper(ctx sessionctx.Context, jobW *JobWrapper) (resErr error) {
var traceID []byte
if traceCtx := ctx.GetTraceCtx(); traceCtx != nil {
r := tracing.StartRegion(traceCtx, "ddl.DoDDLJobWrapper")
defer r.End()
if v := tracing.GetTraceBuf(traceCtx); v != nil {
if traceBuf, ok := v.(*traceevent.TraceBuf); ok && traceBuf != nil {
traceID = traceBuf.GetTraceID()
}
}
}

job := jobW.Job
job.TraceInfo = &tracing.TraceInfo{
ConnectionID: ctx.GetSessionVars().ConnectionID,
SessionAlias: ctx.GetSessionVars().SessionAlias,
TraceID: traceevent.TraceIDFromContext(ctx.GetTraceCtx()),
TraceID: traceID,
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
if mci := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; mci != nil {
// In multiple schema change, we don't run the job.
Expand Down
12 changes: 6 additions & 6 deletions pkg/ddl/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ func (s *jobScheduler) schedule() error {
defer ticker.Stop()
s.mustReloadSchemas()

trace := traceevent.NewTrace()
ctx := tracing.WithFlightRecorder(s.schCtx, trace)
trace := traceevent.NewTraceBuf()
ctx := traceevent.WithTraceBuf(s.schCtx, trace)

for {
if err := s.schCtx.Err(); err != nil {
Expand Down Expand Up @@ -525,7 +525,7 @@ func (s *jobScheduler) deliveryJob(ctx context.Context, wk *worker, pool *worker
pool.put(wk)
}()

trace := traceevent.NewTrace()
trace := traceevent.NewTraceBuf()
jobCtx := s.getJobRunCtx(trace, jobW.ID, jobW.TraceInfo)
defer trace.DiscardOrFlush(jobCtx.ctx)

Expand Down Expand Up @@ -565,11 +565,11 @@ func (s *jobScheduler) deliveryJob(ctx context.Context, wk *worker, pool *worker
})
}

func (s *jobScheduler) getJobRunCtx(trace *traceevent.Trace, jobID int64, traceInfo *tracing.TraceInfo) *jobContext {
func (s *jobScheduler) getJobRunCtx(trace *traceevent.TraceBuf, jobID int64, traceInfo *tracing.TraceInfo) *jobContext {
ch, _ := s.ddlJobDoneChMap.Load(jobID)
newCtx := tracing.WithFlightRecorder(s.schCtx, trace)
newCtx := traceevent.WithTraceBuf(s.schCtx, trace)
if len(traceInfo.TraceID) > 0 {
newCtx = traceevent.ContextWithTraceID(newCtx, traceInfo.TraceID)
trace.SetTraceID(traceInfo.TraceID)
}

Copilot AI Jan 21, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refactoring is incomplete here. While the traceID is being set in the TraceBuf using trace.SetTraceID(traceInfo.TraceID), the context is not being updated for client-go integration.

In session.go (lines 2826-2829), both operations are performed:

  1. traceBuf.SetTraceID(traceID) - stores in TraceBuf
  2. ctx = trace.ContextWithTraceID(ctx, traceID) - stores in context for client-go

However, here only the first operation is done. The second operation is missing, which means the trace ID won't be properly propagated to client-go/TiKV requests made in this context.

You need to add the missing import and context update:

import "github.com/tikv/client-go/v2/trace"

And update the code to:

if len(traceInfo.TraceID) > 0 {
    trace.SetTraceID(traceInfo.TraceID)
    newCtx = trace.ContextWithTraceID(newCtx, traceInfo.TraceID)
}

Copilot uses AI. Check for mistakes.
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return &jobContext{
ctx: newCtx,
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/notifier/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ func (n *DDLNotifier) start() {

ctx := kv.WithInternalSourceType(n.ctx, kv.InternalDDLNotifier)
ctx = logutil.WithCategory(ctx, "ddl-notifier")
trace := traceevent.NewTrace()
ctx = tracing.WithFlightRecorder(ctx, trace)
trace := traceevent.NewTraceBuf()
ctx = traceevent.WithTraceBuf(ctx, trace)
ticker := time.NewTicker(n.pollInterval)
defer ticker.Stop()
for {
Expand Down
4 changes: 2 additions & 2 deletions pkg/dxf/framework/scheduler/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ func (nm *NodeManager) maintainLiveNodes(ctx context.Context, taskMgr TaskManage
func (nm *NodeManager) refreshNodesLoop(ctx context.Context, taskMgr TaskManager, slotMgr *SlotManager) {
ticker := time.NewTicker(nodesCheckInterval)
defer ticker.Stop()
trace := traceevent.NewTrace()
ctx = tracing.WithFlightRecorder(ctx, trace)
trace := traceevent.NewTraceBuf()
ctx = tracing.WithTraceBuf(ctx, trace)
for {
select {
case <-ctx.Done():
Expand Down
8 changes: 4 additions & 4 deletions pkg/dxf/framework/scheduler/scheduler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ func (sm *Manager) scheduleTaskLoop() {
sm.logger.Info("schedule task loop start")
ticker := time.NewTicker(CheckTaskRunningInterval)
defer ticker.Stop()
trace := traceevent.NewTrace()
ctx := tracing.WithFlightRecorder(sm.ctx, trace)
trace := traceevent.NewTraceBuf()
ctx := traceevent.WithTraceBuf(sm.ctx, trace)
for {
select {
case <-sm.ctx.Done():
Expand Down Expand Up @@ -491,8 +491,8 @@ func (sm *Manager) collectLoop() {
defer func() {
metrics.Unregister(sm.metricCollector)
}()
trace := traceevent.NewTrace()
ctx := tracing.WithFlightRecorder(sm.ctx, trace)
trace := traceevent.NewTraceBuf()
ctx := traceevent.WithTraceBuf(sm.ctx, trace)
for {
select {
case <-sm.ctx.Done():
Expand Down
6 changes: 3 additions & 3 deletions pkg/dxf/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Manager struct {
logger *zap.Logger
slotManager *slotManager
nodeResource *proto.NodeResource
trace *traceevent.Trace
trace *traceevent.TraceBuf
}

// NewManager creates a new task executor Manager.
Expand All @@ -89,10 +89,10 @@ func NewManager(ctx context.Context, store kv.Storage, id string, taskTable Task
logger: logger,
slotManager: newSlotManager(resource.TotalCPU),
nodeResource: resource,
trace: traceevent.NewTrace(),
trace: traceevent.NewTraceBuf(),
}

ctx = tracing.WithFlightRecorder(ctx, m.trace)
ctx = traceevent.WithTraceBuf(ctx, m.trace)
m.ctx, m.cancel = context.WithCancel(ctx)
m.mu.taskExecutors = make(map[int64]TaskExecutor)

Expand Down
5 changes: 2 additions & 3 deletions pkg/dxf/framework/taskexecutor/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/traceevent"
"github.com/pingcap/tidb/pkg/util/tracing"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -273,8 +272,8 @@ func (e *BaseTaskExecutor) Run() {
metering.UnregisterRecorder(e.GetTaskBase().ID)
}()

trace := traceevent.NewTrace()
ctx := tracing.WithFlightRecorder(e.ctx, trace)
trace := traceevent.NewTraceBuf()
ctx := traceevent.WithTraceBuf(e.ctx, trace)
// task executor occupies resources, if there's no subtask to run for 10s,
// we release the resources so that other tasks can use them.
// 300ms + 600ms + 1.2s + 2s * 4 = 10.1s
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,8 +1080,8 @@ func (cc *clientConn) Run(ctx context.Context) {
cc.addConnMetrics()

var traceInfo *tracing.TraceInfo
trace := traceevent.NewTrace()
ctx = tracing.WithFlightRecorder(ctx, trace)
traceBuf := traceevent.NewTraceBuf()
ctx = traceevent.WithTraceBuf(ctx, traceBuf)

// Usually, client connection status changes between [dispatching] <=> [reading].
// When some event happens, server may notify this client connection by setting
Expand Down Expand Up @@ -1176,7 +1176,7 @@ func (cc *clientConn) Run(ctx context.Context) {
err = cc.dispatch(ctx, data)
cc.ctx.GetSessionVars().ClearAlloc(&cc.chunkAlloc, err != nil)
cc.chunkAlloc.Reset()
trace.DiscardOrFlush(ctx)
traceBuf.DiscardOrFlush(ctx)

if err != nil {
cc.audit(context.Background(), plugin.Error) // tell the plugin API there was a dispatch error
Expand Down
11 changes: 7 additions & 4 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1849,9 +1849,9 @@ func (s *session) getOomAlarmVariablesInfo() sessmgr.OOMAlarmVariablesInfo {
}

func (s *session) ExecuteInternal(ctx context.Context, sql string, args ...any) (rs sqlexec.RecordSet, err error) {
if sink := tracing.GetSink(ctx); sink == nil {
trace := traceevent.NewTrace()
ctx = tracing.WithFlightRecorder(ctx, trace)
if sink := tracing.GetTraceBuf(ctx); sink == nil {
trace := traceevent.NewTraceBuf()
ctx = traceevent.WithTraceBuf(ctx, trace)
defer trace.DiscardOrFlush(ctx)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Use typed TraceBuf lookup for the fallback guard.

Line 1835 checks raw context presence, not typed *traceevent.TraceBuf. If a mismatched value is present, fallback creation is skipped and events can be dropped.

🔧 Proposed fix
 func (s *session) ExecuteInternal(ctx context.Context, sql string, args ...any) (rs sqlexec.RecordSet, err error) {
-	if sink := tracing.GetTraceBuf(ctx); sink == nil {
+	if traceevent.GetTraceBuf(ctx) == nil {
 		trace := traceevent.NewTraceBuf()
 		ctx = traceevent.WithTraceBuf(ctx, trace)
 		defer trace.DiscardOrFlush(ctx)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if sink := tracing.GetTraceBuf(ctx); sink == nil {
trace := traceevent.NewTraceBuf()
ctx = traceevent.WithTraceBuf(ctx, trace)
defer trace.DiscardOrFlush(ctx)
if traceevent.GetTraceBuf(ctx) == nil {
trace := traceevent.NewTraceBuf()
ctx = traceevent.WithTraceBuf(ctx, trace)
defer trace.DiscardOrFlush(ctx)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/session/session.go` around lines 1835 - 1838, The current guard uses
tracing.GetTraceBuf(ctx) to decide whether to create a fallback trace buffer,
but that can be fooled by a mismatched context value; change the check to ensure
the value is the typed *traceevent.TraceBuf. Specifically, replace the nil check
with a typed lookup/assertion (e.g. tb, ok :=
traceevent.GetTraceBuf(ctx).(*traceevent.TraceBuf); if !ok || tb == nil { ... })
so that NewTraceBuf/WithTraceBuf/DiscardOrFlush only run when no real
*traceevent.TraceBuf is present.


// A developer debugging event so we can see what trace is missing!
Expand Down Expand Up @@ -2822,7 +2822,10 @@ func resetStmtTraceID(ctx context.Context, se *session) (context.Context, []byte
// The trace ID is generated from transaction start_ts and statement count
startTS := se.sessionVars.TxnCtx.StartTS
stmtCount := uint64(se.sessionVars.TxnCtx.StatementCount)
traceID := traceevent.GenerateTraceID(ctx, startTS, stmtCount)
traceID := traceevent.GenerateTraceID(startTS, stmtCount)
if traceBuf := traceevent.GetTraceBuf(ctx); traceBuf != nil {
traceBuf.SetTraceID(traceID)
}
ctx = trace.ContextWithTraceID(ctx, traceID)
se.currentCtx = ctx
// Store trace ID for next statement
Expand Down
11 changes: 3 additions & 8 deletions pkg/util/traceevent/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func handleTraceControlExtractor(ctx context.Context) trace.TraceControlFlags {
flags := trace.TraceControlFlags(0)

// Map TiDB categories to TiKV categories regardless of whether a Trace sink is present.
enabledCategories := tracing.GetEnabledCategories()
enabledCategories := GetEnabledCategories()
if enabledCategories&tracing.TiKVRequest != 0 {
flags = flags.With(trace.FlagTiKVCategoryRequest)
}
Expand All @@ -68,13 +68,8 @@ func handleTraceControlExtractor(ctx context.Context) trace.TraceControlFlags {
flags = flags.With(trace.FlagTiKVCategoryReadDetails)
}
Comment thread
tiancaiamao marked this conversation as resolved.
Outdated

// Extract Trace object from context
sink := tracing.GetSink(ctx)
if sink == nil {
return flags
}
t, ok := sink.(*Trace)
if !ok {
t := GetTraceBuf(ctx)
if t == nil {
return flags
}

Expand Down
76 changes: 38 additions & 38 deletions pkg/util/traceevent/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func TestTraceControlExtractor(t *testing.T) {

// Test with nil context (no sink)
t.Run("NoSink", func(t *testing.T) {
oldCategories := tracing.GetEnabledCategories()
defer tracing.SetCategories(oldCategories)
tracing.SetCategories(tracing.TiKVRequest)
oldCategories := GetEnabledCategories()
defer fr.SetCategories(oldCategories)
fr.SetCategories(tracing.TiKVRequest)

ctx := context.Background()
flags := handleTraceControlExtractor(ctx)
Expand All @@ -45,15 +45,15 @@ func TestTraceControlExtractor(t *testing.T) {

// Test with keep=false
t.Run("KeepFalse", func(t *testing.T) {
tr := NewTrace()
ctx := tracing.WithFlightRecorder(context.Background(), tr)
tr := NewTraceBuf()
ctx := tracing.WithTraceBuf(context.Background(), tr)

// Save old categories and restore after test
oldCategories := tracing.GetEnabledCategories()
defer tracing.SetCategories(oldCategories)
oldCategories := GetEnabledCategories()
defer fr.SetCategories(oldCategories)

// Enable only TiKVRequest
tracing.SetCategories(tracing.TiKVRequest)
fr.SetCategories(tracing.TiKVRequest)

flags := handleTraceControlExtractor(ctx)
require.False(t, flags.Has(trace.FlagImmediateLog), "immediate log should not be set when keep=false")
Expand All @@ -62,17 +62,17 @@ func TestTraceControlExtractor(t *testing.T) {

// Test with keep=true
t.Run("KeepTrue", func(t *testing.T) {
tr := NewTrace()
tr := NewTraceBuf()
// This sets keep=true
tr.bits = GetFlightRecorder().truthTable[0]
ctx := tracing.WithFlightRecorder(context.Background(), tr)
ctx := tracing.WithTraceBuf(context.Background(), tr)

// Save old categories and restore after test
oldCategories := tracing.GetEnabledCategories()
defer tracing.SetCategories(oldCategories)
oldCategories := GetEnabledCategories()
defer fr.SetCategories(oldCategories)

// Enable only TiKVRequest
tracing.SetCategories(tracing.TiKVRequest)
fr.SetCategories(tracing.TiKVRequest)

flags := handleTraceControlExtractor(ctx)
require.True(t, flags.Has(trace.FlagImmediateLog), "immediate log should be set when keep=true")
Expand All @@ -81,14 +81,14 @@ func TestTraceControlExtractor(t *testing.T) {

// Test category mapping: TiKVRequest
t.Run("CategoryTiKVRequest", func(t *testing.T) {
tr := NewTrace()
ctx := tracing.WithFlightRecorder(context.Background(), tr)
tr := NewTraceBuf()
ctx := tracing.WithTraceBuf(context.Background(), tr)

// Save old categories and restore after test
oldCategories := tracing.GetEnabledCategories()
defer tracing.SetCategories(oldCategories)
oldCategories := GetEnabledCategories()
defer fr.SetCategories(oldCategories)

tracing.SetCategories(tracing.TiKVRequest)
fr.SetCategories(tracing.TiKVRequest)

flags := handleTraceControlExtractor(ctx)
require.True(t, flags.Has(trace.FlagTiKVCategoryRequest))
Expand All @@ -98,14 +98,14 @@ func TestTraceControlExtractor(t *testing.T) {

// Test category mapping: TiKVWriteDetails
t.Run("CategoryTiKVWriteDetails", func(t *testing.T) {
tr := NewTrace()
ctx := tracing.WithFlightRecorder(context.Background(), tr)
tr := NewTraceBuf()
ctx := tracing.WithTraceBuf(context.Background(), tr)

// Save old categories and restore after test
oldCategories := tracing.GetEnabledCategories()
defer tracing.SetCategories(oldCategories)
oldCategories := GetEnabledCategories()
defer fr.SetCategories(oldCategories)

tracing.SetCategories(tracing.TiKVWriteDetails)
fr.SetCategories(tracing.TiKVWriteDetails)

flags := handleTraceControlExtractor(ctx)
require.False(t, flags.Has(trace.FlagTiKVCategoryRequest))
Expand All @@ -115,14 +115,14 @@ func TestTraceControlExtractor(t *testing.T) {

// Test category mapping: TiKVReadDetails
t.Run("CategoryTiKVReadDetails", func(t *testing.T) {
tr := NewTrace()
ctx := tracing.WithFlightRecorder(context.Background(), tr)
tr := NewTraceBuf()
ctx := tracing.WithTraceBuf(context.Background(), tr)

// Save old categories and restore after test
oldCategories := tracing.GetEnabledCategories()
defer tracing.SetCategories(oldCategories)
oldCategories := GetEnabledCategories()
defer fr.SetCategories(oldCategories)

tracing.SetCategories(tracing.TiKVReadDetails)
fr.SetCategories(tracing.TiKVReadDetails)

flags := handleTraceControlExtractor(ctx)
require.False(t, flags.Has(trace.FlagTiKVCategoryRequest))
Expand All @@ -132,17 +132,17 @@ func TestTraceControlExtractor(t *testing.T) {

// Test multiple categories
t.Run("MultipleCategoriesAndKeep", func(t *testing.T) {
tr := NewTrace()
tr := NewTraceBuf()
// Set keep=true
tr.bits = GetFlightRecorder().truthTable[0]
ctx := tracing.WithFlightRecorder(context.Background(), tr)
ctx := tracing.WithTraceBuf(context.Background(), tr)

// Save old categories and restore after test
oldCategories := tracing.GetEnabledCategories()
defer tracing.SetCategories(oldCategories)
oldCategories := GetEnabledCategories()
defer fr.SetCategories(oldCategories)

// Enable all three TiKV categories
tracing.SetCategories(tracing.TiKVRequest | tracing.TiKVWriteDetails | tracing.TiKVReadDetails)
fr.SetCategories(tracing.TiKVRequest | tracing.TiKVWriteDetails | tracing.TiKVReadDetails)

flags := handleTraceControlExtractor(ctx)
require.True(t, flags.Has(trace.FlagImmediateLog), "immediate log should be set")
Expand All @@ -153,14 +153,14 @@ func TestTraceControlExtractor(t *testing.T) {

// Test concurrent access (should not race)
t.Run("ConcurrentAccess", func(t *testing.T) {
tr := NewTrace()
ctx := tracing.WithFlightRecorder(context.Background(), tr)
tr := NewTraceBuf()
ctx := tracing.WithTraceBuf(context.Background(), tr)

// Save old categories and restore after test
oldCategories := tracing.GetEnabledCategories()
defer tracing.SetCategories(oldCategories)
oldCategories := GetEnabledCategories()
defer fr.SetCategories(oldCategories)

tracing.SetCategories(tracing.TiKVRequest)
fr.SetCategories(tracing.TiKVRequest)

var wg sync.WaitGroup
// Run multiple concurrent extractors
Expand Down
Loading