Skip to content

Commit ac77c4f

Browse files
authored
compactor scheduler: rework active and pending job metrics (#15112)
#### What this PR does * Reworks `cortex_compactor_scheduler_(pending|active)_jobs` metrics in the following way: * Before: ``` cortex_compactor_scheduler_pending_jobs{user} cortex_compactor_scheduler_active_jobs{user} ``` * Now: ``` cortex_compactor_scheduler_pending_jobs{job_type} cortex_compactor_scheduler_active_jobs{job_type} cortex_compactor_scheduler_pending_jobs_by_user{user} cortex_compactor_scheduler_active_jobs_jobs_by_user{user} ``` Main purpose is to have a low cardinality version of the original metrics, for lighter use when the user breakdown is not needed (e.g. autoscaling). Bonus change: add `job_type` label to the low cardinality one, will be nice for dashboards. * Drop `cortex_compactor_incomplete_plan_jobs`, it's now redundant, calculable from the new metrics above. #### Which issue(s) this PR fixes or relates to n/a #### Checklist - [x] Tests updated. - [n/a] Documentation added. - [n/a] `CHANGELOG.md` updated - the order of entries should be `[CHANGE]`, `[FEATURE]`, `[ENHANCEMENT]`, `[BUGFIX]`. If changelog entry is not needed, please add the `changelog-not-needed` label to the PR. - [n/a] [`about-versioning.md`](https://github.com/grafana/mimir/blob/main/docs/sources/mimir/configure/about-versioning.md) updated with experimental features. <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > Changes Prometheus metric names/label sets for scheduler job counts and alters how per-tenant contributions are tracked/cleared, which can break dashboards/alerts and risks gauge mis-accounting if transitions are wrong. > > **Overview** > Reworks scheduler job-count metrics to provide **low-cardinality** `cortex_compactor_scheduler_(pending|active)_jobs{job_type}` gauges alongside new per-tenant breakdowns `cortex_compactor_scheduler_(pending|active)_jobs_by_user{user}`. > > Removes the redundant `cortex_compactor_incomplete_plan_jobs` gauge and updates queue/tracker metric bookkeeping (including `Clear()` subtraction logic) plus related tests to assert the new pending/active plan vs compaction accounting and tenant cleanup behavior. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 0f16860. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 8f9a24b commit ac77c4f

2 files changed

Lines changed: 159 additions & 85 deletions

File tree

pkg/compactor/scheduler/job_tracker_test.go

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -277,44 +277,49 @@ func TestJobTracker_PlanJobTracking(t *testing.T) {
277277
clk.Set(at(3, 0))
278278
jt, reg := newTestJobTracker(clk)
279279

280-
assertIncompletePlanJobs := func(label string, expected int) {
280+
assertPlanJobLocation := func(label string, pending, active int) {
281281
t.Helper()
282282
require.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
283-
# HELP cortex_compactor_incomplete_plan_jobs The total number of plan jobs that have not yet completed (pending or active).
284-
# TYPE cortex_compactor_incomplete_plan_jobs gauge
285-
cortex_compactor_incomplete_plan_jobs %d
286-
`, expected)), "cortex_compactor_incomplete_plan_jobs"), label)
283+
# HELP cortex_compactor_scheduler_pending_jobs The number of queued pending jobs.
284+
# TYPE cortex_compactor_scheduler_pending_jobs gauge
285+
cortex_compactor_scheduler_pending_jobs{job_type="compaction"} 0
286+
cortex_compactor_scheduler_pending_jobs{job_type="plan"} %d
287+
# HELP cortex_compactor_scheduler_active_jobs The number of jobs active in workers.
288+
# TYPE cortex_compactor_scheduler_active_jobs gauge
289+
cortex_compactor_scheduler_active_jobs{job_type="compaction"} 0
290+
cortex_compactor_scheduler_active_jobs{job_type="plan"} %d
291+
`, pending, active)), "cortex_compactor_scheduler_pending_jobs", "cortex_compactor_scheduler_active_jobs"), label)
287292
}
288293

