4
4
#
5
5
# SPDX-License-Identifier: Apache-2.0
6
6
7
+ require_relative '../common'
8
+
7
9
module OpenTelemetry
8
10
module Instrumentation
9
11
module Sidekiq
@@ -12,6 +14,7 @@ module Server
12
14
# TracerMiddleware propagates context and instruments Sidekiq requests
13
15
# by way of its middleware system
14
16
class TracerMiddleware
17
+ include Common
15
18
include ::Sidekiq ::ServerMiddleware if defined? ( ::Sidekiq ::ServerMiddleware )
16
19
17
20
def call ( _worker , msg , _queue )
@@ -32,40 +35,91 @@ def call(_worker, msg, _queue)
32
35
33
36
extracted_context = OpenTelemetry . propagation . extract ( msg )
34
37
OpenTelemetry ::Context . with_current ( extracted_context ) do
35
- if instrumentation_config [ :propagation_style ] == :child
36
- tracer . in_span ( span_name , attributes : attributes , kind : :consumer ) do |span |
37
- span . add_event ( 'created_at' , timestamp : msg [ 'created_at' ] )
38
- span . add_event ( 'enqueued_at' , timestamp : msg [ 'enqueued_at' ] )
39
- yield
40
- end
41
- else
42
- links = [ ]
43
- span_context = OpenTelemetry ::Trace . current_span ( extracted_context ) . context
44
- links << OpenTelemetry ::Trace ::Link . new ( span_context ) if instrumentation_config [ :propagation_style ] == :link && span_context . valid?
45
- span = tracer . start_root_span ( span_name , attributes : attributes , links : links , kind : :consumer )
46
- OpenTelemetry ::Trace . with_span ( span ) do
47
- span . add_event ( 'created_at' , timestamp : msg [ 'created_at' ] )
48
- span . add_event ( 'enqueued_at' , timestamp : msg [ 'enqueued_at' ] )
49
- yield
50
- rescue Exception => e # rubocop:disable Lint/RescueException
51
- span . record_exception ( e )
52
- span . status = OpenTelemetry ::Trace ::Status . error ( "Unhandled exception of type: #{ e . class } " )
53
- raise e
54
- ensure
55
- span . finish
38
+ track_queue_latency ( msg )
39
+
40
+ timed ( track_process_time_callback ( msg ) ) do
41
+ if instrumentation_config [ :propagation_style ] == :child
42
+ tracer . in_span ( span_name , attributes : attributes , kind : :consumer ) do |span |
43
+ span . add_event ( 'created_at' , timestamp : msg [ 'created_at' ] )
44
+ span . add_event ( 'enqueued_at' , timestamp : msg [ 'enqueued_at' ] )
45
+ yield
46
+ end
47
+ else
48
+ links = [ ]
49
+ span_context = OpenTelemetry ::Trace . current_span ( extracted_context ) . context
50
+ links << OpenTelemetry ::Trace ::Link . new ( span_context ) if instrumentation_config [ :propagation_style ] == :link && span_context . valid?
51
+ span = tracer . start_root_span ( span_name , attributes : attributes , links : links , kind : :consumer )
52
+ OpenTelemetry ::Trace . with_span ( span ) do
53
+ span . add_event ( 'created_at' , timestamp : msg [ 'created_at' ] )
54
+ span . add_event ( 'enqueued_at' , timestamp : msg [ 'enqueued_at' ] )
55
+ yield
56
+ rescue Exception => e # rubocop:disable Lint/RescueException
57
+ span . record_exception ( e )
58
+ span . status = OpenTelemetry ::Trace ::Status . error ( "Unhandled exception of type: #{ e . class } " )
59
+ raise e
60
+ ensure
61
+ span . finish
62
+ end
56
63
end
57
64
end
65
+
66
+ count_consumed_message ( msg )
58
67
end
59
68
end
60
69
61
70
private
62
71
63
- def instrumentation_config
64
- Sidekiq ::Instrumentation . instance . config
72
+ def track_queue_latency ( msg )
73
+ with_meter do
74
+ return unless ( enqueued_at = msg [ 'enqueued_at' ] )
75
+ return unless enqueued_at . is_a? ( Numeric )
76
+
77
+ latency = ( realtime_now - enqueued_at ) . abs
78
+
79
+ queue_latency_gauge &.record ( latency , attributes : metrics_attributes ( msg ) )
80
+ end
81
+ end
82
+
83
+ def track_process_time_callback ( msg )
84
+ -> ( duration ) { track_process_time ( msg , duration ) }
85
+ end
86
+
87
+ def track_process_time ( msg , duration )
88
+ with_meter do
89
+ attributes = metrics_attributes ( msg ) . merge (
90
+ { 'messaging.operation.name' => 'process' }
91
+ )
92
+ messaging_process_duration_histogram &.record ( duration , attributes : attributes )
93
+ end
94
+ end
95
+
96
+ def messaging_process_duration_histogram
97
+ instrumentation . histogram ( 'messaging.process.duration' )
98
+ end
99
+
100
+ def count_consumed_message ( msg )
101
+ with_meter do
102
+ messaging_client_consumed_messages_counter . add ( 1 , attributes : metrics_attributes ( msg ) )
103
+ end
65
104
end
66
105
67
- def tracer
68
- Sidekiq ::Instrumentation . instance . tracer
106
+ def messaging_client_consumed_messages_counter
107
+ instrumentation . counter ( 'messaging.client.consumed.messages' )
108
+ end
109
+
110
+ def queue_latency_gauge
111
+ instrumentation . gauge ( 'messaging.queue.latency' )
112
+ end
113
+
114
+ # FIXME: dedupe
115
+ def metrics_attributes ( msg )
116
+ {
117
+ 'messaging.system' => 'sidekiq' , # FIXME: metrics semconv
118
+ 'messaging.destination.name' => msg [ 'queue' ] # FIXME: metrics semconv
119
+ # server.address => # FIXME: required if available
120
+ # messaging.destination.partition.id => FIXME: recommended
121
+ # server.port => # FIXME: recommended
122
+ }
69
123
end
70
124
end
71
125
end
0 commit comments