Skip to content

Commit 43dddc7

Browse files
authored
feat: add schedule health monitoring metrics to cron scheduler (#2166)
* feat: add schedule health monitoring metrics to cron scheduler Adds Prometheus metrics for cron schedule execution health: - cron_execution_duration_seconds: histogram of execution latency (scheduler/tenant/schedule/status labels) - cron_executions_total: counter of completed/failed executions - cron_lock_contention_total: counter of distributed lock skips - cron_concurrency_rejections_total: counter for global and per-tenant semaphore rejections - cron_last_execution_timestamp: gauge tracking last execution time per schedule - cron_active_schedules: gauge reflecting registered schedule count after each refresh Instruments cron.go at all execution paths: global semaphore rejection, per-tenant semaphore rejection, lock contention skip, execution completion/failure, and schedule refresh. Tests cover each metric function and verify label independence. * fix: address CodeRabbit review feedback on schedule metrics - Extend histogram buckets from 10 to 13 (max ~409s, covers 5m default timeout) - Add DeleteCronScheduleMetrics to remove per-schedule series on deregistration - Call DeleteCronScheduleMetrics in refreshSchedules schedule removal path - Fix TestRecordCronExecution_failed to use delta-based assertion * fix: remove schedule_id from execution histogram to bound cardinality Histogram vectors create N series per unique label combination; including schedule_id in a multi-tenant system creates unbounded cardinality. Labels are now scheduler/tenant_id/status (matching cronExecutionsTotal). Per-schedule staleness tracking is retained via cronLastExecutionTimestamp (a gauge, one series per schedule - acceptable cost). DeleteCronScheduleMetrics signature simplified accordingly; only cleans up the per-schedule timestamp gauge series on deregistration. * fix: bound metric cardinality and guard per-schedule series lifecycle - Remove tenant_id from histogram and counter labels; use scheduler/status only to keep series count predictable regardless of tenant or schedule growth - Split RecordCronLastExecutionTimestamp from RecordCronExecution so call sites can guard against resurrecting series for deregistered schedules - Guard timestamp write in acquireLockAndExecute: only write when schedule is still present in s.schedules at completion time - Call DeleteCronScheduleMetrics on the re-register path (scheduleChanged) in addition to the removal path, so stale series are cleaned up when a schedule is reconfigured --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 06bac4a commit 43dddc7

3 files changed

Lines changed: 253 additions & 0 deletions

File tree

shared/platform/scheduler/cron.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ func (s *CronScheduler) refreshSchedules(ctx context.Context) error {
286286
s.cron.Remove(entryID)
287287
delete(s.entryIDs, id)
288288
delete(s.schedules, id)
289+
DeleteCronScheduleMetrics(s.config.Name, id)
289290
s.logger.Info("removed schedule", "schedule_id", id)
290291
}
291292
}
@@ -300,6 +301,7 @@ func (s *CronScheduler) refreshSchedules(ctx context.Context) error {
300301
s.cron.Remove(s.entryIDs[sched.ID])
301302
delete(s.entryIDs, sched.ID)
302303
delete(s.schedules, sched.ID)
304+
DeleteCronScheduleMetrics(s.config.Name, sched.ID)
303305
s.logger.Info("schedule changed, re-registering",
304306
"schedule_id", sched.ID,
305307
"old_cron_expr", prev.CronExpr,
@@ -323,6 +325,8 @@ func (s *CronScheduler) refreshSchedules(ctx context.Context) error {
323325
"tenant_id", sched.TenantID)
324326
}
325327

328+
UpdateCronActiveSchedules(s.config.Name, float64(len(s.entryIDs)))
329+
326330
return nil
327331
}
328332

