Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pkg/executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,18 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) (err error) {
}
buildStatsConcurrency = min(len(tasks), buildStatsConcurrency)

// Resolve once on the main goroutine before workers fan out;
// SessionVars.systems is not safe for concurrent lookup.
samplingStatsConcurrency, err := getBuildSamplingStatsConcurrency(e.Ctx())
if err != nil {
return err
}
for _, task := range tasks {
if task.colExec != nil {
task.colExec.samplingStatsConcurrency = samplingStatsConcurrency
}
}

// Start workers with channel to collect results.
taskCh := make(chan *analyzeTask, buildStatsConcurrency)
resultsCh := make(chan *statistics.AnalyzeResults, 1)
Expand Down
4 changes: 4 additions & 0 deletions pkg/executor/analyze_col.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type AnalyzeColumnsExec struct {
baseCount int64
baseModifyCnt int64

// Resolved on the main goroutine; SessionVars.systems is not safe for
// concurrent lookup across partition workers.
samplingStatsConcurrency int

memTracker *memory.Tracker
}

Expand Down
9 changes: 2 additions & 7 deletions pkg/executor/analyze_col_sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,9 @@ func (e *AnalyzeColumnsExec) analyzeColumnsPushDown(ctx context.Context, gp *gp.
specialIndexes = append(specialIndexes, idx)
}
}
samplingStatsConcurrency, err := getBuildSamplingStatsConcurrency(e.ctx)
if err != nil {
e.memTracker.Release(e.memTracker.BytesConsumed())
return &statistics.AnalyzeResults{Err: err, Job: e.job}
}
idxNDVPushDownCh := make(chan analyzeIndexNDVTotalResult, 1)
e.handleNDVForSpecialIndexes(ctx, specialIndexes, idxNDVPushDownCh, samplingStatsConcurrency)
count, hists, topNs, fmSketches, err := e.buildSamplingStats(ctx, gp, ranges, specialIndexesOffsets, idxNDVPushDownCh, samplingStatsConcurrency)
e.handleNDVForSpecialIndexes(ctx, specialIndexes, idxNDVPushDownCh, e.samplingStatsConcurrency)
count, hists, topNs, fmSketches, err := e.buildSamplingStats(ctx, gp, ranges, specialIndexesOffsets, idxNDVPushDownCh, e.samplingStatsConcurrency)
if err != nil {
e.memTracker.Release(e.memTracker.BytesConsumed())
return &statistics.AnalyzeResults{Err: err, Job: e.job}
Expand Down
58 changes: 58 additions & 0 deletions pkg/executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ package executor_test
import (
"context"
"fmt"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -337,3 +340,58 @@ func TestAnalyzeV2ReleaseColumnCollectorMemoryImmediately(t *testing.T) {
require.Equal(t, beforeCollectorMem.Load(), afterCollectorMem.Load())
require.Equal(t, beforeCollectorMem.Load(), beforeBytes.Load()-afterBytes.Load())
}

// TestAnalyzeSamplingConcurrencyResolvedOffWorker guards the structural fix
// for a "concurrent map read and map write" runtime fatal in
// SessionVars.GetSessionOrGlobalSystemVar: getBuildSamplingStatsConcurrency
// must never be called from an analyzeWorker goroutine, since workers share
// one SessionVars and would race on its unsynchronised `systems` map. The
// value is resolved on the main goroutine in AnalyzeExec.Next before
// workers fan out and threaded through AnalyzeColumnsExec.samplingStatsConcurrency.
func TestAnalyzeSamplingConcurrencyResolvedOffWorker(t *testing.T) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change is pretty straightforward. I think this test is overkill. I guess the only thing we need to do is add an intest.Assert() in analyzeColumnsPushDown to make sure samplingStatsConcurrency is initialized.

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@tidb_analyze_version = 2")
// Wide outer pool so the partition tasks run in parallel.
tk.MustExec("set @@tidb_build_stats_concurrency = 8")
tk.MustExec(`create table t (
id int primary key,
v int
) partition by hash(id) partitions 8`)

var (
mu sync.Mutex
totalCalls int
workerCallSeen bool
workerStack string
)
testfailpoint.EnableCall(t,
"github.com/pingcap/tidb/pkg/executor/getBuildSamplingStatsConcurrencyCalled",
func() {
buf := make([]byte, 8192)
n := runtime.Stack(buf, false)
stack := string(buf[:n])
mu.Lock()
defer mu.Unlock()
totalCalls++
if strings.Contains(stack, "(*AnalyzeExec).analyzeWorker") {
workerCallSeen = true
if workerStack == "" {
workerStack = stack
}
}
})

tk.MustExec("analyze table t")

mu.Lock()
defer mu.Unlock()
require.Equal(t, 1, totalCalls,
"sampling concurrency must be resolved exactly once per ANALYZE statement")
require.Falsef(t, workerCallSeen,
"getBuildSamplingStatsConcurrency was called from an analyzeWorker goroutine; "+
"workers must read AnalyzeColumnsExec.samplingStatsConcurrency instead, "+
"because SessionVars.systems is not safe for concurrent lookup.\nstack:\n%s",
workerStack)
}
2 changes: 2 additions & 0 deletions pkg/executor/analyze_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
Expand Down Expand Up @@ -84,6 +85,7 @@ func getBuildStatsConcurrency(ctx sessionctx.Context) (int, error) {
}

func getBuildSamplingStatsConcurrency(ctx sessionctx.Context) (int, error) {
failpoint.InjectCall("getBuildSamplingStatsConcurrencyCalled")
return getIntFromSessionVars(ctx, vardef.TiDBBuildSamplingStatsConcurrency)
}

Expand Down
Loading