Skip to content

Commit 2273931

Browse files
committed
executor: estimate analyze NDV from singleton sketches
1 parent 47a9bc6 commit 2273931

2 files changed

Lines changed: 165 additions & 16 deletions

File tree

pkg/executor/analyze_col_sampling.go

Lines changed: 130 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import (
3535
"github.com/pingcap/tidb/pkg/tablecodec"
3636
"github.com/pingcap/tidb/pkg/types"
3737
"github.com/pingcap/tidb/pkg/util"
38-
"github.com/pingcap/tidb/pkg/util/channel"
3938
"github.com/pingcap/tidb/pkg/util/chunk"
4039
"github.com/pingcap/tidb/pkg/util/codec"
4140
"github.com/pingcap/tidb/pkg/util/collate"
@@ -222,6 +221,10 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
222221
for range l {
223222
rootRowCollector.Base().FMSketches = append(rootRowCollector.Base().FMSketches, statistics.NewFMSketch(statistics.MaxSketchSize))
224223
}
224+
nodeNDVSketches := make([][]*statistics.FMSketch, 0, samplingStatsConcurrency)
225+
nodeSingletonSketches := make([][]*statistics.FMSketch, 0, samplingStatsConcurrency)
226+
nodeSampleSizes := make([]int, 0, samplingStatsConcurrency)
227+
nodeSketchSampleCounts := make([]int, 0, samplingStatsConcurrency)
225228

226229
sc := e.ctx.GetSessionVars().StmtCtx
227230

@@ -281,6 +284,10 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
281284
printAnalyzeMergeCollectorLog(oldRootCollectorCount, newRootCollectorCount,
282285
mergeResult.collector.Base().Count, e.tableID.TableID, e.tableID.PartitionID, e.tableID.IsPartitionTable(),
283286
"merge subMergeWorker in AnalyzeColumnsExec", -1)
287+
nodeNDVSketches = append(nodeNDVSketches, copySketches(mergeResult.collector.Base().FMSketches))
288+
nodeSingletonSketches = append(nodeSingletonSketches, copySketches(mergeResult.collector.Base().SingletonSketches))
289+
nodeSampleSizes = append(nodeSampleSizes, mergeResult.collector.Base().Samples.Len())
290+
nodeSketchSampleCounts = append(nodeSketchSampleCounts, int(mergeResult.collector.Base().SketchSampleCount))
284291
e.memTracker.Consume(rootRowCollector.Base().MemSize - oldRootCollectorSize - mergeResult.collector.Base().MemSize)
285292
mergeResult.collector.DestroyAndPutToPool()
286293
}
@@ -304,6 +311,24 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
304311
return 0, nil, nil, nil, err
305312
}
306313

