Skip to content

Commit 129889c

Browse files
committed
feat: metrics integration for sidekiq
1 parent ef747e7 commit 129889c

File tree

11 files changed

+335
-49
lines changed

11 files changed

+335
-49
lines changed

instrumentation/base/lib/opentelemetry/instrumentation/metrics.rb

+5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@
44
#
55
# SPDX-License-Identifier: Apache-2.0
66

7+
begin
8+
require 'opentelemetry-metrics-api'
9+
rescue LoadError
10+
end
11+
712
module OpenTelemetry
813
module Instrumentation
914
# Extensions to Instrumentation::Base that handle metrics instruments.

instrumentation/concurrent_ruby/opentelemetry-instrumentation-concurrent_ruby.gemspec

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ Gem::Specification.new do |spec|
2727

2828
spec.add_dependency 'opentelemetry-api', '~> 1.0'
2929
spec.add_dependency 'opentelemetry-instrumentation-base', '~> 0.23.0'
30+
spec.add_dependency 'opentelemetry-metrics-api', '~> 1.0'
3031

3132
spec.add_development_dependency 'appraisal', '~> 2.5'
3233
spec.add_development_dependency 'bundler', '~> 2.4'

instrumentation/sidekiq/Appraisals

+34-19
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,39 @@
11
# frozen_string_literal: true
22

3-
appraise 'sidekiq-7.0' do
4-
gem 'sidekiq', '~> 7.0'
5-
end
6-
7-
appraise 'sidekiq-6.5' do
8-
gem 'sidekiq', '>= 6.5', '< 7.0'
9-
end
3+
{
4+
'sidekiq-7.0' => [['sidekiq', '~> 7.0']],
5+
'sidekiq-6.5' => [['sidekiq', '>= 6.5', '< 7.0']],
6+
'sidekiq-6.0' => [
7+
['sidekiq', '>= 6.0', '< 6.5'],
8+
['redis', '< 4.8']
9+
],
10+
'sidekiq-5.2' => [
11+
['sidekiq', '~> 5.2'],
12+
['redis', '< 4.8']
13+
],
14+
'sidekiq-4.2' => [
15+
['sidekiq', '~> 4.2'],
16+
['redis', '< 4.8']
17+
]
18+
}.each do |gemfile_name, specs|
19+
appraise gemfile_name do
20+
specs.each do |spec|
21+
gem(*spec)
22+
remove_gem 'opentelemetry-metrics-api'
23+
remove_gem 'opentelemetry-metrics-sdk'
24+
end
25+
end
1026

11-
appraise 'sidekiq-6.0' do
12-
gem 'sidekiq', '>= 6.0', '< 6.5'
13-
gem 'redis', '< 4.8'
14-
end
15-
16-
appraise 'sidekiq-5.2' do
17-
gem 'sidekiq', '~> 5.2'
18-
gem 'redis', '< 4.8'
19-
end
27+
appraise "#{gemfile_name}-metrics-api" do
28+
specs.each do |spec|
29+
gem(*spec)
30+
remove_gem 'opentelemetry-metrics-sdk'
31+
end
32+
end
2033

21-
appraise 'sidekiq-4.2' do
22-
gem 'sidekiq', '~> 4.2'
23-
gem 'redis', '< 4.8'
34+
appraise "#{gemfile_name}-metrics-sdk" do
35+
specs.each do |spec|
36+
gem(*spec)
37+
end
38+
end
2439
end

instrumentation/sidekiq/Gemfile

+4-1
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@
66

77
source 'https://rubygems.org'
88

9+
gem 'opentelemetry-metrics-api', '~> 0.2.0'
10+
911
gemspec
1012

1113
group :test do
1214
gem 'opentelemetry-instrumentation-base', path: '../base'
1315
gem 'opentelemetry-instrumentation-redis', path: '../redis'
14-
gem 'pry-byebug'
16+
gem 'opentelemetry-metrics-sdk'
17+
gem 'opentelemetry-metrics-test-helpers', path: '../../helpers/metrics-test-helpers', require: false
1518
end

instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/instrumentation.rb

+10
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,15 @@ class Instrumentation < OpenTelemetry::Instrumentation::Base
107107
option :trace_poller_wait, default: false, validate: :boolean
108108
option :trace_processor_process_one, default: false, validate: :boolean
109109
option :peer_service, default: nil, validate: :string
110+
option :metrics, default: false, validate: :boolean
111+
112+
counter 'messaging.client.sent.messages'
113+
histogram 'messaging.client.operation.duration', unit: 's'
114+
counter 'messaging.client.consumed.messages'
115+
histogram 'messaging.process.duration', unit: 's'
116+
117+
# TODO: https://github.com/open-telemetry/semantic-conventions/pull/1812
118+
gauge 'messaging.queue.latency', unit: 's'
110119

