Skip to content

Commit dd0e7c8

Browse files
committed
executor, statistics: flush pending stats delta before analyze (pingcap#67939)
close pingcap#22934 (cherry picked from commit 42118f3)
1 parent 74e5085 commit dd0e7c8

11 files changed

Lines changed: 178 additions & 26 deletions

File tree

pkg/executor/analyze.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/pingcap/tidb/pkg/kv"
3535
"github.com/pingcap/tidb/pkg/metrics"
3636
"github.com/pingcap/tidb/pkg/parser/ast"
37+
pmodel "github.com/pingcap/tidb/pkg/parser/model"
3738
"github.com/pingcap/tidb/pkg/planner/core"
3839
"github.com/pingcap/tidb/pkg/sessionctx"
3940
"github.com/pingcap/tidb/pkg/sessionctx/variable"
@@ -84,6 +85,82 @@ const (
8485
idxTask
8586
)
8687

88+
// flushStatsDeltaForAnalyze flushes pending stats deltas for the tables whose column-analyze
89+
// tasks will capture base count / modify_count from mysql.stats_meta. Without this, a stale
90+
// pre-analyze delta can be applied later and double count rows or modifications.
91+
func flushStatsDeltaForAnalyze(ctx context.Context, sctx sessionctx.Context, plan *core.Analyze) error {
92+
if len(plan.ColTasks) == 0 {
93+
return nil
94+
}
95+
if err := ctx.Err(); err != nil {
96+
return err
97+
}
98+
targetIDs := collectAnalyzeStatsDeltaTargetIDsForTest(plan)
99+
if len(targetIDs) == 0 {
100+
return nil
101+
}
102+
return domain.GetDomain(sctx).StatsHandle().DumpStatsDeltaToKV(true, targetIDs...)
103+
}
104+
105+
// collectStatsDeltaFlushObjectsForAnalyze returns the database-qualified table
106+
// objects whose stats deltas must be flushed before building column analyze
107+
// tasks. Column analyze captures base count / modify_count from mysql.stats_meta,
108+
// so each target table is included once even if it has multiple column tasks.
109+
func collectStatsDeltaFlushObjectsForAnalyze(plan *core.Analyze) []*ast.StatsObject {
110+
flushObjects := make([]*ast.StatsObject, 0, len(plan.ColTasks))
111+
type statsObjectKey struct {
112+
dbName string
113+
tableName string
114+
}
115+
seenObjects := make(map[statsObjectKey]struct{}, len(plan.ColTasks))
116+
appendFlushObject := func(task core.AnalyzeColumnsTask) {
117+
dbName, tableName := task.DBName, task.TableName
118+
if dbName == "" || tableName == "" {
119+
intest.Assert(false, "analyze column task must have database-qualified table name")
120+
return
121+
}
122+
key := statsObjectKey{dbName: dbName, tableName: tableName}
123+
if _, ok := seenObjects[key]; ok {
124+
return
125+
}
126+
seenObjects[key] = struct{}{}
127+
flushObjects = append(flushObjects, &ast.StatsObject{
128+
StatsObjectScope: ast.StatsObjectScopeTable,
129+
DBName: pmodel.NewCIStr(dbName),
130+
TableName: pmodel.NewCIStr(tableName),
131+
})
132+
}
133+
for _, task := range plan.ColTasks {
134+
appendFlushObject(task)
135+
}
136+
return flushObjects
137+
}
138+
139+
func collectAnalyzeStatsDeltaTargetIDsForTest(plan *core.Analyze) []int64 {
140+
targetIDs := make([]int64, 0, len(plan.ColTasks))
141+
seenTargetIDs := make(map[int64]struct{}, len(plan.ColTasks))
142+
appendTargetID := func(id int64) {
143+
if _, ok := seenTargetIDs[id]; ok {
144+
return
145+
}
146+
seenTargetIDs[id] = struct{}{}
147+
targetIDs = append(targetIDs, id)
148+
}
149+
for _, task := range plan.ColTasks {
150+
if task.TblInfo == nil {
151+
intest.Assert(false, "analyze column task must have table info")
152+
continue
153+
}
154+
appendTargetID(task.TblInfo.ID)
155+
if partitionInfo := task.TblInfo.GetPartitionInfo(); partitionInfo != nil {
156+
for _, def := range partitionInfo.Definitions {
157+
appendTargetID(def.ID)
158+
}
159+
}
160+
}
161+
return targetIDs
162+
}
163+
87164
// Next implements the Executor Next interface.
88165
// It will collect all the sample task and run them concurrently.
89166
func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {

pkg/executor/analyze_utils_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
package executor
1616

1717
import (
18+
"context"
1819
"fmt"
1920
"testing"
2021

22+
"github.com/pingcap/tidb/pkg/planner/core"
2123
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
2224
"github.com/stretchr/testify/require"
2325
)
@@ -27,3 +29,34 @@ func TestGetAnalyzePanicErr(t *testing.T) {
2729
errMsg := fmt.Sprintf("%s", getAnalyzePanicErr(exeerrors.ErrMemoryExceedForQuery.GenWithStackByArgs(123)))
2830
require.NotContains(t, errMsg, `%!(EXTRA`)
2931
}
32+
33+
func TestCollectStatsDeltaFlushObjectsForAnalyzeDottedNames(t *testing.T) {
34+
plan := &core.Analyze{
35+
ColTasks: []core.AnalyzeColumnsTask{
36+
// Quoted identifiers may contain dots. These first two targets both
37+
// stringify to "a.b.c" if db and table names are joined with ".".
38+
{AnalyzeInfo: core.AnalyzeInfo{DBName: "a.b", TableName: "c"}},
39+
{AnalyzeInfo: core.AnalyzeInfo{DBName: "a", TableName: "b.c"}},
40+
// Keep the duplicate target deduped.
41+
{AnalyzeInfo: core.AnalyzeInfo{DBName: "a", TableName: "b.c"}},
42+
},
43+
}
44+
45+
flushObjects := collectStatsDeltaFlushObjectsForAnalyze(plan)
46+
targets := make([][2]string, 0, len(flushObjects))
47+
for _, obj := range flushObjects {
48+
targets = append(targets, [2]string{obj.DBName.O, obj.TableName.O})
49+
}
50+
51+
require.ElementsMatch(t, [][2]string{
52+
{"a.b", "c"},
53+
{"a", "b.c"},
54+
}, targets)
55+
}
56+
57+
func TestCanBroadcastToTiDBRPCForTestRejectsInvalidEndpoints(t *testing.T) {
58+
// Regression for next-gen realcluster tests: in-process domains can register
59+
// multiple server infos with an empty IP/default :10080 but no TiDB RPC
60+
// listener. Such targets must not take the broadcast path.
61+
require.False(t, canBroadcastToTiDBRPCForTest(context.Background(), []string{"", ""}))
62+
}

pkg/executor/builder.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3245,6 +3245,14 @@ func (b *executorBuilder) buildAnalyze(v *plannercore.Analyze) exec.Executor {
32453245
if b.ctx.GetSessionVars().InRestrictedSQL {
32463246
autoAnalyze = "auto "
32473247
}
3248+
// buildAnalyzeSamplingPushdown reads base count / modify_count from mysql.stats_meta
3249+
// while constructing column analyze tasks. Flush pending deltas first so the base
3250+
// values include pre-analyze changes and later delta dumps cannot double count them.
3251+
// TODO: Determine whether context.Background is appropriate here; if not, use the proper statement context.
3252+
if err := flushStatsDeltaForAnalyze(kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats), b.ctx, v); err != nil {
3253+
b.err = err
3254+
return nil
3255+
}
32483256
exprCtx := b.ctx.GetExprCtx()
32493257
for _, task := range v.ColTasks {
32503258
// ColumnInfos2ColumnsAndNames will use the `colInfos` to find the unique id for the column,

pkg/executor/test/analyzetest/analyze_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2321,7 +2321,6 @@ PARTITION BY RANGE ( a ) (
23212321
tableInfo := table.Meta()
23222322
pi := tableInfo.GetPartitionInfo()
23232323
require.NotNil(t, pi)
2324-
23252324
// analyze partition under static mode with options
23262325
tk.MustExec("analyze table t partition p0 columns a,c with 1 topn, 3 buckets")
23272326
tk.MustQuery("select * from t where b > 1 and c > 1")
@@ -2333,7 +2332,10 @@ PARTITION BY RANGE ( a ) (
23332332
require.Equal(t, 3, len(p0.GetCol(tableInfo.Columns[0].ID).Buckets))
23342333
require.Equal(t, 3, len(p0.GetCol(tableInfo.Columns[2].ID).Buckets))
23352334
require.Equal(t, 0, len(p1.GetCol(tableInfo.Columns[0].ID).Buckets))
2336-
require.Equal(t, 0, len(tbl.GetCol(tableInfo.Columns[0].ID).Buckets))
2335+
// Static partition analyze may flush pending partition deltas into the
2336+
// logical/global stats_meta row, but it must not build a global column
2337+
// histogram. In that meta-only global stats case, the global column is absent.
2338+
require.Nil(t, tbl.GetCol(tableInfo.Columns[0].ID))
23372339
rs := tk.MustQuery("select buckets,topn from mysql.analyze_options where table_id=" + strconv.FormatInt(pi.Definitions[0].ID, 10))
23382340
require.Equal(t, 1, len(rs.Rows()))
23392341
require.Equal(t, "3", rs.Rows()[0][0])

pkg/planner/cardinality/selectivity_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,6 @@ func TestEstimationForUnknownValuesAfterModify(t *testing.T) {
311311
}
312312
testKit.MustExec("analyze table t")
313313
h := dom.StatsHandle()
314-
require.Nil(t, h.DumpStatsDeltaToKV(true))
315314

316315
table, err := dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t"))
317316
require.NoError(t, err)
@@ -336,11 +335,13 @@ func TestEstimationForUnknownValuesAfterModify(t *testing.T) {
336335
require.Nil(t, h.Update(context.Background(), dom.InfoSchema()))
337336

338337
statsTblNew := h.GetPhysicalTableStats(table.Meta().ID, table.Meta())
339-
// Search for a not found value based upon statistics - count should be >= 10 and <=40
338+
// Search for a not found value based upon post-analyze modifications. It
339+
// should be higher than the no-modification fallback, but lower than a value
340+
// already present in the analyzed histogram.
340341
count, err = cardinality.GetColumnRowCount(sctx, col, getRange(15, 15), statsTblNew.RealtimeCount, statsTblNew.ModifyCount, false)
341342
require.NoError(t, err)
342-
require.Truef(t, count < 45, "expected: between 0 to 45, got: %v", count)
343-
require.Truef(t, count > 0, "expected: between 0 to 45, got: %v", count)
343+
require.Greater(t, count, 1.0)
344+
require.Less(t, count, 10.0)
344345
}
345346

346347
func TestNewIndexWithoutStats(t *testing.T) {

pkg/statistics/handle/globalstats/global_stats_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -816,10 +816,11 @@ func TestGlobalStats(t *testing.T) {
816816
" └─IndexRangeScan 1.00 cop[tikv] table:t, partition:p1, index:a(a) range:(3,+inf], keep order:false"))
817817

818818
// When we turned on the switch, we found that pseudo-stats will be used in the plan instead of `Union`.
819+
// The pseudo estimate is based on the stats_meta counts flushed before analyze.
819820
tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic';")
820821
tk.MustQuery("explain format = 'brief' select a from t where a > 3;").Check(testkit.Rows(
821-
"IndexReader 3333.33 root partition:all index:IndexRangeScan",
822-
"└─IndexRangeScan 3333.33 cop[tikv] table:t, index:a(a) range:(3,+inf], keep order:false, stats:pseudo"))
822+
"IndexReader 1.67 root partition:all index:IndexRangeScan",
823+
"└─IndexRangeScan 1.67 cop[tikv] table:t, index:a(a) range:(3,+inf], keep order:false, stats:pseudo"))
823824

824825
// Execute analyze again without error and can generate global-stats.
825826
// And when executing related queries, neither Union nor pseudo-stats are used.

pkg/statistics/handle/handletest/handle_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1234,6 +1234,29 @@ func TestIncrementalModifyCountUpdate(t *testing.T) {
12341234
}
12351235
}
12361236

1237+
func TestFlushPendingStatsDeltaBeforeAnalyze(t *testing.T) {
1238+
store, dom := testkit.CreateMockStoreAndDomain(t)
1239+
tk := testkit.NewTestKit(t, store)
1240+
tk.MustExec("use test")
1241+
tk.MustExec("create table t(a int)")
1242+
1243+
tbl, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t"))
1244+
require.NoError(t, err)
1245+
tableID := tbl.Meta().ID
1246+
1247+
tk.MustExec("insert into t values(1),(2),(3),(4),(5)")
1248+
1249+
tk.MustExec("analyze table t")
1250+
tk.MustQuery(fmt.Sprintf("select count, modify_count from mysql.stats_meta where table_id = %d", tableID)).Check(testkit.Rows(
1251+
"5 0",
1252+
))
1253+
1254+
tk.MustExec("flush stats_delta test.t")
1255+
tk.MustQuery(fmt.Sprintf("select count, modify_count from mysql.stats_meta where table_id = %d", tableID)).Check(testkit.Rows(
1256+
"5 0",
1257+
))
1258+
}
1259+
12371260
func TestRecordHistoricalStatsToStorage(t *testing.T) {
12381261
store, dom := testkit.CreateMockStoreAndDomain(t)
12391262
tk := testkit.NewTestKit(t, store)

pkg/statistics/handle/storage/dump_test.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,19 +155,27 @@ func TestDumpGlobalStats(t *testing.T) {
155155
tk.MustExec("insert into t values (1), (2)")
156156
tk.MustExec("analyze table t")
157157

158-
// global-stats is not existed
158+
// Static partition analyze should not generate global histograms. The
159+
// pre-analyze stats-delta flush may still create a global stats_meta entry.
159160
stats := getStatsJSON(t, dom, "test", "t")
160161
require.NotNil(t, stats.Partitions["p0"])
161162
require.NotNil(t, stats.Partitions["p1"])
162-
require.Nil(t, stats.Partitions[handleutil.TiDBGlobalStats])
163+
globalStats := stats.Partitions[handleutil.TiDBGlobalStats]
164+
if globalStats != nil {
165+
require.Empty(t, globalStats.Columns)
166+
require.Empty(t, globalStats.Indices)
167+
}
163168

164169
// global-stats is existed
165170
tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'")
166171
tk.MustExec("analyze table t")
167172
stats = getStatsJSON(t, dom, "test", "t")
168173
require.NotNil(t, stats.Partitions["p0"])
169174
require.NotNil(t, stats.Partitions["p1"])
170-
require.NotNil(t, stats.Partitions[handleutil.TiDBGlobalStats])
175+
globalStats = stats.Partitions[handleutil.TiDBGlobalStats]
176+
require.NotNil(t, globalStats)
177+
require.NotEmpty(t, globalStats.Columns)
178+
require.NotEmpty(t, globalStats.Indices)
171179
}
172180

173181
func TestLoadGlobalStats(t *testing.T) {

pkg/statistics/handle/storage/gc_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,13 @@ func TestGCPartition(t *testing.T) {
9090

9191
testKit.MustExec("drop table t")
9292
require.Nil(t, h.GCStats(dom.InfoSchema(), ddlLease))
93-
testKit.MustQuery("select count(*) from mysql.stats_meta").Check(testkit.Rows("2"))
93+
testKit.MustQuery("select count(*) from mysql.stats_meta").Check(testkit.Rows("3"))
9494
testKit.MustQuery("select count(*) from mysql.stats_histograms").Check(testkit.Rows("0"))
9595
testKit.MustQuery("select count(*) from mysql.stats_buckets").Check(testkit.Rows("0"))
96+
// FIXME(#68076): The remaining row is the logical table's meta-only stats row. The
97+
// normal GC version-window scan does not revisit it after the table is dropped.
9698
require.Nil(t, h.GCStats(dom.InfoSchema(), ddlLease))
97-
testKit.MustQuery("select count(*) from mysql.stats_meta").Check(testkit.Rows("0"))
99+
testKit.MustQuery("select count(*) from mysql.stats_meta").Check(testkit.Rows("1"))
98100
})
99101
}
100102

pkg/testkit/testkit.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -624,11 +624,10 @@ func containGlobal(rs *Result) bool {
624624
return false
625625
}
626626

627-
// MustNoGlobalStats checks if there is no global stats.
627+
// MustNoGlobalStats checks if there are no global histograms or buckets.
628+
// It intentionally ignores global stats_meta rows, because stats-delta flushes
629+
// may maintain logical/global row counts even when no global histograms exist.
628630
func (tk *TestKit) MustNoGlobalStats(table string) {
629-
if containGlobal(tk.MustQuery("show stats_meta where table_name like '" + table + "'")) {
630-
tk.require.Fail("global stats should not be found")
631-
}
632631
if containGlobal(tk.MustQuery("show stats_buckets where table_name like '" + table + "'")) {
633632
tk.require.Fail("global stats should not be found")
634633
}

0 commit comments

Comments
 (0)