Skip to content

fix: Support multi-transport queue name parsing for MassTransit#3523

Merged
tippmar-nr merged 12 commits intomainfrom
fix/masstransit-multi-transport-queue-parsing
Apr 8, 2026
Merged

fix: Support multi-transport queue name parsing for MassTransit#3523
tippmar-nr merged 12 commits intomainfrom
fix/masstransit-multi-transport-queue-parsing

Conversation

@tippmar-nr
Copy link
Copy Markdown
Member

@tippmar-nr tippmar-nr commented Apr 3, 2026

Summary

Resolves #3519. MassTransit's Kafka transport (and other non-RabbitMQ transports) reported queue names as Unknown with destination type Queue instead of Topic.

Root causes:

  • GetQueueData() only understood RabbitMQ-style underscore-delimited URIs
  • NewRelicFilter used SourceAddress exclusively, which for Kafka Rider is the bus endpoint (no queue/topic info)
  • The consume path in both wrappers hardcoded MessageBrokerDestinationType.Queue

Fixes:

  • Rewrote GetQueueData() as a scheme-aware parser supporting all MassTransit transports (Kafka, RabbitMQ, Azure Service Bus, Amazon SQS, ActiveMQ, InMemory)
  • Added fallback to DestinationAddress when SourceAddress yields Unknown — handles Kafka Rider where SourceAddress is the bus endpoint
  • Detect MassTransit Rider path prefixes (/kafka/, /event-hub/) regardless of bus transport scheme
  • Fixed hardcoded MessageBrokerDestinationType.Queue in the consume path of both modern and legacy NewRelicFilter

Tests:

  • 54 unit tests covering all transport URI formats, fallback behavior, and Rider prefix detection
  • New multi-transport container integration test exercising Kafka, RabbitMQ, and InMemory transports simultaneously
  • Updated existing MassTransit integration test regex for compatibility with the new parser

Test plan

  • Unit tests pass (54 tests)
  • FullAgent.sln builds
  • Container integration test: MassTransitDotNet10Test
  • Container integration test: MassTransitDotNet8Test
  • Existing MassTransit integration tests (StartHost/StartBus variants)
  • CI pipeline

MassTransitHelpers.GetQueueData() only understood RabbitMQ-style URIs,
causing all other transports (Kafka, Azure Service Bus, SQS, ActiveMQ,
In-Memory) to report queue name as "Unknown" with incorrect destination
type. Rewrote the parser to be scheme-aware, handling each transport's
URI format. Also fixed hardcoded MessageBrokerDestinationType.Queue in
the consume path of both modern and legacy NewRelicFilter.
…uration

Add MassTransitTestApp project and MassTransit.Kafka/MassTransit.RabbitMQ
packages to Dotty tracking. Both packages are pinned to 8.x (9.x requires
a paid license) and net8.0 TFM is excluded from updates.
…3519)

GetQueueData now accepts an optional fallback URI (DestinationAddress) for
when SourceAddress yields Unknown — needed for Kafka Rider where the
consume context SourceAddress is the bus endpoint with no queue info.

Rider path prefixes (/kafka/, /event-hub/) are detected regardless of
the bus transport scheme, so Kafka topics are correctly identified as
Topic destination type whether the bus uses RabbitMQ, InMemory, or any
other transport.
)

New MassTransitTestApp exercises three transports in a single container test:
- Kafka Rider: validates Topic destination type and correct topic names
- RabbitMQ: validates Queue publish/send/consume through MassTransit filter
- InMemory MultiBus: validates loopback:// URI parsing via DI-registered bus

Also updates existing MassTransit integration test regex to accommodate
the parser change from underscore-split names to full path segments, and
adds unit tests for fallback address and Rider prefix detection (49 total).
…compose

Aligns with the refactor in main that extracts common service config
into docker-compose-base.yml.
@tippmar-nr tippmar-nr marked this pull request as ready for review April 6, 2026 17:41
@tippmar-nr tippmar-nr requested a review from a team as a code owner April 6, 2026 17:41
@tippmar-nr
Copy link
Copy Markdown
Member Author

