Skip to content

Commit 6a50d22

Browse files
committed
executor: estimate analyze NDV from singleton sketches
1 parent 7277bae commit 6a50d22

4 files changed

Lines changed: 180 additions & 16 deletions

File tree

pkg/executor/analyze_col_sampling.go

Lines changed: 142 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
"github.com/pingcap/tidb/pkg/tablecodec"
3636
"github.com/pingcap/tidb/pkg/types"
3737
"github.com/pingcap/tidb/pkg/util"
38-
"github.com/pingcap/tidb/pkg/util/channel"
3938
"github.com/pingcap/tidb/pkg/util/chunk"
4039
"github.com/pingcap/tidb/pkg/util/codec"
4140
"github.com/pingcap/tidb/pkg/util/collate"
@@ -222,6 +221,17 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
222221
for range l {
223222
rootRowCollector.Base().FMSketches = append(rootRowCollector.Base().FMSketches, statistics.NewFMSketch(statistics.MaxSketchSize))
224223
}
224+
needNDVSampling := statistics.NeedNDVSampling(e.analyzePB.ColReq.GetNdvRate())
225+
var nodeNDVSketches [][]*statistics.FMSketch
226+
var nodeSingletonSketches [][]*statistics.FMSketch
227+
var nodeSampleSizes []int
228+
var nodeSketchSampleCounts []int
229+
if needNDVSampling {
230+
nodeNDVSketches = make([][]*statistics.FMSketch, 0, samplingStatsConcurrency)
231+
nodeSingletonSketches = make([][]*statistics.FMSketch, 0, samplingStatsConcurrency)
232+
nodeSampleSizes = make([]int, 0, samplingStatsConcurrency)
233+
nodeSketchSampleCounts = make([]int, 0, samplingStatsConcurrency)
234+
}
225235

226236
sc := e.ctx.GetSessionVars().StmtCtx
227237

@@ -281,6 +291,12 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
281291
printAnalyzeMergeCollectorLog(oldRootCollectorCount, newRootCollectorCount,
282292
mergeResult.collector.Base().Count, e.tableID.TableID, e.tableID.PartitionID, e.tableID.IsPartitionTable(),
283293
"merge subMergeWorker in AnalyzeColumnsExec", -1)
294+
if needNDVSampling {
295+
nodeNDVSketches = append(nodeNDVSketches, copySketches(mergeResult.collector.Base().FMSketches))
296+
nodeSingletonSketches = append(nodeSingletonSketches, copySketches(mergeResult.collector.Base().SingletonSketches()))
297+
nodeSampleSizes = append(nodeSampleSizes, mergeResult.collector.Base().Samples.Len())
298+
nodeSketchSampleCounts = append(nodeSketchSampleCounts, int(mergeResult.collector.Base().SketchSampleCount))
299+
}
284300
e.memTracker.Consume(rootRowCollector.Base().MemSize - oldRootCollectorSize - mergeResult.collector.Base().MemSize)
285301
mergeResult.collector.DestroyAndPutToPool()
286302
}
@@ -304,6 +320,27 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
304320
return 0, nil, nil, nil, err
305321
}
306322