289-
assertIncompletePlanJobs("no plan jobs yet", 0)
294+
assertPlanJobLocation("no plan jobs yet", 0, 0)
290295

291296
_, err := jt.Maintenance(time.Minute, false, true, time.Hour, 0)
292297
require.NoError(t, err)
293-
assertIncompletePlanJobs("plan job pending", 1)
298+
assertPlanJobLocation("plan job pending", 1, 0)
294299

295300
leaseResp, _, err := jt.Lease()
296301
require.NoError(t, err)
297302
require.Equal(t, planJobId, leaseResp.Key.Id)
298-
assertIncompletePlanJobs("plan job active (still incomplete)", 1)
303+
assertPlanJobLocation("plan job active", 0, 1)
299304

300305
canceled, _, err := jt.CancelLease(leaseResp.Key.Id, leaseResp.Key.Epoch)
301306
require.NoError(t, err)
302307
require.True(t, canceled)
303-
assertIncompletePlanJobs("plan job revived to pending (unchanged)", 1)
308+
assertPlanJobLocation("plan job revived to pending", 1, 0)
304309

305310
leaseResp, _, err = jt.Lease()
306311
require.NoError(t, err)
307312
_, _, err = jt.Remove(leaseResp.Key.Id, leaseResp.Key.Epoch, true)
308313
require.NoError(t, err)
309-
assertIncompletePlanJobs("plan job complete", 0)
314+
assertPlanJobLocation("plan job complete", 0, 0)
310315
}
311316

