-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcache.go
More file actions
578 lines (494 loc) · 20 KB
/
cache.go
File metadata and controls
578 lines (494 loc) · 20 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
// Package daramjwee contains the core implementation of the Cache interface.
package daramjwee
import (
"context"
"errors"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/mrchypark/daramjwee/internal/worker"
)
var ErrCacheClosed = errors.New("daramjwee: cache is closed")
var ErrNilMetadata = errors.New("daramjwee: nil metadata encountered")
// DaramjweeCache is a concrete implementation of the Cache interface.
type DaramjweeCache struct {
HotStore Store
ColdStore Store // Optional
Logger log.Logger
Worker *worker.Manager
DefaultTimeout time.Duration
ShutdownTimeout time.Duration
PositiveFreshFor time.Duration
NegativeFreshFor time.Duration
ColdStorePositiveFreshFor time.Duration
ColdStoreNegativeFreshFor time.Duration
isClosed atomic.Bool
}
var _ Cache = (*DaramjweeCache)(nil)
// Get retrieves data based on the requested caching strategy.
// It first checks the hot cache, then the cold cache, and finally fetches from the origin.
func (c *DaramjweeCache) Get(ctx context.Context, key string, fetcher Fetcher) (io.ReadCloser, error) {
if c.isClosed.Load() {
return nil, ErrCacheClosed
}
ctx, cancel := c.newCtxWithTimeout(ctx)
defer cancel()
// 1. Check Hot Cache
hotStream, hotMeta, err := c.getStreamFromStore(ctx, c.HotStore, key)
if err == nil {
return c.handleHotHit(ctx, key, fetcher, hotStream, hotMeta)
}
if !errors.Is(err, ErrNotFound) {
level.Error(c.Logger).Log("msg", "hot store get failed", "key", key, "err", err)
}
// 2. Check Cold Cache
coldStream, coldMeta, err := c.getStreamFromStore(ctx, c.ColdStore, key)
if err == nil && coldMeta == nil {
err = ErrNilMetadata
if coldStream != nil {
coldStream.Close()
}
}
if err == nil {
return c.handleColdHit(ctx, key, coldStream, coldMeta)
}
if !errors.Is(err, ErrNotFound) {
level.Error(c.Logger).Log("msg", "cold store get failed", "key", key, "err", err)
}
// 3. Fetch from Origin
return c.handleMiss(ctx, key, fetcher)
}
// Set returns a WriteCloser to directly write data to the cache.
// The data is written to the hot store.
func (c *DaramjweeCache) Set(ctx context.Context, key string, metadata *Metadata) (io.WriteCloser, error) {
if c.isClosed.Load() {
return nil, ErrCacheClosed
}
if c.HotStore == nil {
return nil, &ConfigError{"hotStore is not configured"}
}
ctx, cancel := c.newCtxWithTimeout(ctx)
if metadata == nil {
metadata = &Metadata{}
}
metadata.CachedAt = time.Now()
wc, err := c.setStreamToStore(ctx, c.HotStore, key, metadata)
if err != nil {
cancel()
return nil, err
}
return newCancelWriteCloser(wc, cancel), nil
}
// Delete sequentially deletes an object from all cache tiers to prevent deadlocks.
// It attempts to delete from the hot store first, then the cold store.
func (c *DaramjweeCache) Delete(ctx context.Context, key string) error {
if c.isClosed.Load() {
return ErrCacheClosed
}
ctx, cancel := c.newCtxWithTimeout(ctx)
defer cancel()
var firstErr error
// 1. Always delete from Hot Store first.
if c.HotStore != nil {
if err := c.deleteFromStore(ctx, c.HotStore, key); err != nil && !errors.Is(err, ErrNotFound) {
level.Error(c.Logger).Log("msg", "failed to delete from hot store", "key", key, "err", err)
if firstErr == nil {
firstErr = err
}
}
}
// 2. Then delete from Cold Store.
if err := c.deleteFromStore(ctx, c.ColdStore, key); err != nil && !errors.Is(err, ErrNotFound) {
level.Error(c.Logger).Log("msg", "failed to delete from cold store", "key", key, "err", err)
if firstErr == nil {
firstErr = err
}
}
return firstErr
}
// ScheduleRefresh submits a background cache refresh job to the worker.
func (c *DaramjweeCache) ScheduleRefresh(ctx context.Context, key string, fetcher Fetcher) error {
if c.isClosed.Load() {
return ErrCacheClosed
}
if c.Worker == nil {
return errors.New("worker is not configured, cannot schedule refresh")
}
job := func(jobCtx context.Context) {
level.Info(c.Logger).Log("msg", "starting background refresh", "key", key)
var oldMetadata *Metadata
if meta, err := c.statFromStore(jobCtx, c.HotStore, key); err == nil && meta != nil {
oldMetadata = meta
}
result, err := fetcher.Fetch(jobCtx, oldMetadata)
if err != nil {
if errors.Is(err, ErrCacheableNotFound) {
level.Debug(c.Logger).Log("msg", "re-caching as negative entry during background refresh", "key", key)
c.handleNegativeCache(jobCtx, key)
} else if errors.Is(err, ErrNotModified) {
level.Debug(c.Logger).Log("msg", "background refresh: object not modified", "key", key)
} else {
level.Error(c.Logger).Log("msg", "background fetch failed", "key", key, "err", err)
}
return
}
defer result.Body.Close()
if result.Metadata == nil {
result.Metadata = &Metadata{}
}
result.Metadata.CachedAt = time.Now()
writer, err := c.setStreamToStore(jobCtx, c.HotStore, key, result.Metadata)
if err != nil {
level.Error(c.Logger).Log("msg", "failed to get cache writer for refresh", "key", key, "err", err)
return
}
_, copyErr := io.Copy(writer, result.Body)
closeErr := writer.Close()
if copyErr != nil || closeErr != nil {
level.Error(c.Logger).Log("msg", "failed background set", "key", key, "copyErr", copyErr, "closeErr", closeErr)
} else {
level.Info(c.Logger).Log("msg", "background set successful", "key", key)
// WARN: if oldMetadata's data is changed in Fetcher, we need to be careful about stale data
isStale := c.isColdStoreCachedStale(oldMetadata)
if isStale {
// schedule background copy to cold store
c.scheduleSetToStore(context.Background(), c.ColdStore, key)
}
}
}
c.Worker.Submit(job)
return nil
}
func (c *DaramjweeCache) isColdStoreCachedStale(oldMeta *Metadata) bool {
if oldMeta == nil {
return true
}
freshnessLifetime := c.ColdStorePositiveFreshFor
if oldMeta.IsNegative {
freshnessLifetime = c.ColdStoreNegativeFreshFor
}
return time.Now().After(oldMeta.CachedAt.Add(freshnessLifetime))
}
// Close safely shuts down the worker.
func (c *DaramjweeCache) Close() {
if c.isClosed.Swap(true) {
// Already closed, do nothing (prevent duplicate calls)
return
}
if c.Worker != nil {
level.Info(c.Logger).Log("msg", "shutting down daramjwee cache")
if err := c.Worker.Shutdown(c.ShutdownTimeout); err != nil {
level.Error(c.Logger).Log("msg", "graceful shutdown failed", "err", err)
} else {
level.Info(c.Logger).Log("msg", "daramjwee cache shutdown complete")
}
}
}
// handleHotHit processes the logic when an object is found in the hot cache.
func (c *DaramjweeCache) handleHotHit(_ context.Context, key string, fetcher Fetcher, hotStream io.ReadCloser, meta *Metadata) (io.ReadCloser, error) {
level.Debug(c.Logger).Log("msg", "hot cache hit", "key", key)
var isStale bool
// Calculate expiration using metadata and FreshFor duration
if meta.IsNegative {
freshnessLifetime := c.NegativeFreshFor
if freshnessLifetime == 0 || (freshnessLifetime > 0 && time.Now().After(meta.CachedAt.Add(freshnessLifetime))) {
isStale = true
}
} else {
freshnessLifetime := c.PositiveFreshFor
if freshnessLifetime == 0 || (freshnessLifetime > 0 && time.Now().After(meta.CachedAt.Add(freshnessLifetime))) {
isStale = true
}
}
hotStreamCloser := newSafeCloser(hotStream, func() {})
if isStale {
level.Debug(c.Logger).Log("msg", "hot cache is stale, scheduling refresh", "key", key)
hotStreamCloser = newSafeCloser(hotStream, func() { c.ScheduleRefresh(context.Background(), key, fetcher) })
}
if meta.IsNegative {
hotStreamCloser.Close()
return nil, ErrNotFound
}
return hotStreamCloser, nil
}
// handleColdHit processes the logic when an object is found in the cold cache.
func (c *DaramjweeCache) handleColdHit(ctx context.Context, key string, coldStream io.ReadCloser, coldMeta *Metadata) (io.ReadCloser, error) {
level.Debug(c.Logger).Log("msg", "cold cache hit, promoting to hot", "key", key)
// Create a copy of the metadata to promote to the hot cache.
// This prevents data races if multiple goroutines handle Cold Hit concurrently
// by not modifying the original coldMeta object directly.
metaToPromote := &Metadata{}
if coldMeta != nil {
// Copy values from existing metadata.
*metaToPromote = *coldMeta
}
// Write-then-read approach: First save to hot cache completely
writer, err := c.setStreamToStore(ctx, c.HotStore, key, metaToPromote)
if err != nil {
level.Error(c.Logger).Log("msg", "failed to get hot store writer for promotion", "key", key, "err", err)
return coldStream, nil // Return original cold stream if promotion fails
}
// Copy the entire stream from cold to hot cache
_, copyErr := io.Copy(writer, coldStream)
closeErr := writer.Close()
coldStreamCloseErr := coldStream.Close()
if copyErr != nil || closeErr != nil {
level.Error(c.Logger).Log("msg", "failed to promote to hot cache", "key", key, "copyErr", copyErr, "closeErr", closeErr, "coldStreamCloseErr", coldStreamCloseErr)
// It's safer to delete the key from the hot store to avoid leaving a partial/corrupt object.
if delErr := c.deleteFromStore(context.Background(), c.HotStore, key); delErr != nil && !errors.Is(delErr, ErrNotFound) {
level.Warn(c.Logger).Log("msg", "failed to clean up partially promoted key from hot cache", "key", key, "err", delErr)
}
// Since we already closed coldStream, we need to read from cold again
coldStream, _, err := c.getStreamFromStore(ctx, c.ColdStore, key)
if err != nil {
level.Error(c.Logger).Log("msg", "failed to re-read from cold cache after promotion failure", "key", key, "err", err)
return nil, err
}
return coldStream, nil
}
if coldStreamCloseErr != nil {
level.Warn(c.Logger).Log("msg", "closing cold stream after promotion failed", "key", key, "err", coldStreamCloseErr)
}
// Now read from hot cache and return to user
hotStream, _, err := c.getStreamFromStore(ctx, c.HotStore, key)
if err != nil {
level.Error(c.Logger).Log("msg", "failed to read from hot cache after promotion", "key", key, "err", err)
// Fallback to reading from cold cache again to maintain availability
level.Info(c.Logger).Log("msg", "falling back to cold cache after read-after-write failure", "key", key)
coldStream, _, fallbackErr := c.getStreamFromStore(ctx, c.ColdStore, key)
if fallbackErr != nil {
level.Error(c.Logger).Log("msg", "fallback to cold cache failed", "key", key, "originalErr", err, "fallbackErr", fallbackErr)
return nil, fmt.Errorf("failed to read from hot cache after promotion: %w; fallback to cold cache also failed: %v", err, fallbackErr)
}
return coldStream, nil
}
return hotStream, nil
}
// handleMiss processes the logic when an object is not found in either hot or cold cache.
func (c *DaramjweeCache) handleMiss(ctx context.Context, key string, fetcher Fetcher) (io.ReadCloser, error) {
level.Debug(c.Logger).Log("msg", "full cache miss, fetching from origin", "key", key)
var oldMetadata *Metadata
if meta, err := c.statFromStore(ctx, c.HotStore, key); err == nil {
oldMetadata = meta
}
result, err := fetcher.Fetch(ctx, oldMetadata)
if err != nil {
if errors.Is(err, ErrCacheableNotFound) {
return c.handleNegativeCache(ctx, key)
}
if errors.Is(err, ErrNotModified) {
level.Debug(c.Logger).Log("msg", "object not modified, serving from hot cache again", "key", key)
stream, meta, err := c.getStreamFromStore(ctx, c.HotStore, key)
if err != nil {
level.Warn(c.Logger).Log("msg", "failed to refetch from hot cache after 304", "key", key, "err", err)
return nil, ErrNotFound
}
if meta.IsNegative {
stream.Close()
return nil, ErrNotFound
}
return stream, nil
}
return nil, err
}
if result.Metadata == nil {
result.Metadata = &Metadata{}
}
result.Metadata.CachedAt = time.Now()
// Write-then-read approach: First save to hot cache completely
writer, err := c.setStreamToStore(ctx, c.HotStore, key, result.Metadata)
if err != nil {
level.Error(c.Logger).Log("msg", "failed to get cache writer", "key", key, "err", err)
return result.Body, nil // Return original stream if cache write fails
}
// Copy the entire stream to hot cache
_, copyErr := io.Copy(writer, result.Body)
closeErr := writer.Close()
bodyCloseErr := result.Body.Close()
if copyErr != nil || closeErr != nil {
level.Error(c.Logger).Log("msg", "failed to write to hot cache", "key", key, "copyErr", copyErr, "closeErr", closeErr, "bodyCloseErr", bodyCloseErr)
// It's safer to delete the key from the hot store to avoid leaving a partial/corrupt object.
if delErr := c.deleteFromStore(context.Background(), c.HotStore, key); delErr != nil && !errors.Is(delErr, ErrNotFound) {
level.Warn(c.Logger).Log("msg", "failed to clean up partially written key from hot cache", "key", key, "err", delErr)
}
// Since we already closed result.Body, we need to return an error that preserves context.
var writeErr error
if copyErr != nil {
writeErr = copyErr
} else {
writeErr = closeErr
}
return nil, fmt.Errorf("cache write failed and original stream consumed: %w", writeErr)
}
if bodyCloseErr != nil {
level.Warn(c.Logger).Log("msg", "closing result body after caching failed", "key", key, "err", bodyCloseErr)
}
// Schedule background copy to cold store
c.scheduleSetToStore(context.Background(), c.ColdStore, key)
// Now read from hot cache and return to user
hotStream, _, err := c.getStreamFromStore(ctx, c.HotStore, key)
if err != nil {
level.Error(c.Logger).Log("msg", "failed to read from hot cache after write", "key", key, "err", err)
// Fallback to fetching from origin again, as the original stream is consumed.
// This maintains availability for the client at the cost of a second fetch.
level.Info(c.Logger).Log("msg", "refetching from origin after read-after-write failure", "key", key)
result, fetchErr := fetcher.Fetch(ctx, nil) // Pass nil for oldMetadata as cache state is uncertain.
if fetchErr != nil {
level.Error(c.Logger).Log("msg", "fallback fetch failed", "key", key, "originalErr", err, "fallbackErr", fetchErr)
return nil, fmt.Errorf("failed to read from hot cache after write: %w; fallback fetch also failed: %v", err, fetchErr)
}
// Do not attempt to cache this second result to avoid potential loops.
return result.Body, nil
}
return hotStream, nil
}
// handleNegativeCache processes the logic for storing a negative cache entry.
func (c *DaramjweeCache) handleNegativeCache(ctx context.Context, key string) (io.ReadCloser, error) {
level.Debug(c.Logger).Log("msg", "caching as negative entry", "key", key, "NegativeFreshFor", c.NegativeFreshFor)
meta := &Metadata{
IsNegative: true,
CachedAt: time.Now(),
}
writer, err := c.setStreamToStore(ctx, c.HotStore, key, meta)
if err != nil {
level.Warn(c.Logger).Log("msg", "failed to get writer for negative cache entry", "key", key, "err", err)
} else {
if closeErr := writer.Close(); closeErr != nil {
level.Warn(c.Logger).Log("msg", "failed to close writer for negative cache entry", "key", key, "err", closeErr)
}
}
return nil, ErrNotFound
}
// newCtxWithTimeout applies the default timeout to the context if no deadline is set.
func (c *DaramjweeCache) newCtxWithTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
if _, ok := ctx.Deadline(); ok {
return ctx, func() {}
}
return context.WithTimeout(ctx, c.DefaultTimeout)
}
// getStreamFromStore is a wrapper that calls the Store interface's GetStream method.
func (c *DaramjweeCache) getStreamFromStore(ctx context.Context, store Store, key string) (io.ReadCloser, *Metadata, error) {
return store.GetStream(ctx, key)
}
// setStreamToStore is a wrapper that calls the Store interface's SetWithWriter method.
func (c *DaramjweeCache) setStreamToStore(ctx context.Context, store Store, key string, metadata *Metadata) (io.WriteCloser, error) {
return store.SetWithWriter(ctx, key, metadata)
}
// deleteFromStore is a wrapper that calls the Store interface's Delete method.
func (c *DaramjweeCache) deleteFromStore(ctx context.Context, store Store, key string) error {
return store.Delete(ctx, key)
}
// statFromStore is a wrapper that calls the Store interface's Stat method.
func (c *DaramjweeCache) statFromStore(ctx context.Context, store Store, key string) (*Metadata, error) {
return store.Stat(ctx, key)
}
// scheduleSetToStore schedules an asynchronous copy of the hot cache content to the cold cache.
func (c *DaramjweeCache) scheduleSetToStore(_ context.Context, destStore Store, key string) {
if c.Worker == nil {
level.Warn(c.Logger).Log("msg", "worker is not configured, cannot schedule set", "key", key)
return
}
job := func(jobCtx context.Context) {
level.Info(c.Logger).Log("msg", "starting background set", "key", key, "dest", "cold")
srcStream, meta, err := c.getStreamFromStore(jobCtx, c.HotStore, key)
if err != nil {
level.Error(c.Logger).Log("msg", "failed to get stream from hot store for background set", "key", key, "err", err)
return
}
defer srcStream.Close()
destWriter, err := c.setStreamToStore(jobCtx, destStore, key, meta)
if err != nil {
level.Error(c.Logger).Log("msg", "failed to get writer for dest store for background set", "key", key, "err", err)
return
}
_, copyErr := io.Copy(destWriter, srcStream)
closeErr := destWriter.Close()
if copyErr != nil || closeErr != nil {
level.Error(c.Logger).Log("msg", "failed background set", "key", key, "copyErr", copyErr, "closeErr", closeErr)
} else {
level.Info(c.Logger).Log("msg", "background set successful", "key", key, "dest", "cold")
}
}
c.Worker.Submit(job)
}
// cancelWriteCloser cancels the context when the WriteCloser is closed.
type cancelWriteCloser struct {
io.WriteCloser
cancel context.CancelFunc
}
func newCancelWriteCloser(wc io.WriteCloser, cancel context.CancelFunc) io.WriteCloser {
return &cancelWriteCloser{WriteCloser: wc, cancel: cancel}
}
func (cwc *cancelWriteCloser) Close() error {
defer cwc.cancel()
return cwc.WriteCloser.Close()
}
// safeCloser wraps an io.ReadCloser and executes a callback function upon Close.
// It automatically closes when EOF is reached and prevents duplicate closes using sync.Once.
type safeCloser struct {
io.ReadCloser
callback func()
closeOnce sync.Once
closeErr error
}
// newSafeCloser creates a new ReadCloser that executes a callback function
// after the underlying ReadCloser is closed, with automatic EOF detection and safe duplicate close handling.
func newSafeCloser(rc io.ReadCloser, cb func()) *safeCloser {
return &safeCloser{
ReadCloser: rc,
callback: cb,
}
}
// Read reads from the underlying ReadCloser and automatically closes when EOF is reached.
func (c *safeCloser) Read(p []byte) (n int, err error) {
n, err = c.ReadCloser.Read(p)
if err == io.EOF {
c.Close() // 자동으로 닫기
}
return n, err
}
// Close closes the underlying ReadCloser and executes the callback function.
// It uses sync.Once to ensure the close operation and callback are executed only once.
func (c *safeCloser) Close() error {
c.closeOnce.Do(func() {
defer c.callback()
c.closeErr = c.ReadCloser.Close()
})
return c.closeErr
}
// ReadAll reads all data from the safeCloser and returns it as a byte slice.
// It leverages the safeCloser's automatic EOF handling and close callback execution.
// Unlike io.ReadAll, this method benefits from the automatic resource cleanup
// provided by safeCloser when EOF is reached.
func (c *safeCloser) ReadAll() ([]byte, error) {
// Use a reasonable initial buffer size to minimize allocations
buf := make([]byte, 0, 512)
readBuf := make([]byte, 512)
for {
n, err := c.Read(readBuf)
if n > 0 {
buf = append(buf, readBuf[:n]...)
}
if err == io.EOF {
// safeCloser automatically closes on EOF, so we're done
return buf, nil
}
if err != nil {
// For any other error, manually close and return the error
c.Close()
return buf, err
}
}
}
// ReadAll attempts to use safeCloser.ReadAll() if possible, otherwise falls back to io.ReadAll.
// This helper function allows seamless usage regardless of the underlying ReadCloser type.
func ReadAll(rc io.ReadCloser) ([]byte, error) {
if sc, ok := rc.(*safeCloser); ok {
return sc.ReadAll()
}
return io.ReadAll(rc)
}