Skip to content

Add pickup and duration time metrics for processed jobs #4312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/cloud_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ module VCAP::CloudController; end
require 'delayed_job_plugins/deserialization_retry'
require 'delayed_job_plugins/before_enqueue_hook'
require 'delayed_job_plugins/after_enqueue_hook'
require 'delayed_job_plugins/delayed_jobs_metrics'
require 'sequel_plugins/sequel_plugins'
require 'vcap/sequel_add_association_dependencies_monkeypatch'
require 'access/access'
Expand Down
11 changes: 9 additions & 2 deletions lib/cloud_controller/metrics/prometheus_updater.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def self.allow_pid_label

DURATION_BUCKETS = [5, 10, 30, 60, 300, 600, 890].freeze
CONNECTION_DURATION_BUCKETS = [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10].freeze
DELAYED_JOB_METRIC_BUCKETS = [0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 300, 600].freeze

METRICS = [
{ type: :gauge, name: :cc_job_queues_length_total, docstring: 'Job queues length of worker processes', labels: [:queue], aggregation: :most_recent },
Expand Down Expand Up @@ -67,13 +68,19 @@ def self.allow_pid_label
buckets: CONNECTION_DURATION_BUCKETS }
].freeze

DELAYED_JOB_METRICS = [
{ type: :histogram, name: :cc_job_pickup_delay_seconds, docstring: 'Job pickup time (from enqueue to start)', labels: %i[queue worker], buckets: DELAYED_JOB_METRIC_BUCKETS },
{ type: :histogram, name: :cc_job_duration_seconds, docstring: 'Job processing time (start to finish)', labels: %i[queue worker], buckets: DELAYED_JOB_METRIC_BUCKETS }
].freeze

def initialize(registry: Prometheus::Client.registry, cc_worker: false)
self.class.allow_pid_label

@registry = registry

# Register all metrics, to initialize them for discoverability
DB_CONNECTION_POOL_METRICS.each { |metric| register(metric) }
DELAYED_JOB_METRICS.each { |metric| register(metric) }

return if cc_worker

Expand All @@ -98,8 +105,8 @@ def increment_counter_metric(metric)
@registry.get(metric).increment
end

def update_histogram_metric(metric, value)
@registry.get(metric).observe(value)
def update_histogram_metric(metric, value, labels: {})
@registry.get(metric).observe(value, labels:)
end

def update_summary_metric(metric, value)
Expand Down
21 changes: 21 additions & 0 deletions lib/delayed_job_plugins/delayed_jobs_metrics.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module DelayedJobMetrics
class Plugin < Delayed::Plugin
def self.prometheus
@prometheus ||= CloudController::DependencyLocator.instance.cc_worker_prometheus_updater
end

callbacks do |lifecycle|
lifecycle.after(:perform) do |worker, job|
labels = { queue: job.queue, worker: worker.name }

job_pickup_delay = job.locked_at && job.run_at ? job.locked_at - job.run_at : nil
prometheus.update_histogram_metric(:cc_job_pickup_delay_seconds, job_pickup_delay, labels:) if job_pickup_delay

job_duration = job.locked_at ? Time.now.utc - job.locked_at : nil
prometheus.update_histogram_metric(:cc_job_duration_seconds, job_duration, labels:) if job_duration
end
end
end
end

Delayed::Worker.plugins << DelayedJobMetrics::Plugin
1 change: 1 addition & 0 deletions spec/db_spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
require 'delayed_job_plugins/deserialization_retry'
require 'delayed_job_plugins/after_enqueue_hook'
require 'delayed_job_plugins/before_enqueue_hook'
require 'delayed_job_plugins/delayed_jobs_metrics'

require 'support/fakes/blueprints'

Expand Down
42 changes: 42 additions & 0 deletions spec/unit/lib/delayed_job_plugins/delayed_jobs_metrics_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
require 'spec_helper'

RSpec.describe DelayedJobMetrics::Plugin do
let(:prometheus) { instance_double(VCAP::CloudController::Metrics::PrometheusUpdater) }
let(:worker) { instance_double(Delayed::Worker, name: 'test_worker') }

before do
allow(CloudController::DependencyLocator.instance).to receive(:cc_worker_prometheus_updater).and_return(prometheus)
allow(prometheus).to receive(:update_histogram_metric)
end

it 'loads the plugin' do
expect(Delayed::Worker.plugins).to include(DelayedJobMetrics::Plugin)
end

it 'processes a job and updates Prometheus metrics with simulated time delay' do
Timecop.freeze(Time.now) do
events_cleanup_job = VCAP::CloudController::Jobs::Runtime::EventsCleanup.new(10_000)
VCAP::CloudController::Jobs::Enqueuer.new({ queue: VCAP::CloudController::Jobs::Queues.generic }).enqueue(events_cleanup_job)

events_cleanup_job = Delayed::Job.last
expect(events_cleanup_job).not_to be_nil

allow(Time).to receive(:now).and_return(Time.now + 10.seconds)
worker = Delayed::Worker.new
worker.name = 'test_worker'
worker.work_off(1)

expect(prometheus).to have_received(:update_histogram_metric).with(
:cc_job_pickup_delay_seconds,
be_within(0.5).of(10.0),
labels: { queue: VCAP::CloudController::Jobs::Queues.generic, worker: 'test_worker' }
).once

expect(prometheus).to have_received(:update_histogram_metric).with(
:cc_job_duration_seconds,
kind_of(Numeric),
labels: { queue: VCAP::CloudController::Jobs::Queues.generic, worker: 'test_worker' }
).once
end
end
end
Loading