@@ -373,6 +377,7 @@ func (s *CronScheduler) acquireGlobalSemaphore(ctx context.Context, schedule Sch
373377
"schedule_id", schedule.ID,
374378
"tenant_id", schedule.TenantID)
375379
s.recordExecution(ctx, schedule, ExecutionStatusSkipped, nil, strPtr("concurrency limit reached"))
380+
RecordCronConcurrencyRejection(s.config.Name, "global")
376381
return nil, false
377382
}
378383
}
@@ -393,6 +398,7 @@ func (s *CronScheduler) acquireTenantSemaphore(ctx context.Context, schedule Sch
393398
"tenant_id", schedule.TenantID)
394399
s.recordExecution(ctx, schedule, ExecutionStatusSkipped, nil,
395400
strPtr(fmt.Sprintf("per-tenant concurrency limit reached for tenant %s", schedule.TenantID)))
401+
RecordCronConcurrencyRejection(s.config.Name, "per_tenant")
396402
return nil, false
397403
}
398404
}
@@ -458,6 +464,7 @@ func (s *CronScheduler) acquireLockAndExecute(ctx context.Context, schedule Sche
458464
s.logger.Debug("lock not acquired, skipping",
459465
"schedule_id", schedule.ID)
460466
s.recordExecution(ctx, schedule, ExecutionStatusSkipped, nil, strPtr("lock not acquired"))
467+
RecordCronLockContention(s.config.Name)
461468
return
462469
}
463470
defer release()
@@ -467,7 +474,26 @@ func (s *CronScheduler) acquireLockAndExecute(ctx context.Context, schedule Sche
467474
now := time.Now().UTC()
468475
s.recordExecutionStart(ctx, execID, schedule, now)
469476

477+
execStart := time.Now()
470478
err := s.executor.Execute(ctx, schedule)
479+
execDuration := time.Since(execStart)
480+
481+
execStatus := ExecutionStatusCompleted
482+
if err != nil {
483+
execStatus = ExecutionStatusFailed
484+
}
485+
RecordCronExecution(s.config.Name, execStatus, execDuration)
486+
487+
// Only update the per-schedule timestamp gauge when the schedule is still registered.
488+
// This prevents resurrecting Prometheus series for schedules that were removed
489+
// while an execution was in flight.
490+
s.mu.Lock()
491+
_, stillRegistered := s.schedules[schedule.ID]
492+
s.mu.Unlock()
493+
if stillRegistered {
494+
RecordCronLastExecutionTimestamp(s.config.Name, schedule.ID)
495+
}
496+
471497
s.recordExecutionResult(ctx, execID, schedule, err)
472498

473499
if err != nil {

shared/platform/scheduler/metrics.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package scheduler
22

33
import (
4+
"time"
5+
46
"github.com/prometheus/client_golang/prometheus"
57
"github.com/prometheus/client_golang/prometheus/promauto"
68
)
@@ -81,3 +83,90 @@ func RecordInFlightWork(worker string, count float64) {
8183
func RecordPoll(worker string) {
8284
workerPollTotal.WithLabelValues(worker).Inc()
8385
}
86+
87+
// Prometheus metrics for cron schedule execution health.
88+
var (
89+
cronExecutionDurationSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{
90+
Namespace: "meridian",
91+
Subsystem: "scheduler",
92+
Name: "cron_execution_duration_seconds",
93+
Help: "Duration of cron schedule executions in seconds",
94+
Buckets: prometheus.ExponentialBuckets(0.1, 2, 13), // 0.1s to ~409s, covers 5-minute default timeout
95+
}, []string{"scheduler", "status"}) // tenant_id and schedule_id omitted: cardinality scales with tenants × schedules
96+
97+
cronExecutionsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
98+
Namespace: "meridian",
99+
Subsystem: "scheduler",
100+
Name: "cron_executions_total",
101+
Help: "Total number of cron schedule executions by status",
102+
}, []string{"scheduler", "status"}) // tenant_id omitted: per-schedule detail is in the execution store
103+
104+
cronLockContentionTotal = promauto.NewCounterVec(prometheus.CounterOpts{
105+
Namespace: "meridian",
106+
Subsystem: "scheduler",
107+
Name: "cron_lock_contention_total",
108+
Help: "Number of times a schedule was skipped because the distributed lock was already held",
109+
}, []string{"scheduler"})
110+
111+
cronConcurrencyRejectionsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
112+
Namespace: "meridian",
113+
Subsystem: "scheduler",
114+
Name: "cron_concurrency_rejections_total",
115+
Help: "Number of executions skipped due to concurrency limits",
116+
}, []string{"scheduler", "limit_type"})
117+
118+
cronLastExecutionTimestamp = promauto.NewGaugeVec(prometheus.GaugeOpts{
119+
Namespace: "meridian",
120+
Subsystem: "scheduler",
121+
Name: "cron_last_execution_timestamp",
122+
Help: "Unix timestamp of the most recent completed or failed execution for each schedule",
123+
}, []string{"scheduler", "schedule_id"})
124+
125+
cronActiveSchedules = promauto.NewGaugeVec(prometheus.GaugeOpts{
126+
Namespace: "meridian",
127+
Subsystem: "scheduler",
128+
Name: "cron_active_schedules",
129+
Help: "Number of active schedules currently registered in the cron runner",
130+
}, []string{"scheduler"})
131+
)
132+
133+
// RecordCronExecution records duration and count metrics for a completed or failed
134+
// schedule execution. Uses only bounded labels (scheduler, status) to keep cardinality
135+
// predictable. Per-schedule/tenant detail is available in the execution store.
136+
func RecordCronExecution(schedulerName string, status ExecutionStatus, duration time.Duration) {
137+
statusStr := string(status)
138+
cronExecutionDurationSeconds.WithLabelValues(schedulerName, statusStr).Observe(duration.Seconds())
139+
cronExecutionsTotal.WithLabelValues(schedulerName, statusStr).Inc()
140+
}
141+
142+
// RecordCronLastExecutionTimestamp updates the last-execution gauge for a schedule.
143+
// Callers should only invoke this when the schedule is confirmed to still be registered,
144+
// to avoid resurrecting Prometheus series for deregistered schedules.
145+
func RecordCronLastExecutionTimestamp(schedulerName, scheduleID string) {
146+
cronLastExecutionTimestamp.WithLabelValues(schedulerName, scheduleID).SetToCurrentTime()
147+
}
148+
149+
// RecordCronLockContention increments the counter for schedules skipped due to distributed lock contention.
150+
func RecordCronLockContention(schedulerName string) {
151+
cronLockContentionTotal.WithLabelValues(schedulerName).Inc()
152+
}
153+
154+
// RecordCronConcurrencyRejection increments the counter for schedules skipped due to concurrency limits.
155+
// limitType should be "global" or "per_tenant".
156+
func RecordCronConcurrencyRejection(schedulerName, limitType string) {
157+
cronConcurrencyRejectionsTotal.WithLabelValues(schedulerName, limitType).Inc()
158+
}
159+
160+
// UpdateCronActiveSchedules sets the gauge for the number of active schedules registered.
161+
func UpdateCronActiveSchedules(schedulerName string, count float64) {
162+
cronActiveSchedules.WithLabelValues(schedulerName).Set(count)
163+
}
164+
165+
// DeleteCronScheduleMetrics removes per-schedule Prometheus series for a schedule that
166+
// has been deregistered. This prevents stale series from persisting after schedules are
167+
// removed from the provider.
168+
func DeleteCronScheduleMetrics(schedulerName, scheduleID string) {
169+
// The histogram is labeled by tenant_id only (not schedule_id), so no per-schedule
170+
// cleanup is needed there. Only the last-execution timestamp gauge is per-schedule.
171+
cronLastExecutionTimestamp.DeleteLabelValues(schedulerName, scheduleID)
172+
}

