@@ -91,7 +91,6 @@ def response_hook(span, instance, response):
91
91
"""
92
92
import typing
93
93
from typing import Any , Collection
94
-
95
94
import redis
96
95
from wrapt import wrap_function_wrapper
97
96
@@ -106,6 +105,7 @@ def response_hook(span, instance, response):
106
105
from opentelemetry .instrumentation .utils import unwrap
107
106
from opentelemetry .semconv .trace import SpanAttributes
108
107
from opentelemetry .trace import Span
108
+ from opentelemetry .metrics import UpDownCounter , get_meter
109
109
110
110
_DEFAULT_SERVICE = "redis"
111
111
@@ -119,6 +119,7 @@ def response_hook(span, instance, response):
119
119
]
120
120
121
121
_REDIS_ASYNCIO_VERSION = (4 , 2 , 0 )
122
+
122
123
if redis .VERSION >= _REDIS_ASYNCIO_VERSION :
123
124
import redis .asyncio
124
125
@@ -137,6 +138,7 @@ def _set_connection_attributes(span, conn):
137
138
138
139
def _instrument (
139
140
tracer ,
141
+ connections_usage : UpDownCounter ,
140
142
request_hook : _RequestHookT = None ,
141
143
response_hook : _ResponseHookT = None ,
142
144
):
@@ -147,6 +149,7 @@ def _traced_execute_command(func, instance, args, kwargs):
147
149
name = args [0 ]
148
150
else :
149
151
name = instance .connection_pool .connection_kwargs .get ("db" , 0 )
152
+
150
153
with tracer .start_as_current_span (
151
154
name , kind = trace .SpanKind .CLIENT
152
155
) as span :
@@ -156,7 +159,19 @@ def _traced_execute_command(func, instance, args, kwargs):
156
159
span .set_attribute ("db.redis.args_length" , len (args ))
157
160
if callable (request_hook ):
158
161
request_hook (span , instance , args , kwargs )
162
+ connections_usage .add (
163
+ 1 ,
164
+ {
165
+ "db.client.connection.usage.state" : "used" ,
166
+ "db.client.connection.usage.name" : instance .connection_pool .pid ,
167
+ })
159
168
response = func (* args , ** kwargs )
169
+ connections_usage .add (
170
+ 1 ,
171
+ {
172
+ "db.client.connections.usage.state" : "idle" ,
173
+ "db.client.connections.usage.name" : instance .connection_pool .pid ,
174
+ })
160
175
if callable (response_hook ):
161
176
response_hook (span , instance , response )
162
177
return response
@@ -200,13 +215,26 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
200
215
response_hook (span , instance , response )
201
216
return response
202
217
218
+ def _traced_get_connection (func , connection_pool , command_name , * keys , ** options ):
219
+ response = func (command_name , * keys , ** options )
220
+ connections_usage .add (
221
+ 1 ,
222
+ {
223
+ "db.client.connections.usage.state" : "used" ,
224
+ "db.client.connections.usage.name" : connection_pool .pid ,
225
+ })
226
+ return response
227
+
228
+
203
229
pipeline_class = (
204
230
"BasePipeline" if redis .VERSION < (3 , 0 , 0 ) else "Pipeline"
205
231
)
206
232
redis_class = "StrictRedis" if redis .VERSION < (3 , 0 , 0 ) else "Redis"
207
233
208
234
wrap_function_wrapper (
209
- "redis" , f"{ redis_class } .execute_command" , _traced_execute_command
235
+ "redis" ,
236
+ f"{ redis_class } .execute_command" ,
237
+ _traced_execute_command
210
238
)
211
239
wrap_function_wrapper (
212
240
"redis.client" ,
@@ -229,6 +257,11 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
229
257
"ClusterPipeline.execute" ,
230
258
_traced_execute_pipeline ,
231
259
)
260
+ # wrap_function_wrapper(
261
+ # "redis",
262
+ # "ConnectionPool.get_connection",
263
+ # _traced_get_connection
264
+ # )
232
265
if redis .VERSION >= _REDIS_ASYNCIO_VERSION :
233
266
wrap_function_wrapper (
234
267
"redis.asyncio" ,
@@ -278,8 +311,20 @@ def _instrument(self, **kwargs):
278
311
tracer = trace .get_tracer (
279
312
__name__ , __version__ , tracer_provider = tracer_provider
280
313
)
314
+ meter_provider = kwargs .get ("meter_provider" )
315
+ meter = get_meter (
316
+ __name__ ,
317
+ __version__ ,
318
+ meter_provider
319
+ )
320
+ connections_usage = meter .create_up_down_counter (
321
+ name = "db.client.connections.usage" ,
322
+ unit = "1" ,
323
+ description = "The number of connections that are currently in state described"
324
+ )
281
325
_instrument (
282
326
tracer ,
327
+ connections_usage ,
283
328
request_hook = kwargs .get ("request_hook" ),
284
329
response_hook = kwargs .get ("response_hook" ),
285
330
)
0 commit comments