314+
colLen := len(e.colsInfo)
315+
indexPushedDownResult := <-idxNDVPushDownCh
316+
if indexPushedDownResult.err != nil {
317+
return 0, nil, nil, nil, indexPushedDownResult.err
318+
}
319+
for _, offset := range indexesWithVirtualColOffsets {
320+
ret := indexPushedDownResult.results[e.indexes[offset].ID]
321+
rootRowCollector.Base().NullCount[colLen+offset] = ret.Count
322+
rootRowCollector.Base().FMSketches[colLen+offset] = ret.Ars[0].Fms[0]
323+
}
324+
isSpecialIndex := make([]bool, len(e.indexes))
325+
for _, offset := range indexesWithVirtualColOffsets {
326+
if offset >= 0 && offset < len(isSpecialIndex) {
327+
isSpecialIndex[offset] = true
328+
}
329+
}
330+
estimateNDVs := estimateNDVsBySketch(rootRowCollector, nodeNDVSketches, nodeSingletonSketches, nodeSampleSizes, nodeSketchSampleCounts, colLen, isSpecialIndex)
331+
307332
// Decode the data from sample collectors.
308333
virtualColIdx := buildVirtualColumnIndex(e.schemaForVirtualColEval, e.colsInfo)
309334
// Filling virtual columns is necessary here because these samples are used to build statistics for indexes that constructed by virtual columns.
@@ -335,7 +360,6 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
335360
return 0, nil, nil, nil, err
336361
}
337362
}
338-
colLen := len(e.colsInfo)
339363
// The order of the samples are broken when merging samples from sub-collectors.
340364
// So now we need to sort the samples according to the handle in order to calculate correlation.
341365
slices.SortFunc(rootRowCollector.Base().Samples, func(i, j *statistics.ReservoirRowSampleItem) int {
@@ -358,7 +382,7 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
358382
// Start workers to build stats.
359383
for range samplingStatsConcurrency {
360384
e.samplingBuilderWg.Run(func() {
361-
e.subBuildWorker(ctx, buildResultChan, buildTaskChan, hists, topns, exitCh)
385+
e.subBuildWorker(ctx, buildResultChan, buildTaskChan, hists, topns, estimateNDVs, exitCh)
362386
})
363387
}
364388
// Generate tasks for building stats.
@@ -373,18 +397,6 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
373397
fmSketches = append(fmSketches, rootRowCollector.Base().FMSketches[i])
374398
}
375399

376-
indexPushedDownResult := <-idxNDVPushDownCh
377-
if indexPushedDownResult.err != nil {
378-
close(exitCh)
379-
channel.Clear(buildResultChan)
380-
return 0, nil, nil, nil, indexPushedDownResult.err
381-
}
382-
for _, offset := range indexesWithVirtualColOffsets {
383-
ret := indexPushedDownResult.results[e.indexes[offset].ID]
384-
rootRowCollector.Base().NullCount[colLen+offset] = ret.Count
385-
rootRowCollector.Base().FMSketches[colLen+offset] = ret.Ars[0].Fms[0]
386-
}
387-
388400
// Generate tasks for building stats for indexes.
389401
for i, idx := range e.indexes {
390402
buildTaskChan <- &samplingBuildTask{
@@ -713,7 +725,7 @@ func drainPendingSamplingMergeTasks(taskCh <-chan []byte, memTracker *memory.Tra
713725
}
714726
}
715727

716-
func (e *AnalyzeColumnsExec) subBuildWorker(ctx context.Context, resultCh chan error, taskCh chan *samplingBuildTask, hists []*statistics.Histogram, topns []*statistics.TopN, exitCh chan struct{}) {
728+
func (e *AnalyzeColumnsExec) subBuildWorker(ctx context.Context, resultCh chan error, taskCh chan *samplingBuildTask, hists []*statistics.Histogram, topns []*statistics.TopN, estimateNDVs []int64, exitCh chan struct{}) {
717729
defer func() {
718730
if r := recover(); r != nil {
719731
logutil.BgLogger().Warn("analyze subBuildWorker panicked", zap.Any("recover", r), zap.Stack("stack"))
@@ -887,6 +899,9 @@ workLoop:
887899
releaseCollectorMemory()
888900
continue
889901
}
902+
if hist != nil && task.slicePos < len(estimateNDVs) && estimateNDVs[task.slicePos] > hist.NDV {
903+
hist.NDV = estimateNDVs[task.slicePos]
904+
}
890905
finalMemSize := hist.MemoryUsage() + topn.MemoryUsage()
891906
e.memTracker.Consume(finalMemSize)
892907
hists[task.slicePos] = hist
@@ -920,6 +935,105 @@ type samplingBuildTask struct {
920935
slicePos int
921936
}
922937

938+
func copySketches(src []*statistics.FMSketch) []*statistics.FMSketch {
939+
if len(src) == 0 {
940+
return nil
941+
}
942+
dst := make([]*statistics.FMSketch, len(src))
943+
for i, sketch := range src {
944+
if sketch != nil {
945+
dst[i] = sketch.Copy()
946+
}
947+
}
948+
return dst
949+
}
950+
951+
func estimateNDVsBySketch(
952+
rootCollector statistics.RowSampleCollector,
953+
nodeNDVSketches [][]*statistics.FMSketch,
954+
nodeSingletonSketches [][]*statistics.FMSketch,
955+
nodeSampleSizes []int,
956+
nodeSketchSampleCounts []int,
957+
colLen int,
958+
isSpecialIndex []bool,
959+
) []int64 {
960+
totalLen := len(rootCollector.Base().FMSketches)
961+
estimateNDVs := make([]int64, totalLen)
962+
if totalLen == 0 || len(nodeNDVSketches) == 0 || len(nodeNDVSketches) != len(nodeSingletonSketches) {
963+
return estimateNDVs
964+
}
965+
var sampleSize uint64
966+
for _, size := range nodeSketchSampleCounts {
967+
sampleSize += uint64(size)
968+
}
969+
if sampleSize == 0 {
970+
for _, size := range nodeSampleSizes {
971+
sampleSize += uint64(size)
972+
}
973+
}
974+
if sampleSize == 0 {
975+
return estimateNDVs
976+
}
977+
for i := range totalLen {
978+
if i >= colLen {
979+
idxOffset := i - colLen
980+
if idxOffset >= 0 && idxOffset < len(isSpecialIndex) && isSpecialIndex[idxOffset] {
981+
continue
982+
}
983+
}
984+
sketch := rootCollector.Base().FMSketches[i]
985+
if sketch == nil {
986+
continue
987+
}
988+
sampleNDV := uint64(sketch.NDV())
989+
if sampleNDV == 0 {
990+
continue
991+
}
992+
rowCount := uint64(0)
993+
if i < len(rootCollector.Base().NullCount) {
994+
count := rootCollector.Base().Count - rootCollector.Base().NullCount[i]
995+
if count > 0 {
996+
rowCount = uint64(count)
997+
}
998+
} else if rootCollector.Base().Count > 0 {
999+
rowCount = uint64(rootCollector.Base().Count)
1000+
}
1001+
if rowCount == 0 {
1002+
continue
1003+
}
1004+
if sampleNDV > rowCount {
1005+
sampleNDV = rowCount
1006+
}
1007+
ndvSketches := make([]*statistics.FMSketch, 0, len(nodeNDVSketches))
1008+
singletonSketches := make([]*statistics.FMSketch, 0, len(nodeSingletonSketches))
1009+
for nodeIdx := range nodeNDVSketches {
1010+
if i >= len(nodeNDVSketches[nodeIdx]) || i >= len(nodeSingletonSketches[nodeIdx]) {
1011+
continue
1012+
}
1013+
ndvSketch := nodeNDVSketches[nodeIdx][i]
1014+
singletonSketch := nodeSingletonSketches[nodeIdx][i]
1015+
if ndvSketch == nil || singletonSketch == nil {
1016+
continue
1017+
}
1018+
ndvSketches = append(ndvSketches, ndvSketch)
1019+
singletonSketches = append(singletonSketches, singletonSketch)
1020+
}
1021+
if len(ndvSketches) == 0 {
1022+
continue
1023+
}
1024+
singletonItems := statistics.EstimateGlobalSingletonBySketches(ndvSketches, singletonSketches)
1025+
logutil.BgLogger().Debug("analyze sampling ndv estimation",
1026+
zap.Int("item_index", i),
1027+
zap.Uint64("sample_ndv", sampleNDV),
1028+
zap.Uint64("singleton_items", singletonItems),
1029+
zap.Uint64("sample_size", sampleSize),
1030+
zap.Uint64("row_count", rowCount),
1031+
)
1032+
estimateNDVs[i] = int64(statistics.EstimateNDVByGEE(sampleNDV, singletonItems, sampleSize, rowCount))
1033+
}
1034+
return estimateNDVs
1035+
}
1036+
9231037
func readDataAndSendTask(ctx context.Context, sctx sessionctx.Context, handler *tableResultHandler, mergeTaskCh chan []byte, memTracker *memory.Tracker) error {
9241038
// After all tasks are sent, close the mergeTaskCh to notify the mergeWorker that all tasks have been sent.
9251039
defer close(mergeTaskCh)

pkg/executor/analyze_utils_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ import (
2020
"testing"
2121

2222
"github.com/pingcap/tidb/pkg/planner/core"
23+
"github.com/pingcap/tidb/pkg/statistics"
24+
"github.com/pingcap/tidb/pkg/types"
2325
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
26+
"github.com/pingcap/tidb/pkg/util/mock"
2427
"github.com/stretchr/testify/require"
2528
)
2629

@@ -67,3 +70,35 @@ func BuildExecutorForTest(ctx context.Context, stmt *ExecStmt) error {
6770
_, err := stmt.buildExecutor(ctx)
6871
return err
6972
}
73+
74+
func TestEstimateNDVsBySketch(t *testing.T) {
75+
rootCollector := statistics.NewReservoirRowSampleCollector(1, 2)
76+
rootCollector.Count = 100
77+
rootCollector.FMSketches = append(rootCollector.FMSketches,
78+
mustBuildFMSketch(t, 1, 2, 3, 4),
79+
mustBuildFMSketch(t, 10, 11),
80+
)
81+
82+
nodeNDVSketches := [][]*statistics.FMSketch{
83+
{mustBuildFMSketch(t, 1, 2, 3), mustBuildFMSketch(t, 10)},
84+
{mustBuildFMSketch(t, 2, 3, 4), mustBuildFMSketch(t, 11)},
85+
}
86+
nodeSingletonSketches := [][]*statistics.FMSketch{
87+
{mustBuildFMSketch(t, 1, 2, 3), mustBuildFMSketch(t, 10)},
88+
{mustBuildFMSketch(t, 2, 4), mustBuildFMSketch(t, 11)},
89+
}
90+
91+
estimateNDVs := estimateNDVsBySketch(rootCollector, nodeNDVSketches, nodeSingletonSketches, nil, []int{3, 3}, 1, []bool{true})
92+
require.Equal(t, int64(statistics.EstimateNDVByGEE(4, 2, 6, 100)), estimateNDVs[0])
93+
require.Zero(t, estimateNDVs[1], "special index NDV should be left to the pushed-down index result")
94+
}
95+
96+
func mustBuildFMSketch(t *testing.T, values ...int64) *statistics.FMSketch {
97+
t.Helper()
98+
sketch := statistics.NewFMSketch(1000)
99+
sc := mock.NewContext().GetSessionVars().StmtCtx
100+
for _, value := range values {
101+
require.NoError(t, sketch.InsertValue(sc, types.NewIntDatum(value)))
102+
}
103+
return sketch
104+
}

0 commit comments

Comments
 (0)