Skip to content

Commit 6521959

Browse files
committed
fix: improved cache
1 parent 4eb40c2 commit 6521959

File tree

4 files changed

+40
-7
lines changed

4 files changed

+40
-7
lines changed

openmeter/entitlement/balanceworker/entitlementhandler.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ import (
2424
"github.com/samber/lo"
2525
)
2626

27+
const (
28+
metricAttributeAction string = "action"
29+
)
30+
2731
type handleEntitlementEventOptions struct {
2832
// Source is the source of the event, e.g. the "subject" field from the upstream cloudevents event causing the change
2933
source string
@@ -181,15 +185,17 @@ func (w *Worker) processEntitlementEntity(ctx context.Context, entitlementEntity
181185
type recalcAction string
182186

183187
const (
184-
recalcActionRecalculate recalcAction = "recalculate"
185-
recalcActionUseCache recalcAction = "use-cache"
188+
recalcActionRecalculate recalcAction = "recalculate"
189+
recalcActionRecalculateOnHit recalcAction = "recalculate-on-hit"
190+
recalcActionUseCache recalcAction = "use-cache"
186191
)
187192

188193
func (w *Worker) createSnapshotEventEstimator(ctx context.Context, entitlementEntity *entitlement.Entitlement, calculatedAt time.Time, opts handleEntitlementEventOptions) (marshaler.Event, error) {
189194
if len(opts.rawIngestedEvents) == 0 {
190195
return nil, fmt.Errorf("no raw ingested events provided")
191196
}
192197

198+
// TODO: These should come from outside of the call
193199
feature, err := w.entitlement.Feature.GetFeature(ctx, entitlementEntity.Namespace, entitlementEntity.FeatureID, feature.IncludeArchivedFeatureTrue)
194200
if err != nil {
195201
return nil, fmt.Errorf("failed to get feature: %w", err)
@@ -213,6 +219,8 @@ func (w *Worker) createSnapshotEventEstimator(ctx context.Context, entitlementEn
213219
Meter: meterEntity,
214220
}
215221

222+
w.metricEstimatorRequestsTotal.Add(ctx, 1)
223+
216224
action := recalcActionUseCache
217225

218226
ent, err := w.estimator.HandleEntitlementEvent(ctx, estimator.IngestEventInput{
@@ -245,7 +253,7 @@ func (w *Worker) createSnapshotEventEstimator(ctx context.Context, entitlementEn
245253
}
246254

247255
if thHit {
248-
action = recalcActionRecalculate
256+
action = recalcActionRecalculateOnHit
249257
}
250258
}
251259

@@ -254,7 +262,11 @@ func (w *Worker) createSnapshotEventEstimator(ctx context.Context, entitlementEn
254262
// this should be a percentage and later we can say that for 1% of the events we should validate
255263
// cache consistency
256264

257-
if action == recalcActionRecalculate {
265+
w.metricEstimatorActionTotal.Add(ctx, 1, metric.WithAttributes(
266+
attribute.String(metricAttributeAction, string(action)),
267+
))
268+
269+
if action == recalcActionRecalculate || action == recalcActionRecalculateOnHit {
258270
value, err := w.estimator.HandleRecalculation(ctx, target, func(ctx context.Context) (*snapshot.EntitlementValue, error) {
259271
res, err := w.entitlement.Entitlement.GetEntitlementValue(ctx, entitlementEntity.Namespace, entitlementEntity.SubjectKey, entitlementEntity.ID, calculatedAt)
260272
if err != nil {
@@ -302,6 +314,7 @@ func (w *Worker) createSnapshotEventEstimator(ctx context.Context, entitlementEn
302314
balance := estimator.NewInfDecimal(lo.FromPtr(value.Balance))
303315

304316
if balance.GreaterThan(cacheEntry.ApproxUsage) {
317+
w.metricEstimatorValidationErrorsTotal.Add(ctx, 1)
305318
w.opts.Logger.Error("cache entry is inconsistent", "error", err)
306319
}
307320
}

openmeter/entitlement/balanceworker/estimator/backend.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"log/slog"
89
"time"
910

1011
"github.com/go-redsync/redsync/v4"
@@ -23,11 +24,13 @@ type redisCacheBackend struct {
2324

2425
redsync *redsync.Redsync
2526
lockTimeout time.Duration
27+
logger *slog.Logger
2628
}
2729

2830
type RedisCacheBackendOptions struct {
2931
RedisURL string
3032
LockTimeout time.Duration
33+
Logger *slog.Logger
3134
}
3235

3336
func (o *RedisCacheBackendOptions) Validate() error {
@@ -39,6 +42,10 @@ func (o *RedisCacheBackendOptions) Validate() error {
3942
return errors.New("lockTimeout must be greater than 0")
4043
}
4144

45+
if o.Logger == nil {
46+
return errors.New("logger is required")
47+
}
48+
4249
return nil
4350
}
4451

@@ -58,6 +65,7 @@ func NewRedisCacheBackend(in RedisCacheBackendOptions) (CacheBackend, error) {
5865
redis: redis,
5966
redsync: redsync.New(goredis.NewPool(redis)),
6067
lockTimeout: in.LockTimeout,
68+
logger: in.Logger,
6169
}, nil
6270
}
6371

openmeter/entitlement/balanceworker/estimator/estimator.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func (t *TargetEntitlement) GetEntryHash() hasher.Hash {
6262
val := strings.Join(
6363
[]string{
6464
t.Entitlement.ID, t.Entitlement.UpdatedAt.String(), lo.FromPtr(t.Entitlement.DeletedAt).String(),
65-
// TODO: NR of voided and active grants
65+
// TODO: NR of voided and active grants !!!!
6666
// Entitlement usage period change
6767
t.Entitlement.CurrentUsagePeriod.From.String(), t.Entitlement.CurrentUsagePeriod.To.String(),
6868
// Feature changes
@@ -89,6 +89,7 @@ type Cache interface {
8989
type CacheOptions struct {
9090
RedisURL string
9191
LockTimeout time.Duration
92+
Logger *slog.Logger
9293
}
9394

9495
func (o *CacheOptions) Validate() error {
@@ -100,6 +101,10 @@ func (o *CacheOptions) Validate() error {
100101
return errors.New("lockTimeout must be greater than 0")
101102
}
102103

104+
if o.Logger == nil {
105+
return errors.New("logger is required")
106+
}
107+
103108
return nil
104109
}
105110

@@ -111,20 +116,22 @@ func NewCache(in CacheOptions) (Cache, error) {
111116
backend, err := NewRedisCacheBackend(RedisCacheBackendOptions{
112117
RedisURL: in.RedisURL,
113118
LockTimeout: in.LockTimeout,
119+
Logger: in.Logger,
114120
})
115121
if err != nil {
116122
return nil, err
117123
}
118124

119125
return &cache{
120126
backend: backend,
127+
logger: in.Logger,
121128
}, nil
122129
}
123130

124131
type cache struct {
125132
backend CacheBackend
126133

127-
validationRate float64
134+
logger *slog.Logger
128135
}
129136

130137
var _ Cache = (*cache)(nil)
@@ -151,7 +158,7 @@ func (c *cache) HandleEntitlementEvent(ctx context.Context, event IngestEventInp
151158
getChange, err := c.getChange(event.Target.Meter, event.DedupedEvents)
152159
if err != nil {
153160
// TODO: more details
154-
slog.Warn("failed to get change for entitlement", "error", err)
161+
c.logger.Warn("failed to get change for entitlement", "error", err)
155162
// We should not fail the whole calculation, instead if we have any pending
156163
// thresholds, this will force a recalculation of the entitlement balance.
157164
approxIncrease = infinite

openmeter/entitlement/balanceworker/worker.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,11 @@ type Worker struct {
158158
metricRecalculationTime metric.Int64Histogram
159159
metricHighWatermarkCacheStats metric.Int64Counter
160160

161+
// Estimator metrics
162+
metricEstimatorRequestsTotal metric.Int64Counter
163+
metricEstimatorValidationErrorsTotal metric.Int64Counter
164+
metricEstimatorActionTotal metric.Int64Counter
165+
161166
// Handlers
162167
nonPublishingHandler *grouphandler.NoPublishingHandler
163168
}

0 commit comments

Comments
 (0)