Skip to content

Commit 6a6bb42

Browse files
committed
Add pickup and duration time metrics for processed jobs
This change adds a new delayed job plugin which generates the following metrics after a cc-worker processed a job. - cc_job_pickup_delay_seconds: the difference of `run_at` and `locked_at` - cc_job_duration_seconds: Time difference of `locked_at` and current time. Both metrics are labeled with the name of the worker and the queue of the job to allow further aggregation etc.
1 parent dde675b commit 6a6bb42

File tree

5 files changed

+71
-2
lines changed

5 files changed

+71
-2
lines changed

Diff for: lib/cloud_controller.rb

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ module VCAP::CloudController; end
3434
require 'delayed_job_plugins/deserialization_retry'
3535
require 'delayed_job_plugins/before_enqueue_hook'
3636
require 'delayed_job_plugins/after_enqueue_hook'
37+
require 'delayed_job_plugins/delayed_jobs_metrics'
3738
require 'sequel_plugins/sequel_plugins'
3839
require 'vcap/sequel_add_association_dependencies_monkeypatch'
3940
require 'access/access'

Diff for: lib/cloud_controller/metrics/prometheus_updater.rb

+9-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def self.allow_pid_label
1818

1919
DURATION_BUCKETS = [5, 10, 30, 60, 300, 600, 890].freeze
2020
CONNECTION_DURATION_BUCKETS = [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10].freeze
21+
DELAYED_JOB_METRIC_BUCKETS = [0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 300, 600].freeze
2122

2223
METRICS = [
2324
{ type: :gauge, name: :cc_job_queues_length_total, docstring: 'Job queues length of worker processes', labels: [:queue], aggregation: :most_recent },
@@ -67,13 +68,19 @@ def self.allow_pid_label
6768
buckets: CONNECTION_DURATION_BUCKETS }
6869
].freeze
6970

71+
DELAYED_JOB_METRICS = [
72+
{ type: :histogram, name: :cc_job_pickup_delay_seconds, docstring: 'Job pickup time (from enqueue to start)', labels: %i[queue worker], buckets: DELAYED_JOB_METRIC_BUCKETS },
73+
{ type: :histogram, name: :cc_job_duration_seconds, docstring: 'Job processing time (start to finish)', labels: %i[queue worker], buckets: DELAYED_JOB_METRIC_BUCKETS }
74+
].freeze
75+
7076
def initialize(registry: Prometheus::Client.registry, cc_worker: false)
7177
self.class.allow_pid_label
7278

7379
@registry = registry
7480

7581
# Register all metrics, to initialize them for discoverability
7682
DB_CONNECTION_POOL_METRICS.each { |metric| register(metric) }
83+
DELAYED_JOB_METRICS.each { |metric| register(metric) }
7784

7885
return if cc_worker
7986

@@ -98,8 +105,8 @@ def increment_counter_metric(metric)
98105
@registry.get(metric).increment
99106
end
100107

101-
def update_histogram_metric(metric, value)
102-
@registry.get(metric).observe(value)
108+
def update_histogram_metric(metric, value, labels: {})
109+
@registry.get(metric).observe(value, labels:)
103110
end
104111

105112
def update_summary_metric(metric, value)

Diff for: lib/delayed_job_plugins/delayed_jobs_metrics.rb

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
module DelayedJobMetrics
2+
class Plugin < Delayed::Plugin
3+
def self.prometheus
4+
@prometheus ||= CloudController::DependencyLocator.instance.cc_worker_prometheus_updater
5+
end
6+
7+
callbacks do |lifecycle|
8+
lifecycle.after(:perform) do |worker, job|
9+
labels = { queue: job.queue, worker: worker.name }
10+
11+
job_pickup_delay = job.locked_at && job.run_at ? job.locked_at - job.run_at : nil
12+
prometheus.update_histogram_metric(:cc_job_pickup_delay_seconds, job_pickup_delay, labels:) if job_pickup_delay
13+
14+
job_duration = job.locked_at ? Time.now.utc - job.locked_at : nil
15+
prometheus.update_histogram_metric(:cc_job_duration_seconds, job_duration, labels:) if job_duration
16+
end
17+
end
18+
end
19+
end
20+
21+
Delayed::Worker.plugins << DelayedJobMetrics::Plugin

Diff for: spec/db_spec_helper.rb

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
require 'delayed_job_plugins/deserialization_retry'
1717
require 'delayed_job_plugins/after_enqueue_hook'
1818
require 'delayed_job_plugins/before_enqueue_hook'
19+
require 'delayed_job_plugins/delayed_jobs_metrics'
1920

2021
require 'support/fakes/blueprints'
2122

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
require 'spec_helper'
2+
3+
RSpec.describe DelayedJobMetrics::Plugin do
4+
let(:prometheus) { instance_double(VCAP::CloudController::Metrics::PrometheusUpdater) }
5+
let(:worker) { instance_double(Delayed::Worker, name: 'test_worker') }
6+
7+
before do
8+
allow(CloudController::DependencyLocator.instance).to receive(:cc_worker_prometheus_updater).and_return(prometheus)
9+
allow(prometheus).to receive(:update_histogram_metric)
10+
end
11+
12+
it 'processes a job and updates Prometheus metrics with simulated time delay' do
13+
Timecop.freeze(Time.now) do
14+
events_cleanup_job = VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(10_000)
15+
VCAP::CloudController::Jobs::Enqueuer.new({ queue: VCAP::CloudController::Jobs::Queues.generic }).enqueue(events_cleanup_job)
16+
17+
events_cleanup_job = Delayed::Job.last
18+
expect(events_cleanup_job).not_to be_nil
19+
20+
Timecop.travel(10.seconds) do
21+
worker = Delayed::Worker.new
22+
worker.name = 'test_worker'
23+
worker.work_off(1)
24+
25+
expect(prometheus).to have_received(:update_histogram_metric).with(
26+
:cc_job_pickup_delay_seconds,
27+
be_within(0.5).of(10.0),
28+
labels: { queue: VCAP::CloudController::Jobs::Queues.generic, worker: 'test_worker' }
29+
).once
30+
31+
expect(prometheus).to have_received(:update_histogram_metric).with(
32+
:cc_job_duration_seconds,
33+
kind_of(Numeric),
34+
labels: { queue: VCAP::CloudController::Jobs::Queues.generic, worker: 'test_worker' }
35+
).once
36+
end
37+
end
38+
end
39+
end

0 commit comments

Comments
 (0)