Skip to content

Commit 113730d

Browse files
committed
Draft: Initial get_connection wrapper for redis metrics.
1 parent d75194a commit 113730d

File tree

3 files changed

+102
-14
lines changed

3 files changed

+102
-14
lines changed

instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py

+61-14
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ def response_hook(span, instance, response):
9191
"""
9292
import typing
9393
from typing import Any, Collection
94-
9594
import redis
9695
from wrapt import wrap_function_wrapper
9796

@@ -106,6 +105,7 @@ def response_hook(span, instance, response):
106105
from opentelemetry.instrumentation.utils import unwrap
107106
from opentelemetry.semconv.trace import SpanAttributes
108107
from opentelemetry.trace import Span
108+
from opentelemetry.metrics import UpDownCounter, get_meter
109109

110110
_DEFAULT_SERVICE = "redis"
111111

@@ -119,6 +119,7 @@ def response_hook(span, instance, response):
119119
]
120120

121121
_REDIS_ASYNCIO_VERSION = (4, 2, 0)
122+
122123
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
123124
import redis.asyncio
124125

@@ -137,6 +138,7 @@ def _set_connection_attributes(span, conn):
137138

138139
def _instrument(
139140
tracer,
141+
connections_usage: UpDownCounter,
140142
request_hook: _RequestHookT = None,
141143
response_hook: _ResponseHookT = None,
142144
):
@@ -147,18 +149,33 @@ def _traced_execute_command(func, instance, args, kwargs):
147149
name = args[0]
148150
else:
149151
name = instance.connection_pool.connection_kwargs.get("db", 0)
150-
with tracer.start_as_current_span(
151-
name, kind=trace.SpanKind.CLIENT
152-
) as span:
153-
if span.is_recording():
154-
span.set_attribute(SpanAttributes.DB_STATEMENT, query)
155-
_set_connection_attributes(span, instance)
156-
span.set_attribute("db.redis.args_length", len(args))
157-
if callable(request_hook):
158-
request_hook(span, instance, args, kwargs)
159-
response = func(*args, **kwargs)
160-
if callable(response_hook):
161-
response_hook(span, instance, response)
152+
153+
try:
154+
with tracer.start_as_current_span(
155+
name, kind=trace.SpanKind.CLIENT
156+
) as span:
157+
if span.is_recording():
158+
span.set_attribute(SpanAttributes.DB_STATEMENT, query)
159+
_set_connection_attributes(span, instance)
160+
span.set_attribute("db.redis.args_length", len(args))
161+
if callable(request_hook):
162+
request_hook(span, instance, args, kwargs)
163+
response = func(*args, **kwargs)
164+
connections_usage.add(
165+
1,
166+
{
167+
"db.client.connection.usage.state": "used",
168+
"db.client.connection.usage.name": instance.connection_pool.pid,
169+
})
170+
if callable(response_hook):
171+
response_hook(span, instance, response)
172+
finally:
173+
connections_usage.add(
174+
-1,
175+
{
176+
"db.client.connection.usage.state": "idle",
177+
"db.client.connection.usage.name": instance.connection_pool.pid,
178+
})
162179
return response
163180

164181
def _traced_execute_pipeline(func, instance, args, kwargs):
@@ -200,13 +217,26 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
200217
response_hook(span, instance, response)
201218
return response
202219

220+
def _traced_get_connection(func, connection_pool, command_name, *keys, **options):
221+
response = func(command_name, *keys, **options)
222+
connections_usage.add(
223+
1,
224+
{
225+
"db.client.connection.usage.state": "used",
226+
"db.client.connection.usage.name": connection_pool.pid,
227+
})
228+
return response
229+
230+
203231
pipeline_class = (
204232
"BasePipeline" if redis.VERSION < (3, 0, 0) else "Pipeline"
205233
)
206234
redis_class = "StrictRedis" if redis.VERSION < (3, 0, 0) else "Redis"
207235

208236
wrap_function_wrapper(
209-
"redis", f"{redis_class}.execute_command", _traced_execute_command
237+
"redis",
238+
f"{redis_class}.execute_command",
239+
_traced_execute_command
210240
)
211241
wrap_function_wrapper(
212242
"redis.client",
@@ -229,6 +259,11 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
229259
"ClusterPipeline.execute",
230260
_traced_execute_pipeline,
231261
)
262+
wrap_function_wrapper(
263+
"redis",
264+
"ConnectionPool.get_connection",
265+
_traced_get_connection
266+
)
232267
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
233268
wrap_function_wrapper(
234269
"redis.asyncio",
@@ -278,8 +313,20 @@ def _instrument(self, **kwargs):
278313
tracer = trace.get_tracer(
279314
__name__, __version__, tracer_provider=tracer_provider
280315
)
316+
meter_provider = kwargs.get("meter_provider")
317+
meter = get_meter(
318+
__name__,
319+
__version__,
320+
meter_provider
321+
)
322+
connections_usage = meter.create_up_down_counter(
323+
name="db.client.connection.usage",
324+
description="The number of connections that are currently in state described",
325+
unit="1",
326+
)
281327
_instrument(
282328
tracer,
329+
connections_usage,
283330
request_hook=kwargs.get("request_hook"),
284331
response_hook=kwargs.get("response_hook"),
285332
)

instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/package.py

+1
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@
1414

1515

1616
_instruments = ("redis >= 2.6",)
17+
_supports_metrics = True

instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py

+40
Original file line numberDiff line numberDiff line change
@@ -146,3 +146,43 @@ def request_hook(span, conn, args, kwargs):
146146

147147
span = spans[0]
148148
self.assertEqual(span.attributes.get(custom_attribute_name), "GET")
149+
150+
151+
class TestRedisIntegrationMetric(TestBase):
152+
153+
def setUp(self):
154+
super().setUp()
155+
RedisInstrumentor().instrument(meter_provider=self.meter_provider)
156+
157+
def tearDown(self):
158+
super().tearDown()
159+
RedisInstrumentor().uninstrument()
160+
161+
@staticmethod
162+
def redis_get():
163+
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
164+
redis_client = redis.Redis(connection_pool=pool)
165+
redis_client.get('foo')
166+
return pool.pid
167+
168+
def test_multiple_connections_metric_success_redis(self):
169+
pid = self.redis_get()
170+
expected_metric_names = {
171+
"db.client.connection.usage",
172+
}
173+
expected_metric_attributes = {
174+
"db.client.connection.usage.state": "used",
175+
"db.client.connection.usage.name": pid,
176+
}
177+
for (
178+
resource_metrics
179+
) in self.memory_metrics_reader.get_metrics_data().resource_metrics:
180+
for scope_metrics in resource_metrics.scope_metrics:
181+
for metric in scope_metrics.metrics:
182+
self.assertIn(metric.name, expected_metric_names)
183+
for data_point in metric.data.data_points:
184+
for attr in data_point.attributes:
185+
self.assertIn(
186+
attr, expected_metric_attributes
187+
)
188+

0 commit comments

Comments
 (0)