Skip to content

Commit 3505243

Browse files
authored
Merge pull request #78 from karlseguin/control_stop
Refactor control messages + Stop handling
2 parents ece93bf + 22776be commit 3505243

File tree

5 files changed

+220
-197
lines changed

5 files changed

+220
-197
lines changed

cache.go

Lines changed: 20 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,13 @@ type gc struct {
3636

3737
type Cache[T any] struct {
3838
*Configuration[T]
39+
control
3940
list *List[*Item[T]]
4041
size int64
4142
buckets []*bucket[T]
4243
bucketMask uint32
4344
deletables chan *Item[T]
4445
promotables chan *Item[T]
45-
control chan interface{}
4646
}
4747

4848
// Create a new cache with the specified configuration
@@ -51,16 +51,18 @@ func New[T any](config *Configuration[T]) *Cache[T] {
5151
c := &Cache[T]{
5252
list: NewList[*Item[T]](),
5353
Configuration: config,
54+
control: newControl(),
5455
bucketMask: uint32(config.buckets) - 1,
5556
buckets: make([]*bucket[T], config.buckets),
56-
control: make(chan interface{}),
57+
deletables: make(chan *Item[T], config.deleteBuffer),
58+
promotables: make(chan *Item[T], config.promoteBuffer),
5759
}
5860
for i := 0; i < config.buckets; i++ {
5961
c.buckets[i] = &bucket[T]{
6062
lookup: make(map[string]*Item[T]),
6163
}
6264
}
63-
c.restart()
65+
go c.worker()
6466
return c
6567
}
6668

@@ -184,94 +186,6 @@ func (c *Cache[T]) Delete(key string) bool {
184186
return false
185187
}
186188

187-
// Clears the cache
188-
// This is a control command.
189-
func (c *Cache[T]) Clear() {
190-
done := make(chan struct{})
191-
c.control <- clear{done: done}
192-
<-done
193-
}
194-
195-
// Stops the background worker. Operations performed on the cache after Stop
196-
// is called are likely to panic
197-
// This is a control command.
198-
func (c *Cache[T]) Stop() {
199-
close(c.promotables)
200-
<-c.control
201-
}
202-
203-
// Gets the number of items removed from the cache due to memory pressure since
204-
// the last time GetDropped was called
205-
// This is a control command.
206-
func (c *Cache[T]) GetDropped() int {
207-
return doGetDropped(c.control)
208-
}
209-
210-
func doGetDropped(controlCh chan<- interface{}) int {
211-
res := make(chan int)
212-
controlCh <- getDropped{res: res}
213-
return <-res
214-
}
215-
216-
// SyncUpdates waits until the cache has finished asynchronous state updates for any operations
217-
// that were done by the current goroutine up to now.
218-
//
219-
// For efficiency, the cache's implementation of LRU behavior is partly managed by a worker
220-
// goroutine that updates its internal data structures asynchronously. This means that the
221-
// cache's state in terms of (for instance) eviction of LRU items is only eventually consistent;
222-
// there is no guarantee that it happens before a Get or Set call has returned. Most of the time
223-
// application code will not care about this, but especially in a test scenario you may want to
224-
// be able to know when the worker has caught up.
225-
//
226-
// This applies only to cache methods that were previously called by the same goroutine that is
227-
// now calling SyncUpdates. If other goroutines are using the cache at the same time, there is
228-
// no way to know whether any of them still have pending state updates when SyncUpdates returns.
229-
// This is a control command.
230-
func (c *Cache[T]) SyncUpdates() {
231-
doSyncUpdates(c.control)
232-
}
233-
234-
func doSyncUpdates(controlCh chan<- interface{}) {
235-
done := make(chan struct{})
236-
controlCh <- syncWorker{done: done}
237-
<-done
238-
}
239-
240-
// Sets a new max size. That can result in a GC being run if the new maxium size
241-
// is smaller than the cached size
242-
// This is a control command.
243-
func (c *Cache[T]) SetMaxSize(size int64) {
244-
done := make(chan struct{})
245-
c.control <- setMaxSize{size: size, done: done}
246-
<-done
247-
}
248-
249-
// Forces GC. There should be no reason to call this function, except from tests
250-
// which require synchronous GC.
251-
// This is a control command.
252-
func (c *Cache[T]) GC() {
253-
done := make(chan struct{})
254-
c.control <- gc{done: done}
255-
<-done
256-
}
257-
258-
// Gets the size of the cache. This is an O(1) call to make, but it is handled
259-
// by the worker goroutine. It's meant to be called periodically for metrics, or
260-
// from tests.
261-
// This is a control command.
262-
func (c *Cache[T]) GetSize() int64 {
263-
res := make(chan int64)
264-
c.control <- getSize{res}
265-
return <-res
266-
}
267-
268-
func (c *Cache[T]) restart() {
269-
c.deletables = make(chan *Item[T], c.deleteBuffer)
270-
c.promotables = make(chan *Item[T], c.promoteBuffer)
271-
c.control = make(chan interface{})
272-
go c.worker()
273-
}
274-
275189
func (c *Cache[T]) deleteItem(bucket *bucket[T], item *Item[T]) {
276190
bucket.delete(item.key) //stop other GETs from getting it
277191
c.deletables <- item
@@ -293,48 +207,48 @@ func (c *Cache[T]) bucket(key string) *bucket[T] {
293207
}
294208

295209
func (c *Cache[T]) worker() {
296-
defer close(c.control)
297210
dropped := 0
211+
cc := c.control
212+
298213
promoteItem := func(item *Item[T]) {
299214
if c.doPromote(item) && c.size > c.maxSize {
300215
dropped += c.gc()
301216
}
302217
}
218+
303219
for {
304220
select {
305-
case item, ok := <-c.promotables:
306-
if ok == false {
307-
goto drain
308-
}
221+
case item := <-c.promotables:
309222
promoteItem(item)
310223
case item := <-c.deletables:
311224
c.doDelete(item)
312-
case control := <-c.control:
225+
case control := <-cc:
313226
switch msg := control.(type) {
314-
case getDropped:
227+
case controlStop:
228+
goto drain
229+
case controlGetDropped:
315230
msg.res <- dropped
316231
dropped = 0
317-
case setMaxSize:
232+
case controlSetMaxSize:
318233
c.maxSize = msg.size
319234
if c.size > c.maxSize {
320235
dropped += c.gc()
321236
}
322237
msg.done <- struct{}{}
323-
case clear:
238+
case controlClear:
324239
for _, bucket := range c.buckets {
325240
bucket.clear()
326241
}
327242
c.size = 0
328243
c.list = NewList[*Item[T]]()
329244
msg.done <- struct{}{}
330-
case getSize:
245+
case controlGetSize:
331246
msg.res <- c.size
332-
case gc:
247+
case controlGC:
333248
dropped += c.gc()
334249
msg.done <- struct{}{}
335-
case syncWorker:
336-
doAllPendingPromotesAndDeletes(c.promotables, promoteItem,
337-
c.deletables, c.doDelete)
250+
case controlSyncUpdates:
251+
doAllPendingPromotesAndDeletes(c.promotables, promoteItem, c.deletables, c.doDelete)
338252
msg.done <- struct{}{}
339253
}
340254
}
@@ -346,7 +260,6 @@ drain:
346260
case item := <-c.deletables:
347261
c.doDelete(item)
348262
default:
349-
close(c.deletables)
350263
return
351264
}
352265
}
@@ -367,9 +280,7 @@ doAllPromotes:
367280
for {
368281
select {
369282
case item := <-promotables:
370-
if item != nil {
371-
promoteFn(item)
372-
}
283+
promoteFn(item)
373284
default:
374285
break doAllPromotes
375286
}

cache_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,30 @@ func Test_CachePrune(t *testing.T) {
337337
}
338338
}
339339

340+
func Test_ConcurrentStop(t *testing.T) {
341+
for i := 0; i < 100; i++ {
342+
cache := New(Configure[string]())
343+
r := func() {
344+
for {
345+
key := strconv.Itoa(int(rand.Int31n(100)))
346+
switch rand.Int31n(3) {
347+
case 0:
348+
cache.Get(key)
349+
case 1:
350+
cache.Set(key, key, time.Minute)
351+
case 2:
352+
cache.Delete(key)
353+
}
354+
}
355+
}
356+
go r()
357+
go r()
358+
go r()
359+
time.Sleep(time.Millisecond * 10)
360+
cache.Stop()
361+
}
362+
}
363+
340364
type SizedItem struct {
341365
id int
342366
s int64

control.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package ccache
2+
3+
type controlGC struct {
4+
done chan struct{}
5+
}
6+
7+
type controlClear struct {
8+
done chan struct{}
9+
}
10+
11+
type controlStop struct {
12+
}
13+
14+
type controlGetSize struct {
15+
res chan int64
16+
}
17+
18+
type controlGetDropped struct {
19+
res chan int
20+
}
21+
22+
type controlSetMaxSize struct {
23+
size int64
24+
done chan struct{}
25+
}
26+
27+
type controlSyncUpdates struct {
28+
done chan struct{}
29+
}
30+
31+
type control chan interface{}
32+
33+
func newControl() chan interface{} {
34+
return make(chan interface{}, 5)
35+
}
36+
37+
// Forces GC. There should be no reason to call this function, except from tests
38+
// which require synchronous GC.
39+
// This is a control command.
40+
func (c control) GC() {
41+
done := make(chan struct{})
42+
c <- controlGC{done: done}
43+
<-done
44+
}
45+
46+
// Sends a stop signal to the worker thread. The worker thread will shut down
47+
// 5 seconds after the last message is received. The cache should not be used
48+
// after Stop is called, but concurrently executing requests should properly finish
49+
// executing.
50+
// This is a control command.
51+
func (c control) Stop() {
52+
c.SyncUpdates()
53+
c <- controlStop{}
54+
}
55+
56+
// Clears the cache
57+
// This is a control command.
58+
func (c control) Clear() {
59+
done := make(chan struct{})
60+
c <- controlClear{done: done}
61+
<-done
62+
}
63+
64+
// Gets the size of the cache. This is an O(1) call to make, but it is handled
65+
// by the worker goroutine. It's meant to be called periodically for metrics, or
66+
// from tests.
67+
// This is a control command.
68+
func (c control) GetSize() int64 {
69+
res := make(chan int64)
70+
c <- controlGetSize{res: res}
71+
return <-res
72+
}
73+
74+
// Gets the number of items removed from the cache due to memory pressure since
75+
// the last time GetDropped was called
76+
// This is a control command.
77+
func (c control) GetDropped() int {
78+
res := make(chan int)
79+
c <- controlGetDropped{res: res}
80+
return <-res
81+
}
82+
83+
// Sets a new max size. That can result in a GC being run if the new maxium size
84+
// is smaller than the cached size
85+
// This is a control command.
86+
func (c control) SetMaxSize(size int64) {
87+
done := make(chan struct{})
88+
c <- controlSetMaxSize{size: size, done: done}
89+
<-done
90+
}
91+
92+
// SyncUpdates waits until the cache has finished asynchronous state updates for any operations
93+
// that were done by the current goroutine up to now.
94+
//
95+
// For efficiency, the cache's implementation of LRU behavior is partly managed by a worker
96+
// goroutine that updates its internal data structures asynchronously. This means that the
97+
// cache's state in terms of (for instance) eviction of LRU items is only eventually consistent;
98+
// there is no guarantee that it happens before a Get or Set call has returned. Most of the time
99+
// application code will not care about this, but especially in a test scenario you may want to
100+
// be able to know when the worker has caught up.
101+
//
102+
// This applies only to cache methods that were previously called by the same goroutine that is
103+
// now calling SyncUpdates. If other goroutines are using the cache at the same time, there is
104+
// no way to know whether any of them still have pending state updates when SyncUpdates returns.
105+
// This is a control command.
106+
func (c control) SyncUpdates() {
107+
done := make(chan struct{})
108+
c <- controlSyncUpdates{done: done}
109+
<-done
110+
}

0 commit comments

Comments
 (0)