Skip to content

Commit 097c220

Browse files
dinalDina Nimrodi
and
Dina Nimrodi
authored
don't remove in flight metric from cache (#538) (#539)
* don't remove in flight metric from cache * rename * remove only unused metrics from cache * add push to channel counter * lint * more lint * remove unneeded check * move to used list when touching an element * typo * use for loop instead of recursion * remove null check * add comment on log * go mod tidy * add a print of cache size Co-authored-by: Dina Nimrodi <[email protected]> Co-authored-by: Dina Nimrodi <[email protected]>
1 parent a91b462 commit 097c220

File tree

5 files changed

+111
-19
lines changed

5 files changed

+111
-19
lines changed

go.mod

-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ require (
66
github.com/cespare/xxhash v1.1.0
77
github.com/cpuguy83/go-md2man v1.0.10 // indirect
88
github.com/ghodss/yaml v1.0.0
9-
github.com/golang/groupcache v0.0.0-20191027212112-611e8accdfc9
109
github.com/imdario/mergo v0.3.7
1110
github.com/nuclio/logger v0.0.1
1211
github.com/nuclio/nuclio-sdk-go v0.0.0-20190205170814-3b507fbd0324

pkg/appender/appender.go

+8-11
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"sync"
2626
"time"
2727

28-
"github.com/golang/groupcache/lru"
2928
"github.com/nuclio/logger"
3029
"github.com/pkg/errors"
3130
"github.com/v3io/v3io-go/pkg/dataplane"
@@ -107,7 +106,6 @@ func (m *MetricState) error() error {
107106
type MetricsCache struct {
108107
cfg *config.V3ioConfig
109108
partitionMngr *partmgr.PartitionManager
110-
mtx sync.RWMutex
111109
container v3io.Container
112110
logger logger.Logger
113111
started bool
@@ -124,7 +122,7 @@ type MetricsCache struct {
124122
outstandingUpdates int64
125123
requestsInFlight int64
126124

127-
cacheMetricMap *lru.Cache
125+
cacheMetricMap *Cache
128126

129127
NameLabelMap map[string]bool // temp store all lable names
130128

@@ -138,7 +136,8 @@ func NewMetricsCache(container v3io.Container, logger logger.Logger, cfg *config
138136
partMngr *partmgr.PartitionManager) *MetricsCache {
139137

140138
newCache := MetricsCache{container: container, logger: logger, cfg: cfg, partitionMngr: partMngr}
141-
newCache.cacheMetricMap = lru.New(cfg.MetricCacheSize)
139+
newCache.cacheMetricMap = NewCache(cfg.MetricCacheSize)
140+
newCache.logger.DebugWith("Initializing new metric cache", "size", cfg.MetricCacheSize)
142141

143142
newCache.responseChan = make(chan *v3io.Response, channelSize)
144143
newCache.nameUpdateChan = make(chan *v3io.Response, channelSize)
@@ -173,21 +172,15 @@ func (mc *MetricsCache) Start() error {
173172

174173
// return metric struct by key
175174
func (mc *MetricsCache) getMetric(hash uint64) (*MetricState, bool) {
176-
mc.mtx.RLock()
177-
defer mc.mtx.RUnlock()
178-
179175
metric, ok := mc.cacheMetricMap.Get(hash)
180176
if ok {
181-
return metric.(*MetricState), ok
177+
return metric, ok
182178
}
183179
return nil, ok
184180
}
185181

186182
// create a new metric and save in the map
187183
func (mc *MetricsCache) addMetric(hash uint64, name string, metric *MetricState) {
188-
mc.mtx.Lock()
189-
defer mc.mtx.Unlock()
190-
191184
mc.cacheMetricMap.Add(hash, metric)
192185
if _, ok := mc.NameLabelMap[name]; !ok {
193186
metric.newName = true
@@ -197,6 +190,9 @@ func (mc *MetricsCache) addMetric(hash uint64, name string, metric *MetricState)
197190

198191
// Push append to async channel
199192
func (mc *MetricsCache) appendTV(metric *MetricState, t int64, v interface{}) {
193+
metric.Lock()
194+
defer metric.Unlock()
195+
metric.store.numNotProcessed++
200196
mc.asyncAppendChan <- &asyncAppend{metric: metric, t: t, v: v}
201197
}
202198

@@ -280,6 +276,7 @@ func (mc *MetricsCache) AddFast(ref uint64, t int64, v interface{}) error {
280276
}
281277
metric, ok := mc.getMetric(ref)
282278
if !ok {
279+
// do not change error msg, it's parsed by prom
283280
return fmt.Errorf(fmt.Sprintf("metric not found. ref=%v", ref))
284281
}
285282

pkg/appender/ingest.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (mc *MetricsCache) metricFeed(index int) {
7676
// Handle append requests (Add / AddFast)
7777
metric := app.metric
7878
metric.Lock()
79-
79+
metric.store.numNotProcessed--
8080
metric.store.Append(app.t, app.v)
8181
numPushed++
8282
dataQueued += metric.store.samplesQueueLength()
@@ -126,7 +126,6 @@ func (mc *MetricsCache) metricFeed(index int) {
126126
}
127127
}
128128
}
129-
130129
// If we have too much work, stall the queue for some time
131130
if numPushed > mc.cfg.BatchSize/2 && dataQueued/numPushed > 64 {
132131
switch {
@@ -331,6 +330,7 @@ func (mc *MetricsCache) handleResponse(metric *MetricState, resp *v3io.Response,
331330
metric.store = newChunkStore(mc.logger, metric.Lset.LabelNames(), metric.store.isAggr())
332331
metric.retryCount = 0
333332
metric.setState(storeStateInit)
333+
mc.cacheMetricMap.ResetMetric(metric.hash)
334334
}
335335

336336
// Count errors
@@ -380,6 +380,9 @@ func (mc *MetricsCache) handleResponse(metric *MetricState, resp *v3io.Response,
380380
mc.metricQueue.Push(metric)
381381
metric.setState(storeStateAboutToUpdate)
382382
}
383+
if !sent && metric.store.numNotProcessed == 0 && metric.store.pending.Len() == 0 {
384+
mc.cacheMetricMap.ResetMetric(metric.hash)
385+
}
383386

384387
return sent
385388
}

pkg/appender/lru_cache.go

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package appender
2+
3+
import (
4+
clist "container/list"
5+
"sync"
6+
)
7+
8+
// Cache is an LRU cache. It is not safe for concurrent access.
9+
type Cache struct {
10+
maxEntries int
11+
free *clist.List
12+
used *clist.List
13+
cache map[uint64]*clist.Element
14+
cond *sync.Cond
15+
mtx sync.Mutex
16+
}
17+
18+
type entry struct {
19+
key uint64
20+
value *MetricState
21+
}
22+
23+
func NewCache(max int) *Cache {
24+
newCache := Cache{
25+
maxEntries: max,
26+
free: clist.New(),
27+
used: clist.New(),
28+
cache: make(map[uint64]*clist.Element),
29+
}
30+
newCache.cond = sync.NewCond(&newCache.mtx)
31+
return &newCache
32+
}
33+
34+
// Add adds a value to the cache.
35+
func (c *Cache) Add(key uint64, value *MetricState) {
36+
c.mtx.Lock()
37+
defer c.mtx.Unlock()
38+
if ee, ok := c.cache[key]; ok {
39+
c.free.Remove(ee)
40+
//check if element was already in list and if not push to front
41+
c.used.MoveToFront(ee)
42+
if c.used.Front() != ee {
43+
c.used.PushFront(ee)
44+
}
45+
ee.Value.(*entry).value = value
46+
return
47+
}
48+
ele := c.used.PushFront(&entry{key, value})
49+
c.cache[key] = ele
50+
if c.maxEntries != 0 && c.free.Len()+c.used.Len() > c.maxEntries {
51+
c.removeOldest()
52+
}
53+
}
54+
55+
// Get looks up a key's value from the cache.
56+
func (c *Cache) Get(key uint64) (value *MetricState, ok bool) {
57+
c.mtx.Lock()
58+
defer c.mtx.Unlock()
59+
if ele, hit := c.cache[key]; hit {
60+
c.free.Remove(ele)
61+
//check if element was already in list and if not push to front
62+
c.used.MoveToFront(ele)
63+
if c.used.Front() != ele {
64+
c.used.PushFront(ele)
65+
}
66+
return ele.Value.(*entry).value, true
67+
}
68+
return
69+
}
70+
71+
func (c *Cache) removeOldest() {
72+
for {
73+
ele := c.free.Back()
74+
if ele != nil {
75+
c.free.Remove(ele)
76+
kv := ele.Value.(*clist.Element).Value.(*entry)
77+
delete(c.cache, kv.key)
78+
return
79+
}
80+
c.cond.Wait()
81+
}
82+
}
83+
84+
func (c *Cache) ResetMetric(key uint64) {
85+
c.mtx.Lock()
86+
defer c.mtx.Unlock()
87+
if ele, ok := c.cache[key]; ok {
88+
c.used.Remove(ele)
89+
c.free.PushFront(ele)
90+
c.cond.Signal()
91+
}
92+
}

pkg/appender/store.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,12 @@ type chunkStore struct {
6666
lastTid int64
6767
chunks [2]*attrAppender
6868

69-
labelNames []string
70-
aggrList *aggregate.AggregatesList
71-
pending pendingList
72-
maxTime int64
73-
delRawSamples bool // TODO: for metrics w aggregates only
69+
labelNames []string
70+
aggrList *aggregate.AggregatesList
71+
pending pendingList
72+
maxTime int64
73+
delRawSamples bool // TODO: for metrics w aggregates only
74+
numNotProcessed int64
7475
}
7576

7677
func (cs *chunkStore) isAggr() bool {

0 commit comments

Comments
 (0)