forked from pingcap/tidb
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathplan_cache_instance.go
More file actions
353 lines (325 loc) · 11.4 KB
/
Copy pathplan_cache_instance.go
File metadata and controls
353 lines (325 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package core
import (
"fmt"
"runtime"
"sort"
"sync"
"time"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/sessionctx"
"go.uber.org/atomic"
)
func init() {
domain.NewInstancePlanCache = func(softMemLimit, hardMemLimit int64) sessionctx.InstancePlanCache {
return NewInstancePlanCache(softMemLimit, hardMemLimit)
}
}
// NewInstancePlanCache creates a new instance level plan cache.
func NewInstancePlanCache(softMemLimit, hardMemLimit int64) sessionctx.InstancePlanCache {
planCache := new(instancePlanCache)
planCache.softMemLimit.Store(softMemLimit)
planCache.hardMemLimit.Store(hardMemLimit)
return planCache
}
type instancePCNode struct {
value *PlanCacheValue
lastUsed atomic.Time
next atomic.Pointer[instancePCNode]
// deleted marks a bucket head as logically removed before it is detached from the map.
// It is only meaningful on head nodes and prevents Get/Put from treating a deleted bucket
// as a normal empty bucket that can be reused immediately.
deleted atomic.Bool
// accountingState tracks whether a published node has finished updating totCost/totPlan.
// Delete must distinguish "detached but not counted yet" from "detached and already counted"
// to keep Size/MemUsage correct while racing with Put's publish path.
accountingState atomic.Uint32
}
const (
accountingNotCounted uint32 = iota
accountingInProgress
accountingCounted
accountingRemovedUncounted
accountingRemovedCounted
)
// deletedInstancePCNode is a unique sentinel used to mark that a bucket has been removed.
// Traversals must stop on this exact pointer instead of treating it like an empty bucket:
// Delete sets head.deleted before swapping the sentinel into head.next, so concurrent Get/Put
// can reliably tell "deleted bucket" from "bucket with no cached plans" without ABA reuse.
var deletedInstancePCNode = &instancePCNode{}
// instancePlanCache is a lock-free implementation of InstancePlanCache interface.
// [key1] --> [headNode1] --> [node1] --> [node2] --> [node3]
// [key2] --> [headNode2] --> [node4] --> [node5]
// [key3] --> [headNode3] --> [node6] --> [node7] --> [node8]
// headNode.value is always empty, headNode is designed to make it easier to implement.
type instancePlanCache struct {
heads sync.Map
totCost atomic.Int64
totPlan atomic.Int64
evictMutex sync.Mutex
inEvict atomic.Bool
softMemLimit atomic.Int64
hardMemLimit atomic.Int64
}
func (pc *instancePlanCache) getHead(key string, create bool) *instancePCNode {
for {
headNode, ok := pc.heads.Load(key)
if ok { // cache hit
head := headNode.(*instancePCNode)
if !head.deleted.Load() {
return head
}
if !create {
return nil
}
pc.heads.CompareAndDelete(key, headNode)
continue
}
if !create { // cache miss
return nil
}
newHeadNode := pc.createNode(nil)
actual, loaded := pc.heads.LoadOrStore(key, newHeadNode)
if !loaded {
return newHeadNode
}
if headNode, ok := actual.(*instancePCNode); ok && !headNode.deleted.Load() { // for safety
return headNode
}
pc.heads.CompareAndDelete(key, actual)
}
}
// Get gets the cached value according to key and paramTypes.
func (pc *instancePlanCache) Get(key string, paramTypes any) (value any, ok bool) {
headNode := pc.getHead(key, false)
if headNode == nil || headNode.deleted.Load() { // cache miss
return nil, false
}
return pc.getPlanFromList(headNode, paramTypes)
}
func (pc *instancePlanCache) getPlanFromList(headNode *instancePCNode, paramTypes any) (any, bool) {
for node := headNode.next.Load(); node != nil && node != deletedInstancePCNode; node = node.next.Load() {
if checkTypesCompatibility4PC(node.value.ParamTypes, paramTypes) { // v.Plan is read-only, no need to lock
if !pc.inEvict.Load() {
node.lastUsed.Store(time.Now()) // atomically update the lastUsed field
}
return node.value, true
}
}
return nil, false
}
// Put puts the key and values into the cache.
// Due to some thread-safety issues, this Put operation might fail, use the returned succ to indicate it.
func (pc *instancePlanCache) Put(key string, value, paramTypes any) (succ bool) {
if pc.inEvict.Load() {
return // do nothing if eviction is in progress
}
vMem := value.(*PlanCacheValue).MemoryUsage()
if vMem+pc.totCost.Load() > pc.hardMemLimit.Load() {
return // do nothing if it exceeds the hard limit
}
headNode := pc.getHead(key, true)
if headNode == nil || headNode.deleted.Load() {
return false // for safety
}
if _, ok := pc.getPlanFromList(headNode, paramTypes); ok {
return // some other thread has inserted the same plan before
}
if pc.inEvict.Load() || headNode.deleted.Load() {
return // do nothing if eviction is in progress
}
firstNode := headNode.next.Load()
if firstNode == deletedInstancePCNode {
return
}
currNode := pc.createNode(value)
currNode.next.Store(firstNode)
if headNode.next.CompareAndSwap(firstNode, currNode) { // if failed, some other thread has updated this node,
if !currNode.accountingState.CompareAndSwap(accountingNotCounted, accountingInProgress) {
return
}
failpoint.InjectCall("instancePlanCachePutAccountingPause")
pc.totCost.Add(vMem) // then skip this Put and wait for the next time.
pc.totPlan.Add(1)
currNode.accountingState.Store(accountingCounted)
succ = true
}
return
}
// Delete removes all cached values under the exact cache key.
func (pc *instancePlanCache) Delete(key string) (numDeleted int) {
pc.evictMutex.Lock() // serialize against Evict and key deletion for safety
defer pc.evictMutex.Unlock()
pc.inEvict.Store(true)
defer pc.inEvict.Store(false)
headNode := pc.getHead(key, false)
if headNode == nil {
return 0
}
headNode.deleted.Store(true)
firstNode := headNode.next.Swap(deletedInstancePCNode)
pc.heads.Delete(key)
for node := firstNode; node != nil && node != deletedInstancePCNode; node = node.next.Load() {
numDeleted++
done := false
for !done {
switch node.accountingState.Load() {
case accountingNotCounted:
if node.accountingState.CompareAndSwap(accountingNotCounted, accountingRemovedUncounted) {
done = true
}
case accountingInProgress:
runtime.Gosched()
case accountingCounted:
if node.accountingState.CompareAndSwap(accountingCounted, accountingRemovedCounted) {
pc.totCost.Sub(node.value.MemoryUsage())
pc.totPlan.Sub(1)
done = true
}
case accountingRemovedUncounted, accountingRemovedCounted:
done = true
}
}
}
return
}
// All returns all cached values.
// All returned values are read-only, don't modify them.
func (pc *instancePlanCache) All() (values []any) {
values = make([]any, 0, pc.Size())
pc.foreach(func(_, this *instancePCNode) bool {
values = append(values, this.value)
return false
})
return
}
// Evict evicts some values. There should be a background thread to perform the eviction.
// step 1: iterate all values to collect their last_used
// step 2: estimate an eviction threshold time based on all last_used values
// step 3: iterate all values again and evict qualified values
func (pc *instancePlanCache) Evict(evictAll bool) (detailInfo string, numEvicted int) {
pc.evictMutex.Lock() // make sure only one thread to trigger eviction for safety
defer pc.evictMutex.Unlock()
pc.inEvict.Store(true)
defer pc.inEvict.Store(false)
currentTot, softLimit := pc.totCost.Load(), pc.softMemLimit.Load()
if !evictAll && currentTot < softLimit {
detailInfo = fmt.Sprintf("memory usage is below the soft limit, currentTot: %v, softLimit: %v", currentTot, softLimit)
return
}
lastUsedTimes := make([]time.Time, 0, 64)
pc.foreach(func(_, this *instancePCNode) bool { // step 1
lastUsedTimes = append(lastUsedTimes, this.lastUsed.Load())
return false
})
var threshold time.Time
if evictAll {
threshold = time.Now().Add(time.Hour * 24) // a future time
} else {
threshold = pc.calcEvictionThreshold(lastUsedTimes) // step 2
}
detailInfo = fmt.Sprintf("evict threshold: %v", threshold)
pc.foreach(func(prev, this *instancePCNode) bool { // step 3
if !this.lastUsed.Load().After(threshold) { // if lastUsed<=threshold, evict this value
if prev.next.CompareAndSwap(this, this.next.Load()) { // have to use CAS since
pc.totCost.Sub(this.value.MemoryUsage()) // it might have been updated by other thread
pc.totPlan.Sub(1)
numEvicted++
return true
}
}
return false
})
// post operation: clear empty heads in pc.Heads
keys, headNodes := pc.headNodes()
for i, headNode := range headNodes {
if headNode.next.Load() == nil {
pc.heads.Delete(keys[i])
}
}
return
}
// MemUsage returns the memory usage of this plan cache.
func (pc *instancePlanCache) MemUsage() int64 {
return pc.totCost.Load()
}
// Size returns the number of plans in this plan cache.
func (pc *instancePlanCache) Size() int64 {
return pc.totPlan.Load()
}
func (pc *instancePlanCache) calcEvictionThreshold(lastUsedTimes []time.Time) (t time.Time) {
if len(lastUsedTimes) == 0 {
return
}
totCost, softMemLimit := pc.totCost.Load(), pc.softMemLimit.Load()
avgPerPlan := totCost / int64(len(lastUsedTimes))
if avgPerPlan <= 0 {
return
}
memToRelease := totCost - softMemLimit
// (... +avgPerPlan-1) is used to try to keep the final memory usage below the soft mem limit.
numToEvict := (memToRelease + avgPerPlan - 1) / avgPerPlan
if numToEvict <= 0 {
return
}
sort.Slice(lastUsedTimes, func(i, j int) bool {
return lastUsedTimes[i].Before(lastUsedTimes[j])
})
if len(lastUsedTimes) < int(numToEvict) {
return // for safety, avoid index-of-range panic
}
return lastUsedTimes[numToEvict-1]
}
func (pc *instancePlanCache) foreach(callback func(prev, this *instancePCNode) (thisRemoved bool)) {
_, headNodes := pc.headNodes()
for _, headNode := range headNodes {
for prev, this := headNode, headNode.next.Load(); this != nil && this != deletedInstancePCNode; {
thisRemoved := callback(prev, this)
if !thisRemoved { // this node is removed, no need to update the prev node in this case
prev, this = this, this.next.Load()
} else {
this = this.next.Load()
}
}
}
}
func (pc *instancePlanCache) headNodes() ([]string, []*instancePCNode) {
keys := make([]string, 0, 64)
headNodes := make([]*instancePCNode, 0, 64)
pc.heads.Range(func(k, v any) bool {
keys = append(keys, k.(string))
headNodes = append(headNodes, v.(*instancePCNode))
return true
})
return keys, headNodes
}
func (*instancePlanCache) createNode(value any) *instancePCNode {
node := new(instancePCNode)
if value != nil {
node.value = value.(*PlanCacheValue)
}
node.lastUsed.Store(time.Now())
return node
}
// GetLimits gets the memory limit of this plan cache.
func (pc *instancePlanCache) GetLimits() (softLimit, hardLimit int64) {
return pc.softMemLimit.Load(), pc.hardMemLimit.Load()
}
// SetLimits sets the memory limit of this plan cache.
func (pc *instancePlanCache) SetLimits(softLimit, hardLimit int64) {
pc.softMemLimit.Store(softLimit)
pc.hardMemLimit.Store(hardLimit)
}