Skip to content

Commit 2f0ae36

Browse files
committed
Fixes to address comments
1 parent b5235f8 commit 2f0ae36

File tree

8 files changed

+160
-10
lines changed

8 files changed

+160
-10
lines changed

tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,11 @@ public async Task FlushAsync()
260260
{
261261
_aggregator.AddBacklog(in backlogPoint);
262262
}
263+
264+
while (_transactionBuffer.TryDequeue(out var transactionPoint))
265+
{
266+
_aggregator.AddTransaction(transactionPoint);
267+
}
263268
}
264269
catch (Exception ex)
265270
{

tracer/src/Datadog.Trace/DataStreamsMonitoring/SpanExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ internal static class SpanExtensions
1818
/// <param name="checkpointName">The checkpoint name at which the transaction is being tracked</param>
1919
internal static void TrackTransaction(this Span span, DataStreamsManager? manager, string transactionId, string checkpointName)
2020
{
21-
if (manager is null)
21+
if (manager is null || !manager.IsTransactionTrackingEnabled)
2222
{
2323
return;
2424
}

tracer/src/Datadog.Trace/DataStreamsMonitoring/TransactionTracking/DataStreamsExtractorRegistry.cs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,17 @@
44
// </copyright>
55
#nullable enable
66

7+
using System;
78
using System.Collections.Generic;
9+
using Datadog.Trace.Logging;
810
using Datadog.Trace.Util.Json;
911

1012
namespace Datadog.Trace.DataStreamsMonitoring.TransactionTracking;
1113

1214
internal sealed class DataStreamsExtractorRegistry
1315
{
16+
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor<DataStreamsExtractorRegistry>();
17+
1418
private readonly Dictionary<DataStreamsTransactionExtractor.Type, List<DataStreamsTransactionExtractor>> _extractors = new();
1519

1620
internal DataStreamsExtractorRegistry(string extractorsJson)
@@ -20,7 +24,17 @@ internal DataStreamsExtractorRegistry(string extractorsJson)
2024
return;
2125
}
2226

23-
var deserialized = JsonHelper.DeserializeObject<List<DataStreamsTransactionExtractor>>(extractorsJson);
27+
List<DataStreamsTransactionExtractor>? deserialized;
28+
try
29+
{
30+
deserialized = JsonHelper.DeserializeObject<List<DataStreamsTransactionExtractor>>(extractorsJson);
31+
}
32+
catch (Exception ex)
33+
{
34+
Log.Warning(ex, "Failed to parse DD_DATA_STREAMS_TRANSACTION_EXTRACTORS value. Transaction tracking extractors will be disabled.");
35+
return;
36+
}
37+
2438
if (deserialized == null)
2539
{
2640
return;

tracer/src/Datadog.Trace/DataStreamsMonitoring/TransactionTracking/DataStreamsTransactionInfo.cs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ namespace Datadog.Trace.DataStreamsMonitoring.TransactionTracking;
1313

1414
internal readonly struct DataStreamsTransactionInfo
1515
{
16+
private const int MaxIdBytes = 255;
17+
1618
private static readonly ConcurrentDictionary<string, int> Cache = new();
1719
private static int _counter;
1820

@@ -22,22 +24,35 @@ internal readonly struct DataStreamsTransactionInfo
2224

2325
internal DataStreamsTransactionInfo(string id, long timestamp, string checkpoint)
2426
{
25-
_idBytes = Encoding.UTF8.GetBytes(id);
27+
var encoded = Encoding.UTF8.GetBytes(id);
28+
_idBytes = Truncate(encoded);
2629
_timestamp = timestamp;
27-
_checkpointId = Cache.GetOrAdd(checkpoint, Interlocked.Increment(ref _counter));
30+
_checkpointId = Cache.GetOrAdd(checkpoint, _ => Interlocked.Increment(ref _counter));
2831
}
2932

3033
internal DataStreamsTransactionInfo(byte[] idBytes, long timestamp, string checkpoint)
3134
{
32-
_idBytes = idBytes;
35+
_idBytes = Truncate(idBytes);
3336
_timestamp = timestamp;
34-
_checkpointId = Cache.GetOrAdd(checkpoint, Interlocked.Increment(ref _counter));
37+
_checkpointId = Cache.GetOrAdd(checkpoint, _ => Interlocked.Increment(ref _counter));
3538
}
3639

3740
internal long TimestampNs { get => _timestamp; }
3841

3942
internal string TransactionId { get => Encoding.UTF8.GetString(_idBytes); }
4043

44+
private static byte[] Truncate(byte[] source)
45+
{
46+
if (source.Length <= MaxIdBytes)
47+
{
48+
return source;
49+
}
50+
51+
var truncated = new byte[MaxIdBytes];
52+
Array.Copy(source, truncated, MaxIdBytes);
53+
return truncated;
54+
}
55+
4156
internal static byte[] GetCacheBytes()
4257
{
4358
var result = new byte[512];
@@ -92,7 +107,7 @@ internal void WriteTo(byte[] buffer, int offset)
92107
buffer[offset + 7] = (byte)(_timestamp >> 8);
93108
buffer[offset + 8] = (byte)_timestamp;
94109

95-
// id size, up to 256 bytes
110+
// id size, up to 255 bytes
96111
buffer[offset + 9] = (byte)_idBytes.Length;
97112

98113
// copy the ID

tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsExtractorRegistryTest.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,29 @@ public void DeserializeCorrectly()
1818
var registry = new DataStreamsExtractorRegistry("[{\"name\": \"transaction-origin\", \"type\": \"HTTP_OUT_HEADERS\", \"value\": \"transaction-id\"}]");
1919
registry.AsJson().Should().Be("{\"HttpOutHeaders\":[{\"name\":\"transaction-origin\",\"type\":\"HTTP_OUT_HEADERS\",\"value\":\"transaction-id\",\"ExtractorType\":1}]}");
2020
}
21+
22+
[Fact]
23+
public void GetExtractorsByType_ReturnsAllExtractors_ForSameType()
24+
{
25+
var registry = new DataStreamsExtractorRegistry(
26+
"[" +
27+
"{\"name\": \"n1\", \"type\": \"HTTP_OUT_HEADERS\", \"value\": \"v1\"}," +
28+
"{\"name\": \"n2\", \"type\": \"HTTP_OUT_HEADERS\", \"value\": \"v2\"}" +
29+
"]");
30+
31+
var extractors = registry.GetExtractorsByType(DataStreamsTransactionExtractor.Type.HttpOutHeaders);
32+
33+
extractors.Should().HaveCount(2);
34+
extractors.Should().Contain(e => e.Name == "n1" && e.Value == "v1");
35+
extractors.Should().Contain(e => e.Name == "n2" && e.Value == "v2");
36+
}
37+
38+
[Fact]
39+
public void GetExtractorsByType_DoesNotReturnExtractors_ForOtherType()
40+
{
41+
var registry = new DataStreamsExtractorRegistry(
42+
"[{\"name\": \"n1\", \"type\": \"HTTP_OUT_HEADERS\", \"value\": \"v1\"}]");
43+
44+
registry.GetExtractorsByType(DataStreamsTransactionExtractor.Type.HttpInHeaders).Should().BeNull();
45+
}
2146
}

tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -274,11 +274,14 @@ public void WhenEnabled_TrackTransaction_AddsTransactionAndTagsSpan()
274274
[Fact]
275275
public void WhenDisabled_TrackTransaction_DoesNothing()
276276
{
277-
var dsm = GetDataStreamManager(false, out _);
277+
var dsm = GetDataStreamManager(false, out var writer);
278278
var span = new Span(new SpanContext(traceId: 123, spanId: 456), DateTimeOffset.UtcNow);
279279

280-
var act = () => span.TrackTransaction(dsm, "tx-abc", "some-checkpoint");
281-
act.Should().NotThrow();
280+
span.TrackTransaction(dsm, "tx-abc", "some-checkpoint");
281+
282+
span.Tags.GetTag("dsm.transaction.id").Should().BeNull();
283+
// writer is null when DSM is disabled, so nothing could have been enqueued
284+
writer.Should().BeNull();
282285
}
283286

284287
[Fact]

tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsTransactionInfoTest.cs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,57 @@ public void GetByteCount_MatchesGetBytesLength()
6363
var transaction = new DataStreamsTransactionInfo("hello-world", 12345L, "my-cp");
6464
transaction.GetByteCount().Should().Be(transaction.GetBytes().Length);
6565
}
66+
67+
[Fact]
68+
public void CheckpointId_DoesNotIncrement_WhenCheckpointAlreadyCached()
69+
{
70+
DataStreamsTransactionInfo.ClearCache();
71+
72+
// First use of "cp-a" gets id 1, first use of "cp-b" gets id 2.
73+
var a1 = new DataStreamsTransactionInfo("tx1", 1, "cp-a");
74+
var b1 = new DataStreamsTransactionInfo("tx2", 1, "cp-b");
75+
76+
// Repeated uses of existing checkpoint names must not increment the counter.
77+
for (var i = 0; i < 10; i++)
78+
{
79+
_ = new DataStreamsTransactionInfo("tx3", 1, "cp-a");
80+
_ = new DataStreamsTransactionInfo("tx4", 1, "cp-b");
81+
}
82+
83+
// A brand-new checkpoint must still get the next sequential id (3), not some higher value.
84+
var c1 = new DataStreamsTransactionInfo("tx5", 1, "cp-c");
85+
86+
a1.GetBytes()[0].Should().Be(1);
87+
b1.GetBytes()[0].Should().Be(2);
88+
c1.GetBytes()[0].Should().Be(3, "counter must only advance for new checkpoint names");
89+
}
90+
91+
[Theory]
92+
[InlineData(255)] // exact limit — no truncation
93+
[InlineData(256)] // one byte over
94+
[InlineData(512)] // well over
95+
public void LongTransactionId_IsTruncatedTo255Bytes(int idByteLength)
96+
{
97+
var id = new string('a', idByteLength);
98+
var transaction = new DataStreamsTransactionInfo(id, 1L, "cp");
99+
var bytes = transaction.GetBytes();
100+
101+
// byte at offset 9 is the encoded length; must never exceed 255
102+
bytes[9].Should().Be(255);
103+
// total payload length must match: 10 header bytes + encoded id length
104+
bytes.Length.Should().Be(10 + 255);
105+
// GetByteCount must agree
106+
transaction.GetByteCount().Should().Be(bytes.Length);
107+
}
108+
109+
[Fact]
110+
public void LongTransactionId_FromByteArray_IsTruncatedTo255Bytes()
111+
{
112+
var idBytes = new byte[300];
113+
var transaction = new DataStreamsTransactionInfo(idBytes, 1L, "cp");
114+
var bytes = transaction.GetBytes();
115+
116+
bytes[9].Should().Be(255);
117+
bytes.Length.Should().Be(10 + 255);
118+
}
66119
}

tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsWriterTests.cs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,41 @@ public async Task WhenNotSupported_DoesNotReportAnyBucketsOnClose(bool? isSuppor
230230
api.Sent.Should().BeEmpty();
231231
}
232232

233+
[Fact]
234+
public async Task WhenSupported_WritesTransaction_OnClose()
235+
{
236+
// mirrors WhenSupported_WritesAStatsPoint_OnClose for transactions
237+
var bucketDuration = 100_000_000;
238+
var api = new StubApi();
239+
var writer = CreateWriter(api, out var discovery, bucketDuration);
240+
TriggerSupportUpdate(discovery, isSupported: true);
241+
242+
writer.AddTransaction(new DataStreamsTransactionInfo("tx-id", 1L, "cp"));
243+
244+
await writer.DisposeAsync();
245+
246+
api.Sent.Should().ContainSingle();
247+
}
248+
249+
[Fact]
250+
public async Task FlushAsync_DrainsPendingTransactions()
251+
{
252+
// int.MaxValue bucket: timer will never fire and ShouldFlushTransactions
253+
// won't trigger for a small payload. FlushAsync must drain _transactionBuffer
254+
// directly — it is called immediately so ProcessQueueLoop has not yet woken
255+
// from its initial 10 ms sleep.
256+
var bucketDuration = int.MaxValue;
257+
var api = new StubApi();
258+
var writer = CreateWriter(api, out var discovery, bucketDuration);
259+
TriggerSupportUpdate(discovery, isSupported: true);
260+
261+
writer.AddTransaction(new DataStreamsTransactionInfo("tx-id", 1L, "cp"));
262+
await writer.FlushAsync();
263+
264+
api.Sent.Should().NotBeEmpty("FlushAsync must drain the transaction buffer");
265+
await writer.DisposeAsync();
266+
}
267+
233268
[Fact]
234269
public async Task CanCreateWriterWithDefaultBucket()
235270
{

0 commit comments

Comments
 (0)