Skip to content

Commit d6634c8

Browse files
authored
chore: Ensure consistent DT header insertion/replacement logic across all wrappers (#3512)
* chore: Ensure consistent DT header insertion/replacement logic across all wrappers. * Fix flaky container test * test: Fix Kafka container test race condition and byte count range Move existing-headers produce+consume before cancellation-token consumers to prevent race where cancellation consumer could grab the larger existing-headers message. Widen byte count assertion range to accommodate non-deterministic span ordering from FirstOrDefault. * Rework dt propagation tests for SQS
1 parent 9c0a755 commit d6634c8

23 files changed

Lines changed: 686 additions & 33 deletions

File tree

src/Agent/NewRelic/Agent/Extensions/NewRelic.Agent.Extensions/AwsSdk/SqsHelper.cs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,6 @@ public static void InsertDistributedTraceHeaders(ITransaction transaction, objec
7878
var getMessageAttributes = _getMessageAttributes.GetOrAdd(smr.GetType(), t => VisibilityBypasser.Instance.GeneratePropertyAccessor<IDictionary>(t, "MessageAttributes"));
7979
var messageAttributes = getMessageAttributes(smr);
8080

81-
// if we can't add all DT headers, don't add any
82-
if ((messageAttributes.Count + dtHeaderCount - headersInserted) > MaxSQSMessageAttributes)
83-
return;
84-
8581
// create a new MessageAttributeValue instance
8682
var messageAttributeValueTypeFactory = _messageAttributeValueTypeFactory ??= VisibilityBypasser.Instance.GenerateTypeFactory(smr.GetType().Assembly.FullName, "Amazon.SQS.Model.MessageAttributeValue");
8783
object newMessageAttributeValue = messageAttributeValueTypeFactory.Invoke();
@@ -92,7 +88,19 @@ public static void InsertDistributedTraceHeaders(ITransaction transaction, objec
9288
var stringValuePropertySetter = VisibilityBypasser.Instance.GeneratePropertySetter<string>(newMessageAttributeValue, "StringValue");
9389
stringValuePropertySetter(value);
9490

95-
messageAttributes.Add(key, newMessageAttributeValue);
91+
if (messageAttributes.Contains(key))
92+
{
93+
// Replace existing header in place — no count change, no limit check needed
94+
messageAttributes[key] = newMessageAttributeValue;
95+
}
96+
else
97+
{
98+
// New header — check that adding all remaining DT headers won't exceed the SQS limit
99+
if ((messageAttributes.Count + dtHeaderCount - headersInserted) > MaxSQSMessageAttributes)
100+
return;
101+
102+
messageAttributes.Add(key, newMessageAttributeValue);
103+
}
96104

97105
++headersInserted;
98106
});

src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/RestSharp/AppendHeaders.cs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,17 @@ public AfterWrappedMethodDelegate BeforeWrappedMethod(InstrumentedMethodCall ins
2626
{
2727
var httpWebRequest = (HttpWebRequest)instrumentedMethodCall.MethodCall.MethodArguments[0];
2828

29-
var setHeaders = new Action<HttpWebRequest, string, string>((carrier, key, value) =>
29+
// Insert DT headers after AppendHeaders completes, so they overwrite any
30+
// pre-existing DT headers that RestSharp copied from the RestRequest.
31+
return Delegates.GetDelegateFor(onComplete: () =>
3032
{
31-
// 'Set' will replace an existing value
32-
httpWebRequest.Headers?.Set(key, value);
33-
});
34-
35-
agent.CurrentTransaction.InsertDistributedTraceHeaders(httpWebRequest, setHeaders);
33+
var setHeaders = new Action<HttpWebRequest, string, string>((carrier, key, value) =>
34+
{
35+
// 'Set' will replace an existing value
36+
carrier.Headers?.Set(key, value);
37+
});
3638

37-
return Delegates.NoOp;
39+
agent.CurrentTransaction.InsertDistributedTraceHeaders(httpWebRequest, setHeaders);
40+
});
3841
}
3942
}

