Skip to content

Commit 2ecea0f

Browse files
committed
Enable persiting cache to disk
If implemented, the cache will be persisted to disk by calling `Persist`. It will be loaded when instantiating a cache by calling `New` if an existing `path` is provided. Signed-off-by: Soule BA <[email protected]>
1 parent a108669 commit 2ecea0f

File tree

3 files changed

+355
-18
lines changed

3 files changed

+355
-18
lines changed

cache/cache.go

Lines changed: 217 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,13 @@ limitations under the License.
1717
package cache
1818

1919
import (
20+
"bufio"
2021
"cmp"
22+
"encoding/binary"
23+
"encoding/json"
24+
"errors"
25+
"io"
26+
"os"
2127
"slices"
2228
"sort"
2329
"sync"
@@ -40,34 +46,37 @@ type Cache[T any] struct {
4046
// item is an item stored in the cache.
4147
type item[T any] struct {
4248
key string
43-
// object is the item's object.
44-
object T
4549
// expiration is the item's expiration time.
4650
expiration int64
51+
// object is the item's object.
52+
object T
4753
}
4854

4955
type cache[T any] struct {
5056
// index holds the cache index.
5157
index map[string]*item[T]
5258
// items is the store of elements in the cache.
5359
items []*item[T]
54-
// sorted indicates whether the items are sorted by expiration time.
55-
// It is initially true, and set to false when the items are not sorted.
56-
sorted bool
60+
5761
// capacity is the maximum number of index the cache can hold.
5862
capacity int
5963
metrics *cacheMetrics
6064
labelsFunc GetLvsFunc[T]
6165
janitor *janitor[T]
62-
closed bool
66+
path string
67+
buf buffer
68+
// sorted indicates whether the items are sorted by expiration time.
69+
// It is initially true, and set to false when the items are not sorted.
70+
sorted bool
71+
closed bool
6372

6473
mu sync.RWMutex
6574
}
6675

6776
var _ Expirable[any] = &Cache[any]{}
6877

6978
// New creates a new cache with the given configuration.
70-
func New[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T], error) {
79+
func New[T any](capacity int, path string, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T], error) {
7180
opt := storeOptions[T]{}
7281
for _, o := range opts {
7382
err := o(&opt)
@@ -83,6 +92,7 @@ func New[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T]
8392
items: make([]*item[T], 0, capacity),
8493
sorted: true,
8594
capacity: capacity,
95+
path: path,
8696
metrics: newCacheMetrics(opt.registerer, opt.extraLabels...),
8797
labelsFunc: opt.labelsFunc,
8898
janitor: &janitor[T]{
@@ -93,6 +103,16 @@ func New[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T]
93103

94104
C := &Cache[T]{cache: c, keyFunc: keyFunc}
95105

106+
if c.path != "" {
107+
// load the cache from the file if it exists
108+
if _, err := os.Stat(c.path); err == nil {
109+
err = c.load()
110+
if err != nil {
111+
return nil, err
112+
}
113+
}
114+
}
115+
96116
if opt.interval > 0 {
97117
go c.janitor.run(c)
98118
}
@@ -341,8 +361,8 @@ func (c *cache[T]) Resize(size int) int {
341361
}
342362

343363
// delete the overflow indexes
344-
for _, v := range c.items[:overflow] {
345-
delete(c.index, v.key)
364+
for _, item := range c.items[:overflow] {
365+
delete(c.index, item.key)
346366
c.metrics.incCacheEvictions()
347367
c.metrics.decCacheItems()
348368
}
@@ -494,3 +514,191 @@ func (j *janitor[T]) run(c *cache[T]) {
494514
}
495515
}
496516
}
517+
518+
// buffer is a helper type used to write data to a byte slice
519+
type buffer []byte
520+
521+
// clear clears the buffer
522+
func (s *buffer) clear() {
523+
*s = (*s)[:0]
524+
}
525+
526+
// writeByteSlice writes a byte slice to the buffer
527+
func (s *buffer) writeByteSlice(v []byte) {
528+
*s = append(*s, v...)
529+
}
530+
531+
// writeUint64 writes a uint64 to the buffer
532+
// it is written in little endian format
533+
func (s *buffer) writeUint64(v uint64) {
534+
var buf [8]byte
535+
binary.LittleEndian.PutUint64(buf[:], v)
536+
*s = append(*s, buf[:]...)
537+
}
538+
539+
// writeBuf writes the buffer to the file
540+
func (c *cache[T]) writeBuf(file *os.File) error {
541+
if _, err := file.Write(c.buf); err != nil {
542+
return err
543+
}
544+
// sync the file to disk straight away
545+
file.Sync()
546+
return nil
547+
}
548+
549+
// Persist writes the cache to disk
550+
// The cache is written to a temporary file first
551+
// and then renamed to the final file name to atomically
552+
// update the cache file. This is done to avoid corrupting
553+
// the cache file in case of a crash while writing to the file. If a file
554+
// with the same name exists, it is overwritten.
555+
// The cache file is written in the following format:
556+
// key length, key, expiration, data length, data // repeat for each item
557+
// The key length and data length are written as uint64 in little endian format
558+
// The expiration is written as a unix timestamp in seconds as uint64 in little endian format
559+
// The key is written as a byte slice
560+
// The data is written as a json encoded byte slice
561+
func (c *cache[T]) Persist() error {
562+
c.mu.Lock()
563+
defer c.mu.Unlock()
564+
565+
if err := c.writeToBuf(); err != nil {
566+
return err
567+
}
568+
569+
// create new temp file
570+
newFile, err := os.Create(c.path + ".tmp")
571+
if err != nil {
572+
errf := os.Remove(c.path + ".tmp")
573+
return errors.Join(err, errf)
574+
}
575+
576+
if err := c.writeBuf(newFile); err != nil {
577+
errf := os.Remove(c.path + ".tmp")
578+
return errors.Join(err, errf)
579+
}
580+
581+
// close the file
582+
if err := newFile.Close(); err != nil {
583+
errf := os.Remove(c.path + ".tmp")
584+
return errors.Join(err, errf)
585+
}
586+
587+
if err := os.Rename(c.path+".tmp", c.path); err != nil {
588+
panic("shrink failed: " + err.Error())
589+
}
590+
591+
return nil
592+
}
593+
594+
// writeToBuf writes the cache to the buffer
595+
func (c *cache[T]) writeToBuf() error {
596+
c.buf.clear()
597+
for _, item := range c.items {
598+
data, err := json.Marshal(item.object)
599+
if err != nil {
600+
return err
601+
}
602+
603+
// write the key, expiration and data to the buffer
604+
// format: key length, key, expiration, data length, data
605+
// doing this this way, gives us the ability to read the file
606+
// without having to read the entire file into memory. This is
607+
// done for possible future use cases e.g. where the cache file
608+
// could be very large or for range queries.
609+
c.buf.writeUint64(uint64(len(item.key)))
610+
c.buf.writeByteSlice([]byte(item.key))
611+
c.buf.writeUint64(uint64(item.expiration))
612+
c.buf.writeUint64(uint64(len(data)))
613+
c.buf.writeByteSlice(data)
614+
}
615+
return nil
616+
}
617+
618+
// load reads the cache from disk
619+
// The cache file is read in the following format:
620+
// key length, key, expiration, data length, data // repeat for each item
621+
// This function cannot be called concurrently, and should be called
622+
// before the cache is used.
623+
func (c *cache[T]) load() error {
624+
file, err := os.Open(c.path)
625+
if err != nil {
626+
return err
627+
}
628+
defer file.Close()
629+
630+
rd := bufio.NewReader(file)
631+
items, err := c.readFrom(rd)
632+
if err != nil {
633+
return err
634+
}
635+
636+
for _, item := range items {
637+
if len(c.items) >= c.capacity {
638+
break
639+
}
640+
c.items = append(c.items, item)
641+
c.index[item.key] = item
642+
}
643+
644+
if len(c.items) > 0 {
645+
c.metrics.setCachedItems(float64(len(c.items)))
646+
c.sorted = false
647+
}
648+
return nil
649+
}
650+
651+
func (c *cache[T]) readFrom(rd io.Reader) ([]*item[T], error) {
652+
items := make([]*item[T], 0)
653+
for {
654+
// read until EOF
655+
item, err := c.readItem(rd)
656+
if err != nil {
657+
if err == io.EOF {
658+
break
659+
}
660+
return nil, err
661+
}
662+
items = append(items, item)
663+
}
664+
return items, nil
665+
}
666+
667+
func (c *cache[T]) readItem(rd io.Reader) (*item[T], error) {
668+
var (
669+
buf = make([]byte, 8)
670+
item item[T]
671+
)
672+
if _, err := io.ReadFull(rd, buf); err != nil {
673+
if err == io.EOF {
674+
return nil, err
675+
}
676+
return nil, err
677+
}
678+
keyLen := binary.LittleEndian.Uint64(buf)
679+
key := make([]byte, keyLen)
680+
if _, err := io.ReadFull(rd, key); err != nil {
681+
return nil, err
682+
}
683+
item.key = string(key)
684+
685+
if _, err := io.ReadFull(rd, buf); err != nil {
686+
return nil, err
687+
}
688+
item.expiration = int64(binary.LittleEndian.Uint64(buf))
689+
690+
if _, err := io.ReadFull(rd, buf); err != nil {
691+
return nil, err
692+
}
693+
dataLen := binary.LittleEndian.Uint64(buf)
694+
data := make([]byte, dataLen)
695+
if _, err := io.ReadFull(rd, data); err != nil {
696+
return nil, err
697+
}
698+
699+
if err := json.Unmarshal(data, &item.object); err != nil {
700+
return nil, err
701+
}
702+
703+
return &item, nil
704+
}

0 commit comments

Comments
 (0)