312317
func TestJobTracker_Cleanup(t *testing.T) {
313318
clk := clock.NewMock()
314319
reg := prometheus.NewPedanticRegistry()
315320
sm := newSchedulerMetrics(reg)
316321

317-
// Two tenants share the same incompleteJobsBytes and incompletePlanJobs gauges.
322+
// Two tenants share the same aggregate gauges (incompleteJobsBytes, pendingJobs, activeJobs).
318323
jt1 := NewJobTracker(&NopJobPersister{}, "tenant1", clk, infiniteLeases, infiniteLeases, sm.newTrackerMetricsForTenant("tenant1"), log.NewNopLogger())
319324
jt2 := NewJobTracker(&NopJobPersister{}, "tenant2", clk, infiniteLeases, infiniteLeases, sm.newTrackerMetricsForTenant("tenant2"), log.NewNopLogger())
320325

@@ -333,25 +338,41 @@ func TestJobTracker_Cleanup(t *testing.T) {
333338
_, err = jt2.Maintenance(time.Minute, false, true, time.Hour, 0)
334339
require.NoError(t, err)
335340

341+
// Lease both of tenant1's jobs
342+
for range 2 {
343+
_, _, err := jt1.Lease()
344+
require.NoError(t, err)
345+
}
346+
336347
require.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(`
337-
# HELP cortex_compactor_incomplete_plan_jobs The total number of plan jobs that have not yet completed (pending or active).
338-
# TYPE cortex_compactor_incomplete_plan_jobs gauge
339-
cortex_compactor_incomplete_plan_jobs 2
340-
`), "cortex_compactor_incomplete_plan_jobs"), "both tenants have a pending plan job")
348+
# HELP cortex_compactor_scheduler_pending_jobs The number of queued pending jobs.
349+
# TYPE cortex_compactor_scheduler_pending_jobs gauge
350+
cortex_compactor_scheduler_pending_jobs{job_type="compaction"} 1
351+
cortex_compactor_scheduler_pending_jobs{job_type="plan"} 1
352+
# HELP cortex_compactor_scheduler_active_jobs The number of jobs active in workers.
353+
# TYPE cortex_compactor_scheduler_active_jobs gauge
354+
cortex_compactor_scheduler_active_jobs{job_type="compaction"} 1
355+
cortex_compactor_scheduler_active_jobs{job_type="plan"} 1
356+
`), "cortex_compactor_scheduler_pending_jobs", "cortex_compactor_scheduler_active_jobs"), "tenant1 active, tenant2 pending")
341357

342358
// Cleaning up tenant1 should only subtract its contribution, not zero the shared gauges.
343359
jt1.CleanupMetrics()
344360
assertTrackerBytes(t, reg, "only tenant1 bytes removed", 0, 200)
361+
require.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(`
362+
# HELP cortex_compactor_scheduler_pending_jobs_by_user The number of queued pending jobs, broken down by user.
363+
# TYPE cortex_compactor_scheduler_pending_jobs_by_user gauge
364+
cortex_compactor_scheduler_pending_jobs_by_user{user="tenant2"} 2
365+
`), "cortex_compactor_scheduler_pending_jobs_by_user"), "only tenant2 pending jobs remain")
345366
require.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(`
346367
# HELP cortex_compactor_scheduler_pending_jobs The number of queued pending jobs.
347368
# TYPE cortex_compactor_scheduler_pending_jobs gauge
348-
cortex_compactor_scheduler_pending_jobs{user="tenant2"} 2
349-
`), "cortex_compactor_scheduler_pending_jobs"), "only tenant2 pending jobs remain")
350-
require.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(`
351-
# HELP cortex_compactor_incomplete_plan_jobs The total number of plan jobs that have not yet completed (pending or active).
352-
# TYPE cortex_compactor_incomplete_plan_jobs gauge
353-
cortex_compactor_incomplete_plan_jobs 1
354-
`), "cortex_compactor_incomplete_plan_jobs"), "only tenant2 plan job remains")
369+
cortex_compactor_scheduler_pending_jobs{job_type="compaction"} 1
370+
cortex_compactor_scheduler_pending_jobs{job_type="plan"} 1
371+
# HELP cortex_compactor_scheduler_active_jobs The number of jobs active in workers.
372+
# TYPE cortex_compactor_scheduler_active_jobs gauge
373+
cortex_compactor_scheduler_active_jobs{job_type="compaction"} 0
374+
cortex_compactor_scheduler_active_jobs{job_type="plan"} 0
375+
`), "cortex_compactor_scheduler_pending_jobs", "cortex_compactor_scheduler_active_jobs"), "tenant1's active contribution removed, tenant2's pending preserved")
355376
}
356377