111120
private
112121

@@ -115,6 +124,7 @@ def gem_version
115124
end
116125

117126
def require_dependencies
127+
require_relative 'middlewares/common'
118128
require_relative 'middlewares/client/tracer_middleware'
119129
require_relative 'middlewares/server/tracer_middleware'
120130

instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/client/tracer_middleware.rb

+38-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
#
55
# SPDX-License-Identifier: Apache-2.0
66

7+
require_relative '../common'
8+
79
module OpenTelemetry
810
module Instrumentation
911
module Sidekiq
@@ -12,6 +14,7 @@ module Client
1214
# TracerMiddleware propagates context and instruments Sidekiq client
1315
# by way of its middleware system
1416
class TracerMiddleware
17+
include Common
1518
include ::Sidekiq::ClientMiddleware if defined?(::Sidekiq::ClientMiddleware)
1619

1720
def call(_worker_class, job, _queue, _redis_pool)
@@ -33,17 +36,49 @@ def call(_worker_class, job, _queue, _redis_pool)
3336
OpenTelemetry.propagation.inject(job)
3437
span.add_event('created_at', timestamp: job['created_at'])
3538
yield
39+
end.tap do # rubocop: disable Style/MultilineBlockChain
40+
count_sent_message(job)
3641
end
3742
end
3843

3944
private
4045

41-
def instrumentation_config
42-
Sidekiq::Instrumentation.instance.config
46+
def count_sent_message(job)
47+
with_meter do |_meter|
48+
counter_attributes = metrics_attributes(job).merge(
49+
{
50+
'messaging.operation.name' => 'create'
51+
# server.address => # FIXME: required if available
52+
# messaging.destination.partition.id => FIXME: recommended
53+
# server.port => # FIXME: recommended
54+
}
55+
)
56+
57+
counter = messaging_client_sent_messages_counter
58+
counter.add(1, attributes: counter_attributes)
59+
end
60+
end
61+
62+
def messaging_client_sent_messages_counter
63+
instrumentation.counter('messaging.client.sent.messages')
4364
end
4465

4566
def tracer
46-
Sidekiq::Instrumentation.instance.tracer
67+
instrumentation.tracer
68+
end
69+
70+
def with_meter(&block)
71+
instrumentation.with_meter(&block)
72+
end
73+
74+
def metrics_attributes(job)
75+
{
76+
'messaging.system' => 'sidekiq', # FIXME: metrics semconv
77+
'messaging.destination.name' => job['queue'] # FIXME: metrics semconv
78+
# server.address => # FIXME: required if available
79+
# messaging.destination.partition.id => FIXME: recommended
80+
# server.port => # FIXME: recommended
81+
}
4782
end
4883
end
4984
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright The OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
module OpenTelemetry
8+
module Instrumentation
9+
module Sidekiq
10+
module Middlewares
11+
# Common logic for server and client middlewares
12+
module Common
13+
private
14+
15+
def instrumentation
16+
Sidekiq::Instrumentation.instance
17+
end
18+
19+
def instrumentation_config
20+
Sidekiq::Instrumentation.instance.config
21+
end
22+
23+
# Bypasses _all_ enclosed logic unless metrics are enabled
24+
def with_meter(&block)
25+
instrumentation.with_meter(&block)
26+
end
27+
28+
# time an inner block and yield the duration to the given callback
29+
def timed(callback)
30+
return yield unless metrics_enabled?
31+
32+
t0 = monotonic_now
33+
34+
yield.tap do
35+
callback.call(monotonic_now - t0)
36+
end
37+
end
38+
39+
def realtime_now
40+
Process.clock_gettime(Process::CLOCK_REALTIME)
41+
end
42+
43+
def monotonic_now
44+
Process.clock_gettime(Process::CLOCK_MONOTONIC)
45+
end
46+
47+
def tracer
48+
instrumentation.tracer
49+
end
50+
51+
def metrics_enabled?
52+
instrumentation.metrics_enabled?
53+
end
54+
end
55+
end
56+
end
57+
end
58+
end

instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware.rb

+79-25
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
#
55
# SPDX-License-Identifier: Apache-2.0
66

7+
require_relative '../common'
8+
79
module OpenTelemetry
810
module Instrumentation
911
module Sidekiq
@@ -12,6 +14,7 @@ module Server
1214
# TracerMiddleware propagates context and instruments Sidekiq requests
1315
# by way of its middleware system
1416
class TracerMiddleware
17+
include Common
1518
include ::Sidekiq::ServerMiddleware if defined?(::Sidekiq::ServerMiddleware)
1619

