Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 101 additions & 17 deletions pkg/planner/core/plan_cache_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ package core

import (
"fmt"
"runtime"
"sort"
"sync"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/sessionctx"
"go.uber.org/atomic"
Expand All @@ -43,8 +45,30 @@ type instancePCNode struct {
value *PlanCacheValue
lastUsed atomic.Time
next atomic.Pointer[instancePCNode]
// deleted marks a bucket head as logically removed before it is detached from the map.
// It is only meaningful on head nodes and prevents Get/Put from treating a deleted bucket
// as a normal empty bucket that can be reused immediately.
deleted atomic.Bool
// accountingState tracks whether a published node has finished updating totCost/totPlan.
// Delete must distinguish "detached but not counted yet" from "detached and already counted"
// to keep Size/MemUsage correct while racing with Put's publish path.
accountingState atomic.Uint32
}

const (
accountingNotCounted uint32 = iota
accountingInProgress
accountingCounted
accountingRemovedUncounted
accountingRemovedCounted
)

// deletedInstancePCNode is a unique sentinel used to mark that a bucket has been removed.
// Traversals must stop on this exact pointer instead of treating it like an empty bucket:
// Delete sets head.deleted before swapping the sentinel into head.next, so concurrent Get/Put
// can reliably tell "deleted bucket" from "bucket with no cached plans" without ABA reuse.
var deletedInstancePCNode = &instancePCNode{}

// instancePlanCache is a lock-free implementation of InstancePlanCache interface.
// [key1] --> [headNode1] --> [node1] --> [node2] --> [node3]
// [key2] --> [headNode2] --> [node4] --> [node5]
Expand All @@ -62,32 +86,45 @@ type instancePlanCache struct {
}

func (pc *instancePlanCache) getHead(key string, create bool) *instancePCNode {
headNode, ok := pc.heads.Load(key)
if ok { // cache hit
return headNode.(*instancePCNode)
}
if !create { // cache miss
return nil
}
newHeadNode := pc.createNode(nil)
actual, _ := pc.heads.LoadOrStore(key, newHeadNode)
if headNode, ok := actual.(*instancePCNode); ok { // for safety
return headNode
for {
headNode, ok := pc.heads.Load(key)
if ok { // cache hit
head := headNode.(*instancePCNode)
if !head.deleted.Load() {
return head
}
if !create {
return nil
}
pc.heads.CompareAndDelete(key, headNode)
continue
}
if !create { // cache miss
return nil
}
newHeadNode := pc.createNode(nil)
actual, loaded := pc.heads.LoadOrStore(key, newHeadNode)
if !loaded {
return newHeadNode
}
if headNode, ok := actual.(*instancePCNode); ok && !headNode.deleted.Load() { // for safety
return headNode
}
pc.heads.CompareAndDelete(key, actual)
}
return nil
}

// Get gets the cached value according to key and paramTypes.
func (pc *instancePlanCache) Get(key string, paramTypes any) (value any, ok bool) {
headNode := pc.getHead(key, false)
if headNode == nil { // cache miss
if headNode == nil || headNode.deleted.Load() { // cache miss
return nil, false
}
return pc.getPlanFromList(headNode, paramTypes)
}

func (pc *instancePlanCache) getPlanFromList(headNode *instancePCNode, paramTypes any) (any, bool) {
for node := headNode.next.Load(); node != nil; node = node.next.Load() {
for node := headNode.next.Load(); node != nil && node != deletedInstancePCNode; node = node.next.Load() {
if checkTypesCompatibility4PC(node.value.ParamTypes, paramTypes) { // v.Plan is read-only, no need to lock
if !pc.inEvict.Load() {
node.lastUsed.Store(time.Now()) // atomically update the lastUsed field
Expand All @@ -109,27 +146,74 @@ func (pc *instancePlanCache) Put(key string, value, paramTypes any) (succ bool)
return // do nothing if it exceeds the hard limit
}
headNode := pc.getHead(key, true)
if headNode == nil {
if headNode == nil || headNode.deleted.Load() {
return false // for safety
}
if _, ok := pc.getPlanFromList(headNode, paramTypes); ok {
return // some other thread has inserted the same plan before
}
if pc.inEvict.Load() {
if pc.inEvict.Load() || headNode.deleted.Load() {
return // do nothing if eviction is in progress
}

firstNode := headNode.next.Load()
if firstNode == deletedInstancePCNode {
return
}
currNode := pc.createNode(value)
currNode.next.Store(firstNode)
if headNode.next.CompareAndSwap(firstNode, currNode) { // if failed, some other thread has updated this node,
if !currNode.accountingState.CompareAndSwap(accountingNotCounted, accountingInProgress) {
return
}
failpoint.InjectCall("instancePlanCachePutAccountingPause")
pc.totCost.Add(vMem) // then skip this Put and wait for the next time.
pc.totPlan.Add(1)
currNode.accountingState.Store(accountingCounted)
succ = true
}
return
}

// Delete removes all cached values under the exact cache key.
func (pc *instancePlanCache) Delete(key string) (numDeleted int) {
pc.evictMutex.Lock() // serialize against Evict and key deletion for safety
defer pc.evictMutex.Unlock()
pc.inEvict.Store(true)
defer pc.inEvict.Store(false)

headNode := pc.getHead(key, false)
if headNode == nil {
return 0
}
headNode.deleted.Store(true)
firstNode := headNode.next.Swap(deletedInstancePCNode)
pc.heads.Delete(key)
for node := firstNode; node != nil && node != deletedInstancePCNode; node = node.next.Load() {
numDeleted++
done := false
for !done {
switch node.accountingState.Load() {
case accountingNotCounted:
if node.accountingState.CompareAndSwap(accountingNotCounted, accountingRemovedUncounted) {
done = true
}
case accountingInProgress:
runtime.Gosched()
case accountingCounted:
if node.accountingState.CompareAndSwap(accountingCounted, accountingRemovedCounted) {
pc.totCost.Sub(node.value.MemoryUsage())
pc.totPlan.Sub(1)
done = true
}
case accountingRemovedUncounted, accountingRemovedCounted:
done = true
}
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return
}

// All returns all cached values.
// All returned values are read-only, don't modify them.
func (pc *instancePlanCache) All() (values []any) {
Expand Down Expand Up @@ -226,7 +310,7 @@ func (pc *instancePlanCache) calcEvictionThreshold(lastUsedTimes []time.Time) (t
func (pc *instancePlanCache) foreach(callback func(prev, this *instancePCNode) (thisRemoved bool)) {
_, headNodes := pc.headNodes()
for _, headNode := range headNodes {
for prev, this := headNode, headNode.next.Load(); this != nil; {
for prev, this := headNode, headNode.next.Load(); this != nil && this != deletedInstancePCNode; {
thisRemoved := callback(prev, this)
if !thisRemoved { // this node is removed, no need to update the prev node in this case
prev, this = this, this.next.Load()
Expand Down
90 changes: 90 additions & 0 deletions pkg/planner/core/plan_cache_instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/planner/util/coretestsdk"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/types"
"github.com/stretchr/testify/require"
)

Expand All @@ -44,6 +47,11 @@ func _miss(t *testing.T, pc sessionctx.InstancePlanCache, testKey, statsHash int
require.False(t, ok)
}

func _putWithParamTypes(pc sessionctx.InstancePlanCache, key string, memUsage int64, paramTypes []*types.FieldType) (succ bool) {
v := &PlanCacheValue{Memory: memUsage, ParamTypes: paramTypes}
return pc.Put(key, v, paramTypes)
}

func TestInstancePlanCacheBasic(t *testing.T) {
sctx := coretestsdk.MockContext()
defer func() {
Expand Down Expand Up @@ -75,6 +83,66 @@ func TestInstancePlanCacheBasic(t *testing.T) {
_put(pc, 1, 101, 0)
require.Equal(t, pc.MemUsage(), int64(100)) // the second one will be ignored

t.Run("delete waits for published put accounting", func(t *testing.T) {
pc = NewInstancePlanCache(1000, 1000)
published := make(chan struct{}, 1)
resume := make(chan struct{})
fpName := "github.com/pingcap/tidb/pkg/planner/core/instancePlanCachePutAccountingPause"
var once sync.Once
require.NoError(t, failpoint.EnableCall(fpName, func() {
once.Do(func() {
published <- struct{}{}
<-resume
})
}))
defer func() {
require.NoError(t, failpoint.Disable(fpName))
}()

putDone := make(chan bool, 1)
go func() {
putDone <- _put(pc, 1, 100, 0)
}()

select {
case <-published:
case <-time.After(time.Second):
t.Fatal("put did not reach the accounting pause")
}

deleteDone := make(chan int, 1)
go func() {
deleteDone <- pc.Delete("1-0")
}()

select {
case numDeleted := <-deleteDone:
t.Fatalf("delete finished too early with %d deleted entries", numDeleted)
case <-time.After(50 * time.Millisecond):
}

close(resume)
require.True(t, <-putDone)
require.Equal(t, 1, <-deleteDone)
require.Equal(t, int64(0), pc.MemUsage())
require.Equal(t, int64(0), pc.Size())
_miss(t, pc, 1, 0)
})

// delete an exact key
pc = NewInstancePlanCache(1000, 1000)
_put(pc, 1, 100, 0)
_put(pc, 2, 100, 0)
_put(pc, 3, 100, 0)
numDeleted := pc.Delete("2-0")
require.Equal(t, 1, numDeleted)
require.Equal(t, int64(200), pc.MemUsage())
require.Equal(t, int64(2), pc.Size())
_hit(t, pc, 1, 0)
_miss(t, pc, 2, 0)
_hit(t, pc, 3, 0)
require.Equal(t, 0, pc.Delete("not-exist"))

// eviction
pc = NewInstancePlanCache(320, 500)
_put(pc, 1, 100, 0)
Expand Down Expand Up @@ -158,6 +226,28 @@ func TestInstancePlanCacheWithMatchOpts(t *testing.T) {
_miss(t, pc, 3, 2)
_miss(t, pc, 3, 3)

// same exact key with different param types should be deleted together
pc = NewInstancePlanCache(1000, 1000)
key := "shared-key"
paramTypes1 := []*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}
paramTypes2 := []*types.FieldType{types.NewFieldType(mysql.TypeDouble)}
require.True(t, _putWithParamTypes(pc, key, 100, paramTypes1))
require.True(t, _putWithParamTypes(pc, key, 100, paramTypes2))
_put(pc, 2, 100, 1)
_, ok := pc.Get(key, paramTypes1)
require.True(t, ok)
_, ok = pc.Get(key, paramTypes2)
require.True(t, ok)
numDeleted := pc.Delete(key)
require.Equal(t, 2, numDeleted)
require.Equal(t, int64(100), pc.MemUsage())
require.Equal(t, int64(1), pc.Size())
_, ok = pc.Get(key, paramTypes1)
require.False(t, ok)
_, ok = pc.Get(key, paramTypes2)
require.False(t, ok)
_hit(t, pc, 2, 1)

// hard limit can take effect in this case
pc = NewInstancePlanCache(200, 200)
_put(pc, 1, 100, 1)
Expand Down
2 changes: 2 additions & 0 deletions pkg/sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type InstancePlanCache interface {
Get(key string, paramTypes any) (value any, ok bool)
// Put puts the key and value into the cache.
Put(key string, value, paramTypes any) (succ bool)
// Delete removes all cached values under the exact cache key.
Delete(key string) (numDeleted int)
// All returns all cached values.
// Returned values are read-only, don't modify them.
All() (values []any)
Expand Down
Loading