src/Agent/NewRelic/Agent/Extensions/Providers/Wrapper/Wcf3/ChannelFactoryWrapper.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,13 @@ private void SetHeaders (HttpRequestMessageProperty carrier, string key, string
119119

120120
private void SetHeaders(Message carrier, string key, string value)
121121
{
122-
// 'Set' will replace an existing value
122+
// Remove existing header if present, then add the new one
123+
var headerIndex = carrier.Headers.FindHeader(key, string.Empty);
124+
if (headerIndex >= 0)
125+
{
126+
carrier.Headers.RemoveAt(headerIndex);
127+
}
128+
123129
carrier.Headers.Add(MessageHeader.CreateHeader(key, string.Empty, value));
124130
}
125131

tests/Agent/IntegrationTests/Applications/BasicMvcApplication/Controllers/DefaultController.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,21 @@ public string ChainedWebRequest(string chainedServerName, string chainedPortNumb
160160
return "Worked";
161161
}
162162

163+
public string ChainedWebRequestWithExistingHeaders(string chainedServerName, string chainedPortNumber, string chainedAction)
164+
{
165+
var address = $"http://{chainedServerName}:{chainedPortNumber}/Default/{chainedAction}";
166+
#pragma warning disable SYSLIB0014 // obsolete usage is ok here
167+
var httpWebRequest = (HttpWebRequest)WebRequest.Create(address);
168+
// Pre-populate with stale DT headers — agent should replace them
169+
httpWebRequest.Headers.Set("traceparent", "00-stale0000000000000000000000000-stale000000000-01");
170+
httpWebRequest.Headers.Set("tracestate", "stale=value");
171+
httpWebRequest.Headers.Set("newrelic", "stale-newrelic-payload");
172+
httpWebRequest.GetResponse();
173+
#pragma warning restore SYSLIB0014
174+
175+
return "Worked";
176+
}
177+
163178
public async Task<string> ChainedHttpClient(string chainedServerName, string chainedPortNumber, string chainedAction)
164179
{
165180
var address = $"http://{chainedServerName}:{chainedPortNumber}/Default/{chainedAction}";

tests/Agent/IntegrationTests/Applications/BasicMvcApplication/Controllers/DistributedTracingController.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,23 @@ public async Task<string> MakeExternalCallUsingRestClient(string externalCallUrl
6262
}
6363
return "Worked";
6464
}
65+
66+
[Route("DistributedTracing/MakeExternalCallUsingRestClientWithExistingHeaders")]
67+
public async Task<string> MakeExternalCallUsingRestClientWithExistingHeaders(string externalCallUrl)
68+
{
69+
var uri = new Uri(externalCallUrl);
70+
var client = new RestClient($"http://{uri.Host}:{uri.Port}");
71+
var restRequest = new RestRequest(uri.PathAndQuery);
72+
// Pre-populate with stale DT headers — agent should replace them
73+
restRequest.AddHeader("traceparent", "00-stale0000000000000000000000000-stale000000000-01");
74+
restRequest.AddHeader("tracestate", "stale=value");
75+
restRequest.AddHeader("newrelic", "stale-newrelic-payload");
76+
var response = await client.ExecuteTaskAsync<IEnumerable<Bird>>(restRequest);
77+
78+
if ((response.StatusCode != HttpStatusCode.OK) && (response.StatusCode != HttpStatusCode.NoContent))
79+
{
80+
return $"Unexpected HTTP status code {response.StatusCode}";
81+
}
82+
return "Worked";
83+
}
6584
}

tests/Agent/IntegrationTests/Applications/Owin4WebApi/Controllers/DistributedTracingSenderController.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,29 @@ public async Task<string> CallNext(string nextUrl)
3030
return result;
3131
}
3232
}
33+
34+
[HttpGet]
35+
[Route("api/CallNextWithExistingHeaders")]
36+
public async Task<string> CallNextWithExistingHeaders(string nextUrl)
37+
{
38+
try
39+
{
40+
using (var client = new HttpClient())
41+
{
42+
var request = new HttpRequestMessage(HttpMethod.Get, nextUrl);
43+
// Pre-populate with stale DT headers — agent should replace them
44+
request.Headers.Add("traceparent", "00-stale0000000000000000000000000-stale000000000-01");
45+
request.Headers.Add("tracestate", "stale=value");
46+
request.Headers.Add("newrelic", "stale-newrelic-payload");
47+
var response = await client.SendAsync(request);
48+
var result = await response.Content.ReadAsStringAsync();
49+
return result;
50+
}
51+
}
52+
catch (Exception ex)
53+
{
54+
var result = $"Exception occurred in {nameof(DistributedTracingSenderController)} calling [{nextUrl}]: {ex}";
55+
return result;
56+
}
57+
}
3358
}

tests/Agent/IntegrationTests/ContainerApplications/AwsSdkTestApp/AwsSdkExercisers/AwsSdkSQSExerciser.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,29 @@ await _amazonSqsClient.DeleteMessageAsync(new DeleteMessageRequest
142142
}
143143
}
144144