1720
def call(_worker, msg, _queue)
@@ -32,40 +35,91 @@ def call(_worker, msg, _queue)
3235

3336
extracted_context = OpenTelemetry.propagation.extract(msg)
3437
OpenTelemetry::Context.with_current(extracted_context) do
35-
if instrumentation_config[:propagation_style] == :child
36-
tracer.in_span(span_name, attributes: attributes, kind: :consumer) do |span|
37-
span.add_event('created_at', timestamp: msg['created_at'])
38-
span.add_event('enqueued_at', timestamp: msg['enqueued_at'])
39-
yield
40-
end
41-
else
42-
links = []
43-
span_context = OpenTelemetry::Trace.current_span(extracted_context).context
44-
links << OpenTelemetry::Trace::Link.new(span_context) if instrumentation_config[:propagation_style] == :link && span_context.valid?
45-
span = tracer.start_root_span(span_name, attributes: attributes, links: links, kind: :consumer)
46-
OpenTelemetry::Trace.with_span(span) do
47-
span.add_event('created_at', timestamp: msg['created_at'])
48-
span.add_event('enqueued_at', timestamp: msg['enqueued_at'])
49-
yield
50-
rescue Exception => e # rubocop:disable Lint/RescueException
51-
span.record_exception(e)
52-
span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}")
53-
raise e
54-
ensure
55-
span.finish
38+
track_queue_latency(msg)
39+
40+
timed(track_process_time_callback(msg)) do
41+
if instrumentation_config[:propagation_style] == :child
42+
tracer.in_span(span_name, attributes: attributes, kind: :consumer) do |span|
43+
span.add_event('created_at', timestamp: msg['created_at'])
44+
span.add_event('enqueued_at', timestamp: msg['enqueued_at'])
45+
yield
46+
end
47+
else
48+
links = []
49+
span_context = OpenTelemetry::Trace.current_span(extracted_context).context
50+
links << OpenTelemetry::Trace::Link.new(span_context) if instrumentation_config[:propagation_style] == :link && span_context.valid?
51+
span = tracer.start_root_span(span_name, attributes: attributes, links: links, kind: :consumer)
52+
OpenTelemetry::Trace.with_span(span) do
53+
span.add_event('created_at', timestamp: msg['created_at'])
54+
span.add_event('enqueued_at', timestamp: msg['enqueued_at'])
55+
yield
56+
rescue Exception => e # rubocop:disable Lint/RescueException
57+
span.record_exception(e)
58+
span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}")
59+
raise e
60+
ensure
61+
span.finish
62+
end
5663
end
5764
end
65+
66+
count_consumed_message(msg)
5867
end
5968
end
6069

6170
private
6271

63-
def instrumentation_config
64-
Sidekiq::Instrumentation.instance.config
72+
def track_queue_latency(msg)
73+
with_meter do
74+
return unless (enqueued_at = msg['enqueued_at'])
75+
return unless enqueued_at.is_a?(Numeric)
76+
77+
latency = (realtime_now - enqueued_at).abs
78+
79+
queue_latency_gauge&.record(latency, attributes: metrics_attributes(msg))
80+
end
81+
end
82+
83+
def track_process_time_callback(msg)
84+
->(duration) { track_process_time(msg, duration) }
85+
end
86+
87+
def track_process_time(msg, duration)
88+
with_meter do
89+
attributes = metrics_attributes(msg).merge(
90+
{ 'messaging.operation.name' => 'process' }
91+
)
92+
messaging_process_duration_histogram&.record(duration, attributes: attributes)
93+
end
94+
end
95+
96+
def messaging_process_duration_histogram
97+
instrumentation.histogram('messaging.process.duration')
98+
end
99+
100+
def count_consumed_message(msg)
101+
with_meter do
102+
messaging_client_consumed_messages_counter.add(1, attributes: metrics_attributes(msg))
103+
end
65104
end
66105

67-
def tracer
68-
Sidekiq::Instrumentation.instance.tracer
106+
def messaging_client_consumed_messages_counter
107+
instrumentation.counter('messaging.client.consumed.messages')
108+
end
109+
110+
def queue_latency_gauge
111+
instrumentation.gauge('messaging.queue.latency')
112+
end
113+
114+
# FIXME: dedupe
115+
def metrics_attributes(msg)
116+
{
117+
'messaging.system' => 'sidekiq', # FIXME: metrics semconv
118+
'messaging.destination.name' => msg['queue'] # FIXME: metrics semconv
119+
# server.address => # FIXME: required if available
120+
# messaging.destination.partition.id => FIXME: recommended
121+
# server.port => # FIXME: recommended
122+
}
69123
end
70124
end
71125
end

0 commit comments

Comments
 (0)