323+
colLen := len(e.colsInfo)
324+
indexPushedDownResult := <-idxNDVPushDownCh
325+
if indexPushedDownResult.err != nil {
326+
return 0, nil, nil, nil, indexPushedDownResult.err
327+
}
328+
for _, offset := range indexesWithVirtualColOffsets {
329+
ret := indexPushedDownResult.results[e.indexes[offset].ID]
330+
rootRowCollector.Base().NullCount[colLen+offset] = ret.Count
331+
rootRowCollector.Base().FMSketches[colLen+offset] = ret.Ars[0].Fms[0]
332+
}
333+
isSpecialIndex := make([]bool, len(e.indexes))
334+
for _, offset := range indexesWithVirtualColOffsets {
335+
if offset >= 0 && offset < len(isSpecialIndex) {
336+
isSpecialIndex[offset] = true
337+
}
338+
}
339+
estimateNDVs := make([]int64, l)
340+
if needNDVSampling {
341+
estimateNDVs = estimateNDVsBySketch(rootRowCollector, nodeNDVSketches, nodeSingletonSketches, nodeSampleSizes, nodeSketchSampleCounts, colLen, isSpecialIndex)
342+
}
343+
307344
// Decode the data from sample collectors.
308345
virtualColIdx := buildVirtualColumnIndex(e.schemaForVirtualColEval, e.colsInfo)
309346
// Filling virtual columns is necessary here because these samples are used to build statistics for indexes that constructed by virtual columns.
@@ -335,7 +372,6 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
335372
return 0, nil, nil, nil, err
336373
}
337374
}
338-
colLen := len(e.colsInfo)
339375
// The order of the samples are broken when merging samples from sub-collectors.
340376
// So now we need to sort the samples according to the handle in order to calculate correlation.
341377
slices.SortFunc(rootRowCollector.Base().Samples, func(i, j *statistics.ReservoirRowSampleItem) int {
@@ -358,7 +394,7 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
358394
// Start workers to build stats.
359395
for range samplingStatsConcurrency {
360396
e.samplingBuilderWg.Run(func() {
361-
e.subBuildWorker(ctx, buildResultChan, buildTaskChan, hists, topns, exitCh)
397+
e.subBuildWorker(ctx, buildResultChan, buildTaskChan, hists, topns, estimateNDVs, exitCh)
362398
})
363399
}
364400
// Generate tasks for building stats.
@@ -373,18 +409,6 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
373409
fmSketches = append(fmSketches, rootRowCollector.Base().FMSketches[i])
374410
}
375411

376-
indexPushedDownResult := <-idxNDVPushDownCh
377-
if indexPushedDownResult.err != nil {
378-
close(exitCh)
379-
channel.Clear(buildResultChan)
380-
return 0, nil, nil, nil, indexPushedDownResult.err
381-
}
382-
for _, offset := range indexesWithVirtualColOffsets {
383-
ret := indexPushedDownResult.results[e.indexes[offset].ID]
384-
rootRowCollector.Base().NullCount[colLen+offset] = ret.Count
385-
rootRowCollector.Base().FMSketches[colLen+offset] = ret.Ars[0].Fms[0]
386-
}
387-
388412
// Generate tasks for building stats for indexes.
389413
for i, idx := range e.indexes {
390414
buildTaskChan <- &samplingBuildTask{
@@ -713,7 +737,7 @@ func drainPendingSamplingMergeTasks(taskCh <-chan []byte, memTracker *memory.Tra
713737
}
714738
}
715739

716-
func (e *AnalyzeColumnsExec) subBuildWorker(ctx context.Context, resultCh chan error, taskCh chan *samplingBuildTask, hists []*statistics.Histogram, topns []*statistics.TopN, exitCh chan struct{}) {
740+
func (e *AnalyzeColumnsExec) subBuildWorker(ctx context.Context, resultCh chan error, taskCh chan *samplingBuildTask, hists []*statistics.Histogram, topns []*statistics.TopN, estimateNDVs []int64, exitCh chan struct{}) {
717741
defer func() {
718742
if r := recover(); r != nil {
719743
logutil.BgLogger().Warn("analyze subBuildWorker panicked", zap.Any("recover", r), zap.Stack("stack"))
@@ -887,6 +911,9 @@ workLoop:
887911
releaseCollectorMemory()
888912
continue
889913
}
914+
if hist != nil && task.slicePos < len(estimateNDVs) && estimateNDVs[task.slicePos] > hist.NDV {
915+
hist.NDV = estimateNDVs[task.slicePos]
916+
}
890917
finalMemSize := hist.MemoryUsage() + topn.MemoryUsage()
891918
e.memTracker.Consume(finalMemSize)
892919
hists[task.slicePos] = hist
@@ -920,6 +947,105 @@ type samplingBuildTask struct {
920947
slicePos int
921948
}
922949

950+
func copySketches(src []*statistics.FMSketch) []*statistics.FMSketch {
951+
if len(src) == 0 {
952+
return nil
953+
}
954+
dst := make([]*statistics.FMSketch, len(src))
955+
for i, sketch := range src {
956+
if sketch != nil {
957+
dst[i] = sketch.Copy()
958+
}
959+
}
960+
return dst
961+
}
962+
963+
func estimateNDVsBySketch(
964+
rootCollector statistics.RowSampleCollector,
965+
nodeNDVSketches [][]*statistics.FMSketch,
966+
nodeSingletonSketches [][]*statistics.FMSketch,
967+
nodeSampleSizes []int,
968+
nodeSketchSampleCounts []int,
969+
colLen int,
970+
isSpecialIndex []bool,
971+
) []int64 {
972+
totalLen := len(rootCollector.Base().FMSketches)
973+
estimateNDVs := make([]int64, totalLen)
974+
if totalLen == 0 || len(nodeNDVSketches) == 0 || len(nodeNDVSketches) != len(nodeSingletonSketches) {
975+
return estimateNDVs
976+
}
977+
var sampleSize uint64
978+
for _, size := range nodeSketchSampleCounts {
979+
sampleSize += uint64(size)
980+
}
981+
if sampleSize == 0 {
982+
for _, size := range nodeSampleSizes {
983+
sampleSize += uint64(size)
984+
}
985+
}
986+
if sampleSize == 0 {
987+
return estimateNDVs
988+
}
989+
for i := range totalLen {
990+
if i >= colLen {
991+
idxOffset := i - colLen
992+
if idxOffset >= 0 && idxOffset < len(isSpecialIndex) && isSpecialIndex[idxOffset] {
993+
continue
994+
}
995+
}
996+
sketch := rootCollector.Base().FMSketches[i]
997+
if sketch == nil {
998+
continue
999+
}
1000+
sampleNDV := uint64(sketch.NDV())
1001+
if sampleNDV == 0 {
1002+
continue
1003+
}
1004+
rowCount := uint64(0)
1005+
if i < len(rootCollector.Base().NullCount) {
1006+
count := rootCollector.Base().Count - rootCollector.Base().NullCount[i]
1007+
if count > 0 {
1008+
rowCount = uint64(count)
1009+
}
1010+
} else if rootCollector.Base().Count > 0 {
1011+
rowCount = uint64(rootCollector.Base().Count)
1012+
}
1013+
if rowCount == 0 {
1014+
continue
1015+
}
1016+
if sampleNDV > rowCount {
1017+
sampleNDV = rowCount
1018+
}
1019+
ndvSketches := make([]*statistics.FMSketch, 0, len(nodeNDVSketches))
1020+
singletonSketches := make([]*statistics.FMSketch, 0, len(nodeSingletonSketches))
1021+
for nodeIdx := range nodeNDVSketches {
1022+
if i >= len(nodeNDVSketches[nodeIdx]) || i >= len(nodeSingletonSketches[nodeIdx]) {
1023+
continue
1024+
}
1025+
ndvSketch := nodeNDVSketches[nodeIdx][i]
1026+
singletonSketch := nodeSingletonSketches[nodeIdx][i]
1027+
if ndvSketch == nil || singletonSketch == nil {
1028+
continue
1029+
}
1030+
ndvSketches = append(ndvSketches, ndvSketch)
1031+
singletonSketches = append(singletonSketches, singletonSketch)
1032+
}
1033+
if len(ndvSketches) == 0 {
1034+
continue
1035+
}
1036+
singletonItems := statistics.EstimateGlobalSingletonBySketches(ndvSketches, singletonSketches)
1037+
logutil.BgLogger().Debug("analyze sampling ndv estimation",
1038+
zap.Int("item_index", i),
1039+
zap.Uint64("sample_ndv", sampleNDV),
1040+
zap.Uint64("singleton_items", singletonItems),
1041+
zap.Uint64("sample_size", sampleSize),
1042+
zap.Uint64("row_count", rowCount),
1043+
)
1044+
estimateNDVs[i] = int64(statistics.EstimateNDVByGEE(sampleNDV, singletonItems, sampleSize, rowCount))
1045+
}
1046+
return estimateNDVs
1047+
}
1048+
9231049
func readDataAndSendTask(ctx context.Context, sctx sessionctx.Context, handler *tableResultHandler, mergeTaskCh chan []byte, memTracker *memory.Tracker) error {
9241050
// After all tasks are sent, close the mergeTaskCh to notify the mergeWorker that all tasks have been sent.
9251051
defer close(mergeTaskCh)

pkg/executor/analyze_utils_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ import (
2020
"testing"
2121

2222
"github.com/pingcap/tidb/pkg/planner/core"
23+
"github.com/pingcap/tidb/pkg/statistics"
24+
"github.com/pingcap/tidb/pkg/types"
2325
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
26+
"github.com/pingcap/tidb/pkg/util/mock"
2427
"github.com/stretchr/testify/require"
2528
)
2629

@@ -67,3 +70,35 @@ func BuildExecutorForTest(ctx context.Context, stmt *ExecStmt) error {
6770
_, err := stmt.buildExecutor(ctx)
6871
return err
6972
}
73+
74+
func TestEstimateNDVsBySketch(t *testing.T) {
75+
rootCollector := statistics.NewReservoirRowSampleCollector(1, 2)
76+
rootCollector.Count = 100
77+
rootCollector.FMSketches = append(rootCollector.FMSketches,
78+
mustBuildFMSketch(t, 1, 2, 3, 4),
79+
mustBuildFMSketch(t, 10, 11),
80+
)
81+
82+
nodeNDVSketches := [][]*statistics.FMSketch{
83+
{mustBuildFMSketch(t, 1, 2, 3), mustBuildFMSketch(t, 10)},
84+
{mustBuildFMSketch(t, 2, 3, 4), mustBuildFMSketch(t, 11)},
85+
}
86+
nodeSingletonSketches := [][]*statistics.FMSketch{
87+
{mustBuildFMSketch(t, 1, 2, 3), mustBuildFMSketch(t, 10)},
88+
{mustBuildFMSketch(t, 2, 4), mustBuildFMSketch(t, 11)},
89+
}
90+
91+
estimateNDVs := estimateNDVsBySketch(rootCollector, nodeNDVSketches, nodeSingletonSketches, nil, []int{3, 3}, 1, []bool{true})
92+
require.Equal(t, int64(statistics.EstimateNDVByGEE(4, 2, 6, 100)), estimateNDVs[0])
93+
require.Zero(t, estimateNDVs[1], "special index NDV should be left to the pushed-down index result")
94+
}
95+
96+
func mustBuildFMSketch(t *testing.T, values ...int64) *statistics.FMSketch {
97+
t.Helper()
98+
sketch := statistics.NewFMSketch(1000)
99+
sc := mock.NewContext().GetSessionVars().StmtCtx
100+
for _, value := range values {
101+
require.NoError(t, sketch.InsertValue(sc, types.NewIntDatum(value)))
102+
}
103+
return sketch
104+
}

pkg/executor/builder.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3169,6 +3169,7 @@ func (b *executorBuilder) buildAnalyzeSamplingPushdown(
31693169
modifyCount = int64(val.(int))
31703170
})
31713171
sampleRate := new(float64)
3172+
ndvRate := float64(statistics.DefaultNDVSampleRate)
31723173
var sampleRateReason string
31733174
if opts[ast.AnalyzeOptNumSamples] == 0 {
31743175
*sampleRate = math.Float64frombits(opts[ast.AnalyzeOptSampleRate])
@@ -3229,6 +3230,7 @@ func (b *executorBuilder) buildAnalyzeSamplingPushdown(
32293230
BucketSize: int64(opts[ast.AnalyzeOptNumBuckets]),
32303231
SampleSize: int64(opts[ast.AnalyzeOptNumSamples]),
32313232
SampleRate: sampleRate,
3233+
NdvRate: &ndvRate,
32323234
SketchSize: statistics.MaxSketchSize,
32333235
ColumnsInfo: util.ColumnsToProto(task.ColsInfo, task.TblInfo.PKIsHandle, false, false),
32343236
ColumnGroups: colGroups,

pkg/store/mockstore/unistore/cophandler/analyze.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,7 @@ func handleAnalyzeFullSamplingReq(
457457
MaxSampleSize: int(colReq.SampleSize),
458458
MaxFMSketchSize: int(colReq.SketchSize),
459459
SampleRate: colReq.GetSampleRate(),
460+
NDVSampleRate: colReq.GetNdvRate(),
460461
Rng: rand.New(rand.NewSource(time.Now().UnixNano())),
461462
}
462463
collector, err := builder.Collect()

0 commit comments

Comments
 (0)