145+
[MethodImpl(MethodImplOptions.NoOptimization | MethodImplOptions.NoInlining)]
146+
public async Task SQS_SendMessageWithExistingDTHeadersAsync(string message)
147+
{
148+
if (_sqsQueueUrl == null)
149+
{
150+
throw new InvalidOperationException("Queue URL is not set. Call SQS_Initialize or SQS_SetQueueUrl first.");
151+
}
152+
153+
var request = new SendMessageRequest
154+
{
155+
QueueUrl = _sqsQueueUrl,
156+
MessageBody = message,
157+
MessageAttributes = new Dictionary<string, MessageAttributeValue>
158+
{
159+
{ "traceparent", new MessageAttributeValue { DataType = "String", StringValue = "00-stale0000000000000000000000000-stale000000000-01" } },
160+
{ "tracestate", new MessageAttributeValue { DataType = "String", StringValue = "stale=value" } },
161+
{ "newrelic", new MessageAttributeValue { DataType = "String", StringValue = "stale-newrelic-payload" } },
162+
}
163+
};
164+
165+
await _amazonSqsClient.SendMessageAsync(request);
166+
}
167+
145168
// send message batch
146169
[MethodImpl(MethodImplOptions.NoOptimization | MethodImplOptions.NoInlining)]
147170
public async Task SQS_SendMessageBatchAsync(string[] messages)

tests/Agent/IntegrationTests/ContainerApplications/AwsSdkTestApp/Controllers/AwsSdkSQSController.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,18 @@ public async Task SQS_SendMessageToQueueAsync([Required]string message, [Require
8282
_logger.LogInformation("Message {Message} sent to {Queue}", message, messageQueueUrl);
8383
}
8484

85+
// GET: /AwsSdk/SQS_SendMessageWithExistingDTHeadersToQueue?message=Hello&messageQueueUrl=MyQueue
86+
[HttpGet("SQS_SendMessageWithExistingDTHeadersToQueue")]
87+
public async Task SQS_SendMessageWithExistingDTHeadersToQueueAsync([Required]string message, [Required]string messageQueueUrl)
88+
{
89+
_logger.LogInformation("Sending message with existing DT headers {Message} to {Queue}", message, messageQueueUrl);
90+
using var awsSdkSQSExerciser = new AwsSdkSQSExerciser();
91+
awsSdkSQSExerciser.SQS_SetQueueUrl(messageQueueUrl);
92+
93+
await awsSdkSQSExerciser.SQS_SendMessageWithExistingDTHeadersAsync(message);
94+
_logger.LogInformation("Message with existing DT headers {Message} sent to {Queue}", message, messageQueueUrl);
95+
}
96+
8597
// GET: /AwsSdk/SQS_SendMessageBatchToQueue?messageQueueUrl=MyQueue
8698
[HttpGet("SQS_ReceiveMessageFromQueue")]
8799
public async Task<IEnumerable<Message>> SQS_ReceiveMessageFromQueueAsync([Required]string messageQueueUrl)

tests/Agent/IntegrationTests/ContainerApplications/KafkaTestApp/Controllers/KafkaController.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ public async Task<string> ProduceAsync()
3838
return "Complete";
3939
}
4040

41+
[HttpGet("produceasyncwithexistingheaders")]
42+
public async Task<string> ProduceAsyncWithExistingHeaders()
43+
{
44+
await _producer.ProduceAsyncWithExistingHeaders();
45+
return "Complete";
46+
}
47+
4148
[HttpGet("bootstrap_server")]
4249
public string GetBootstrapServer() => Program.GetBootstrapServer();
4350

tests/Agent/IntegrationTests/ContainerApplications/KafkaTestApp/Producer.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
using System.Runtime.CompilerServices;
5+
using System.Text;
56
using System.Threading.Tasks;
67
using Confluent.Kafka;
78
using Confluent.Kafka.Admin;
@@ -66,4 +67,25 @@ public async Task ProduceAsync()
6667

6768
await _producer.ProduceAsync(_topic, new Message<string, string> { Key = user, Value = item });
6869
}
70+
71+
[MethodImpl(MethodImplOptions.NoInlining)]
72+
public async Task ProduceAsyncWithExistingHeaders()
73+
{
74+
var user = "asyncExistingHeadersUser";
75+
var item = "asyncExistingHeadersItem";
76+
77+
var message = new Message<string, string>
78+
{
79+
Key = user,
80+
Value = item,
81+
Headers = new Headers
82+
{
83+
{ "traceparent", Encoding.ASCII.GetBytes("00-stale0000000000000000000000000-stale000000000-01") },
84+
{ "tracestate", Encoding.ASCII.GetBytes("stale=value") },
85+
{ "newrelic", Encoding.ASCII.GetBytes("stale-newrelic-payload") }
86+
}
87+
};
88+
89+
await _producer.ProduceAsync(_topic, message);
90+
}
6991
}

0 commit comments

Comments
 (0)