shared/platform/scheduler/metrics_test.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package scheduler
22

33
import (
44
"testing"
5+
"time"
56

67
"github.com/prometheus/client_golang/prometheus"
78
io_prometheus_client "github.com/prometheus/client_model/go"
@@ -25,6 +26,38 @@ func getGaugeValue(t *testing.T, vec *prometheus.GaugeVec, label string) float64
2526
return m.GetGauge().GetValue()
2627
}
2728

29+
func getCounterValueMulti(t *testing.T, vec *prometheus.CounterVec, labels ...string) float64 {
30+
t.Helper()
31+
m := &io_prometheus_client.Metric{}
32+
err := vec.WithLabelValues(labels...).(prometheus.Metric).Write(m)
33+
require.NoError(t, err)
34+
return m.GetCounter().GetValue()
35+
}
36+
37+
func getGaugeValueMulti(t *testing.T, vec *prometheus.GaugeVec, labels ...string) float64 {
38+
t.Helper()
39+
m := &io_prometheus_client.Metric{}
40+
err := vec.WithLabelValues(labels...).(prometheus.Metric).Write(m)
41+
require.NoError(t, err)
42+
return m.GetGauge().GetValue()
43+
}
44+
45+
func getHistogramCount(t *testing.T, vec *prometheus.HistogramVec, labels ...string) uint64 {
46+
t.Helper()
47+
m := &io_prometheus_client.Metric{}
48+
err := vec.WithLabelValues(labels...).(prometheus.Metric).Write(m)
49+
require.NoError(t, err)
50+
return m.GetHistogram().GetSampleCount()
51+
}
52+
53+
func getHistogramSum(t *testing.T, vec *prometheus.HistogramVec, labels ...string) float64 {
54+
t.Helper()
55+
m := &io_prometheus_client.Metric{}
56+
err := vec.WithLabelValues(labels...).(prometheus.Metric).Write(m)
57+
require.NoError(t, err)
58+
return m.GetHistogram().GetSampleSum()
59+
}
60+
2861
func TestRecordWorkerStart(t *testing.T) {
2962
before := getCounterValue(t, workerStartsTotal, "test-worker-start")
3063
RecordWorkerStart("test-worker-start")
@@ -80,3 +113,108 @@ func TestMetrics_different_workers_are_independent(t *testing.T) {
80113

81114
assert.Greater(t, aVal, bVal)
82115
}
116+
117+
// --- Cron execution metrics ---
118+
119+
func TestRecordCronExecution_completed(t *testing.T) {
120+
sched := "test-cron-completed"
121+
122+
// Histogram and counter are labeled by scheduler/status only (tenant_id and schedule_id
123+
// omitted to keep cardinality bounded regardless of tenant/schedule count).
124+
beforeHistogram := getHistogramCount(t, cronExecutionDurationSeconds, sched, "COMPLETED")
125+
beforeCounter := getCounterValueMulti(t, cronExecutionsTotal, sched, "COMPLETED")
126+
127+
RecordCronExecution(sched, ExecutionStatusCompleted, 500*time.Millisecond)
128+
129+
assert.Equal(t, beforeHistogram+1, getHistogramCount(t, cronExecutionDurationSeconds, sched, "COMPLETED"))
130+
assert.Equal(t, beforeCounter+1, getCounterValueMulti(t, cronExecutionsTotal, sched, "COMPLETED"))
131+
}
132+
133+
func TestRecordCronLastExecutionTimestamp(t *testing.T) {
134+
sched := "test-cron-ts"
135+
sid := "sched-ts"
136+
137+
RecordCronLastExecutionTimestamp(sched, sid)
138+
assert.Greater(t, getGaugeValueMulti(t, cronLastExecutionTimestamp, sched, sid), 0.0)
139+
}
140+
141+
func TestRecordCronExecution_failed(t *testing.T) {
142+
sched := "test-cron-failed"
143+
144+
beforeHistogram := getHistogramCount(t, cronExecutionDurationSeconds, sched, "FAILED")
145+
beforeCounter := getCounterValueMulti(t, cronExecutionsTotal, sched, "FAILED")
146+
147+
RecordCronExecution(sched, ExecutionStatusFailed, 100*time.Millisecond)
148+
149+
assert.Equal(t, beforeHistogram+1, getHistogramCount(t, cronExecutionDurationSeconds, sched, "FAILED"))
150+
assert.Equal(t, beforeCounter+1, getCounterValueMulti(t, cronExecutionsTotal, sched, "FAILED"))
151+
}
152+
153+
func TestRecordCronExecution_duration_is_observed(t *testing.T) {
154+
sched := "test-cron-duration"
155+
156+
RecordCronExecution(sched, ExecutionStatusCompleted, 2*time.Second)
157+
158+
assert.GreaterOrEqual(t, getHistogramSum(t, cronExecutionDurationSeconds, sched, "COMPLETED"), 2.0)
159+
}
160+
161+
func TestRecordCronLockContention(t *testing.T) {
162+
sched := "test-cron-lock"
163+
164+
before := getCounterValueMulti(t, cronLockContentionTotal, sched)
165+
RecordCronLockContention(sched)
166+
assert.Equal(t, before+1, getCounterValueMulti(t, cronLockContentionTotal, sched))
167+
}
168+
169+
func TestRecordCronConcurrencyRejection_global(t *testing.T) {
170+
sched := "test-cron-conc-global"
171+
172+
before := getCounterValueMulti(t, cronConcurrencyRejectionsTotal, sched, "global")
173+
RecordCronConcurrencyRejection(sched, "global")
174+
assert.Equal(t, before+1, getCounterValueMulti(t, cronConcurrencyRejectionsTotal, sched, "global"))
175+
}
176+
177+
func TestRecordCronConcurrencyRejection_per_tenant(t *testing.T) {
178+
sched := "test-cron-conc-tenant"
179+
180+
before := getCounterValueMulti(t, cronConcurrencyRejectionsTotal, sched, "per_tenant")
181+
RecordCronConcurrencyRejection(sched, "per_tenant")
182+
assert.Equal(t, before+1, getCounterValueMulti(t, cronConcurrencyRejectionsTotal, sched, "per_tenant"))
183+
}
184+
185+
func TestUpdateCronActiveSchedules(t *testing.T) {
186+
sched := "test-cron-active"
187+
188+
UpdateCronActiveSchedules(sched, 5)
189+
assert.Equal(t, 5.0, getGaugeValueMulti(t, cronActiveSchedules, sched))
190+
191+
UpdateCronActiveSchedules(sched, 0)
192+
assert.Equal(t, 0.0, getGaugeValueMulti(t, cronActiveSchedules, sched))
193+
}
194+
195+
func TestDeleteCronScheduleMetrics_removes_series(t *testing.T) {
196+
sched := "test-cron-delete"
197+
sid := "sched-del"
198+
199+
// Record a timestamp so the per-schedule series exists
200+
RecordCronLastExecutionTimestamp(sched, sid)
201+
assert.Greater(t, getGaugeValueMulti(t, cronLastExecutionTimestamp, sched, sid), 0.0)
202+
203+
// Delete should not panic and removes the per-schedule timestamp series
204+
assert.NotPanics(t, func() {
205+
DeleteCronScheduleMetrics(sched, sid)
206+
})
207+
208+
// After deletion, the timestamp series resets to zero (re-created fresh)
209+
assert.Equal(t, 0.0, getGaugeValueMulti(t, cronLastExecutionTimestamp, sched, sid))
210+
}
211+
212+
func TestCronMetrics_independent_schedulers(t *testing.T) {
213+
RecordCronLockContention("sched-x")
214+
RecordCronLockContention("sched-x")
215+
RecordCronLockContention("sched-y")
216+
217+
xVal := getCounterValueMulti(t, cronLockContentionTotal, "sched-x")
218+
yVal := getCounterValueMulti(t, cronLockContentionTotal, "sched-y")
219+
assert.Greater(t, xVal, yVal)
220+
}

0 commit comments

Comments
 (0)