@@ -91,7 +91,6 @@ def response_hook(span, instance, response):
91
91
"""
92
92
import typing
93
93
from typing import Any , Collection
94
-
95
94
import redis
96
95
from wrapt import wrap_function_wrapper
97
96
@@ -106,6 +105,7 @@ def response_hook(span, instance, response):
106
105
from opentelemetry .instrumentation .utils import unwrap
107
106
from opentelemetry .semconv .trace import SpanAttributes
108
107
from opentelemetry .trace import Span
108
+ from opentelemetry .metrics import UpDownCounter , get_meter
109
109
110
110
_DEFAULT_SERVICE = "redis"
111
111
@@ -119,6 +119,7 @@ def response_hook(span, instance, response):
119
119
]
120
120
121
121
_REDIS_ASYNCIO_VERSION = (4 , 2 , 0 )
122
+
122
123
if redis .VERSION >= _REDIS_ASYNCIO_VERSION :
123
124
import redis .asyncio
124
125
@@ -137,6 +138,7 @@ def _set_connection_attributes(span, conn):
137
138
138
139
def _instrument (
139
140
tracer ,
141
+ connections_usage : UpDownCounter ,
140
142
request_hook : _RequestHookT = None ,
141
143
response_hook : _ResponseHookT = None ,
142
144
):
@@ -147,18 +149,33 @@ def _traced_execute_command(func, instance, args, kwargs):
147
149
name = args [0 ]
148
150
else :
149
151
name = instance .connection_pool .connection_kwargs .get ("db" , 0 )
150
- with tracer .start_as_current_span (
151
- name , kind = trace .SpanKind .CLIENT
152
- ) as span :
153
- if span .is_recording ():
154
- span .set_attribute (SpanAttributes .DB_STATEMENT , query )
155
- _set_connection_attributes (span , instance )
156
- span .set_attribute ("db.redis.args_length" , len (args ))
157
- if callable (request_hook ):
158
- request_hook (span , instance , args , kwargs )
159
- response = func (* args , ** kwargs )
160
- if callable (response_hook ):
161
- response_hook (span , instance , response )
152
+
153
+ try :
154
+ with tracer .start_as_current_span (
155
+ name , kind = trace .SpanKind .CLIENT
156
+ ) as span :
157
+ if span .is_recording ():
158
+ span .set_attribute (SpanAttributes .DB_STATEMENT , query )
159
+ _set_connection_attributes (span , instance )
160
+ span .set_attribute ("db.redis.args_length" , len (args ))
161
+ if callable (request_hook ):
162
+ request_hook (span , instance , args , kwargs )
163
+ response = func (* args , ** kwargs )
164
+ connections_usage .add (
165
+ 1 ,
166
+ {
167
+ "db.client.connection.usage.state" : "used" ,
168
+ "db.client.connection.usage.name" : instance .connection_pool .pid ,
169
+ })
170
+ if callable (response_hook ):
171
+ response_hook (span , instance , response )
172
+ finally :
173
+ connections_usage .add (
174
+ - 1 ,
175
+ {
176
+ "db.client.connection.usage.state" : "idle" ,
177
+ "db.client.connection.usage.name" : instance .connection_pool .pid ,
178
+ })
162
179
return response
163
180
164
181
def _traced_execute_pipeline (func , instance , args , kwargs ):
@@ -200,13 +217,26 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
200
217
response_hook (span , instance , response )
201
218
return response
202
219
220
+ def _traced_get_connection (func , connection_pool , command_name , * keys , ** options ):
221
+ response = func (command_name , * keys , ** options )
222
+ connections_usage .add (
223
+ 1 ,
224
+ {
225
+ "db.client.connection.usage.state" : "used" ,
226
+ "db.client.connection.usage.name" : connection_pool .pid ,
227
+ })
228
+ return response
229
+
230
+
203
231
pipeline_class = (
204
232
"BasePipeline" if redis .VERSION < (3 , 0 , 0 ) else "Pipeline"
205
233
)
206
234
redis_class = "StrictRedis" if redis .VERSION < (3 , 0 , 0 ) else "Redis"
207
235
208
236
wrap_function_wrapper (
209
- "redis" , f"{ redis_class } .execute_command" , _traced_execute_command
237
+ "redis" ,
238
+ f"{ redis_class } .execute_command" ,
239
+ _traced_execute_command
210
240
)
211
241
wrap_function_wrapper (
212
242
"redis.client" ,
@@ -229,6 +259,11 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
229
259
"ClusterPipeline.execute" ,
230
260
_traced_execute_pipeline ,
231
261
)
262
+ wrap_function_wrapper (
263
+ "redis" ,
264
+ "ConnectionPool.get_connection" ,
265
+ _traced_get_connection
266
+ )
232
267
if redis .VERSION >= _REDIS_ASYNCIO_VERSION :
233
268
wrap_function_wrapper (
234
269
"redis.asyncio" ,
@@ -278,8 +313,20 @@ def _instrument(self, **kwargs):
278
313
tracer = trace .get_tracer (
279
314
__name__ , __version__ , tracer_provider = tracer_provider
280
315
)
316
+ meter_provider = kwargs .get ("meter_provider" )
317
+ meter = get_meter (
318
+ __name__ ,
319
+ __version__ ,
320
+ meter_provider
321
+ )
322
+ connections_usage = meter .create_up_down_counter (
323
+ name = "db.client.connection.usage" ,
324
+ description = "The number of connections that are currently in state described" ,
325
+ unit = "1" ,
326
+ )
281
327
_instrument (
282
328
tracer ,
329
+ connections_usage ,
283
330
request_hook = kwargs .get ("request_hook" ),
284
331
response_hook = kwargs .get ("response_hook" ),
285
332
)
0 commit comments