tippmar-nr commented Apr 7, 2026

MassTransit URI Format Reference for GetQueueData()

This document maps each MassTransit transport URI format to its source code in the MassTransit repo and shows how our GetQueueDataFromUri() implementation handles it.

Agent source: src/Agent/NewRelic/Agent/Extensions/NewRelic.Agent.Extensions/Helpers/MassTransit.cs (branch fix/masstransit-multi-transport-queue-parsing)


1. RabbitMQ

MassTransit source: RabbitMqEndpointAddress.cs

Schemes: rabbitmq, rabbitmqs (SSL, verified), amqp, amqps

URI format:

rabbitmq://host/{name}                          (default vhost "/")
rabbitmq://host/{vhost}/{name}                  (named vhost)
rabbitmq://host/{vhost}/{Hostname}_{AppName}_bus_{id}?temporary=true   (auto-generated bus endpoint)
rabbitmqs://host/{vhost}/{name}                 (SSL connection, port 5671)

Our parsing (ParseRabbitMqUri): Splits by _ to extract the last segment (e.g. the random id suffix). If no underscores, falls back to last path segment. Checks ?temporary=true for temp queue detection.

Query params: temporary, durable, autodelete, type, bind, queue, delayedtype, alternateexchange, bindexchange, singleactiveconsumer


2. Kafka Rider

MassTransit source: KafkaTopicAddress.cs

