Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Initial get_connection wrapper for redis metrics closes #1148 #1194

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ def response_hook(span, instance, response):
"""
import typing
from typing import Any, Collection

import redis
from wrapt import wrap_function_wrapper

Expand All @@ -106,6 +105,7 @@ def response_hook(span, instance, response):
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Span
from opentelemetry.metrics import UpDownCounter, get_meter

_DEFAULT_SERVICE = "redis"

Expand All @@ -119,6 +119,7 @@ def response_hook(span, instance, response):
]

_REDIS_ASYNCIO_VERSION = (4, 2, 0)

if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
import redis.asyncio

Expand All @@ -137,6 +138,7 @@ def _set_connection_attributes(span, conn):

def _instrument(
tracer,
connections_usage: UpDownCounter,
request_hook: _RequestHookT = None,
response_hook: _ResponseHookT = None,
):
Expand All @@ -146,18 +148,33 @@ def _traced_execute_command(func, instance, args, kwargs):
name = args[0]
else:
name = instance.connection_pool.connection_kwargs.get("db", 0)
with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT
) as span:
if span.is_recording():
span.set_attribute(SpanAttributes.DB_STATEMENT, query)
_set_connection_attributes(span, instance)
span.set_attribute("db.redis.args_length", len(args))
if callable(request_hook):
request_hook(span, instance, args, kwargs)
response = func(*args, **kwargs)
if callable(response_hook):
response_hook(span, instance, response)

try:
with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT
) as span:
if span.is_recording():
span.set_attribute(SpanAttributes.DB_STATEMENT, query)
_set_connection_attributes(span, instance)
span.set_attribute("db.redis.args_length", len(args))
if callable(request_hook):
request_hook(span, instance, args, kwargs)
response = func(*args, **kwargs)
connections_usage.add(
1,
{
"db.client.connection.usage.state": "used",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to the spec, the prefix "db.client.connection.usage" should be omitted. look the comment here: open-telemetry/opentelemetry-js-contrib#1220 (comment)
The name of the attribute is just "state"

"db.client.connection.usage.name": instance.connection_pool.pid,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to the spec, there is no attribute named db.client.connection.usage.name. But there is pool.name. see here: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/database-metrics.md

You can also see the conversation about pool.name I had here: open-telemetry/opentelemetry-specification#2958
It was specific for mysql in jacascript but still can give an idea about what is pool name.

})
if callable(response_hook):
response_hook(span, instance, response)
finally:
connections_usage.add(
-1,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a cycle of using a connection:

  1. First you create a connection and it becomes part of the pool ( -> should add +1 to 'idle').
  2. Then you start using it ( -> should add +1 to 'used' and -1 to 'idle').
  3. Then you release/finish using it ( -> should add -1 to 'used', and +1 to 'idle').
  4. Finally it is removed from the pool ( -> should add -1 to 'idle')

You should find these events in the redis code and understand when a connection is created, used, removed etc. each of these events should be reflected in the number of 'idle' and/or 'used' connection.

{
"db.client.connection.usage.state": "idle",
"db.client.connection.usage.name": instance.connection_pool.pid,
})
Comment on lines +163 to +177
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you decide the state of connection yourself based on the command execution? This can't be correct right?

return response

def _traced_execute_pipeline(func, instance, args, kwargs):
Expand Down Expand Up @@ -199,13 +216,26 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
response_hook(span, instance, response)
return response

def _traced_get_connection(func, connection_pool, command_name, *keys, **options):
response = func(command_name, *keys, **options)
connections_usage.add(
1,
{
"db.client.connection.usage.state": "used",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is get connection same as used?

"db.client.connection.usage.name": connection_pool.pid,
})
return response


pipeline_class = (
"BasePipeline" if redis.VERSION < (3, 0, 0) else "Pipeline"
)
redis_class = "StrictRedis" if redis.VERSION < (3, 0, 0) else "Redis"

wrap_function_wrapper(
"redis", f"{redis_class}.execute_command", _traced_execute_command
"redis",
f"{redis_class}.execute_command",
_traced_execute_command
)
wrap_function_wrapper(
"redis.client",
Expand All @@ -228,6 +258,11 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
"ClusterPipeline.execute",
_traced_execute_pipeline,
)
wrap_function_wrapper(
"redis",
"ConnectionPool.get_connection",
_traced_get_connection
)
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
wrap_function_wrapper(
"redis.asyncio",
Expand Down Expand Up @@ -277,8 +312,20 @@ def _instrument(self, **kwargs):
tracer = trace.get_tracer(
__name__, __version__, tracer_provider=tracer_provider
)
meter_provider = kwargs.get("meter_provider")
meter = get_meter(
__name__,
__version__,
meter_provider
)
connections_usage = meter.create_up_down_counter(
name="db.client.connection.usage",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's "db.client.connections.usage"

description="The number of connections that are currently in state described",
unit="1",
)
_instrument(
tracer,
connections_usage,
request_hook=kwargs.get("request_hook"),
response_hook=kwargs.get("response_hook"),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@


_instruments = ("redis >= 2.6",)
_supports_metrics = True
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,43 @@ def request_hook(span, conn, args, kwargs):

span = spans[0]
self.assertEqual(span.attributes.get(custom_attribute_name), "GET")


class TestRedisIntegrationMetric(TestBase):

def setUp(self):
super().setUp()
RedisInstrumentor().instrument(meter_provider=self.meter_provider)

def tearDown(self):
super().tearDown()
RedisInstrumentor().uninstrument()

@staticmethod
def redis_get():
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
redis_client = redis.Redis(connection_pool=pool)
redis_client.get('foo')
return pool.pid

def test_multiple_connections_metric_success_redis(self):
pid = self.redis_get()
expected_metric_names = {
"db.client.connection.usage",
}
expected_metric_attributes = {
"db.client.connection.usage.state": "used",
"db.client.connection.usage.name": pid,
}
for (
resource_metrics
) in self.memory_metrics_reader.get_metrics_data().resource_metrics:
for scope_metrics in resource_metrics.scope_metrics:
for metric in scope_metrics.metrics:
self.assertIn(metric.name, expected_metric_names)
for data_point in metric.data.data_points:
for attr in data_point.attributes:
self.assertIn(
attr, expected_metric_attributes
)