Skip to content

Commit 6c7f5ce

Browse files
authored
feat: add sync cache metrics (#3510)
Signed-off-by: Song Gao <disxiaofei@163.com>
1 parent b9b5af5 commit 6c7f5ce

File tree

3 files changed

+78
-1
lines changed

3 files changed

+78
-1
lines changed

internal/server/promMetrics/metrcis.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@
1414

1515
package promMetrics
1616

17-
import "github.com/prometheus/client_golang/prometheus"
17+
import (
18+
"github.com/prometheus/client_golang/prometheus"
19+
20+
"github.com/lf-edge/ekuiper/metrics"
21+
)
1822

1923
const (
2024
LblStatusType = "status"
@@ -47,6 +51,7 @@ func InitServerMetrics() {
4751

4852
func RegisterMetrics() {
4953
InitServerMetrics()
54+
metrics.RegisterSyncCacheMetrics()
5055
prometheus.MustRegister(RuleStatusCountGauge)
5156
prometheus.MustRegister(RuleStatusGauge)
5257
}

internal/topo/node/cache/sync_cache.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,21 @@ import (
2121

2222
"github.com/lf-edge/ekuiper/internal/conf"
2323
"github.com/lf-edge/ekuiper/internal/pkg/store"
24+
"github.com/lf-edge/ekuiper/metrics"
2425
"github.com/lf-edge/ekuiper/pkg/api"
2526
"github.com/lf-edge/ekuiper/pkg/infra"
2627
"github.com/lf-edge/ekuiper/pkg/kv"
2728
)
2829

30+
const (
31+
addLbl = "add"
32+
sendLbl = "send"
33+
delLbl = "del"
34+
ackLbl = "ack"
35+
loadLbl = "load"
36+
flushLbl = "flush"
37+
)
38+
2939
// page Rotates storage for in memory cache
3040
// Not thread safe!
3141
type page struct {
@@ -92,6 +102,8 @@ func (p *page) reset() {
92102
}
93103

94104
type SyncCache struct {
105+
ruleID string
106+
opID string
95107
// The input data to the cache
96108
in <-chan []map[string]interface{}
97109
Out chan []map[string]interface{}
@@ -119,6 +131,8 @@ type SyncCache struct {
119131
func NewSyncCacheWithExitChanel(ctx api.StreamContext, in <-chan []map[string]interface{}, errCh chan<- error, cacheConf *conf.SinkConf, bufferLength int, exitCh chan<- struct{}) *SyncCache {
120132
c := NewSyncCache(ctx, in, errCh, cacheConf, bufferLength)
121133
c.exitCh = exitCh
134+
c.ruleID = ctx.GetRuleId()
135+
c.opID = ctx.GetOpId()
122136
return c
123137
}
124138

@@ -171,6 +185,7 @@ func (c *SyncCache) run(ctx api.StreamContext) {
171185
c.send(ctx)
172186
}
173187
case isSuccess := <-c.Ack:
188+
metrics.SyncCacheOpCnter.WithLabelValues(ackLbl, c.ruleID, c.opID).Inc()
174189
// only send the next sink after receiving an ack
175190
ctx.GetLogger().Debugf("cache ack")
176191
if isSuccess {
@@ -194,6 +209,7 @@ func (c *SyncCache) run(ctx api.StreamContext) {
194209
}
195210

196211
func (c *SyncCache) send(ctx api.StreamContext) {
212+
metrics.SyncCacheOpCnter.WithLabelValues(sendLbl, c.ruleID, c.opID).Inc()
197213
if c.CacheLength > 1 && c.cacheConf.ResendInterval > 0 {
198214
time.Sleep(time.Duration(c.cacheConf.ResendInterval) * time.Millisecond)
199215
}
@@ -215,6 +231,7 @@ func (c *SyncCache) send(ctx api.StreamContext) {
215231

216232
// addCache not thread safe!
217233
func (c *SyncCache) addCache(ctx api.StreamContext, item []map[string]interface{}) {
234+
metrics.SyncCacheOpCnter.WithLabelValues(addLbl, c.ruleID, c.opID).Inc()
218235
isNotFull := c.appendMemCache(item)
219236
if !isNotFull {
220237
if c.diskBufferPage == nil {
@@ -227,6 +244,10 @@ func (c *SyncCache) addCache(ctx api.StreamContext, item []map[string]interface{
227244
c.loadFromDisk(ctx)
228245
ctx.GetLogger().Debug("disk full, remove the last page")
229246
}
247+
start := time.Now()
248+
defer func() {
249+
metrics.SyncCacheDurationHist.WithLabelValues(flushLbl, c.ruleID, c.opID).Observe(float64(time.Since(start).Microseconds()))
250+
}()
230251
err := c.store.Set(strconv.Itoa(c.diskPageTail), c.diskBufferPage)
231252
if err != nil {
232253
ctx.GetLogger().Errorf("fail to store disk cache %v", err)
@@ -258,6 +279,7 @@ func (c *SyncCache) addCache(ctx api.StreamContext, item []map[string]interface{
258279

259280
// deleteCache not thread safe!
260281
func (c *SyncCache) deleteCache(ctx api.StreamContext) {
282+
metrics.SyncCacheOpCnter.WithLabelValues(delLbl, c.ruleID, c.opID).Inc()
261283
ctx.GetLogger().Debugf("deleting cache. CacheLength: %d, diskSize: %d", c.CacheLength, c.diskSize)
262284
if len(c.memCache) == 0 {
263285
ctx.GetLogger().Debug("mem cache is empty")
@@ -282,6 +304,11 @@ func (c *SyncCache) deleteCache(ctx api.StreamContext) {
282304
}
283305

284306
func (c *SyncCache) loadFromDisk(ctx api.StreamContext) {
307+
metrics.SyncCacheOpCnter.WithLabelValues(loadLbl, c.ruleID, c.opID).Inc()
308+
start := time.Now()
309+
defer func() {
310+
metrics.SyncCacheDurationHist.WithLabelValues(loadLbl, c.ruleID, c.opID).Observe(float64(time.Since(start).Microseconds()))
311+
}()
285312
// load page from the disk
286313
ctx.GetLogger().Debugf("loading from disk %d. CacheLength: %d, diskSize: %d", c.diskPageTail, c.CacheLength, c.diskSize)
287314
hotPage := newPage(c.cacheConf.BufferPageSize)

metrics/sync_cache.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright 2025 EMQ Technologies Co., Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package metrics
16+
17+
import "github.com/prometheus/client_golang/prometheus"
18+
19+
const (
20+
LblType = "type"
21+
LblRuleIDType = "rule"
22+
LblOpIDType = "op"
23+
)
24+
25+
var (
26+
SyncCacheDurationHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{
27+
Namespace: "kuiper",
28+
Subsystem: "sync_cache",
29+
Name: "duration",
30+
Buckets: prometheus.ExponentialBuckets(10, 2, 20), // 10us ~ 5s
31+
Help: "hist of sync cache",
32+
}, []string{LblType, LblRuleIDType, LblOpIDType})
33+
34+
SyncCacheOpCnter = prometheus.NewCounterVec(prometheus.CounterOpts{
35+
Namespace: "kuiper",
36+
Subsystem: "sync_cache",
37+
Name: "counter",
38+
Help: "counter of sync cache",
39+
}, []string{LblType, LblRuleIDType, LblOpIDType})
40+
)
41+
42+
func RegisterSyncCacheMetrics() {
43+
prometheus.MustRegister(SyncCacheOpCnter)
44+
prometheus.MustRegister(SyncCacheDurationHist)
45+
}

0 commit comments

Comments
 (0)