Skip to content

Commit e8b7970

Browse files
committed
statistics: skip stats cache update when incoming table is older
A stale async histogram load could overwrite a newer analyzed cache entry because the cache accepted any table object regardless of version. Guard `Update` and `CopyAndUpdate` with a version check so an older table cannot replace a newer one for the same physical table. Issue Number: ref #67355
1 parent 511dba1 commit e8b7970

3 files changed

Lines changed: 50 additions & 6 deletions

File tree

pkg/statistics/handle/cache/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ go_test(
4545
],
4646
embed = [":cache"],
4747
flaky = True,
48-
shard_count = 3,
48+
shard_count = 4,
4949
deps = [
5050
"//pkg/config",
5151
"//pkg/statistics",

pkg/statistics/handle/cache/statscache_test.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,30 @@ func TestUpdateStatsHealthyMetrics(t *testing.T) {
118118
}
119119
}
120120

121+
func TestUpdateSkipsOlderTableVersion(t *testing.T) {
122+
current := newMockTable(t, 1, false, 1280, 0, 1)
123+
current.Version = 2
124+
stale := newMockTable(t, 1, false, 640, 0, 1)
125+
stale.Version = 1
126+
127+
cache := &StatsCache{
128+
c: newMockStatsCacheInner(current),
129+
}
130+
131+
cache.Update([]*statistics.Table{stale}, nil, false)
132+
133+
got, ok := cache.c.Get(current.PhysicalID)
134+
require.True(t, ok)
135+
require.Equal(t, uint64(2), got.Version)
136+
require.Equal(t, int64(1280), got.RealtimeCount)
137+
138+
copied := cache.CopyAndUpdate([]*statistics.Table{stale}, nil)
139+
got, ok = copied.c.Get(current.PhysicalID)
140+
require.True(t, ok)
141+
require.Equal(t, uint64(2), got.Version)
142+
require.Equal(t, int64(1280), got.RealtimeCount)
143+
}
144+
121145
func newMockTable(t *testing.T, physicalID int64, pseudo bool, realtimeCount, modifyCount int64, analyzeVersion uint64) *statistics.Table {
122146
hist := statistics.NewHistColl(physicalID, realtimeCount, modifyCount, 0, 0)
123147
table := &statistics.Table{
@@ -162,27 +186,33 @@ func (m *mockStatsCacheInner) Values() []*statistics.Table {
162186
}
163187

164188
func (m *mockStatsCacheInner) Get(id int64) (*statistics.Table, bool) {
165-
panic("not implemented")
189+
tbl, ok := m.items[id]
190+
return tbl, ok
166191
}
167192

168193
func (m *mockStatsCacheInner) Put(id int64, tbl *statistics.Table) bool {
169-
panic("not implemented")
194+
m.items[id] = tbl
195+
return true
170196
}
171197

172198
func (m *mockStatsCacheInner) Del(id int64) {
173-
panic("not implemented")
199+
delete(m.items, id)
174200
}
175201

176202
func (m *mockStatsCacheInner) Cost() int64 {
177203
panic("not implemented")
178204
}
179205

180206
func (m *mockStatsCacheInner) Len() int {
181-
panic("not implemented")
207+
return len(m.items)
182208
}
183209

184210
func (m *mockStatsCacheInner) Copy() internal.StatsCacheInner {
185-
panic("not implemented")
211+
copied := make(map[int64]*statistics.Table, len(m.items))
212+
for id, tbl := range m.items {
213+
copied[id] = tbl
214+
}
215+
return &mockStatsCacheInner{items: copied}
186216
}
187217

188218
func (*mockStatsCacheInner) SetCapacity(int64) {

pkg/statistics/handle/cache/statscacheinner.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,11 +143,22 @@ func (sc *StatsCache) Version() uint64 {
143143
return sc.maxTblStatsVer.Load()
144144
}
145145

146+
// shouldSkipTableUpdate reports whether the incoming table stats are older than
147+
// what is already cached for the same physical table. Skipping the update
148+
// prevents a stale async histogram load from overwriting a newer analyze result.
149+
func (sc *StatsCache) shouldSkipTableUpdate(tbl *statistics.Table) bool {
150+
current, ok := sc.c.Get(tbl.PhysicalID)
151+
return ok && current != nil && current.Version > tbl.Version
152+
}
153+
146154
// CopyAndUpdate copies a new cache and updates the new statistics table cache. It is only used in the COW mode.
147155
func (sc *StatsCache) CopyAndUpdate(tables []*statistics.Table, deletedIDs []int64) *StatsCache {
148156
newCache := &StatsCache{c: sc.c.Copy()}
149157
newCache.maxTblStatsVer.Store(sc.maxTblStatsVer.Load())
150158
for _, tbl := range tables {
159+
if newCache.shouldSkipTableUpdate(tbl) {
160+
continue
161+
}
151162
id := tbl.PhysicalID
152163
newCache.c.Put(id, tbl)
153164
}
@@ -167,6 +178,9 @@ func (sc *StatsCache) CopyAndUpdate(tables []*statistics.Table, deletedIDs []int
167178
// Update updates the new statistics table cache.
168179
func (sc *StatsCache) Update(tables []*statistics.Table, deletedIDs []int64, skipMoveForwardStatsCache bool) {
169180
for _, tbl := range tables {
181+
if sc.shouldSkipTableUpdate(tbl) {
182+
continue
183+
}
170184
id := tbl.PhysicalID
171185
metrics.UpdateCounter.Inc()
172186
sc.c.Put(id, tbl)

0 commit comments

Comments
 (0)