diff --git a/newrelic/api/transaction.py b/newrelic/api/transaction.py index 9d8b9e3f8..2d46b8439 100644 --- a/newrelic/api/transaction.py +++ b/newrelic/api/transaction.py @@ -66,7 +66,9 @@ _logger = logging.getLogger(__name__) DISTRIBUTED_TRACE_KEYS_REQUIRED = ("ty", "ac", "ap", "tr", "ti") -DISTRIBUTED_TRACE_TRANSPORT_TYPES = set(("HTTP", "HTTPS", "Kafka", "JMS", "IronMQ", "AMQP", "Queue", "Other")) +DISTRIBUTED_TRACE_TRANSPORT_TYPES = set( + ("HTTP", "HTTPS", "Kafka", "JMS", "IronMQ", "AMQP", "Queue", "PYAMQP", "SQS", "REDIS", "ZooKeeper", "Other") +) DELIMITER_FORMAT_RE = re.compile("[ \t]*,[ \t]*") ACCEPTED_DISTRIBUTED_TRACE = 1 CREATED_DISTRIBUTED_TRACE = 2 diff --git a/newrelic/config.py b/newrelic/config.py index 1bebd7918..a944e7cff 100644 --- a/newrelic/config.py +++ b/newrelic/config.py @@ -2846,7 +2846,10 @@ def _process_module_builtin_defaults(): _process_module_definition( "kafka.coordinator.heartbeat", "newrelic.hooks.messagebroker_kafkapython", "instrument_kafka_heartbeat" ) - + _process_module_definition("kombu.messaging", "newrelic.hooks.messagebroker_kombu", "instrument_kombu_messaging") + _process_module_definition( + "kombu.serialization", "newrelic.hooks.messagebroker_kombu", "instrument_kombu_serializaion" + ) _process_module_definition("logging", "newrelic.hooks.logger_logging", "instrument_logging") _process_module_definition("loguru", "newrelic.hooks.logger_loguru", "instrument_loguru") @@ -3983,7 +3986,9 @@ def _process_module_builtin_defaults(): ) _process_module_definition("gearman.worker", "newrelic.hooks.application_gearman", "instrument_gearman_worker") - _process_module_definition("aiobotocore.client", "newrelic.hooks.external_aiobotocore", "instrument_aiobotocore_client") + _process_module_definition( + "aiobotocore.client", "newrelic.hooks.external_aiobotocore", "instrument_aiobotocore_client" + ) _process_module_definition( "aiobotocore.endpoint", "newrelic.hooks.external_aiobotocore", "instrument_aiobotocore_endpoint" diff --git a/newrelic/core/attribute.py b/newrelic/core/attribute.py index d1ecdd462..acdd75b8d 100644 --- a/newrelic/core/attribute.py +++ b/newrelic/core/attribute.py @@ -75,6 +75,10 @@ "host.displayName", "http.statusCode", "http.url", + "kafka.consume.channel_id", + "kafka.consume.byteCount", + "kombu.consume.channel_id", + "kombu.consume.byteCount", "llm", "message.queueName", "message.routingKey", diff --git a/newrelic/core/config.py b/newrelic/core/config.py index 65af62a82..e35097987 100644 --- a/newrelic/core/config.py +++ b/newrelic/core/config.py @@ -71,6 +71,10 @@ def emit(self, record): _logger.addHandler(_NullHandler()) +def parse_space_separated_into_list(string): + return string.split() + + def _map_aws_account_id(s, logger): # The AWS account id must be a 12 digit number. # See https://docs.aws.amazon.com/accounts/latest/reference/manage-acct-identifiers.html#awsaccountid. @@ -148,6 +152,18 @@ class AttributesSettings(Settings): pass +class KombuSettings(Settings): + pass + + +class IgnoredExchangesSettings(Settings): + pass + + +class KombuConsumerSettings(Settings): + enabled = False + + class GCRuntimeMetricsSettings(Settings): enabled = False @@ -483,6 +499,9 @@ class EventHarvestConfigHarvestLimitSettings(Settings): _settings.event_harvest_config.harvest_limits = EventHarvestConfigHarvestLimitSettings() _settings.event_loop_visibility = EventLoopVisibilitySettings() _settings.gc_runtime_metrics = GCRuntimeMetricsSettings() +_settings.kombu = KombuSettings() +_settings.kombu.ignored_exchanges = IgnoredExchangesSettings() +_settings.kombu.consumer = KombuConsumerSettings() _settings.memory_runtime_pid_metrics = MemoryRuntimeMetricsSettings() _settings.heroku = HerokuSettings() _settings.infinite_tracing = InfiniteTracingSettings() @@ -775,6 +794,11 @@ def default_otlp_host(host): _settings.thread_profiler.enabled = True _settings.cross_application_tracer.enabled = False +# celeryev is the monitoring queue for rabbitmq which we do not need to monitor-it just makes a lot of noise. +_settings.kombu.ignored_exchanges = parse_space_separated_into_list( + os.environ.get("NEW_RELIC_KOMBU_IGNORED_EXCHANGES", "celeryev") +) +_settings.kombu.consumer.enabled = _environ_as_bool("NEW_RELIC_KOMBU_PRODUCER_ENABLED", default=False) _settings.gc_runtime_metrics.enabled = _environ_as_bool("NEW_RELIC_GC_RUNTIME_METRICS_ENABLED", default=False) _settings.gc_runtime_metrics.top_object_count_limit = 5 diff --git a/newrelic/hooks/messagebroker_kombu.py b/newrelic/hooks/messagebroker_kombu.py new file mode 100644 index 000000000..8f54f5a32 --- /dev/null +++ b/newrelic/hooks/messagebroker_kombu.py @@ -0,0 +1,227 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import sys +import logging + +from newrelic.api.application import application_instance +from newrelic.api.function_trace import FunctionTrace +from newrelic.api.message_trace import MessageTrace +from newrelic.api.message_transaction import MessageTransaction +from newrelic.api.time_trace import current_trace, notice_error +from newrelic.api.transaction import current_transaction +from newrelic.api.application import application_settings +from newrelic.common.object_wrapper import ObjectProxy, function_wrapper, wrap_function_wrapper +from newrelic.common.package_version_utils import get_package_version +from newrelic.common.signature import bind_args + +_logger = logging.getLogger(__name__) + +""" +The following are unsported transport types since the libraries are too old: +* librabbitmq +* qpid +* amqp uses librabbitmq or py-amqp +""" +AVAILABLE_TRANSPORTS = { + "py-amqp": "PYAMQP", + "sqs": "SQS", + "redis": "REDIS", + "zookeeper": "ZooKeeper", + "confluentkafka": "Kafka", +} + + +def wrap_Producer_publish(wrapped, instance, args, kwargs): + transaction = current_transaction() + + if transaction is None: + return wrapped(*args, **kwargs) + + bound_args = bind_args(wrapped, args, kwargs) + headers = bound_args["headers"] + headers = headers if headers else {} + value = bound_args["body"] + key = bound_args["routing_key"] + exchange = getattr(bound_args["exchange"], "name", None) or "Default" + + transaction.add_messagebroker_info("Kombu", get_package_version("kombu")) + + with MessageTrace( + library="Kombu", + operation="Produce", + destination_type="Exchange", + destination_name=exchange, + source=wrapped, + terminal=False, + ): + dt_headers = {k: v.encode("utf-8") for k, v in MessageTrace.generate_request_headers(transaction)} + if headers: + dt_headers.update(headers) + + try: + bound_args["headers"] = dt_headers + return wrapped(**bound_args) + except Exception: + notice_error() + raise + + +def wrap_consumer_recieve_callback(wrapped, instance, args, kwargs): + # In cases where Kombu is being used to talk to the queue via Celery (aka Celery + # is the toplevel api) a transaction will be created for Kombu and a separate + # transaction will be created for Celery. If kombu.consumer.enabled is disabled, + # do not create the duplicate Kombu transaction. + settings = application_settings() or global_settings() + if not settings.kombu.consumer.enabled: + return wrapped(*args, **kwargs) + + # This will be the transaction if any that is created by this wrapper. + created_transaction = None + + bound_args = bind_args(wrapped, args, kwargs) + message = bound_args["message"] + if message: + # In Kombu there is not iterator, instead there is a callback that + # is called inside wrapped. + # This callback can be called either outside of a transaction, or + # within the context of an existing transaction. There are 3 + # possibilities we need to handle: (Note that this is similar to + # our Pika and Celery instrumentation) + # + # 1. In an inactive transaction + # + # If the end_of_transaction() or ignore_transaction() API + # calls have been invoked, this iterator may be called in the + # context of an inactive transaction. In this case, don't wrap + # the callback in any way. + # + # 2. In an active transaction + # + # Do nothing. + # + # 3. Outside of a transaction + # + # Since it's not running inside of an existing transaction, we + # want to create a new background transaction for it. + body = getattr(message, "body", None) + key = getattr(message, "delivery_info", {}).get("routing_key") + library = "Kombu" + destination_type = "Exchange" + destination_name = getattr(message, "delivery_info", {}).get("exchange") or "Default" + received_bytes = len(str(body).encode("utf-8")) + message_count = 1 + transaction = current_transaction(active_only=False) + if not transaction and destination_name not in settings.kombu.ignored_exchanges: + # Try to get the transport type. The default for kombu is py-amqp. + # If not in the known transport type list, fallback to "Other". + try: + transport_name = getattr( + getattr(getattr(instance, "connection", None), "transport", None), "driver_name", "py-amqp" + ) + transport_type = AVAILABLE_TRANSPORTS.get(transport_name.lower(), "Other") + except Exception: + _logger.debug("Failed to determine transport type.", exc_info=True) + transport_type = "Other" + created_transaction = MessageTransaction( + application=application_instance(), + library=library, + destination_type=destination_type, + destination_name=destination_name, + headers=dict(getattr(message, "headers", {})), + transport_type=transport_type, + routing_key=key, + source=wrapped, + ) + created_transaction.__enter__() # pylint: disable=C2801 + created_transaction.destination_name = destination_name + + # Obtain consumer client_id to send up as agent attribute + if hasattr(message, "channel") and hasattr(message.channel, "channel_id"): + channel_id = message.channel.channel_id + created_transaction._add_agent_attribute("kombu.consume.channel_id", channel_id) + if received_bytes: + created_transaction._add_agent_attribute("kombu.consume.byteCount", received_bytes) + + transaction = current_transaction() + if transaction: # If there is an active transaction now. + # Add metrics whether or not a transaction was already active, or one was just started. + # Don't add metrics if there was an inactive transaction. + # Name the metrics using the same format as the transaction, but in case the active transaction + # was an existing one and not a message transaction, reproduce the naming logic here. + group = f"Message/{library}/{destination_type}" + name = f"Named/{destination_name}" + if received_bytes: + transaction.record_custom_metric(f"{group}/{name}/Received/Bytes", received_bytes) + if message_count: + transaction.record_custom_metric(f"{group}/{name}/Received/Messages", message_count) + transaction.add_messagebroker_info("Kombu", get_package_version("kombu")) + + try: + return_val = wrapped(*args, **kwargs) + except Exception: + if current_transaction(): + # Report error on existing transaction if there is one + notice_error() + else: + # Report error on application + notice_error(application=application_instance(activate=False)) + raise + + if created_transaction and not created_transaction.stopped: + created_transaction.__exit__(*sys.exc_info()) + + return return_val + + +def wrap_serialize(wrapped, instance, args, kwargs): + transaction = current_transaction() + if not transaction: + return wrapped(*args, **kwargs) + + exchange = "Unknown" + if isinstance(transaction, MessageTransaction): + exchange = transaction.destination_name + else: + # Find parent message trace to retrieve topic + message_trace = current_trace() + while message_trace is not None and not isinstance(message_trace, MessageTrace): + message_trace = message_trace.parent + if message_trace: + exchange = message_trace.destination_name + + group = f"MessageBroker/Kombu/Exchange" + name = f"Named/{exchange}/Serialization/Value" + + with FunctionTrace(name=name, group=group) as ft: + return wrapped(*args, **kwargs) + + +def instrument_kombu_messaging(module): + if hasattr(module, "Producer"): + wrap_function_wrapper(module, "Producer.publish", wrap_Producer_publish) + if hasattr(module, "Consumer"): + wrap_function_wrapper(module, "Consumer._receive_callback", wrap_consumer_recieve_callback) + # This is a little unorthodox but because Kombu creates an object on import we + # have to instrument it where it's used/imported as opposed to where the class is + # defined. + if hasattr(module, "dumps"): + wrap_function_wrapper(module, "dumps", wrap_serialize) + + +def instrument_kombu_serializaion(module): + # This is a little unorthodox but because Kombu creates an object on import we + # have to instrument it where it's used/imported as opposed to where the class is + # defined. + if hasattr(module, "loads"): + wrap_function_wrapper(module, "loads", wrap_serialize) diff --git a/tests/messagebroker_kombu/conftest.py b/tests/messagebroker_kombu/conftest.py new file mode 100644 index 000000000..12437c541 --- /dev/null +++ b/tests/messagebroker_kombu/conftest.py @@ -0,0 +1,203 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import uuid +import pickle + +import kombu +import pytest +from kombu import messaging +from kombu import serialization +from testing_support.db_settings import rabbitmq_settings +from testing_support.fixtures import ( # noqa: F401; pylint: disable=W0611 + collector_agent_registration_fixture, + collector_available_fixture, +) +from testing_support.validators.validate_distributed_trace_accepted import validate_distributed_trace_accepted + +from newrelic.api.transaction import current_transaction +from newrelic.common.object_wrapper import transient_function_wrapper + +TRANSPORT_TYPES = {"pyamqp": "PYAMQP", "amqp": "AMQP"} +DB_SETTINGS = rabbitmq_settings()[0] + + +@pytest.fixture( + scope="session", + params=[ + "pyamqp" # , "amqp", #"qpid", "redis" + ], +) +def transport_type(request): + return request.param + + +@pytest.fixture(scope="session") +def producer_connection(transport_type): + host = DB_SETTINGS["host"] + with kombu.Connection(f"{transport_type}://{host}") as conn: + yield conn + + +@pytest.fixture(scope="session") +def consumer_connection(transport_type): + with kombu.Connection(DB_SETTINGS["host"]) as conn: + yield conn + + +_default_settings = { + "package_reporting.enabled": False, # Turn off package reporting for testing as it causes slow downs. + "transaction_tracer.explain_threshold": 0.0, + "transaction_tracer.transaction_threshold": 0.0, + "transaction_tracer.stack_trace_threshold": 0.0, + "debug.log_data_collector_payloads": True, + "debug.record_transaction_failure": True, + "kombu.consumer.enabled": True, +} + +collector_agent_registration = collector_agent_registration_fixture( + app_name="Python Agent Test (messagebroker_kombu)", + default_settings=_default_settings, + linked_applications=["Python Agent Test (messagebroker_kombu)"], +) + + +@pytest.fixture(scope="function") +def producer(producer_connection): + # Purge the queue. + channel = producer_connection.channel() + channel.queue_purge("bar") + + producer = producer_connection.Producer(serializer="json") + + yield producer + + +@pytest.fixture(scope="function") +def consumer(producer, consumer_connection, queue, consume): + # According to the docs: + # Note that a Consumer does not need the serialization method specified. They can + # auto-detect the serialization method as the content-type is sent as a message + # header. + consumer = consumer_connection.Consumer(queue, callbacks=[consume]) + with consumer as con: + yield con + + +@pytest.fixture(scope="function") +def consumer_callback_error(producer, consumer_connection, queue, consume_error): + consumer = consumer_connection.Consumer(queue, callbacks=[consume_error]) + with consumer as con: + yield con + + +@pytest.fixture(scope="function") +def consumer_validate_dt(producer, consumer_connection, queue, consume_validate_dt): + consumer = consumer_connection.Consumer(queue, callbacks=[consume_validate_dt]) + with consumer as con: + yield con + + +@pytest.fixture +def consume(events): + def _consume(body, message): + message.ack() + events.append({"body": body, "routing_key": message.delivery_info["routing_key"]}) + + return _consume + + +@pytest.fixture +def consume_error(events): + def _consume(body, message): + message.ack() + events.append({"body": body, "routing_key": message.delivery_info["routing_key"]}) + raise RuntimeError("Error in consumer callback") + + return _consume + + +@pytest.fixture +def consume_validate_dt(events, transport_type): + expected_transport_type = TRANSPORT_TYPES[transport_type] + + @validate_distributed_trace_accepted(transport_type=expected_transport_type) + def _consume(body, message): + # Capture headers to validate dt headers. + txn = current_transaction() + txn._test_request_headers = message.headers + + message.ack() + events.append({"body": body, "routing_key": message.delivery_info["routing_key"]}) + + return _consume + + +@pytest.fixture +def events(): + return [] + + +@pytest.fixture +def exchange(): + return kombu.Exchange("exchange", "direct", durable=True) + + +@pytest.fixture +def queue(exchange): + return kombu.Queue("bar", exchange=exchange, routing_key="bar") + + +@pytest.fixture +def send_producer_message(producer, exchange, queue): + def _test(): + producer.publish({"foo": 1}, exchange=exchange, routing_key="bar", declare=[queue]) + + return _test + + +@pytest.fixture +def get_consumer_record(send_producer_message, consumer_connection, consumer): + def _test(): + send_producer_message() + + consumer_connection.drain_events(timeout=5) + + return _test + + +@pytest.fixture +def get_consumer_record_error(send_producer_message, consumer_connection, consumer_callback_error): + def _test(): + send_producer_message() + + consumer_connection.drain_events(timeout=5) + + return _test + + +@transient_function_wrapper(messaging, "Producer.publish.__wrapped__") +# Place transient wrapper underneath instrumentation +def cache_kombu_producer_headers(wrapped, instance, args, kwargs): + transaction = current_transaction() + + if transaction is None: + return wrapped(*args, **kwargs) + + ret = wrapped(*args, **kwargs) + + headers = kwargs.get("headers", []) + transaction._test_request_headers = headers + return ret diff --git a/tests/messagebroker_kombu/test_consumer.py b/tests/messagebroker_kombu/test_consumer.py new file mode 100644 index 000000000..f7b98efc1 --- /dev/null +++ b/tests/messagebroker_kombu/test_consumer.py @@ -0,0 +1,151 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from testing_support.fixtures import override_application_settings +from testing_support.fixtures import reset_core_stats_engine, validate_attributes +from testing_support.validators.validate_error_event_attributes_outside_transaction import ( + validate_error_event_attributes_outside_transaction, +) +from testing_support.validators.validate_transaction_count import validate_transaction_count +from testing_support.validators.validate_transaction_errors import validate_transaction_errors +from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics +from newrelic.common.package_version_utils import get_package_version + +from newrelic.api.background_task import background_task +from newrelic.api.transaction import end_of_transaction +from newrelic.common.object_names import callable_name + + +def test_custom_metrics(get_consumer_record, events): + @validate_transaction_metrics( + "Named/exchange", + group="Message/Kombu/Exchange", + custom_metrics=[ + ("Message/Kombu/Exchange/Named/exchange/Received/Bytes", 1), + ("Message/Kombu/Exchange/Named/exchange/Received/Messages", 1), + ("MessageBroker/Kombu/Exchange/Named/exchange/Serialization/Value", 1), + ], + background_task=True, + ) + def _test(): + get_consumer_record() + + assert len(events) == 1 + assert events[0]["routing_key"] + assert events[0]["body"] == {"foo": 1} + + _test() + + +@override_application_settings({"kombu.consumer.enabled": False}) +@validate_transaction_count(0) +def test_no_transaction_created_when_kombu_consumer_disabled(get_consumer_record): + get_consumer_record() + + +@validate_transaction_count(2) +def test_multiple_transactions(get_consumer_record): + get_consumer_record() + get_consumer_record() + + +def test_custom_metrics_on_existing_transaction(get_consumer_record): + version = get_package_version("kombu") + + @validate_transaction_metrics( + "test_consumer:test_custom_metrics_on_existing_transaction.._test", + custom_metrics=[ + ("Message/Kombu/Exchange/Named/exchange/Received/Bytes", 1), + ("Message/Kombu/Exchange/Named/exchange/Received/Messages", 1), + (f"Python/MessageBroker/Kombu/{version}", 1), + ("MessageBroker/Kombu/Exchange/Named/exchange/Serialization/Value", 1), + ("MessageBroker/Kombu/Exchange/Named/Unknown/Serialization/Value", 1), + ], + background_task=True, + ) + @validate_transaction_count(1) + @background_task() + def _test(): + get_consumer_record() + + _test() + + +def test_custom_metrics_inactive_transaction(get_consumer_record): # , expected_missing_broker_metrics): + @validate_transaction_metrics( + "test_consumer:test_custom_metrics_inactive_transaction.._test", + custom_metrics=[ + ("Message/Kombu/Exchange/Named/exchange/Received/Bytes", None), + ("Message/Kombu/Exchange/Named/exchange/Received/Messages", None), + ], + # + expected_missing_broker_metrics, + background_task=True, + ) + @validate_transaction_count(1) + @background_task() + def _test(): + end_of_transaction() + get_consumer_record() + + _test() + + +def test_agent_attributes(get_consumer_record): + @validate_attributes("agent", ["kombu.consume.channel_id", "kombu.consume.byteCount"]) + def _test(): + get_consumer_record() + + _test() + + +def test_consumer_errors(get_consumer_record_error): + exc_class = RuntimeError + + @reset_core_stats_engine() + @validate_error_event_attributes_outside_transaction( + num_errors=1, exact_attrs={"intrinsic": {"error.class": callable_name(exc_class)}, "agent": {}, "user": {}} + ) + def _test(): + with pytest.raises(exc_class): + get_consumer_record_error() + + _test() + + +def test_distributed_tracing_headers(send_producer_message, consumer_connection, consumer_validate_dt): + # Produce the messages inside a transaction, making sure to close it. + @background_task() + def _produce(): + send_producer_message() + + @validate_transaction_metrics( + "Named/exchange", + group="Message/Kombu/Exchange", + rollup_metrics=[ + ("Supportability/DistributedTrace/AcceptPayload/Success", None), + ("Supportability/TraceContext/Accept/Success", 1), + ], + background_task=True, + ) + @validate_transaction_count(1) + def _consume(): + def _test(): + consumer_connection.drain_events(timeout=5) + + _test() + + _produce() + _consume() diff --git a/tests/messagebroker_kombu/test_producer.py b/tests/messagebroker_kombu/test_producer.py new file mode 100644 index 000000000..591d7aca4 --- /dev/null +++ b/tests/messagebroker_kombu/test_producer.py @@ -0,0 +1,96 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from conftest import cache_kombu_producer_headers +from kombu.exceptions import EncodeError +from testing_support.validators.validate_messagebroker_headers import validate_messagebroker_headers +from testing_support.validators.validate_transaction_errors import validate_transaction_errors +from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics + +from newrelic.api.background_task import background_task +from newrelic.api.function_trace import FunctionTrace +from newrelic.common.object_names import callable_name + + +def test_trace_metrics(send_producer_message): + import kombu + + version = kombu.__version__ + + scoped_metrics = [("MessageBroker/Kombu/Exchange/Produce/Named/exchange", 1)] + unscoped_metrics = scoped_metrics + + @validate_transaction_metrics( + "test_producer:test_trace_metrics..test", + scoped_metrics=scoped_metrics, + rollup_metrics=unscoped_metrics, + custom_metrics=[ + (f"Python/MessageBroker/Kombu/{version}", 1), + ("MessageBroker/Kombu/Exchange/Named/exchange/Serialization/Value", 1), + ], + background_task=True, + ) + @background_task() + def test(): + send_producer_message() + + test() + + +def test_distributed_tracing_headers(exchange, send_producer_message): + @validate_transaction_metrics( + "test_producer:test_distributed_tracing_headers..test", + rollup_metrics=[ + ("Supportability/TraceContext/Create/Success", 1), + ("Supportability/DistributedTrace/CreatePayload/Success", 1), + ], + background_task=True, + ) + @background_task() + @cache_kombu_producer_headers + @validate_messagebroker_headers + def test(): + send_producer_message() + + test() + + +def test_distributed_tracing_headers_under_terminal(exchange, send_producer_message): + @validate_transaction_metrics( + "test_distributed_tracing_headers_under_terminal", + rollup_metrics=[ + ("Supportability/TraceContext/Create/Success", 1), + ("Supportability/DistributedTrace/CreatePayload/Success", 1), + ], + background_task=True, + ) + @background_task(name="test_distributed_tracing_headers_under_terminal") + @cache_kombu_producer_headers + @validate_messagebroker_headers + def test(): + with FunctionTrace(name="terminal_trace", terminal=True): + send_producer_message() + + test() + + +def test_producer_errors(exchange, producer, queue, monkeypatch): + @validate_transaction_errors([callable_name(EncodeError)]) + @background_task() + def test(): + with pytest.raises(EncodeError): + producer.publish({"foo": object()}, exchange=exchange, routing_key="bar", declare=[queue]) + + test() diff --git a/tox.ini b/tox.ini index ad1e7d5d0..5922c11fd 100644 --- a/tox.ini +++ b/tox.ini @@ -54,6 +54,7 @@ envlist = kafka-messagebroker_kafkapython-py38-kafkapython{020001,020000}, kafka-messagebroker_kafkapython-{py37,py38,pypy310}-kafkapythonlatest, kafka-messagebroker_kafkapython-{py38,py39,py310,py311,py312,py313,pypy310}-kafkapythonnglatest, + kombu-messagebroker_kombu-{py38,py39,py310,py311,py312,py313,pypy310}-kombulatest, memcached-datastore_aiomcache-{py38,py39,py310,py311,py312,py313}, memcached-datastore_bmemcached-{py37,py38,py39,py310,py311,py312,py313}, memcached-datastore_memcache-{py37,py38,py39,py310,py311,py312,py313,pypy310}-memcached01, @@ -417,6 +418,7 @@ deps = messagebroker_kafkapython-kafkapythonlatest: kafka-python messagebroker_kafkapython-kafkapython020001: kafka-python<2.0.2 messagebroker_kafkapython-kafkapython020000: kafka-python<2.0.1 + messagebroker_kombu-kombulatest: kombu template_genshi-genshilatest: genshi template_jinja2-jinja2latest: Jinja2 template_jinja2-jinja2030103: Jinja2<3.1.4 @@ -533,6 +535,7 @@ changedir = logger_structlog: tests/logger_structlog messagebroker_confluentkafka: tests/messagebroker_confluentkafka messagebroker_kafkapython: tests/messagebroker_kafkapython + messagebroker_kombu: tests/messagebroker_kombu messagebroker_pika: tests/messagebroker_pika mlmodel_langchain: tests/mlmodel_langchain mlmodel_openai: tests/mlmodel_openai