Skip to content

Commit 85a31af

Browse files
authored
feat: add sync cache metrics (#3514)
Signed-off-by: Song Gao <disxiaofei@163.com>
1 parent e2b06e2 commit 85a31af

File tree

2 files changed

+17
-6
lines changed

2 files changed

+17
-6
lines changed

internal/topo/node/cache/sync_cache.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ import (
2828
)
2929

3030
const (
31-
addLbl = "add"
32-
sendLbl = "send"
33-
delLbl = "del"
34-
ackLbl = "ack"
35-
loadLbl = "load"
36-
flushLbl = "flush"
31+
addLbl = "add"
32+
sendLbl = "send"
33+
delLbl = "del"
34+
ackLbl = "ack"
35+
loadLbl = "load"
36+
flushLbl = "flush"
37+
lengthLbl = "len"
3738
)
3839

3940
// page Rotates storage for in memory cache
@@ -184,6 +185,7 @@ func (c *SyncCache) run(ctx api.StreamContext) {
184185
if c.sendStatus == 0 {
185186
c.send(ctx)
186187
}
188+
metrics.SyncCacheGauge.WithLabelValues(lengthLbl, c.ruleID, c.opID).Set(float64(c.CacheLength))
187189
case isSuccess := <-c.Ack:
188190
metrics.SyncCacheOpCnter.WithLabelValues(ackLbl, c.ruleID, c.opID).Inc()
189191
// only send the next sink after receiving an ack
@@ -201,6 +203,7 @@ func (c *SyncCache) run(ctx api.StreamContext) {
201203
if c.sendStatus == 0 {
202204
c.send(ctx)
203205
}
206+
metrics.SyncCacheGauge.WithLabelValues(lengthLbl, c.ruleID, c.opID).Set(float64(c.CacheLength))
204207
case <-ctx.Done():
205208
ctx.GetLogger().Infof("sink node %s instance cache %d done", ctx.GetOpId(), ctx.GetInstanceId())
206209
return

metrics/sync_cache.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,17 @@ var (
3737
Name: "counter",
3838
Help: "counter of sync cache",
3939
}, []string{LblType, LblRuleIDType, LblOpIDType})
40+
41+
SyncCacheGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
42+
Namespace: "kuiper",
43+
Subsystem: "sync_cache",
44+
Name: "gauge", // 10us ~ 5s
45+
Help: "gauge of sync cache",
46+
}, []string{LblType, LblRuleIDType, LblOpIDType})
4047
)
4148

4249
func RegisterSyncCacheMetrics() {
4350
prometheus.MustRegister(SyncCacheOpCnter)
4451
prometheus.MustRegister(SyncCacheDurationHist)
52+
prometheus.MustRegister(SyncCacheGauge)
4553
}

0 commit comments

Comments
 (0)