Skip to content

Commit e504d75

Browse files
chore: collect stats for reporting event sampler (#5357)
1 parent 78fb917 commit e504d75

File tree

8 files changed

+192
-19
lines changed

8 files changed

+192
-19
lines changed

enterprise/reporting/error_reporting.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func NewErrorDetailReporter(
124124

125125
if eventSamplingEnabled.Load() {
126126
var err error
127-
eventSampler, err = event_sampler.NewEventSampler(ctx, eventSamplingDuration, eventSamplerType, eventSamplingCardinality, event_sampler.BadgerEventSamplerErrorsPathName, conf, log)
127+
eventSampler, err = event_sampler.NewEventSampler(ctx, eventSamplingDuration, eventSamplerType, eventSamplingCardinality, event_sampler.ErrorsReporting, conf, log, stats)
128128
if err != nil {
129129
panic(err)
130130
}

enterprise/reporting/event_sampler/badger_event_sampler.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/rudderlabs/rudder-go-kit/config"
1414
"github.com/rudderlabs/rudder-go-kit/logger"
15+
"github.com/rudderlabs/rudder-go-kit/stats"
1516
"github.com/rudderlabs/rudder-server/rruntime"
1617
"github.com/rudderlabs/rudder-server/utils/misc"
1718
)
@@ -23,6 +24,11 @@ type BadgerEventSampler struct {
2324
ctx context.Context
2425
cancel context.CancelFunc
2526
wg sync.WaitGroup
27+
sc *StatsCollector
28+
}
29+
30+
func GetPathName(module string) string {
31+
return "/" + module + "-badger"
2632
}
2733

2834
func DefaultPath(pathName string) (string, error) {
@@ -33,8 +39,15 @@ func DefaultPath(pathName string) (string, error) {
3339
return fmt.Sprintf(`%v%v`, tmpDirPath, pathName), nil
3440
}
3541

36-
func NewBadgerEventSampler(ctx context.Context, pathName string, ttl config.ValueLoader[time.Duration], conf *config.Config, log logger.Logger) (*BadgerEventSampler, error) {
37-
dbPath, err := DefaultPath(pathName)
42+
func NewBadgerEventSampler(
43+
ctx context.Context,
44+
module string,
45+
ttl config.ValueLoader[time.Duration],
46+
conf *config.Config,
47+
log logger.Logger,
48+
stats stats.Stats,
49+
) (*BadgerEventSampler, error) {
50+
dbPath, err := DefaultPath(GetPathName(module))
3851
if err != nil || dbPath == "" {
3952
return nil, err
4053
}
@@ -63,6 +76,7 @@ func NewBadgerEventSampler(ctx context.Context, pathName string, ttl config.Valu
6376
ctx: ctx,
6477
cancel: cancel,
6578
wg: sync.WaitGroup{},
79+
sc: NewStatsCollector(BadgerTypeEventSampler, module, stats),
6680
}
6781

6882
if err != nil {
@@ -81,6 +95,9 @@ func NewBadgerEventSampler(ctx context.Context, pathName string, ttl config.Valu
8195
func (es *BadgerEventSampler) Get(key string) (bool, error) {
8296
es.mu.Lock()
8397
defer es.mu.Unlock()
98+
start := time.Now()
99+
defer es.sc.RecordGetDuration(start)
100+
es.sc.RecordGet()
84101

85102
var found bool
86103

@@ -106,6 +123,9 @@ func (es *BadgerEventSampler) Get(key string) (bool, error) {
106123
func (es *BadgerEventSampler) Put(key string) error {
107124
es.mu.Lock()
108125
defer es.mu.Unlock()
126+
start := time.Now()
127+
defer es.sc.RecordPutDuration(start)
128+
es.sc.RecordPut()
109129

110130
return es.db.Update(func(txn *badger.Txn) error {
111131
entry := badger.NewEntry([]byte(key), []byte{1}).WithTTL(es.ttl.Load())

enterprise/reporting/event_sampler/event_sampler.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@ import (
66

77
"github.com/rudderlabs/rudder-go-kit/config"
88
"github.com/rudderlabs/rudder-go-kit/logger"
9+
"github.com/rudderlabs/rudder-go-kit/stats"
910
)
1011

1112
const (
12-
BadgerTypeEventSampler = "badger"
13-
InMemoryCacheTypeEventSampler = "in_memory_cache"
14-
BadgerEventSamplerMetricsPathName = "/metrics-reporting-badger"
15-
BadgerEventSamplerErrorsPathName = "/errors-reporting-badger"
13+
BadgerTypeEventSampler = "badger"
14+
InMemoryCacheTypeEventSampler = "in_memory_cache"
15+
MetricsReporting = "metrics-reporting"
16+
ErrorsReporting = "errors-reporting"
1617
)
1718

1819
//go:generate mockgen -destination=../../../mocks/enterprise/reporting/event_sampler/mock_event_sampler.go -package=mocks github.com/rudderlabs/rudder-server/enterprise/reporting/event_sampler EventSampler
@@ -27,18 +28,19 @@ func NewEventSampler(
2728
ttl config.ValueLoader[time.Duration],
2829
eventSamplerType config.ValueLoader[string],
2930
eventSamplingCardinality config.ValueLoader[int],
30-
badgerDBPath string,
31+
module string,
3132
conf *config.Config,
3233
log logger.Logger,
34+
stats stats.Stats,
3335
) (es EventSampler, err error) {
3436
switch eventSamplerType.Load() {
3537
case BadgerTypeEventSampler:
36-
es, err = NewBadgerEventSampler(ctx, badgerDBPath, ttl, conf, log)
38+
es, err = NewBadgerEventSampler(ctx, module, ttl, conf, log, stats)
3739
case InMemoryCacheTypeEventSampler:
38-
es, err = NewInMemoryCacheEventSampler(ctx, ttl, eventSamplingCardinality)
40+
es, err = NewInMemoryCacheEventSampler(ctx, module, ttl, eventSamplingCardinality, stats)
3941
default:
4042
log.Warnf("invalid event sampler type: %s. Using default badger event sampler", eventSamplerType.Load())
41-
es, err = NewBadgerEventSampler(ctx, badgerDBPath, ttl, conf, log)
43+
es, err = NewBadgerEventSampler(ctx, module, ttl, conf, log, stats)
4244
}
4345

4446
if err != nil {

enterprise/reporting/event_sampler/event_sampler_test.go

Lines changed: 83 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111

1212
"github.com/rudderlabs/rudder-go-kit/config"
1313
"github.com/rudderlabs/rudder-go-kit/logger"
14+
"github.com/rudderlabs/rudder-go-kit/stats"
15+
"github.com/rudderlabs/rudder-go-kit/stats/memstats"
1416
)
1517

1618
func TestBadger(t *testing.T) {
@@ -23,15 +25,40 @@ func TestBadger(t *testing.T) {
2325

2426
t.Run("should put and get keys", func(t *testing.T) {
2527
assert.Equal(t, 3000*time.Millisecond, ttl.Load())
26-
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log)
28+
statsStore, err := memstats.New()
29+
require.NoError(t, err)
30+
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, MetricsReporting, conf, log, statsStore)
2731
_ = es.Put("key1")
2832
_ = es.Put("key2")
2933
_ = es.Put("key3")
34+
35+
require.Equal(t, statsStore.Get(StatReportingEventSamplerRequestsTotal, map[string]string{
36+
"type": BadgerTypeEventSampler,
37+
"module": MetricsReporting,
38+
"operation": "put",
39+
}).LastValue(), float64(3))
40+
require.Equal(t, len(statsStore.Get(StatReportingEventSamplerRequestDuration, map[string]string{
41+
"type": BadgerTypeEventSampler,
42+
"module": MetricsReporting,
43+
"operation": "put",
44+
}).Durations()), 3)
45+
3046
val1, _ := es.Get("key1")
3147
val2, _ := es.Get("key2")
3248
val3, _ := es.Get("key3")
3349
val4, _ := es.Get("key4")
3450

51+
require.Equal(t, statsStore.Get(StatReportingEventSamplerRequestsTotal, map[string]string{
52+
"type": BadgerTypeEventSampler,
53+
"module": MetricsReporting,
54+
"operation": "get",
55+
}).LastValue(), float64(4))
56+
require.Equal(t, len(statsStore.Get(StatReportingEventSamplerRequestDuration, map[string]string{
57+
"type": BadgerTypeEventSampler,
58+
"module": MetricsReporting,
59+
"operation": "get",
60+
}).Durations()), 4)
61+
3562
assert.True(t, val1, "Expected key1 to be present")
3663
assert.True(t, val2, "Expected key2 to be present")
3764
assert.True(t, val3, "Expected key3 to be present")
@@ -43,7 +70,7 @@ func TestBadger(t *testing.T) {
4370
conf.Set("Reporting.eventSampling.durationInMinutes", 100)
4471
assert.Equal(t, 100*time.Millisecond, ttl.Load())
4572

46-
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log)
73+
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, MetricsReporting, conf, log, stats.NOP)
4774
defer es.Close()
4875

4976
_ = es.Put("key1")
@@ -65,15 +92,40 @@ func TestInMemoryCache(t *testing.T) {
6592

6693
t.Run("should put and get keys", func(t *testing.T) {
6794
assert.Equal(t, 3000*time.Millisecond, ttl.Load())
68-
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log)
95+
statsStore, err := memstats.New()
96+
require.NoError(t, err)
97+
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, MetricsReporting, conf, log, statsStore)
6998
_ = es.Put("key1")
7099
_ = es.Put("key2")
71100
_ = es.Put("key3")
101+
102+
require.Equal(t, statsStore.Get(StatReportingEventSamplerRequestsTotal, map[string]string{
103+
"type": InMemoryCacheTypeEventSampler,
104+
"module": MetricsReporting,
105+
"operation": "put",
106+
}).LastValue(), float64(3))
107+
require.Equal(t, len(statsStore.Get(StatReportingEventSamplerRequestDuration, map[string]string{
108+
"type": InMemoryCacheTypeEventSampler,
109+
"module": MetricsReporting,
110+
"operation": "put",
111+
}).Durations()), 3)
112+
72113
val1, _ := es.Get("key1")
73114
val2, _ := es.Get("key2")
74115
val3, _ := es.Get("key3")
75116
val4, _ := es.Get("key4")
76117

118+
require.Equal(t, statsStore.Get(StatReportingEventSamplerRequestsTotal, map[string]string{
119+
"type": InMemoryCacheTypeEventSampler,
120+
"module": MetricsReporting,
121+
"operation": "get",
122+
}).LastValue(), float64(4))
123+
require.Equal(t, len(statsStore.Get(StatReportingEventSamplerRequestDuration, map[string]string{
124+
"type": InMemoryCacheTypeEventSampler,
125+
"module": MetricsReporting,
126+
"operation": "get",
127+
}).Durations()), 4)
128+
77129
assert.True(t, val1, "Expected key1 to be present")
78130
assert.True(t, val2, "Expected key2 to be present")
79131
assert.True(t, val3, "Expected key3 to be present")
@@ -83,7 +135,7 @@ func TestInMemoryCache(t *testing.T) {
83135
t.Run("should not get evicted keys", func(t *testing.T) {
84136
conf.Set("Reporting.eventSampling.durationInMinutes", 100)
85137
assert.Equal(t, 100*time.Millisecond, ttl.Load())
86-
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log)
138+
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, MetricsReporting, conf, log, stats.NOP)
87139
_ = es.Put("key1")
88140

89141
require.Eventually(t, func() bool {
@@ -95,19 +147,43 @@ func TestInMemoryCache(t *testing.T) {
95147
t.Run("should not add keys if length exceeds", func(t *testing.T) {
96148
conf.Set("Reporting.eventSampling.durationInMinutes", 3000)
97149
assert.Equal(t, 3000*time.Millisecond, ttl.Load())
98-
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, BadgerEventSamplerMetricsPathName, conf, log)
150+
statsStore, err := memstats.New()
151+
require.NoError(t, err)
152+
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, MetricsReporting, conf, log, statsStore)
99153
_ = es.Put("key1")
100154
_ = es.Put("key2")
101155
_ = es.Put("key3")
102156
_ = es.Put("key4")
103157
_ = es.Put("key5")
104158

159+
require.Equal(t, statsStore.Get(StatReportingEventSamplerRequestsTotal, map[string]string{
160+
"type": InMemoryCacheTypeEventSampler,
161+
"module": MetricsReporting,
162+
"operation": "put",
163+
}).LastValue(), float64(3))
164+
require.Equal(t, len(statsStore.Get(StatReportingEventSamplerRequestDuration, map[string]string{
165+
"type": InMemoryCacheTypeEventSampler,
166+
"module": MetricsReporting,
167+
"operation": "put",
168+
}).Durations()), 3)
169+
105170
val1, _ := es.Get("key1")
106171
val2, _ := es.Get("key2")
107172
val3, _ := es.Get("key3")
108173
val4, _ := es.Get("key4")
109174
val5, _ := es.Get("key5")
110175

176+
require.Equal(t, statsStore.Get(StatReportingEventSamplerRequestsTotal, map[string]string{
177+
"type": InMemoryCacheTypeEventSampler,
178+
"module": MetricsReporting,
179+
"operation": "get",
180+
}).LastValue(), float64(5))
181+
require.Equal(t, len(statsStore.Get(StatReportingEventSamplerRequestDuration, map[string]string{
182+
"type": InMemoryCacheTypeEventSampler,
183+
"module": MetricsReporting,
184+
"operation": "get",
185+
}).Durations()), 5)
186+
111187
assert.True(t, val1, "Expected key1 to be present")
112188
assert.True(t, val2, "Expected key2 to be present")
113189
assert.True(t, val3, "Expected key3 to be present")
@@ -147,9 +223,10 @@ func BenchmarkEventSampler(b *testing.B) {
147223
ttl,
148224
eventSamplerType,
149225
eventSamplingCardinality,
150-
BadgerEventSamplerMetricsPathName,
226+
MetricsReporting,
151227
conf,
152228
log,
229+
stats.NOP,
153230
)
154231
require.NoError(b, err)
155232

enterprise/reporting/event_sampler/in_memory_cache_event_sampler.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/rudderlabs/rudder-go-kit/cachettl"
88
"github.com/rudderlabs/rudder-go-kit/config"
9+
"github.com/rudderlabs/rudder-go-kit/stats"
910
)
1011

1112
type InMemoryCacheEventSampler struct {
@@ -15,9 +16,16 @@ type InMemoryCacheEventSampler struct {
1516
ttl config.ValueLoader[time.Duration]
1617
limit config.ValueLoader[int]
1718
length int
19+
sc *StatsCollector
1820
}
1921

20-
func NewInMemoryCacheEventSampler(ctx context.Context, ttl config.ValueLoader[time.Duration], limit config.ValueLoader[int]) (*InMemoryCacheEventSampler, error) {
22+
func NewInMemoryCacheEventSampler(
23+
ctx context.Context,
24+
module string,
25+
ttl config.ValueLoader[time.Duration],
26+
limit config.ValueLoader[int],
27+
stats stats.Stats,
28+
) (*InMemoryCacheEventSampler, error) {
2129
c := cachettl.New[string, bool](cachettl.WithNoRefreshTTL)
2230
ctx, cancel := context.WithCancel(ctx)
2331

@@ -28,6 +36,7 @@ func NewInMemoryCacheEventSampler(ctx context.Context, ttl config.ValueLoader[ti
2836
ttl: ttl,
2937
limit: limit,
3038
length: 0,
39+
sc: NewStatsCollector(InMemoryCacheTypeEventSampler, module, stats),
3140
}
3241

3342
es.cache.OnEvicted(func(key string, value bool) {
@@ -38,6 +47,10 @@ func NewInMemoryCacheEventSampler(ctx context.Context, ttl config.ValueLoader[ti
3847
}
3948

4049
func (es *InMemoryCacheEventSampler) Get(key string) (bool, error) {
50+
start := time.Now()
51+
defer es.sc.RecordGetDuration(start)
52+
es.sc.RecordGet()
53+
4154
value := es.cache.Get(key)
4255
return value, nil
4356
}
@@ -47,6 +60,10 @@ func (es *InMemoryCacheEventSampler) Put(key string) error {
4760
return nil
4861
}
4962

63+
start := time.Now()
64+
defer es.sc.RecordPutDuration(start)
65+
es.sc.RecordPut()
66+
5067
es.cache.Put(key, true, es.ttl.Load())
5168
es.length++
5269
return nil
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package event_sampler
2+
3+
import (
4+
"time"
5+
6+
"github.com/rudderlabs/rudder-go-kit/stats"
7+
)
8+
9+
const (
10+
StatReportingEventSamplerRequestsTotal = "reporting_event_sampler_requests_total"
11+
StatReportingEventSamplerRequestDuration = "reporting_event_sampler_request_duration_seconds"
12+
)
13+
14+
type StatsCollector struct {
15+
stats stats.Stats
16+
getCounter stats.Measurement
17+
putCounter stats.Measurement
18+
getDuration stats.Measurement
19+
putDuration stats.Measurement
20+
}
21+
22+
func NewStatsCollector(eventSamplerType, module string, statsFactory stats.Stats) *StatsCollector {
23+
getRequestTags := getTags(eventSamplerType, module, "get")
24+
putRequestTags := getTags(eventSamplerType, module, "put")
25+
26+
return &StatsCollector{
27+
stats: statsFactory,
28+
getCounter: statsFactory.NewTaggedStat(StatReportingEventSamplerRequestsTotal, stats.CountType, getRequestTags),
29+
putCounter: statsFactory.NewTaggedStat(StatReportingEventSamplerRequestsTotal, stats.CountType, putRequestTags),
30+
getDuration: statsFactory.NewTaggedStat(StatReportingEventSamplerRequestDuration, stats.TimerType, getRequestTags),
31+
putDuration: statsFactory.NewTaggedStat(StatReportingEventSamplerRequestDuration, stats.TimerType, putRequestTags),
32+
}
33+
}
34+
35+
func (sc *StatsCollector) RecordGet() {
36+
sc.getCounter.Increment()
37+
}
38+
39+
func (sc *StatsCollector) RecordPut() {
40+
sc.putCounter.Increment()
41+
}
42+
43+
func (sc *StatsCollector) RecordGetDuration(start time.Time) {
44+
sc.getDuration.SendTiming(time.Since(start))
45+
}
46+
47+
func (sc *StatsCollector) RecordPutDuration(start time.Time) {
48+
sc.putDuration.SendTiming(time.Since(start))
49+
}
50+
51+
func getTags(eventSamplerType, module, operation string) stats.Tags {
52+
return stats.Tags{"type": eventSamplerType, "module": module, "operation": operation}
53+
}

0 commit comments

Comments
 (0)