@@ -35,7 +35,6 @@ import (
3535 "github.com/pingcap/tidb/pkg/tablecodec"
3636 "github.com/pingcap/tidb/pkg/types"
3737 "github.com/pingcap/tidb/pkg/util"
38- "github.com/pingcap/tidb/pkg/util/channel"
3938 "github.com/pingcap/tidb/pkg/util/chunk"
4039 "github.com/pingcap/tidb/pkg/util/codec"
4140 "github.com/pingcap/tidb/pkg/util/collate"
@@ -222,6 +221,17 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
222221 for range l {
223222 rootRowCollector .Base ().FMSketches = append (rootRowCollector .Base ().FMSketches , statistics .NewFMSketch (statistics .MaxSketchSize ))
224223 }
224+ needNDVSampling := statistics .NeedNDVSampling (e .analyzePB .ColReq .GetNdvRate ())
225+ var nodeNDVSketches [][]* statistics.FMSketch
226+ var nodeSingletonSketches [][]* statistics.FMSketch
227+ var nodeSampleSizes []int
228+ var nodeSketchSampleCounts []int
229+ if needNDVSampling {
230+ nodeNDVSketches = make ([][]* statistics.FMSketch , 0 , samplingStatsConcurrency )
231+ nodeSingletonSketches = make ([][]* statistics.FMSketch , 0 , samplingStatsConcurrency )
232+ nodeSampleSizes = make ([]int , 0 , samplingStatsConcurrency )
233+ nodeSketchSampleCounts = make ([]int , 0 , samplingStatsConcurrency )
234+ }
225235
226236 sc := e .ctx .GetSessionVars ().StmtCtx
227237
@@ -281,6 +291,12 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
281291 printAnalyzeMergeCollectorLog (oldRootCollectorCount , newRootCollectorCount ,
282292 mergeResult .collector .Base ().Count , e .tableID .TableID , e .tableID .PartitionID , e .tableID .IsPartitionTable (),
283293 "merge subMergeWorker in AnalyzeColumnsExec" , - 1 )
294+ if needNDVSampling {
295+ nodeNDVSketches = append (nodeNDVSketches , copySketches (mergeResult .collector .Base ().FMSketches ))
296+ nodeSingletonSketches = append (nodeSingletonSketches , copySketches (mergeResult .collector .Base ().SingletonSketches ))
297+ nodeSampleSizes = append (nodeSampleSizes , mergeResult .collector .Base ().Samples .Len ())
298+ nodeSketchSampleCounts = append (nodeSketchSampleCounts , int (mergeResult .collector .Base ().SketchSampleCount ))
299+ }
284300 e .memTracker .Consume (rootRowCollector .Base ().MemSize - oldRootCollectorSize - mergeResult .collector .Base ().MemSize )
285301 mergeResult .collector .DestroyAndPutToPool ()
286302 }
@@ -304,6 +320,27 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
304320 return 0 , nil , nil , nil , err
305321 }
306322
323+ colLen := len (e .colsInfo )
324+ indexPushedDownResult := <- idxNDVPushDownCh
325+ if indexPushedDownResult .err != nil {
326+ return 0 , nil , nil , nil , indexPushedDownResult .err
327+ }
328+ for _ , offset := range indexesWithVirtualColOffsets {
329+ ret := indexPushedDownResult .results [e .indexes [offset ].ID ]
330+ rootRowCollector .Base ().NullCount [colLen + offset ] = ret .Count
331+ rootRowCollector .Base ().FMSketches [colLen + offset ] = ret .Ars [0 ].Fms [0 ]
332+ }
333+ isSpecialIndex := make ([]bool , len (e .indexes ))
334+ for _ , offset := range indexesWithVirtualColOffsets {
335+ if offset >= 0 && offset < len (isSpecialIndex ) {
336+ isSpecialIndex [offset ] = true
337+ }
338+ }
339+ estimateNDVs := make ([]int64 , l )
340+ if needNDVSampling {
341+ estimateNDVs = estimateNDVsBySketch (rootRowCollector , nodeNDVSketches , nodeSingletonSketches , nodeSampleSizes , nodeSketchSampleCounts , colLen , isSpecialIndex )
342+ }
343+
307344 // Decode the data from sample collectors.
308345 virtualColIdx := buildVirtualColumnIndex (e .schemaForVirtualColEval , e .colsInfo )
309346 // Filling virtual columns is necessary here because these samples are used to build statistics for indexes that constructed by virtual columns.
@@ -335,7 +372,6 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
335372 return 0 , nil , nil , nil , err
336373 }
337374 }
338- colLen := len (e .colsInfo )
339375 // The order of the samples are broken when merging samples from sub-collectors.
340376 // So now we need to sort the samples according to the handle in order to calculate correlation.
341377 slices .SortFunc (rootRowCollector .Base ().Samples , func (i , j * statistics.ReservoirRowSampleItem ) int {
@@ -358,7 +394,7 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
358394 // Start workers to build stats.
359395 for range samplingStatsConcurrency {
360396 e .samplingBuilderWg .Run (func () {
361- e .subBuildWorker (ctx , buildResultChan , buildTaskChan , hists , topns , exitCh )
397+ e .subBuildWorker (ctx , buildResultChan , buildTaskChan , hists , topns , estimateNDVs , exitCh )
362398 })
363399 }
364400 // Generate tasks for building stats.
@@ -373,18 +409,6 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
373409 fmSketches = append (fmSketches , rootRowCollector .Base ().FMSketches [i ])
374410 }
375411
376- indexPushedDownResult := <- idxNDVPushDownCh
377- if indexPushedDownResult .err != nil {
378- close (exitCh )
379- channel .Clear (buildResultChan )
380- return 0 , nil , nil , nil , indexPushedDownResult .err
381- }
382- for _ , offset := range indexesWithVirtualColOffsets {
383- ret := indexPushedDownResult .results [e .indexes [offset ].ID ]
384- rootRowCollector .Base ().NullCount [colLen + offset ] = ret .Count
385- rootRowCollector .Base ().FMSketches [colLen + offset ] = ret .Ars [0 ].Fms [0 ]
386- }
387-
388412 // Generate tasks for building stats for indexes.
389413 for i , idx := range e .indexes {
390414 buildTaskChan <- & samplingBuildTask {
@@ -713,7 +737,7 @@ func drainPendingSamplingMergeTasks(taskCh <-chan []byte, memTracker *memory.Tra
713737 }
714738}
715739
716- func (e * AnalyzeColumnsExec ) subBuildWorker (ctx context.Context , resultCh chan error , taskCh chan * samplingBuildTask , hists []* statistics.Histogram , topns []* statistics.TopN , exitCh chan struct {}) {
740+ func (e * AnalyzeColumnsExec ) subBuildWorker (ctx context.Context , resultCh chan error , taskCh chan * samplingBuildTask , hists []* statistics.Histogram , topns []* statistics.TopN , estimateNDVs [] int64 , exitCh chan struct {}) {
717741 defer func () {
718742 if r := recover (); r != nil {
719743 logutil .BgLogger ().Warn ("analyze subBuildWorker panicked" , zap .Any ("recover" , r ), zap .Stack ("stack" ))
@@ -887,6 +911,9 @@ workLoop:
887911 releaseCollectorMemory ()
888912 continue
889913 }
914+ if hist != nil && task .slicePos < len (estimateNDVs ) && estimateNDVs [task .slicePos ] > hist .NDV {
915+ hist .NDV = estimateNDVs [task .slicePos ]
916+ }
890917 finalMemSize := hist .MemoryUsage () + topn .MemoryUsage ()
891918 e .memTracker .Consume (finalMemSize )
892919 hists [task .slicePos ] = hist
@@ -920,6 +947,105 @@ type samplingBuildTask struct {
920947 slicePos int
921948}
922949
950+ func copySketches (src []* statistics.FMSketch ) []* statistics.FMSketch {
951+ if len (src ) == 0 {
952+ return nil
953+ }
954+ dst := make ([]* statistics.FMSketch , len (src ))
955+ for i , sketch := range src {
956+ if sketch != nil {
957+ dst [i ] = sketch .Copy ()
958+ }
959+ }
960+ return dst
961+ }
962+
963+ func estimateNDVsBySketch (
964+ rootCollector statistics.RowSampleCollector ,
965+ nodeNDVSketches [][]* statistics.FMSketch ,
966+ nodeSingletonSketches [][]* statistics.FMSketch ,
967+ nodeSampleSizes []int ,
968+ nodeSketchSampleCounts []int ,
969+ colLen int ,
970+ isSpecialIndex []bool ,
971+ ) []int64 {
972+ totalLen := len (rootCollector .Base ().FMSketches )
973+ estimateNDVs := make ([]int64 , totalLen )
974+ if totalLen == 0 || len (nodeNDVSketches ) == 0 || len (nodeNDVSketches ) != len (nodeSingletonSketches ) {
975+ return estimateNDVs
976+ }
977+ var sampleSize uint64
978+ for _ , size := range nodeSketchSampleCounts {
979+ sampleSize += uint64 (size )
980+ }
981+ if sampleSize == 0 {
982+ for _ , size := range nodeSampleSizes {
983+ sampleSize += uint64 (size )
984+ }
985+ }
986+ if sampleSize == 0 {
987+ return estimateNDVs
988+ }
989+ for i := range totalLen {
990+ if i >= colLen {
991+ idxOffset := i - colLen
992+ if idxOffset >= 0 && idxOffset < len (isSpecialIndex ) && isSpecialIndex [idxOffset ] {
993+ continue
994+ }
995+ }
996+ sketch := rootCollector .Base ().FMSketches [i ]
997+ if sketch == nil {
998+ continue
999+ }
1000+ sampleNDV := uint64 (sketch .NDV ())
1001+ if sampleNDV == 0 {
1002+ continue
1003+ }
1004+ rowCount := uint64 (0 )
1005+ if i < len (rootCollector .Base ().NullCount ) {
1006+ count := rootCollector .Base ().Count - rootCollector .Base ().NullCount [i ]
1007+ if count > 0 {
1008+ rowCount = uint64 (count )
1009+ }
1010+ } else if rootCollector .Base ().Count > 0 {
1011+ rowCount = uint64 (rootCollector .Base ().Count )
1012+ }
1013+ if rowCount == 0 {
1014+ continue
1015+ }
1016+ if sampleNDV > rowCount {
1017+ sampleNDV = rowCount
1018+ }
1019+ ndvSketches := make ([]* statistics.FMSketch , 0 , len (nodeNDVSketches ))
1020+ singletonSketches := make ([]* statistics.FMSketch , 0 , len (nodeSingletonSketches ))
1021+ for nodeIdx := range nodeNDVSketches {
1022+ if i >= len (nodeNDVSketches [nodeIdx ]) || i >= len (nodeSingletonSketches [nodeIdx ]) {
1023+ continue
1024+ }
1025+ ndvSketch := nodeNDVSketches [nodeIdx ][i ]
1026+ singletonSketch := nodeSingletonSketches [nodeIdx ][i ]
1027+ if ndvSketch == nil || singletonSketch == nil {
1028+ continue
1029+ }
1030+ ndvSketches = append (ndvSketches , ndvSketch )
1031+ singletonSketches = append (singletonSketches , singletonSketch )
1032+ }
1033+ if len (ndvSketches ) == 0 {
1034+ continue
1035+ }
1036+ singletonItems := statistics .EstimateGlobalSingletonBySketches (ndvSketches , singletonSketches )
1037+ logutil .BgLogger ().Debug ("analyze sampling ndv estimation" ,
1038+ zap .Int ("item_index" , i ),
1039+ zap .Uint64 ("sample_ndv" , sampleNDV ),
1040+ zap .Uint64 ("singleton_items" , singletonItems ),
1041+ zap .Uint64 ("sample_size" , sampleSize ),
1042+ zap .Uint64 ("row_count" , rowCount ),
1043+ )
1044+ estimateNDVs [i ] = int64 (statistics .EstimateNDVByGEE (sampleNDV , singletonItems , sampleSize , rowCount ))
1045+ }
1046+ return estimateNDVs
1047+ }
1048+
9231049func readDataAndSendTask (ctx context.Context , sctx sessionctx.Context , handler * tableResultHandler , mergeTaskCh chan []byte , memTracker * memory.Tracker ) error {
9241050 // After all tasks are sent, close the mergeTaskCh to notify the mergeWorker that all tasks have been sent.
9251051 defer close (mergeTaskCh )
0 commit comments