Skip to content

Commit 39f7ea5

Browse files
authored
Merge pull request #152 from codex-team/master
Update prod
2 parents 724e2d5 + 9cf0d57 commit 39f7ea5

2 files changed

Lines changed: 24 additions & 7 deletions

File tree

pkg/redis/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -346,15 +346,15 @@ func (r *RedisClient) TSAdd(
346346
return res.Err()
347347
}
348348

349-
// SafeTSAdd ensures that a TS key exists and adds a sample safely
349+
// SafeTSAdd ensures that a TS key exists and adds a sample safely.
350+
// timestamp is the bucket start time in milliseconds.
350351
func (r *RedisClient) SafeTSAdd(
351352
key string,
352353
value int64,
353354
labels map[string]string,
354355
retention time.Duration,
356+
timestamp int64,
355357
) error {
356-
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
357-
358358
err := r.TSAdd(key, value, timestamp, labels)
359359
if err != nil && strings.Contains(err.Error(), "TSDB: key does not exist") {
360360
log.Warnf("TS key %s does not exist, creating it...", key)

pkg/server/errorshandler/handler.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,24 @@ func getTimeSeriesKey(projectId, metricType, granularity string, isSystemMetric
144144
return fmt.Sprintf("ts:project-%s:%s:%s", metricType, projectId, granularity)
145145
}
146146

147+
// bucketTimestampMs returns the current UTC time truncated to the start of the
148+
// given granularity bucket, in milliseconds. Truncating ensures that all events
149+
// within the same bucket share one timestamp so ON_DUPLICATE SUM accumulates
150+
// them into a single sample instead of creating a separate sample per event.
151+
func bucketTimestampMs(granularity string) int64 {
152+
now := time.Now().UTC()
153+
var t time.Time
154+
switch granularity {
155+
case "hourly":
156+
t = now.Truncate(time.Hour)
157+
case "daily":
158+
t = now.Truncate(24 * time.Hour)
159+
default: // minutely
160+
t = now.Truncate(time.Minute)
161+
}
162+
return t.UnixNano() / int64(time.Millisecond)
163+
}
164+
147165
// recordProjectMetrics records project metrics to Redis TimeSeries
148166
// metricType can be: "events-accepted", "events-rate-limited", etc.
149167
func (handler *Handler) recordProjectMetrics(projectId, metricType string, isSystemMetric bool) {
@@ -158,18 +176,17 @@ func (handler *Handler) recordProjectMetrics(projectId, metricType string, isSys
158176
}
159177

160178
// minutely: store for 24 hours
161-
// Use TS.ADD with ON_DUPLICATE SUM to accumulate events within the same timestamp
162-
if err := handler.RedisClient.SafeTSAdd(minutelyKey, 1, labels, 24*time.Hour); err != nil {
179+
if err := handler.RedisClient.SafeTSAdd(minutelyKey, 1, labels, 24*time.Hour, bucketTimestampMs("minutely")); err != nil {
163180
log.Errorf("failed to add minutely TS for %s: %v", metricType, err)
164181
}
165182

166183
// hourly: store for 7 days
167-
if err := handler.RedisClient.SafeTSAdd(hourlyKey, 1, labels, 7*24*time.Hour); err != nil {
184+
if err := handler.RedisClient.SafeTSAdd(hourlyKey, 1, labels, 7*24*time.Hour, bucketTimestampMs("hourly")); err != nil {
168185
log.Errorf("failed to add hourly TS for %s: %v", metricType, err)
169186
}
170187

171188
// daily: store for 90 days
172-
if err := handler.RedisClient.SafeTSAdd(dailyKey, 1, labels, 90*24*time.Hour); err != nil {
189+
if err := handler.RedisClient.SafeTSAdd(dailyKey, 1, labels, 90*24*time.Hour, bucketTimestampMs("daily")); err != nil {
173190
log.Errorf("failed to add daily TS for %s: %v", metricType, err)
174191
}
175192
}

0 commit comments

Comments
 (0)