-
Notifications
You must be signed in to change notification settings - Fork 389
/
Copy pathcache.go
205 lines (177 loc) · 5.58 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
// Copyright 2017 Google LLC. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package cacheqm contains a caching quota.Manager implementation.
package cacheqm
import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/google/trillian/quota"
"k8s.io/klog/v2"
)
const (
// DefaultMinBatchSize is the suggested default for minBatchSize.
DefaultMinBatchSize = 100
// DefaultMaxCacheEntries is the suggested default for maxEntries.
DefaultMaxCacheEntries = 1000
)
// now is used in place of time.Now to allow tests to take control of time.
var now = time.Now
type manager struct {
quota.Manager
minBatchSize, maxEntries int
// mu guards cache
mu sync.Mutex
cache map[quota.Spec]*bucket
// evictWg tracks evict() goroutines.
evictWg sync.WaitGroup
}
type bucket struct {
tokens int
lastModified time.Time
}
// NewCachedManager wraps a quota.Manager with an implementation that caches tokens locally.
//
// minBatchSize determines the minimum number of tokens requested from qm for each GetTokens()
// request.
//
// maxEntries determines the maximum number of cache entries, apart from global quotas. The oldest
// entries are evicted as necessary, their tokens replenished via PutTokens() to avoid excessive
// leakage.
func NewCachedManager(qm quota.Manager, minBatchSize, maxEntries int) (quota.Manager, error) {
switch {
case minBatchSize <= 0:
return nil, fmt.Errorf("invalid minBatchSize: %v", minBatchSize)
case maxEntries <= 0:
return nil, fmt.Errorf("invalid maxEntries: %v", minBatchSize)
}
return &manager{
Manager: qm,
minBatchSize: minBatchSize,
maxEntries: maxEntries,
cache: make(map[quota.Spec]*bucket),
}, nil
}
// GetTokens implements Manager.GetTokens.
func (m *manager) GetTokens(ctx context.Context, numTokens int, specs []quota.Spec) error {
m.mu.Lock()
defer m.mu.Unlock()
// Verify which buckets need more tokens, if any
specsToRefill := []quota.Spec{}
for _, spec := range specs {
bucket, ok := m.cache[spec]
if !ok || bucket.tokens < numTokens {
specsToRefill = append(specsToRefill, spec)
}
}
// Request the required number of tokens and add them to buckets
if len(specsToRefill) != 0 {
defer func() {
// Do not hold GetTokens on eviction, it won't change the result.
m.evictWg.Add(1)
go func() {
m.evict(ctx)
m.evictWg.Done()
}()
}()
// A more accurate count would be numTokens+m.minBatchSize-bucket.tokens, but that might
// force us to make a GetTokens call for each spec. A single call is likely to be more
// efficient.
tokens := numTokens + m.minBatchSize
if err := m.Manager.GetTokens(ctx, tokens, specsToRefill); err != nil {
return err
}
for _, spec := range specsToRefill {
b, ok := m.cache[spec]
if !ok {
b = &bucket{}
m.cache[spec] = b
}
b.tokens += tokens
}
}
// Subtract tokens from cache
lastModified := now()
for _, spec := range specs {
bucket, ok := m.cache[spec]
// Sanity check
if !ok || bucket.tokens < 0 || bucket.tokens < numTokens {
klog.Errorf("Bucket invariants failed for spec %+v: ok = %v, bucket = %+v", spec, ok, bucket)
return nil // Something is wrong with the implementation, let requests go through.
}
bucket.tokens -= numTokens
bucket.lastModified = lastModified
}
return nil
}
func (m *manager) evict(ctx context.Context) {
m.mu.Lock()
// m.mu is explicitly unlocked, so we don't have to hold it while we wait for goroutines to
// complete.
if len(m.cache) <= m.maxEntries {
m.mu.Unlock()
return
}
// Find and evict the oldest entries. To avoid excessive token leakage, let's try and
// replenish the tokens held for the evicted entries.
var buckets bucketsByTime = make([]specBucket, 0, len(m.cache))
for spec, b := range m.cache {
if spec.Group != quota.Global {
buckets = append(buckets, specBucket{bucket: b, spec: spec})
}
}
sort.Sort(buckets)
wg := sync.WaitGroup{}
evicts := len(m.cache) - m.maxEntries
for i := 0; i < evicts; i++ {
b := buckets[i]
klog.V(1).Infof("Too many tokens cached, returning least recently used (%v tokens for %+v)", b.tokens, b.spec)
delete(m.cache, b.spec)
// goroutines must not access the cache, the lock is released before they complete.
wg.Add(1)
go func() {
if err := m.PutTokens(ctx, b.tokens, []quota.Spec{b.spec}); err != nil {
klog.Warningf("Error replenishing tokens from evicted bucket (spec = %+v, bucket = %+v): %v", b.spec, b.bucket, err)
}
wg.Done()
}()
}
m.mu.Unlock()
wg.Wait()
}
// wait waits for spawned goroutines to complete. Used by eviction tests.
func (m *manager) wait() {
m.evictWg.Wait()
}
// specBucket is a bucket with the corresponding spec.
type specBucket struct {
*bucket
spec quota.Spec
}
// bucketsByTime is a sortable slice of specBuckets.
type bucketsByTime []specBucket
// Len provides sort.Interface.Len.
func (b bucketsByTime) Len() int {
return len(b)
}
// Less provides sort.Interface.Less.
func (b bucketsByTime) Less(i, j int) bool {
return b[i].lastModified.Before(b[j].lastModified)
}
// Swap provides sort.Interface.Swap.
func (b bucketsByTime) Swap(i, j int) {
b[i], b[j] = b[j], b[i]
}