Skip to content

Commit aef4271

Browse files
committed
fix: Add fallback address and Rider prefix detection for MassTransit (#3519)
GetQueueData now accepts an optional fallback URI (DestinationAddress) for when SourceAddress yields Unknown — needed for Kafka Rider where the consume context SourceAddress is the bus endpoint with no queue info. Rider path prefixes (/kafka/, /event-hub/) are detected regardless of the bus transport scheme, so Kafka topics are correctly identified as Topic destination type whether the bus uses RabbitMQ, InMemory, or any other transport.
1 parent f631e55 commit aef4271

3 files changed

Lines changed: 50 additions & 7 deletions

File tree

src/Agent/NewRelic/Agent/Extensions/NewRelic.Agent.Extensions/Helpers/MassTransit.cs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,21 @@ public class MassTransitQueueData
1515

1616
public class MassTransitHelpers
1717
{
18-
public static MassTransitQueueData GetQueueData(Uri sourceAddress)
18+
public static MassTransitQueueData GetQueueData(Uri sourceAddress, Uri fallbackAddress = null)
19+
{
20+
var data = GetQueueDataFromUri(sourceAddress);
21+
22+
// If the primary address didn't yield a meaningful name, try the fallback.
23+
// This handles transports like Kafka Rider where SourceAddress may be the
24+
// bus endpoint (e.g. loopback://localhost/) while DestinationAddress contains
25+
// the actual topic/queue information.
26+
if (data.QueueName == "Unknown" && fallbackAddress != null)
27+
data = GetQueueDataFromUri(fallbackAddress);
28+
29+
return data;
30+
}
31+
32+
private static MassTransitQueueData GetQueueDataFromUri(Uri sourceAddress)
1933
{
2034
var data = new MassTransitQueueData();
2135

@@ -41,6 +55,11 @@ public static MassTransitQueueData GetQueueData(Uri sourceAddress)
4155
return data;
4256
}
4357

58+
// MassTransit Rider embeds path prefixes (e.g. /kafka/, /event-hub/) regardless
59+
// of the bus transport scheme. Check for these before scheme-specific parsing.
60+
if (TryParseRiderPrefix(sourceAddress, data))
61+
return data;
62+
4463
// Transport-specific parsing by URI scheme
4564
switch (scheme)
4665
{
@@ -86,6 +105,30 @@ public static MassTransitQueueData GetQueueData(Uri sourceAddress)
86105
return data;
87106
}
88107

108+
private static bool TryParseRiderPrefix(Uri sourceAddress, MassTransitQueueData data)
109+
{
110+
// MassTransit Rider transports (Kafka, Event Hubs) embed a path prefix
111+
// in the URI regardless of the underlying bus transport scheme.
112+
// e.g. loopback://localhost/kafka/{topic}, rabbitmq://host/kafka/{topic}
113+
var path = sourceAddress.AbsolutePath.TrimStart('/');
114+
115+
if (path.StartsWith("kafka/", StringComparison.OrdinalIgnoreCase))
116+
{
117+
data.QueueName = path.Substring("kafka/".Length).TrimEnd('/');
118+
data.DestinationType = MessageBrokerDestinationType.Topic;
119+
return true;
120+
}
121+
122+
if (path.StartsWith("event-hub/", StringComparison.OrdinalIgnoreCase))
123+
{
124+
data.QueueName = path.Substring("event-hub/".Length).TrimEnd('/');
125+
data.DestinationType = MessageBrokerDestinationType.Topic;
126+
return true;
127+
}
128+
129+
return false;
130+
}
131+
89132
private static void ParseRabbitMqUri(Uri sourceAddress, MassTransitQueueData data)
90133
{
91134
// RabbitMQ uses underscore-delimited names:

src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/MassTransit/NewRelicFilter.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public async Task Send(ConsumeContext context, IPipe<ConsumeContext> next)
4040

4141
var mc = new MethodCall(_consumeMethod, context, default(string[]), true);
4242

43-
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress);
43+
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress, context.DestinationAddress);
4444

4545
var transaction = _agent.CreateTransaction(
4646
destinationType: queueData.DestinationType,
@@ -86,7 +86,7 @@ public async Task Send(PublishContext context, IPipe<PublishContext> next)
8686

8787
var mc = new MethodCall(_publishMethod, context, default(string[]), true);
8888

89-
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress);
89+
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress, context.DestinationAddress);
9090

9191
var transaction = _agent.CurrentTransaction;
9292
InsertDistributedTraceHeaders(context.Headers, transaction);
@@ -103,7 +103,7 @@ public async Task Send(SendContext context, IPipe<SendContext> next)
103103

104104
var mc = new MethodCall(_sendMethod, context, default(string[]), true);
105105

106-
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress);
106+
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress, context.DestinationAddress);
107107

108108
var transaction = _agent.CurrentTransaction;
109109
InsertDistributedTraceHeaders(context.Headers, transaction);

src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/MassTransitLegacy/NewRelicFilter.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public async Task Send(ConsumeContext context, IPipe<ConsumeContext> next)
4141

4242
var mc = new MethodCall(_consumeMethod, context, default(string[]), true);
4343

44-
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress);
44+
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress, context.DestinationAddress);
4545

4646
var transaction = _agent.CreateTransaction(
4747
destinationType: queueData.DestinationType,
@@ -87,7 +87,7 @@ public async Task Send(PublishContext context, IPipe<PublishContext> next)
8787

8888
var mc = new MethodCall(_publishMethod, context, default(string[]), true);
8989

90-
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress);
90+
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress, context.DestinationAddress);
9191

9292
var transaction = _agent.CurrentTransaction;
9393
InsertDistributedTraceHeaders(context.Headers, transaction);
@@ -104,7 +104,7 @@ public async Task Send(SendContext context, IPipe<SendContext> next)
104104

105105
var mc = new MethodCall(_sendMethod, context, default(string[]), true);
106106

107-
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress);
107+
var queueData = MassTransitHelpers.GetQueueData(context.SourceAddress, context.DestinationAddress);
108108

109109
var transaction = _agent.CurrentTransaction;
110110
InsertDistributedTraceHeaders(context.Headers, transaction);

0 commit comments

Comments
 (0)