Scheme: Inherits from host bus (NOT kafka://) -- this is the critical distinction

PathPrefix constant: "kafka"

URI format:

{bus-scheme}://host/kafka/{topic}

Examples:

loopback://localhost/kafka/my-topic          (InMemory bus + Kafka Rider)
rabbitmq://broker/kafka/my-topic             (RabbitMQ bus + Kafka Rider)

Our parsing (TryParseRiderPrefix): Checks AbsolutePath for kafka/ prefix before any scheme-specific parsing. Strips the prefix and sets DestinationType = Topic. This runs early in the parsing pipeline so it takes priority over scheme-specific handlers.

Note: The kafka:// scheme is also handled in the scheme switch as a fallback, but MassTransit does not currently generate this format.


3. Azure Event Hubs Rider

MassTransit source: EventHubEndpointAddress.cs

Scheme: Inherits from host bus

PathPrefix constant: "event-hub" (verified: public const string PathPrefix = "event-hub")

URI format:

{bus-scheme}://host/event-hub/{hub-name}
sb://namespace.servicebus.windows.net/event-hub/{hub-name}

Our parsing (TryParseRiderPrefix): Same early check for event-hub/ prefix. Strips prefix, sets DestinationType = Topic. Also handled in ParseServiceBusUri for the sb:// scheme case.


4. Azure Service Bus

MassTransit source: ServiceBusEndpointAddress.cs

Scheme: sb

URI format:

sb://namespace.servicebus.windows.net/{name}                (no scope)
sb://namespace.servicebus.windows.net/{scope}/{name}        (with scope)
sb://namespace.servicebus.windows.net/{name}?type=topic     (topic)

Our parsing (ParseServiceBusUri): First checks for event-hub/ path prefix (Event Hubs Rider over SB transport). Otherwise extracts last path segment. Uses ?type=topic query param for destination type. Treats ?autodelete as temp queue.

Query params: autodelete (seconds), type (queue or topic)


5. Amazon SQS

MassTransit source: AmazonSqsEndpointAddress.cs

Scheme: amazonsqs

URI format:

amazonsqs://region/{name}                       (no scope)
amazonsqs://region/{scope}/{name}               (with scope)
amazonsqs://region/{name}?type=topic            (SNS topic)

Our parsing (scheme switch): Extracts last path segment. Checks ?type=topic for topic detection. Checks ?temporary=true for temp queue.

Query params: durable, autodelete, temporary, type


6. ActiveMQ

MassTransit source: ActiveMqEndpointAddress.cs

Schemes: activemq, amqp (shared with RabbitMQ -- but amqp is handled in our ActiveMQ branch, while rabbitmq has its own parser)

URI format:

activemq://host/{name}                          (default vhost "/")
activemq://host/{vhost}/{name}                  (named vhost)
activemq://host/{name}?type=topic               (topic destination)

Default port: 61616 (omitted from URI)

Our parsing (scheme switch): Extracts last path segment. Checks ?type=topic for topic detection. Checks ?temporary=true for temp queue.

Query params: durable, autodelete, temporary, type

Note on amqp/amqps ambiguity: Both RabbitMQ and ActiveMQ can use amqp:// scheme. Our code handles amqp/amqps in the ActiveMQ branch (last-path-segment extraction), not the RabbitMQ branch (underscore splitting). This is correct because the underscore-delimited naming is a RabbitMQ convention, and amqp:// URIs from either transport work fine with last-path-segment extraction.


7. InMemory / Loopback

MassTransit source: InMemoryEndpointAddress.cs

Scheme: loopback

URI format:

loopback://localhost/{name}                     (default vhost "/")
loopback://localhost/{vhost}/{name}             (named vhost)

Our parsing (scheme switch): Extracts last path segment. No temp queue detection (InMemory doesn't support persistent queues).

Query params: bind, queue, type (exchange type)


8. SQL Transport

MassTransit source: SqlEndpointAddress.cs, SqlHostAddress.cs

Scheme: db (verified: public const string DbScheme = "db" in SqlHostAddress.cs)

URI format:

db://host/{virtualhost}/{name}                  (queue)
db://host/{virtualhost}.{area}/{name}           (queue with area/scope)
db://host/{virtualhost}/{name}?type=topic       (topic)

Our parsing: No explicit case "db": — falls through to default, which calls GetLastPathSegment(). This correctly extracts the queue name. However, ?type=topic and ?autodelete query params are not evaluated in the default branch, so SQL transport topics would be reported as Queue instead of Topic, and auto-delete queues would not be detected as TempQueue.

Query params: autodelete, type, instance


9. WebJobs Transports

MassTransit source: MassTransit.WebJobs.EventHubsIntegration/ and MassTransit.WebJobs.ServiceBusIntegration/

These are Azure Functions (WebJobs) bindings for Event Hubs and Service Bus. They do not define their own endpoint address classes — they reuse the address classes from their parent transports (EventHubEndpointAddress and ServiceBusEndpointAddress respectively). No additional URI formats to handle.


10. Short-form Addressing (All Transports)

MassTransit source: All endpoint address classes support short-form pseudo-schemes.

Schemes: queue, topic, exchange

URI format:

queue:{name}
topic:{name}
exchange:{name}

Our parsing (scheme switch, first priority): Detected by scheme before any transport-specific parsing. topic: sets DestinationType = Topic. queue: and exchange: set DestinationType = Queue.

Verification note: Short-form support was verified in the source for RabbitMQ, ActiveMQ, InMemory, SQS, and Service Bus address classes. It was not independently verified for Kafka or Event Hub address classes.


Unknown / Unrecognized URI Formats

Behavior: The default case in the scheme switch calls GetLastPathSegment(). If the URI has no usable path segments (e.g. loopback://localhost/ with just a trailing slash), GetLastPathSegment() returns "Unknown".

Null URI: If sourceAddress is null, the method returns immediately with the default MassTransitQueueData (QueueName = "Unknown", DestinationType = Queue).

Fallback address: GetQueueData(sourceAddress, fallbackAddress) tries the primary address first. If it results in "Unknown", it tries the fallback. This handles Kafka Rider consume scenarios where SourceAddress is the bus endpoint (e.g. loopback://localhost/) and DestinationAddress has the actual topic.


Parsing Priority Order

  1. Null check -- returns "Unknown" immediately
  2. Short-form schemes (topic:, queue:, exchange:) -- highest priority
  3. Rider path prefixes (kafka/, event-hub/) -- checked before scheme-specific parsing, because Rider URIs inherit the bus transport scheme
  4. Transport scheme (rabbitmq, sb, amazonsqs, activemq, loopback, etc.)
  5. Default fallback -- last path segment extraction for any unrecognized scheme
  6. Fallback address -- if primary yields "Unknown", tries the secondary URI

Exhaustive Transport Enumeration

All 10 transports under src/Transports/ in the MassTransit repo have been enumerated and analyzed:

Transport Address class Scheme / Prefix Our handling
RabbitMQ RabbitMqEndpointAddress rabbitmq, rabbitmqs ParseRabbitMqUri
ActiveMQ ActiveMqEndpointAddress activemq, amqp, amqps scheme switch
Amazon SQS AmazonSqsEndpointAddress amazonsqs scheme switch
Azure Service Bus ServiceBusEndpointAddress sb ParseServiceBusUri
Kafka Rider KafkaTopicAddress path prefix kafka/ TryParseRiderPrefix
Event Hubs Rider EventHubEndpointAddress path prefix event-hub/ TryParseRiderPrefix
InMemory/Loopback InMemoryEndpointAddress loopback scheme switch
SQL Transport SqlEndpointAddress db default fallback
WebJobs.EventHubs (no own address class) reuses EventHubEndpointAddress inherited
WebJobs.ServiceBus (no own address class) reuses ServiceBusEndpointAddress inherited

No other Rider-style path prefixes exist beyond kafka/ and event-hub/.


Known Gaps

  1. SQL transport ?type=topic and ?autodelete not evaluated: The db:// scheme falls through to the default branch, which extracts the queue name correctly via GetLastPathSegment() but does not check query params. SQL transport topics would be reported as Queue instead of Topic, and auto-delete queues would not be detected as TempQueue. This is a relatively new MassTransit feature with limited adoption.

  2. Short-form pseudo-schemes for Kafka/Event Hub: The queue:, topic:, and exchange: short-form schemes were verified in the source for RabbitMQ, ActiveMQ, InMemory, SQS, and Service Bus. They were not independently verified in the Kafka or Event Hub address classes. Our code handles them regardless since the scheme check runs first, but the claim that "all endpoint address classes support short-form" is not fully verified.

MassTransit generates rabbitmqs:// URIs for SSL connections to RabbitMQ.
Add case "rabbitmqs" fallthrough to ParseRabbitMqUri so these URIs get
the underscore-splitting logic instead of falling through to the default
last-path-segment extraction.
@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 87.70492% with 15 lines in your changes missing coverage. Please review.
✅ Project coverage is 81.77%. Comparing base (407eae5) to head (cbc925c).

Files with missing lines Patch % Lines
...s/NewRelic.Agent.Extensions/Helpers/MassTransit.cs 87.70% 11 Missing and 4 partials ⚠️
Additional details and impacted files
@@           Coverage Diff            @@
##             main    #3523    +/-   ##
========================================
  Coverage   81.77%   81.77%            
========================================
  Files         508      508            
  Lines       34235    34345   +110     
  Branches     4041     4061    +20     
========================================
+ Hits        27995    28086    +91     
- Misses       5275     5289    +14     
- Partials      965      970     +5     
Flag Coverage Δ
Agent 82.76% <87.70%> (-0.01%) ⬇️
Profiler 71.75% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...s/NewRelic.Agent.Extensions/Helpers/MassTransit.cs 88.18% <87.70%> (-11.82%) ⬇️

... and 1 file with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Member

@nrcventura nrcventura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this code should not throw an exception if it encounters a uri with an unexpected format. However, there is a lot of code to wade through, and the explanation from claude describing where the Uri structure is defined, doesn't seem to be accurate, or at least does not seem to be complete enough for me to understand the expected formats. From what I've seen in the MassTransit repo I do not think that this parsing covers all of the possibilities, but I do think that it is an improvement over what was previously there.

@tippmar-nr tippmar-nr merged commit db48d08 into main Apr 8, 2026
118 checks passed
@tippmar-nr tippmar-nr deleted the fix/masstransit-multi-transport-queue-parsing branch April 8, 2026 14:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

MassTransit Kafka transport reports queue name as "Unknown" in transaction traces

3 participants