357378
func TestJobTracker_CancelLease_PlanJobAlwaysRevives(t *testing.T) {

pkg/compactor/scheduler/metrics.go

Lines changed: 116 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@ const (
1717

1818
type schedulerMetrics struct {
1919
pendingJobs *prometheus.GaugeVec
20+
pendingJobsByUser *prometheus.GaugeVec
2021
incompleteJobsBytes *prometheus.GaugeVec
21-
incompletePlanJobs prometheus.Gauge
2222
activeJobs *prometheus.GaugeVec
23+
activeJobsByUser *prometheus.GaugeVec
2324
jobsCompleted *prometheus.CounterVec
2425
repeatedJobFailures prometheus.Counter
2526
}
@@ -29,18 +30,22 @@ func newSchedulerMetrics(reg prometheus.Registerer) *schedulerMetrics {
2930
pendingJobs: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
3031
Name: "cortex_compactor_scheduler_pending_jobs",
3132
Help: "The number of queued pending jobs.",
33+
}, []string{"job_type"}),
34+
pendingJobsByUser: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
35+
Name: "cortex_compactor_scheduler_pending_jobs_by_user",
36+
Help: "The number of queued pending jobs, broken down by user.",
3237
}, []string{"user"}),
3338
incompleteJobsBytes: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
3439
Name: "cortex_compactor_scheduler_incomplete_compaction_jobs_bytes",
3540
Help: "The total bytes of blocks in compaction jobs that have not yet completed (pending or active).",
3641
}, []string{"compaction_type"}),
37-
incompletePlanJobs: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
38-
Name: "cortex_compactor_incomplete_plan_jobs",
39-
Help: "The total number of plan jobs that have not yet completed (pending or active).",
40-
}),
4142
activeJobs: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
4243
Name: "cortex_compactor_scheduler_active_jobs",
4344
Help: "The number of jobs active in workers.",
45+
}, []string{"job_type"}),
46+
activeJobsByUser: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
47+
Name: "cortex_compactor_scheduler_active_jobs_by_user",
48+
Help: "The number of jobs active in workers, broken down by user.",
4449
}, []string{"user"}),
4550
jobsCompleted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
4651
Name: "cortex_compactor_scheduler_jobs_completed_total",
@@ -54,6 +59,10 @@ func newSchedulerMetrics(reg prometheus.Registerer) *schedulerMetrics {
5459
// Pre-initialize job type labels so we get zeros instead of no data.
5560
m.jobsCompleted.WithLabelValues(jobTypePlan)
5661
m.jobsCompleted.WithLabelValues(jobTypeCompaction)
62+
m.pendingJobs.WithLabelValues(jobTypePlan)
63+
m.pendingJobs.WithLabelValues(jobTypeCompaction)
64+
m.activeJobs.WithLabelValues(jobTypePlan)
65+
m.activeJobs.WithLabelValues(jobTypeCompaction)
5766
m.incompleteJobsBytes.WithLabelValues(compactionTypeSplit)
5867
m.incompleteJobsBytes.WithLabelValues(compactionTypeMerge)
5968
return m
@@ -62,14 +71,17 @@ func newSchedulerMetrics(reg prometheus.Registerer) *schedulerMetrics {
6271
func (s *schedulerMetrics) newTrackerMetricsForTenant(tenant string) *trackerMetrics {
6372
return &trackerMetrics{
6473
queue: &queueMetrics{
65-
pendingJobs: s.pendingJobs.WithLabelValues(tenant),
66-
activeJobs: s.activeJobs.WithLabelValues(tenant),
67-
incompleteSplitBytes: s.incompleteJobsBytes.WithLabelValues(compactionTypeSplit),
68-
incompleteMergeBytes: s.incompleteJobsBytes.WithLabelValues(compactionTypeMerge),
69-
incompletePlanJobs: s.incompletePlanJobs,
74+
pendingJobsByUser: s.pendingJobsByUser.WithLabelValues(tenant),
75+
activeJobsByUser: s.activeJobsByUser.WithLabelValues(tenant),
76+
pendingPlanJobs: s.pendingJobs.WithLabelValues(jobTypePlan),
77+
pendingCompactionJobs: s.pendingJobs.WithLabelValues(jobTypeCompaction),
78+
activePlanJobs: s.activeJobs.WithLabelValues(jobTypePlan),
79+
activeCompactionJobs: s.activeJobs.WithLabelValues(jobTypeCompaction),
80+
incompleteSplitBytes: s.incompleteJobsBytes.WithLabelValues(compactionTypeSplit),
81+
incompleteMergeBytes: s.incompleteJobsBytes.WithLabelValues(compactionTypeMerge),
7082
clear: func() {
71-
s.pendingJobs.DeleteLabelValues(tenant)
72-
s.activeJobs.DeleteLabelValues(tenant)
83+
s.pendingJobsByUser.DeleteLabelValues(tenant)
84+
s.activeJobsByUser.DeleteLabelValues(tenant)
7385
},
7486
},
7587
repeatedJobFailures: s.repeatedJobFailures,
@@ -84,98 +96,139 @@ type trackerMetrics struct {
8496
// Clear deletes all per-tenant label values and subtracts this tenant's contribution from the
8597
// shared gauges. Must be called when a tenant is removed.
8698
func (m *trackerMetrics) Clear() {
87-
m.queue.incompleteSplitBytes.Sub(float64(m.queue.splitBytes))
88-
m.queue.incompleteMergeBytes.Sub(float64(m.queue.mergeBytes))
89-
m.queue.incompletePlanJobs.Sub(float64(m.queue.planJobCount))
90-
m.queue.splitBytes = 0
91-
m.queue.mergeBytes = 0
92-
m.queue.planJobCount = 0
93-
m.queue.clear()
99+
q := m.queue
100+
q.incompleteSplitBytes.Sub(float64(q.splitBytes))
101+
q.incompleteMergeBytes.Sub(float64(q.mergeBytes))
102+
q.pendingPlanJobs.Sub(float64(q.pendingPlanCount))
103+
q.pendingCompactionJobs.Sub(float64(q.pendingCompactionCount))
104+
q.activePlanJobs.Sub(float64(q.activePlanCount))
105+
q.activeCompactionJobs.Sub(float64(q.activeCompactionCount))
106+
q.splitBytes = 0
107+
q.mergeBytes = 0
108+
q.pendingPlanCount = 0
109+
q.pendingCompactionCount = 0
110+
q.activePlanCount = 0
111+
q.activeCompactionCount = 0
112+
q.clear()
94113
}
95114

96115
// queueMetrics encapsulates queue-level metrics for one tenant, allowing the caller to ignore
97116
// the details of which metrics to update and how, focusing only on job state transitions.
98117
// Callers are responsible for making valid transitions. Invalid calls (e.g. DropPending on an
99118
// empty queue) will produce incorrect gauge values. Methods are not thread-safe.
100119
type queueMetrics struct {
101-
pendingJobs prometheus.Gauge
102-
activeJobs prometheus.Gauge
120+
pendingJobsByUser prometheus.Gauge
121+
activeJobsByUser prometheus.Gauge
103122

104123
// shared across tenants
105-
incompleteSplitBytes prometheus.Gauge
106-
incompleteMergeBytes prometheus.Gauge
107-
incompletePlanJobs prometheus.Gauge
108-
109-
// splitBytes, mergeBytes, and planJobCount track this tenant's contribution to the shared
110-
// incomplete gauges so we can subtract exactly the right amount on tenant removal.
111-
splitBytes uint64
112-
mergeBytes uint64
113-
planJobCount int
114-
clear func()
124+
pendingPlanJobs prometheus.Gauge
125+
pendingCompactionJobs prometheus.Gauge
126+
activePlanJobs prometheus.Gauge
127+
activeCompactionJobs prometheus.Gauge
128+
incompleteSplitBytes prometheus.Gauge
129+
incompleteMergeBytes prometheus.Gauge
130+
131+
// This tenant's contribution to the shared gauges, tracked so Clear() can subtract exactly
132+
// the right amount on tenant removal.
133+
splitBytes uint64
134+
mergeBytes uint64
135+
pendingPlanCount int
136+
pendingCompactionCount int
137+
activePlanCount int
138+
activeCompactionCount int
139+
clear func()
115140
}
116141

117142
func (q *queueMetrics) Pending(j TrackedJob) {
118-
q.pendingJobs.Inc()
119-
if j.ID() == planJobId {
120-
q.incompletePlanJobs.Inc()
121-
q.planJobCount++
122-
} else {
123-
q.addBytes(j.(*TrackedCompactionJob))
143+
q.incPending(j.ID() == planJobId)
144+
if cj, ok := j.(*TrackedCompactionJob); ok {
145+
q.addBytes(cj)
124146
}
125147
}
126148

127149
func (q *queueMetrics) Leased(j TrackedJob) {
128-
q.pendingJobs.Dec()
129-
q.activeJobs.Inc()
150+
isPlan := j.ID() == planJobId
151+
q.decPending(isPlan)
152+
q.incActive(isPlan)
130153
}
131154

132155
// Recover records jobs restored from persisted state on startup.
133156
func (q *queueMetrics) Recover(pending, leased []TrackedJob) {
134157
for _, j := range pending {
135-
q.pendingJobs.Inc()
136-
if j.ID() == planJobId {
137-
q.incompletePlanJobs.Inc()
138-
q.planJobCount++
139-
} else {
140-
q.addBytes(j.(*TrackedCompactionJob))
141-
}
158+
q.Pending(j)
142159
}
143160
for _, j := range leased {
144-
q.activeJobs.Inc()
145-
if j.ID() == planJobId {
146-
q.incompletePlanJobs.Inc()
147-
q.planJobCount++
148-
} else if cj, ok := j.(*TrackedCompactionJob); ok {
161+
q.incActive(j.ID() == planJobId)
162+
if cj, ok := j.(*TrackedCompactionJob); ok {
149163
q.addBytes(cj)
150164
}
151165
}
152166
}
153167

154168
// Revive records a job moving from active back to pending (lease expired or cancelled).
155169
func (q *queueMetrics) Revive(j TrackedJob) {
156-
q.activeJobs.Dec()
157-
q.pendingJobs.Inc()
170+
isPlan := j.ID() == planJobId
171+
q.decActive(isPlan)
172+
q.incPending(isPlan)
158173
}
159174

160175
// Complete records a job leaving the system from the active queue (success or failure).
161176
func (q *queueMetrics) Complete(j TrackedJob) {
162-
q.activeJobs.Dec()
163-
if j.ID() == planJobId {
164-
q.incompletePlanJobs.Dec()
165-
q.planJobCount--
166-
} else if cj, ok := j.(*TrackedCompactionJob); ok {
177+
q.decActive(j.ID() == planJobId)
178+
if cj, ok := j.(*TrackedCompactionJob); ok {
167179
q.subBytes(cj)
168180
}
169181
}
170182

171183
// DropPending records a job leaving the system from the pending queue.
172184
func (q *queueMetrics) DropPending(j TrackedJob) {
173-
q.pendingJobs.Dec()
174-
if j.ID() == planJobId {
175-
q.incompletePlanJobs.Dec()
176-
q.planJobCount--
185+
q.decPending(j.ID() == planJobId)
186+
if cj, ok := j.(*TrackedCompactionJob); ok {
187+
q.subBytes(cj)
188+
}
189+
}
190+
191+
func (q *queueMetrics) incPending(isPlan bool) {
192+
q.pendingJobsByUser.Inc()
193+
if isPlan {
194+
q.pendingPlanJobs.Inc()
195+
q.pendingPlanCount++
196+
} else {
197+
q.pendingCompactionJobs.Inc()
198+
q.pendingCompactionCount++
199+
}
200+
}
201+
202+
func (q *queueMetrics) decPending(isPlan bool) {
203+
q.pendingJobsByUser.Dec()
204+
if isPlan {
205+
q.pendingPlanJobs.Dec()
206+
q.pendingPlanCount--
207+
} else {
208+
q.pendingCompactionJobs.Dec()
209+
q.pendingCompactionCount--
210+
}
211+
}
212+
213+
func (q *queueMetrics) incActive(isPlan bool) {
214+
q.activeJobsByUser.Inc()
215+
if isPlan {
216+
q.activePlanJobs.Inc()
217+
q.activePlanCount++
218+
} else {
219+
q.activeCompactionJobs.Inc()
220+
q.activeCompactionCount++
221+
}
222+
}
223+
224+
func (q *queueMetrics) decActive(isPlan bool) {
225+
q.activeJobsByUser.Dec()
226+
if isPlan {
227+
q.activePlanJobs.Dec()
228+
q.activePlanCount--
177229
} else {
178-
q.subBytes(j.(*TrackedCompactionJob))
230+
q.activeCompactionJobs.Dec()
231+
q.activeCompactionCount--
179232
}
180233
}
181234

0 commit comments

Comments
 (0)