@@ -91,7 +91,6 @@ def response_hook(span, instance, response):
91
91
"""
92
92
import typing
93
93
from typing import Any , Collection
94
-
95
94
import redis
96
95
from wrapt import wrap_function_wrapper
97
96
@@ -106,6 +105,7 @@ def response_hook(span, instance, response):
106
105
from opentelemetry .instrumentation .utils import unwrap
107
106
from opentelemetry .semconv .trace import SpanAttributes
108
107
from opentelemetry .trace import Span
108
+ from opentelemetry .metrics import UpDownCounter , get_meter
109
109
110
110
_DEFAULT_SERVICE = "redis"
111
111
@@ -119,6 +119,7 @@ def response_hook(span, instance, response):
119
119
]
120
120
121
121
_REDIS_ASYNCIO_VERSION = (4 , 2 , 0 )
122
+
122
123
if redis .VERSION >= _REDIS_ASYNCIO_VERSION :
123
124
import redis .asyncio
124
125
@@ -137,6 +138,7 @@ def _set_connection_attributes(span, conn):
137
138
138
139
def _instrument (
139
140
tracer ,
141
+ connections_usage : UpDownCounter ,
140
142
request_hook : _RequestHookT = None ,
141
143
response_hook : _ResponseHookT = None ,
142
144
):
@@ -147,6 +149,7 @@ def _traced_execute_command(func, instance, args, kwargs):
147
149
name = args [0 ]
148
150
else :
149
151
name = instance .connection_pool .connection_kwargs .get ("db" , 0 )
152
+
150
153
with tracer .start_as_current_span (
151
154
name , kind = trace .SpanKind .CLIENT
152
155
) as span :
@@ -200,13 +203,34 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
200
203
response_hook (span , instance , response )
201
204
return response
202
205
206
+ def _traced_get_connection (func , connection_pool , command_name , * keys , ** options ):
207
+ span_name = "random-test"
208
+ with tracer .start_as_current_span (
209
+ span_name , kind = trace .SpanKind .CLIENT
210
+ ) as span :
211
+ response = func (command_name , * keys , ** options )
212
+ metric_labels = {
213
+ SpanAttributes .DB_CLIENT_CONNECTIONS_USAGE : connection_pool ._created_connections ,
214
+ }
215
+ connections_usage .add (1 , metric_labels )
216
+ metric_labels [
217
+ SpanAttributes .DB_CLIENT_CONNECTIONS_USAGE
218
+ ] = connection_pool ._created_connections
219
+ with open ('/Users/diego_canizales/Documents/EPAM/Disney/Nucleus/playground/redis/log.txt' , 'a' ) as f :
220
+ for k , v in metric_labels .items ():
221
+ f .write (f'k:{ k } , v:{ v } \n ' )
222
+ return response
223
+
224
+
203
225
pipeline_class = (
204
226
"BasePipeline" if redis .VERSION < (3 , 0 , 0 ) else "Pipeline"
205
227
)
206
228
redis_class = "StrictRedis" if redis .VERSION < (3 , 0 , 0 ) else "Redis"
207
229
208
230
wrap_function_wrapper (
209
- "redis" , f"{ redis_class } .execute_command" , _traced_execute_command
231
+ "redis" ,
232
+ f"{ redis_class } .execute_command" ,
233
+ _traced_execute_command
210
234
)
211
235
wrap_function_wrapper (
212
236
"redis.client" ,
@@ -229,6 +253,11 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
229
253
"ClusterPipeline.execute" ,
230
254
_traced_execute_pipeline ,
231
255
)
256
+ wrap_function_wrapper (
257
+ "redis" ,
258
+ "ConnectionPool.get_connection" ,
259
+ _traced_get_connection
260
+ )
232
261
if redis .VERSION >= _REDIS_ASYNCIO_VERSION :
233
262
wrap_function_wrapper (
234
263
"redis.asyncio" ,
@@ -278,8 +307,20 @@ def _instrument(self, **kwargs):
278
307
tracer = trace .get_tracer (
279
308
__name__ , __version__ , tracer_provider = tracer_provider
280
309
)
310
+ meter_provider = kwargs .get ("meter_provider" )
311
+ meter = get_meter (
312
+ __name__ ,
313
+ __version__ ,
314
+ meter_provider
315
+ )
316
+ connections_usage = meter .create_up_down_counter (
317
+ name = "db.client.connections.usage" ,
318
+ unit = "connections" ,
319
+ description = "The number of connections that are currently in state described"
320
+ )
281
321
_instrument (
282
322
tracer ,
323
+ connections_usage ,
283
324
request_hook = kwargs .get ("request_hook" ),
284
325
response_hook = kwargs .get ("response_hook" ),
285
326
)
0 commit comments