diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Http/HttpClient/HttpMessageHandlerCommon.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Http/HttpClient/HttpMessageHandlerCommon.cs index 38ea90c6e1c6..aba40bc89c1b 100644 --- a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Http/HttpClient/HttpMessageHandlerCommon.cs +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Http/HttpClient/HttpMessageHandlerCommon.cs @@ -8,6 +8,7 @@ using System.Threading; using Datadog.Trace.ClrProfiler.CallTarget; using Datadog.Trace.Configuration; +using Datadog.Trace.DataStreamsMonitoring.TransactionTracking; using Datadog.Trace.Propagators; namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Http.HttpClient @@ -42,6 +43,25 @@ public static CallTargetState OnMethodBegin(TTarget instance, integrationId, out var tags); + var dataStreamsManager = tracer.TracerManager.DataStreamsManager; + if (dataStreamsManager.IsTransactionTrackingEnabled) + { + var extractors = dataStreamsManager.GetExtractorsByType(DataStreamsTransactionExtractor.Type.HttpOutHeaders); + if (extractors != null) + { + foreach (var extractor in extractors) + { + if (headers.TryGetValues(extractor.Value, out var headerValues)) + { + foreach (var headerValue in headerValues) + { + dataStreamsManager.TrackTransaction(headerValue, extractor.Name); + } + } + } + } + } + if (scope is not null) { tags.HttpClientHandlerType = instance.GetType().FullName; diff --git a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs index e77cf84309d7..ea6935e4c07d 100644 --- a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs +++ b/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs @@ -12,6 +12,7 @@ using System.Threading; using Datadog.Trace.Configuration.Schema; using Datadog.Trace.DataStreamsMonitoring; +using Datadog.Trace.DataStreamsMonitoring.TransactionTracking; using Datadog.Trace.DataStreamsMonitoring.Utils; using Datadog.Trace.DuckTyping; using Datadog.Trace.Logging; @@ -288,6 +289,11 @@ private static long GetMessageSize(T message) var base64PathwayContext = Convert.ToBase64String(BitConverter.GetBytes(span.Context.PathwayContext.Value.Hash.Value)); message.Headers.Add(DataStreamsPropagationHeaders.TemporaryBase64PathwayContext, Encoding.UTF8.GetBytes(base64PathwayContext)); } + + if (!dataStreamsManager.IsInDefaultState) + { + ApplyDataStreamsExtractors(dataStreamsManager, DataStreamsTransactionExtractor.Type.KafkaConsumeHeaders, message); + } } } catch (Exception ex) @@ -327,6 +333,30 @@ internal static void CloseConsumerScope(Tracer tracer) } } + internal static void ApplyDataStreamsExtractors( + DataStreamsManager dataStreamsManager, + DataStreamsTransactionExtractor.Type extractorType, + TMessage? message) + where TMessage : IMessage + { + if (message?.Instance == null) + { + return; + } + + var extractors = dataStreamsManager.GetExtractorsByType(extractorType); + if (extractors != null && message.Headers != null) + { + foreach (var extractor in extractors) + { + if (message.Headers.TryGetLastBytes(extractor.Value, out var transactionId)) + { + dataStreamsManager.TrackTransaction(transactionId, extractor.Name); + } + } + } + } + /// /// Try to inject the prop /// @@ -392,6 +422,10 @@ internal static void TryInjectHeaders( } dataStreamsManager.InjectPathwayContext(span.Context.PathwayContext, adapter); + if (!dataStreamsManager.IsInDefaultState) + { + ApplyDataStreamsExtractors(dataStreamsManager, DataStreamsTransactionExtractor.Type.KafkaProduceHeaders, message); + } } } catch (Exception ex) diff --git a/tracer/src/Datadog.Trace/Configuration/TracerSettings.cs b/tracer/src/Datadog.Trace/Configuration/TracerSettings.cs index cb03b502c47f..30977cbb3ab0 100644 --- a/tracer/src/Datadog.Trace/Configuration/TracerSettings.cs +++ b/tracer/src/Datadog.Trace/Configuration/TracerSettings.cs @@ -584,6 +584,10 @@ not null when string.Equals(value, "otlp", StringComparison.OrdinalIgnoreCase) = .WithKeys(ConfigurationKeys.DataStreamsMonitoring.Enabled) .AsBool() == null; + DataStreamsTransactionExtractors = config + .WithKeys(ConfigurationKeys.DataStreamsMonitoring.TransactionExtractors) + .AsString(string.Empty); + // no legacy headers if we are in "enbaled by default" state IsDataStreamsLegacyHeadersEnabled = config .WithKeys(ConfigurationKeys.DataStreamsMonitoring.LegacyHeadersEnabled) @@ -1207,6 +1211,11 @@ not null when string.Equals(value, "otlp", StringComparison.OrdinalIgnoreCase) = /// internal bool IsDataStreamsMonitoringInDefaultState { get; } + /// + /// Gets a raw value for DSM extractors + /// + internal string DataStreamsTransactionExtractors { get; } + /// /// Gets a value indicating whether data streams schema extraction is enabled or not. /// diff --git a/tracer/src/Datadog.Trace/Configuration/supported-configurations.yaml b/tracer/src/Datadog.Trace/Configuration/supported-configurations.yaml index aa637d4bacd7..59b85f334209 100644 --- a/tracer/src/Datadog.Trace/Configuration/supported-configurations.yaml +++ b/tracer/src/Datadog.Trace/Configuration/supported-configurations.yaml @@ -524,6 +524,15 @@ supportedConfigurations: Configuration key for enabling legacy binary headers in Data Streams Monitoring. false by default if DSM is in default state, true otherwise + DD_DATA_STREAMS_TRANSACTION_EXTRACTORS: + - implementation: A + type: array + default: null + product: DataStreamsMonitoring + const_name: TransactionExtractors + documentation: |- + Configuration key for transaction extractors in Data Streams Monitoring. + DD_DBM_PROPAGATION_MODE: - implementation: A type: string diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/Aggregation/DataStreamsAggregator.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/Aggregation/DataStreamsAggregator.cs index acef94d865d1..5f0e6c2528c4 100644 --- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/Aggregation/DataStreamsAggregator.cs +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/Aggregation/DataStreamsAggregator.cs @@ -8,6 +8,7 @@ using System; using System.Collections.Generic; using System.IO; +using Datadog.Trace.DataStreamsMonitoring.TransactionTracking; using Datadog.Trace.DataStreamsMonitoring.Utils; using Datadog.Trace.SourceGenerators; @@ -17,7 +18,7 @@ namespace Datadog.Trace.DataStreamsMonitoring.Aggregation; /// Aggregates multiple s into their correct buckets /// Note that this class is *not* thread safe /// -internal sealed class DataStreamsAggregator +internal sealed class DataStreamsAggregator(DataStreamsMessagePackFormatter formatter, int bucketDurationMs) { // The inner dictionary is constrained in size by the number of unique hashes seen by the app // Unique hashes are unique paths from origin to here, which could be unbounded if there are loops @@ -30,17 +31,15 @@ internal sealed class DataStreamsAggregator private readonly Dictionary> _backlogBuckets = new(); - private readonly DataStreamsMessagePackFormatter _formatter; + private readonly DataStreamsTransactionContainer _dataStreamsTransactionContainer = new(1024); + + private readonly DataStreamsMessagePackFormatter _formatter = formatter; private readonly DDSketchPool _sketchPool = new(); - private readonly long _bucketDurationInNs; + private readonly long _bucketDurationInNs = ((long)bucketDurationMs) * 1_000_000; private List? _statsToWrite; private List? _backlogsToWrite; - public DataStreamsAggregator(DataStreamsMessagePackFormatter formatter, int bucketDurationMs) - { - _formatter = formatter; - _bucketDurationInNs = ((long)bucketDurationMs) * 1_000_000; - } + internal bool ShouldFlushTransactions => _dataStreamsTransactionContainer.ShouldFlush; /// /// Add the stats point to the aggregated stats @@ -70,6 +69,11 @@ public void AddBacklog(in BacklogPoint point) } } + public void AddTransaction(in DataStreamsTransactionInfo transaction) + { + _dataStreamsTransactionContainer.Add(transaction); + } + /// /// Serialize the aggregated results using message pack /// @@ -80,9 +84,11 @@ public bool Serialize(Stream stream, long maxBucketFlushTimeNs) { var statsToAdd = Export(maxBucketFlushTimeNs) ?? new(); var backlogsToAdd = ExportBacklogs(maxBucketFlushTimeNs) ?? new(); - if (statsToAdd.Count > 0 || backlogsToAdd.Count > 0) + + var transactionData = _dataStreamsTransactionContainer.GetDataAndReset(); + if (statsToAdd.Count > 0 || backlogsToAdd.Count > 0 || transactionData.Length > 0) { - _formatter.Serialize(stream, _bucketDurationInNs, statsToAdd, backlogsToAdd); + _formatter.Serialize(stream, _bucketDurationInNs, statsToAdd, backlogsToAdd, transactionData); Clear(statsToAdd, backlogsToAdd); return true; diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/Aggregation/DataStreamsMessagePackFormatter.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/Aggregation/DataStreamsMessagePackFormatter.cs index a519b8407150..3f82be4959c1 100644 --- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/Aggregation/DataStreamsMessagePackFormatter.cs +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/Aggregation/DataStreamsMessagePackFormatter.cs @@ -12,6 +12,7 @@ using System.Threading; using Datadog.Trace.Configuration; using Datadog.Trace.ContinuousProfiler; +using Datadog.Trace.DataStreamsMonitoring.TransactionTracking; using Datadog.Trace.Vendors.Datadog.Sketches; using Datadog.Trace.Vendors.MessagePack; @@ -53,6 +54,9 @@ internal sealed class DataStreamsMessagePackFormatter private readonly byte[] _processTagsBytes = StringEncoding.UTF8.GetBytes("ProcessTags"); private readonly byte[] _isInDefaultStateBytes = StringEncoding.UTF8.GetBytes("IsInDefaultState"); + private readonly byte[] _transactions = StringEncoding.UTF8.GetBytes("Transactions"); + private readonly byte[] _transactionCheckpointIds = StringEncoding.UTF8.GetBytes("TransactionCheckpointIds"); + private byte[] _environmentValueBytes; private byte[] _serviceValueBytes; private ProcessTags? _processTags; @@ -116,11 +120,16 @@ private static long GetProductsMask(TracerSettings tracerSettings, ProfilerSetti return productsMask; } - public int Serialize(Stream stream, long bucketDurationNs, List statsBuckets, List backlogsBuckets) + public int Serialize( + Stream stream, + long bucketDurationNs, + List statsBuckets, + List backlogsBuckets, + byte[] transactionData) { + var hasTransactions = transactionData.Length > 0; var withProcessTags = _writeProcessTags && _processTags?.TagsList.Count > 0; var processTags = _writeProcessTags ? _processTags?.TagsList : null; - var bytesWritten = 0; // Should be in sync with Java @@ -146,11 +155,24 @@ public int Serialize(Stream stream, long bucketDurationNs, List 0 ? 1 : 0; count += backlogBucketCount > 0 ? 1 : 0; + count += hasTransactions ? 2 : 0; // 2-4 entries per StatsBucket (Backlogs and Stats are both optional): // https://github.com/DataDog/data-streams-go/blob/60ba06aec619850aef8ed0b9b1f0f5e310438362/datastreams/payload.go#L48 diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs index 907117b7f273..a026487d942e 100644 --- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs @@ -6,6 +6,7 @@ #nullable enable using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Datadog.Trace.Agent.DiscoveryService; @@ -13,6 +14,7 @@ using Datadog.Trace.ContinuousProfiler; using Datadog.Trace.DataStreamsMonitoring.Aggregation; using Datadog.Trace.DataStreamsMonitoring.Hashes; +using Datadog.Trace.DataStreamsMonitoring.TransactionTracking; using Datadog.Trace.Headers; using Datadog.Trace.Logging; using Datadog.Trace.PlatformHelpers; @@ -29,14 +31,14 @@ internal sealed class DataStreamsManager private static readonly AsyncLocal LastConsumePathway = new(); // saves the context on consume checkpointing only private readonly object _nodeHashUpdateLock = new(); private readonly ConcurrentDictionary _schemaRateLimiters = new(); + private readonly IDiscoveryService _discoveryService; + private readonly DataStreamsExtractorRegistry _registry; private readonly IDisposable _updateSubscription; private readonly bool _isLegacyDsmHeadersEnabled; - private readonly IDiscoveryService _discoveryService; // only saved to be able to unsubscribe private long _nodeHashBase; // note that this actually represents a `ulong` that we have done an unsafe cast for private MutableSettings _previousMutableSettings; private string? _previousContainerTagsHash; private bool _isEnabled; - private bool _isInDefaultState; private IDataStreamsWriter? _writer; public DataStreamsManager( @@ -48,8 +50,10 @@ public DataStreamsManager( _isLegacyDsmHeadersEnabled = tracerSettings.IsDataStreamsLegacyHeadersEnabled; _writer = writer; _discoveryService = discoveryService; - _isInDefaultState = tracerSettings.IsDataStreamsMonitoringInDefaultState; + IsInDefaultState = tracerSettings.IsDataStreamsMonitoringInDefaultState; + _registry = new DataStreamsExtractorRegistry(tracerSettings.DataStreamsTransactionExtractors); + Log.Debug(@"Data Streams extractors loaded: {AsJson}", _registry.AsJson()); _previousMutableSettings = tracerSettings.Manager.InitialMutableSettings; // even though the value will probably get updated by a callback when subscriptions happen just after, // we still need to initialize it to a value from initial settings in case no callback fire @@ -64,9 +68,11 @@ public bool IsEnabled get => Volatile.Read(ref _isEnabled); } - public bool IsInDefaultState + public bool IsInDefaultState { get; } + + public bool IsTransactionTrackingEnabled { - get => Volatile.Read(ref _isInDefaultState); + get => !IsInDefaultState && IsEnabled; } /// Callback for AgentConfiguration updates @@ -159,6 +165,11 @@ public async Task FlushAsync() where TCarrier : IBinaryHeadersCollection => IsEnabled ? DataStreamsContextPropagator.Instance.Extract(headers) : null; + public List? GetExtractorsByType(DataStreamsTransactionExtractor.Type extractorType) + { + return _registry.GetExtractorsByType(extractorType); + } + /// /// Injects a into headers /// @@ -175,6 +186,34 @@ public void InjectPathwayContext(PathwayContext? context, TCarrier hea DataStreamsContextPropagator.Instance.Inject(context.Value, headers, _isLegacyDsmHeadersEnabled); } + public void TrackTransaction(string transactionId, string checkpointName) + { + if (!IsEnabled) + { + return; + } + + var writer = Volatile.Read(ref _writer); + writer?.AddTransaction(new DataStreamsTransactionInfo( + transactionId, + DateTimeOffset.UtcNow.ToUnixTimeNanoseconds(), + checkpointName)); + } + + public void TrackTransaction(byte[] transactionIdBytes, string checkpointName) + { + if (!IsEnabled) + { + return; + } + + var writer = Volatile.Read(ref _writer); + writer?.AddTransaction(new DataStreamsTransactionInfo( + transactionIdBytes, + DateTimeOffset.UtcNow.ToUnixTimeNanoseconds(), + checkpointName)); + } + public void TrackBacklog(string tags, long value) { if (!IsEnabled) diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs index e6ebd333ad2c..7e9b8935ee05 100644 --- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs @@ -13,6 +13,7 @@ using Datadog.Trace.Configuration; using Datadog.Trace.ContinuousProfiler; using Datadog.Trace.DataStreamsMonitoring.Aggregation; +using Datadog.Trace.DataStreamsMonitoring.TransactionTracking; using Datadog.Trace.DataStreamsMonitoring.Transport; using Datadog.Trace.Logging; using Datadog.Trace.Util; @@ -29,6 +30,7 @@ internal sealed class DataStreamsWriter : IDataStreamsWriter private readonly long _bucketDurationMs; private readonly BoundedConcurrentQueue _buffer = new(queueLimit: 10_000); private readonly BoundedConcurrentQueue _backlogBuffer = new(queueLimit: 10_000); + private readonly BoundedConcurrentQueue _transactionBuffer = new(queueLimit: 10_000); private readonly TimeSpan _waitTimeSpan = TimeSpan.FromMilliseconds(10); private readonly TimeSpan _flushSemaphoreWaitTime = TimeSpan.FromSeconds(1); private readonly DataStreamsAggregator _aggregator; @@ -130,6 +132,24 @@ public void Add(in StatsPoint point) Interlocked.Increment(ref _pointsDropped); } + public void AddTransaction(in DataStreamsTransactionInfo transaction) + { + if (!Volatile.Read(ref _isInitialized)) + { + Initialize(); + } + + if (Volatile.Read(ref _isSupported) != SupportState.Unsupported) + { + if (_transactionBuffer.TryEnqueue(transaction)) + { + return; + } + } + + Interlocked.Increment(ref _pointsDropped); + } + public void AddBacklog(in BacklogPoint point) { if (!Volatile.Read(ref _isInitialized)) @@ -342,6 +362,16 @@ private void ProcessQueueLoop() { _aggregator.AddBacklog(in backlogPoint); } + + while (_transactionBuffer.TryDequeue(out var transactionPoint)) + { + _aggregator.AddTransaction(transactionPoint); + } + + if (_aggregator.ShouldFlushTransactions) + { + RequestFlush(); + } } catch (Exception ex) { diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/IDataStreamsWriter.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/IDataStreamsWriter.cs index e88ee3049aa5..ad146cb7539b 100644 --- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/IDataStreamsWriter.cs +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/IDataStreamsWriter.cs @@ -6,6 +6,7 @@ #nullable enable using System.Threading.Tasks; using Datadog.Trace.DataStreamsMonitoring.Aggregation; +using Datadog.Trace.DataStreamsMonitoring.TransactionTracking; namespace Datadog.Trace.DataStreamsMonitoring; @@ -15,6 +16,8 @@ internal interface IDataStreamsWriter void AddBacklog(in BacklogPoint point); + void AddTransaction(in DataStreamsTransactionInfo transaction); + Task FlushAsync(); Task DisposeAsync(); diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/SpanExtensions.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/SpanExtensions.cs index 58d923ba4046..98ebb1c066bc 100644 --- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/SpanExtensions.cs +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/SpanExtensions.cs @@ -9,6 +9,24 @@ namespace Datadog.Trace.DataStreamsMonitoring; internal static class SpanExtensions { + /// + /// Tracks a transaction at the given checkpoint and tags the span with the transaction ID. + /// + /// The span instance + /// The to use + /// The transaction identifier + /// The checkpoint name at which the transaction is being tracked + internal static void TrackTransaction(this Span span, DataStreamsManager? manager, string transactionId, string checkpointName) + { + if (manager is null) + { + return; + } + + span.SetTag("dsm.transaction.id", transactionId); + manager.TrackTransaction(transactionId, checkpointName); + } + /// /// Sets a DataStreams checkpoint and adds pathway tag to the span /// diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/TransactionTracking/DataStreamsExtractorRegistry.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/TransactionTracking/DataStreamsExtractorRegistry.cs new file mode 100644 index 000000000000..d8fb47e01583 --- /dev/null +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/TransactionTracking/DataStreamsExtractorRegistry.cs @@ -0,0 +1,46 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// +#nullable enable + +using System.Collections.Generic; +using Datadog.Trace.Util.Json; + +namespace Datadog.Trace.DataStreamsMonitoring.TransactionTracking; + +internal sealed class DataStreamsExtractorRegistry +{ + private readonly Dictionary> _extractors = new(); + + internal DataStreamsExtractorRegistry(string extractorsJson) + { + if (string.IsNullOrWhiteSpace(extractorsJson)) + { + return; + } + + var deserialized = JsonHelper.DeserializeObject>(extractorsJson); + if (deserialized == null) + { + return; + } + + foreach (var extractor in deserialized) + { + var list = _extractors.GetValueOrDefault(extractor.ExtractorType) ?? new(); + list.Add(extractor); + _extractors[extractor.ExtractorType] = list; + } + } + + internal string AsJson() + { + return JsonHelper.SerializeObject(_extractors); + } + + internal List? GetExtractorsByType(DataStreamsTransactionExtractor.Type extractorType) + { + return _extractors.GetValueOrDefault(extractorType); + } +} diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/TransactionTracking/DataStreamsTransactionContainer.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/TransactionTracking/DataStreamsTransactionContainer.cs new file mode 100644 index 000000000000..c06f053e5b98 --- /dev/null +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/TransactionTracking/DataStreamsTransactionContainer.cs @@ -0,0 +1,60 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// +#nullable enable + +using System; + +namespace Datadog.Trace.DataStreamsMonitoring.TransactionTracking; + +/// +/// Accumulates serialized transaction bytes for the DSM flush cycle. +/// This class is NOT thread-safe. All access must be serialized by the caller +/// (in practice, via DataStreamsWriter._flushSemaphore). +/// +internal sealed class DataStreamsTransactionContainer +{ + private const int MaxSizeBytes = 512 * 1024; + + private readonly int _initialByteSize; + + private byte[] _data; + private int _size; + + internal DataStreamsTransactionContainer(int initialSizeBytes) + { + _initialByteSize = initialSizeBytes; + _data = new byte[initialSizeBytes]; + } + + internal bool ShouldFlush => _size >= MaxSizeBytes; + + public void Add(DataStreamsTransactionInfo transactionInfo) + { + var byteCount = transactionInfo.GetByteCount(); + + if (_data.Length - _size < byteCount) + { + var resized = new byte[Math.Max(_data.Length * 2, _size + byteCount)]; + Array.Copy(_data, 0, resized, 0, _size); + _data = resized; + } + + transactionInfo.WriteTo(_data, _size); + _size += byteCount; + } + + public int Size() => _size; + + public byte[] GetDataAndReset() + { + // trim zeros + var result = new byte[_size]; + Array.Copy(_data, 0, result, 0, _size); + // reset buffer and position + _data = new byte[_initialByteSize]; + _size = 0; + return result; + } +} diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/TransactionTracking/DataStreamsTransactionExtractor.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/TransactionTracking/DataStreamsTransactionExtractor.cs new file mode 100644 index 000000000000..0cae89e1f2a2 --- /dev/null +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/TransactionTracking/DataStreamsTransactionExtractor.cs @@ -0,0 +1,58 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// +#nullable enable + +using System.Collections.Generic; +using Datadog.Trace.Vendors.Newtonsoft.Json; + +namespace Datadog.Trace.DataStreamsMonitoring.TransactionTracking; + +internal sealed class DataStreamsTransactionExtractor +{ + private static readonly Dictionary TypeMap = new(System.StringComparer.Ordinal) + { + ["HTTP_OUT_HEADERS"] = Type.HttpOutHeaders, + ["HTTP_IN_HEADERS"] = Type.HttpInHeaders, + ["KAFKA_CONSUME_HEADERS"] = Type.KafkaConsumeHeaders, + ["KAFKA_PRODUCE_HEADERS"] = Type.KafkaProduceHeaders, + }; + + private Type? _cachedType; + + public enum Type + { + Unknown, + + HttpOutHeaders, + + HttpInHeaders, + + KafkaConsumeHeaders, + + KafkaProduceHeaders, + } + + [JsonProperty(PropertyName = "name")] + public string Name { get; private set; } = string.Empty; + + [JsonProperty(PropertyName = "type")] + public string StringType { get; private set; } = string.Empty; + + [JsonProperty(PropertyName = "value")] + public string Value { get; private set; } = string.Empty; + + public Type ExtractorType + { + get + { + if (_cachedType is null) + { + _cachedType = TypeMap.TryGetValue(StringType, out var t) ? t : Type.Unknown; + } + + return _cachedType.Value; + } + } +} diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/TransactionTracking/DataStreamsTransactionInfo.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/TransactionTracking/DataStreamsTransactionInfo.cs new file mode 100644 index 000000000000..9f544560186a --- /dev/null +++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/TransactionTracking/DataStreamsTransactionInfo.cs @@ -0,0 +1,108 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// +#nullable enable + +using System; +using System.Collections.Concurrent; +using System.Text; +using System.Threading; + +namespace Datadog.Trace.DataStreamsMonitoring.TransactionTracking; + +internal readonly struct DataStreamsTransactionInfo +{ + private static readonly ConcurrentDictionary Cache = new(); + private static int _counter; + + private readonly byte[] _idBytes; + private readonly long _timestamp; + private readonly int _checkpointId; + + internal DataStreamsTransactionInfo(string id, long timestamp, string checkpoint) + { + _idBytes = Encoding.UTF8.GetBytes(id); + _timestamp = timestamp; + _checkpointId = Cache.GetOrAdd(checkpoint, Interlocked.Increment(ref _counter)); + } + + internal DataStreamsTransactionInfo(byte[] idBytes, long timestamp, string checkpoint) + { + _idBytes = idBytes; + _timestamp = timestamp; + _checkpointId = Cache.GetOrAdd(checkpoint, Interlocked.Increment(ref _counter)); + } + + internal long TimestampNs { get => _timestamp; } + + internal string TransactionId { get => Encoding.UTF8.GetString(_idBytes); } + + internal static byte[] GetCacheBytes() + { + var result = new byte[512]; + var index = 0; + + foreach (var pair in Cache) + { + var keyBytes = Encoding.UTF8.GetBytes(pair.Key); + // resize the buffer if needed + if (result.Length - index <= keyBytes.Length + 2) + { + var resized = new byte[result.Length * 2]; + Array.Copy(result, 0, resized, 0, result.Length); + result = resized; + } + + result[index] = (byte)pair.Value; + index++; + result[index] = (byte)(keyBytes.Length); + index++; + + Array.Copy(keyBytes, 0, result, index, keyBytes.Length); + index += keyBytes.Length; + } + + var trimmed = new byte[index]; + Array.Copy(result, trimmed, index); + return trimmed; + } + + // ClearCache is for using in tests only — resets both the map and the counter so IDs are deterministic + internal static void ClearCache() + { + Cache.Clear(); + Interlocked.Exchange(ref _counter, 0); + } + + internal int GetByteCount() => _idBytes.Length + 10; + + internal void WriteTo(byte[] buffer, int offset) + { + // up to 1 byte for checkpoint id + buffer[offset] = (byte)_checkpointId; + + // 8 bytes for timestamp + buffer[offset + 1] = (byte)(_timestamp >> 56); + buffer[offset + 2] = (byte)(_timestamp >> 48); + buffer[offset + 3] = (byte)(_timestamp >> 40); + buffer[offset + 4] = (byte)(_timestamp >> 32); + buffer[offset + 5] = (byte)(_timestamp >> 24); + buffer[offset + 6] = (byte)(_timestamp >> 16); + buffer[offset + 7] = (byte)(_timestamp >> 8); + buffer[offset + 8] = (byte)_timestamp; + + // id size, up to 256 bytes + buffer[offset + 9] = (byte)_idBytes.Length; + + // copy the ID + Array.Copy(_idBytes, 0, buffer, offset + 10, _idBytes.Length); + } + + internal byte[] GetBytes() + { + var result = new byte[GetByteCount()]; + WriteTo(result, 0); + return result; + } +} diff --git a/tracer/src/Datadog.Trace/Generated/net461/Datadog.Trace.SourceGenerators/ConfigurationKeysGenerator/ConfigurationKeys.DataStreamsMonitoring.g.cs b/tracer/src/Datadog.Trace/Generated/net461/Datadog.Trace.SourceGenerators/ConfigurationKeysGenerator/ConfigurationKeys.DataStreamsMonitoring.g.cs index 6d26b13501d8..7e39ebcaca84 100644 --- a/tracer/src/Datadog.Trace/Generated/net461/Datadog.Trace.SourceGenerators/ConfigurationKeysGenerator/ConfigurationKeys.DataStreamsMonitoring.g.cs +++ b/tracer/src/Datadog.Trace/Generated/net461/Datadog.Trace.SourceGenerators/ConfigurationKeysGenerator/ConfigurationKeys.DataStreamsMonitoring.g.cs @@ -29,5 +29,11 @@ internal static class DataStreamsMonitoring /// /// public const string LegacyHeadersEnabled = "DD_DATA_STREAMS_LEGACY_HEADERS"; + + /// + /// Configuration key for transaction extractors in Data Streams Monitoring. + /// + /// + public const string TransactionExtractors = "DD_DATA_STREAMS_TRANSACTION_EXTRACTORS"; } } diff --git a/tracer/src/Datadog.Trace/Generated/net6.0/Datadog.Trace.SourceGenerators/ConfigurationKeysGenerator/ConfigurationKeys.DataStreamsMonitoring.g.cs b/tracer/src/Datadog.Trace/Generated/net6.0/Datadog.Trace.SourceGenerators/ConfigurationKeysGenerator/ConfigurationKeys.DataStreamsMonitoring.g.cs index 6d26b13501d8..7e39ebcaca84 100644 --- a/tracer/src/Datadog.Trace/Generated/net6.0/Datadog.Trace.SourceGenerators/ConfigurationKeysGenerator/ConfigurationKeys.DataStreamsMonitoring.g.cs +++ b/tracer/src/Datadog.Trace/Generated/net6.0/Datadog.Trace.SourceGenerators/ConfigurationKeysGenerator/ConfigurationKeys.DataStreamsMonitoring.g.cs @@ -29,5 +29,11 @@ internal static class DataStreamsMonitoring /// /// public const string LegacyHeadersEnabled = "DD_DATA_STREAMS_LEGACY_HEADERS"; + + /// + /// Configuration key for transaction extractors in Data Streams Monitoring. + /// + /// + public const string TransactionExtractors = "DD_DATA_STREAMS_TRANSACTION_EXTRACTORS"; } } diff --git a/tracer/src/Datadog.Trace/Generated/netcoreapp3.1/Datadog.Trace.SourceGenerators/ConfigurationKeysGenerator/ConfigurationKeys.DataStreamsMonitoring.g.cs b/tracer/src/Datadog.Trace/Generated/netcoreapp3.1/Datadog.Trace.SourceGenerators/ConfigurationKeysGenerator/ConfigurationKeys.DataStreamsMonitoring.g.cs index 6d26b13501d8..7e39ebcaca84 100644 --- a/tracer/src/Datadog.Trace/Generated/netcoreapp3.1/Datadog.Trace.SourceGenerators/ConfigurationKeysGenerator/ConfigurationKeys.DataStreamsMonitoring.g.cs +++ b/tracer/src/Datadog.Trace/Generated/netcoreapp3.1/Datadog.Trace.SourceGenerators/ConfigurationKeysGenerator/ConfigurationKeys.DataStreamsMonitoring.g.cs @@ -29,5 +29,11 @@ internal static class DataStreamsMonitoring /// /// public const string LegacyHeadersEnabled = "DD_DATA_STREAMS_LEGACY_HEADERS"; + + /// + /// Configuration key for transaction extractors in Data Streams Monitoring. + /// + /// + public const string TransactionExtractors = "DD_DATA_STREAMS_TRANSACTION_EXTRACTORS"; } } diff --git a/tracer/src/Datadog.Trace/Generated/netstandard2.0/Datadog.Trace.SourceGenerators/ConfigurationKeysGenerator/ConfigurationKeys.DataStreamsMonitoring.g.cs b/tracer/src/Datadog.Trace/Generated/netstandard2.0/Datadog.Trace.SourceGenerators/ConfigurationKeysGenerator/ConfigurationKeys.DataStreamsMonitoring.g.cs index 6d26b13501d8..7e39ebcaca84 100644 --- a/tracer/src/Datadog.Trace/Generated/netstandard2.0/Datadog.Trace.SourceGenerators/ConfigurationKeysGenerator/ConfigurationKeys.DataStreamsMonitoring.g.cs +++ b/tracer/src/Datadog.Trace/Generated/netstandard2.0/Datadog.Trace.SourceGenerators/ConfigurationKeysGenerator/ConfigurationKeys.DataStreamsMonitoring.g.cs @@ -29,5 +29,11 @@ internal static class DataStreamsMonitoring /// /// public const string LegacyHeadersEnabled = "DD_DATA_STREAMS_LEGACY_HEADERS"; + + /// + /// Configuration key for transaction extractors in Data Streams Monitoring. + /// + /// + public const string TransactionExtractors = "DD_DATA_STREAMS_TRANSACTION_EXTRACTORS"; } } diff --git a/tracer/src/Datadog.Trace/PlatformHelpers/AspNetCoreHttpRequestHandler.cs b/tracer/src/Datadog.Trace/PlatformHelpers/AspNetCoreHttpRequestHandler.cs index 40acefc70362..3f33cce9e60c 100644 --- a/tracer/src/Datadog.Trace/PlatformHelpers/AspNetCoreHttpRequestHandler.cs +++ b/tracer/src/Datadog.Trace/PlatformHelpers/AspNetCoreHttpRequestHandler.cs @@ -16,6 +16,7 @@ using Datadog.Trace.AppSec.Coordinator; using Datadog.Trace.ClrProfiler.AutoInstrumentation.Proxy; using Datadog.Trace.Configuration; +using Datadog.Trace.DataStreamsMonitoring.TransactionTracking; using Datadog.Trace.DiagnosticListeners; using Datadog.Trace.DuckTyping; using Datadog.Trace.ExtensionMethods; @@ -136,6 +137,25 @@ private Scope StartAspNetCorePipelineScope(Tracer tracer, Security security, Ias var scope = tracer.StartActiveInternal(_requestInOperationName, extractedContext.SpanContext, tags: tags, links: extractedContext.Links); scope.Span.DecorateWebServerSpan(resourceName, httpMethod, host, url, userAgent, tags); + var dataStreamsManager = tracer.TracerManager.DataStreamsManager; + if (dataStreamsManager.IsTransactionTrackingEnabled) + { + var extractors = dataStreamsManager.GetExtractorsByType(DataStreamsTransactionExtractor.Type.HttpInHeaders); + if (extractors != null) + { + foreach (var extractor in extractors) + { + if (request.Headers.TryGetValue(extractor.Value, out var headerValues)) + { + foreach (var headerValue in headerValues) + { + dataStreamsManager.TrackTransaction(headerValue, extractor.Name); + } + } + } + } + } + var headerTagsInternal = tracer.CurrentTraceSettings.Settings.HeaderTags; if (headerTagsInternal.Count != 0) { diff --git a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsAggregatorTests.cs b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsAggregatorTests.cs index d3fef2c365cc..2d645100e0bc 100644 --- a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsAggregatorTests.cs +++ b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsAggregatorTests.cs @@ -11,6 +11,7 @@ using Datadog.Trace.DataStreamsMonitoring; using Datadog.Trace.DataStreamsMonitoring.Aggregation; using Datadog.Trace.DataStreamsMonitoring.Hashes; +using Datadog.Trace.DataStreamsMonitoring.TransactionTracking; using Datadog.Trace.DataStreamsMonitoring.Utils; using Datadog.Trace.ExtensionMethods; using Datadog.Trace.TestHelpers.TestTracer; @@ -127,6 +128,68 @@ public async Task Aggregator_FlushesStats() AssertBucket(stats, hash: 3, CreateSketch(5), CreateSketch(2)); } + [Fact] + public async Task Serialize_ReturnsTrue_WhenOnlyTransactionsPresent() + { + await using var tracer = TracerHelper.Create(); + var aggregator = new DataStreamsAggregator( + new DataStreamsMessagePackFormatter(tracer.Settings, new ProfilerSettings(ProfilerState.Disabled)), + BucketDurationMs); + + aggregator.AddTransaction(new DataStreamsTransactionInfo("tx-1", DateTimeOffset.UtcNow.ToUnixTimeNanoseconds(), "checkpoint")); + + using var stream = new MemoryStream(); + aggregator.Serialize(stream, long.MaxValue).Should().BeTrue(); + + // second call must return false — transactions were already consumed on the first call + using var stream2 = new MemoryStream(); + aggregator.Serialize(stream2, long.MaxValue).Should().BeFalse(); + } + + [Fact] + public async Task Serialize_ReturnsFalse_WhenEmpty() + { + await using var tracer = TracerHelper.Create(); + var aggregator = new DataStreamsAggregator( + new DataStreamsMessagePackFormatter(tracer.Settings, new ProfilerSettings(ProfilerState.Disabled)), + BucketDurationMs); + + using var stream = new MemoryStream(); + aggregator.Serialize(stream, long.MaxValue).Should().BeFalse(); + } + + [Fact] + public async Task ShouldFlushTransactions_ReturnsFalse_WhenContainerIsSmall() + { + await using var tracer = TracerHelper.Create(); + var aggregator = new DataStreamsAggregator( + new DataStreamsMessagePackFormatter(tracer.Settings, new ProfilerSettings(ProfilerState.Disabled)), + BucketDurationMs); + + aggregator.AddTransaction(new DataStreamsTransactionInfo("id", 1L, "cp")); + aggregator.ShouldFlushTransactions.Should().BeFalse(); + } + + [Fact] + public async Task ShouldFlushTransactions_ReturnsTrue_WhenContainerExceedsThreshold() + { + await using var tracer = TracerHelper.Create(); + var aggregator = new DataStreamsAggregator( + new DataStreamsMessagePackFormatter(tracer.Settings, new ProfilerSettings(ProfilerState.Disabled)), + BucketDurationMs); + + var id = new string('x', 512); + var byteCount = new DataStreamsTransactionInfo(id, 0L, "cp").GetByteCount(); + var count = (512 * 1024 / byteCount) + 1; + + for (var i = 0; i < count; i++) + { + aggregator.AddTransaction(new DataStreamsTransactionInfo(id, (long)i, "cp")); + } + + aggregator.ShouldFlushTransactions.Should().BeTrue(); + } + private static DataStreamsAggregator CreateAggregatorWithData(Tracer tracer, long t1, long t2) { var aggregator = new DataStreamsAggregator( diff --git a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsExtractorRegistryTest.cs b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsExtractorRegistryTest.cs new file mode 100644 index 000000000000..55f9c4002ce7 --- /dev/null +++ b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsExtractorRegistryTest.cs @@ -0,0 +1,21 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// +#nullable enable + +using Datadog.Trace.DataStreamsMonitoring.TransactionTracking; +using FluentAssertions; +using Xunit; + +namespace Datadog.Trace.Tests.DataStreamsMonitoring; + +public class DataStreamsExtractorRegistryTest +{ + [Fact] + public void DeserializeCorrectly() + { + var registry = new DataStreamsExtractorRegistry("[{\"name\": \"transaction-origin\", \"type\": \"HTTP_OUT_HEADERS\", \"value\": \"transaction-id\"}]"); + registry.AsJson().Should().Be("{\"HttpOutHeaders\":[{\"name\":\"transaction-origin\",\"type\":\"HTTP_OUT_HEADERS\",\"value\":\"transaction-id\",\"ExtractorType\":1}]}"); + } +} diff --git a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs index d7e58b90d9bb..a18ce53a0468 100644 --- a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs +++ b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs @@ -13,6 +13,7 @@ using Datadog.Trace.DataStreamsMonitoring; using Datadog.Trace.DataStreamsMonitoring.Aggregation; using Datadog.Trace.DataStreamsMonitoring.Hashes; +using Datadog.Trace.DataStreamsMonitoring.TransactionTracking; using Datadog.Trace.ExtensionMethods; using Datadog.Trace.PlatformHelpers; using Datadog.Trace.TestHelpers.FluentAssertionsExtensions; @@ -126,6 +127,15 @@ public void WhenEnabled_TracksBacklog() point.Tags.Should().Be(tags); } + [Fact] + public void WhenEnabled_TracksTransactions() + { + var dsm = GetDataStreamManager(true, out var writer); + dsm.TrackTransaction("transaction-id", "checkpoint"); + + writer.DataStreamsTransactions.Size().Should().BeGreaterThan(0); + } + [Fact] public async Task WhenEnabled_TimeInQueueIsNotUsedForSecondCheckpoint() { @@ -249,6 +259,37 @@ public void WhenEnabled_SetCheckpoint_SetsSpanTags() span.Tags.GetTag("pathway.hash").Should().NotBeNull(); } + [Fact] + public void WhenEnabled_TrackTransaction_AddsTransactionAndTagsSpan() + { + var dsm = GetDataStreamManager(true, out var writer); + var span = new Span(new SpanContext(traceId: 123, spanId: 456), DateTimeOffset.UtcNow); + + span.TrackTransaction(dsm, "tx-abc", "some-checkpoint"); + + writer.DataStreamsTransactions.GetDataAndReset().Should().NotBeEmpty(); + span.Tags.GetTag("dsm.transaction.id").Should().Be("tx-abc"); + } + + [Fact] + public void WhenDisabled_TrackTransaction_DoesNothing() + { + var dsm = GetDataStreamManager(false, out _); + var span = new Span(new SpanContext(traceId: 123, spanId: 456), DateTimeOffset.UtcNow); + + var act = () => span.TrackTransaction(dsm, "tx-abc", "some-checkpoint"); + act.Should().NotThrow(); + } + + [Fact] + public void WhenManagerIsNull_TrackTransaction_DoesNothing() + { + var span = new Span(new SpanContext(traceId: 123, spanId: 456), DateTimeOffset.UtcNow); + + var act = () => span.TrackTransaction(null, "tx-abc", "some-checkpoint"); + act.Should().NotThrow(); + } + [Fact] public async Task WhenEnabled_OneConsumeTwoProduceUsesTwiceConsumePathway() { @@ -358,6 +399,8 @@ internal class DataStreamsWriterMock : IDataStreamsWriter public ConcurrentQueue BacklogPoints { get; } = new(); + public DataStreamsTransactionContainer DataStreamsTransactions { get; } = new(1024); + public int DisposeCount => Volatile.Read(ref _disposeCount); public void Add(in StatsPoint point) @@ -365,6 +408,11 @@ public void Add(in StatsPoint point) Points.Enqueue(point); } + public void AddTransaction(in DataStreamsTransactionInfo transaction) + { + DataStreamsTransactions.Add(transaction); + } + public void AddBacklog(in BacklogPoint point) { BacklogPoints.Enqueue(point); diff --git a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsMessagePackFormatterTests.cs b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsMessagePackFormatterTests.cs index 242d91ee6e1e..782e7badde7c 100644 --- a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsMessagePackFormatterTests.cs +++ b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsMessagePackFormatterTests.cs @@ -91,7 +91,7 @@ public void CanRoundTripMessagePackFormat() }; using var ms = new MemoryStream(); - formatter.Serialize(ms, bucketDurationNs: bucketDuration, statsBuckets: buckets, backlogBuckets); + formatter.Serialize(ms, bucketDurationNs: bucketDuration, statsBuckets: buckets, backlogBuckets, System.Array.Empty()); var data = new ArraySegment(ms.GetBuffer()); @@ -192,7 +192,7 @@ public void ProcessTagsGetWritten() var formatter = new DataStreamsMessagePackFormatter(settings, new ProfilerSettings(ProfilerState.Disabled)); using var ms = new MemoryStream(); - formatter.Serialize(ms, bucketDuration, [], []); + formatter.Serialize(ms, bucketDuration, [], [], System.Array.Empty()); var result = MessagePackSerializer.Deserialize(new ArraySegment(ms.GetBuffer())); // content varies depending on how the tests are run, so we cannot really assert on the content. diff --git a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsTransactionCacheTestCollection.cs b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsTransactionCacheTestCollection.cs new file mode 100644 index 000000000000..28c5c241103a --- /dev/null +++ b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsTransactionCacheTestCollection.cs @@ -0,0 +1,15 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +using Xunit; + +namespace Datadog.Trace.Tests.DataStreamsMonitoring; + +// Tests that assert on specific checkpoint IDs in the static cache must not run in parallel with other +// tests that create DataStreamsTransactionInfo objects — they share a static ConcurrentDictionary + counter. +[CollectionDefinition(nameof(DataStreamsTransactionCacheTestCollection), DisableParallelization = true)] +public class DataStreamsTransactionCacheTestCollection +{ +} diff --git a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsTransactionContainerTest.cs b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsTransactionContainerTest.cs new file mode 100644 index 000000000000..0e2de3119a8f --- /dev/null +++ b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsTransactionContainerTest.cs @@ -0,0 +1,93 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +using System.Linq; +using Datadog.Trace.DataStreamsMonitoring.TransactionTracking; +using FluentAssertions; +using Xunit; + +namespace Datadog.Trace.Tests.DataStreamsMonitoring; + +[Collection(nameof(DataStreamsTransactionCacheTestCollection))] +public class DataStreamsTransactionContainerTest +{ + [Fact] + public void ZerosAreTrimmedWhenSerialized() + { + DataStreamsTransactionInfo.ClearCache(); + var container = new DataStreamsTransactionContainer(1024); + container.GetDataAndReset().Should().BeEmpty(); + + var transaction = new DataStreamsTransactionInfo("1", 1, "1"); + container.Add(transaction); + var bytes = container.GetDataAndReset(); + bytes.Should().BeEquivalentTo(new byte[] { 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 49 }); + } + + [Fact] + public void Add_WhenRemainingSpaceExactlyEqualsTransactionSize_StoresDataCorrectly() + { + var transaction = new DataStreamsTransactionInfo("id", 1L, "cp"); + var expectedBytes = transaction.GetBytes(); + + // buffer sized to exactly fit one transaction — no slack + var container = new DataStreamsTransactionContainer(expectedBytes.Length); + container.Add(transaction); + container.GetDataAndReset().Should().BeEquivalentTo(expectedBytes); + } + + [Fact] + public void Add_MultipleTransactions_AccumulatesBytes() + { + var t1 = new DataStreamsTransactionInfo("aaa", 10L, "cp1"); + var t2 = new DataStreamsTransactionInfo("bbb", 20L, "cp2"); + var expected = t1.GetBytes().Concat(t2.GetBytes()).ToArray(); + + var container = new DataStreamsTransactionContainer(1024); + container.Add(t1); + container.Add(t2); + container.GetDataAndReset().Should().BeEquivalentTo(expected); + } + + [Fact] + public void ShouldFlush_ReturnsFalse_WhenBelowThreshold() + { + var container = new DataStreamsTransactionContainer(1024); + container.Add(new DataStreamsTransactionInfo("id", 1L, "cp")); + container.ShouldFlush.Should().BeFalse(); + } + + [Fact] + public void ShouldFlush_ReturnsTrue_WhenThresholdExceeded() + { + var id = new string('x', 512); + var byteCount = new DataStreamsTransactionInfo(id, 0L, "cp").GetByteCount(); + var count = (512 * 1024 / byteCount) + 1; + + var container = new DataStreamsTransactionContainer(1024); + for (var i = 0; i < count; i++) + { + container.Add(new DataStreamsTransactionInfo(id, (long)i, "cp")); + } + + container.ShouldFlush.Should().BeTrue(); + } + + [Fact] + public void Add_DoesNotDropData_WhenAboveFlushThreshold() + { + var id = new string('x', 512); + var byteCount = new DataStreamsTransactionInfo(id, 0L, "cp").GetByteCount(); + var count = (512 * 1024 / byteCount) + 10; + + var container = new DataStreamsTransactionContainer(1024); + for (var i = 0; i < count; i++) + { + container.Add(new DataStreamsTransactionInfo(id, (long)i, "cp")); + } + + container.Size().Should().Be(count * byteCount); + } +} diff --git a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsTransactionExtractorTests.cs b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsTransactionExtractorTests.cs new file mode 100644 index 000000000000..e9168e93c33a --- /dev/null +++ b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsTransactionExtractorTests.cs @@ -0,0 +1,39 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// +#nullable enable + +using System.Linq; +using Datadog.Trace.DataStreamsMonitoring.TransactionTracking; +using FluentAssertions; +using Xunit; + +namespace Datadog.Trace.Tests.DataStreamsMonitoring; + +public class DataStreamsTransactionExtractorTests +{ + [Theory] + [InlineData("HTTP_OUT_HEADERS", 1)] + [InlineData("HTTP_IN_HEADERS", 2)] + [InlineData("KAFKA_CONSUME_HEADERS", 3)] + [InlineData("KAFKA_PRODUCE_HEADERS", 4)] + [InlineData("UNKNOWN_STUFF", 0)] + [InlineData("", 0)] + public void ExtractorType_ReturnsCorrectType_ForTypeString(string stringType, int expectedInt) + { + var expected = (DataStreamsTransactionExtractor.Type)expectedInt; + var json = $"[{{\"name\": \"n\", \"type\": \"{stringType}\", \"value\": \"v\"}}]"; + var registry = new DataStreamsExtractorRegistry(json); + registry.GetExtractorsByType(expected).Should().ContainSingle() + .Which.ExtractorType.Should().Be(expected); + } + + [Fact] + public void ExtractorType_ReturnsSameValue_OnMultipleCalls() + { + var registry = new DataStreamsExtractorRegistry("[{\"name\": \"n\", \"type\": \"HTTP_OUT_HEADERS\", \"value\": \"v\"}]"); + var extractor = registry.GetExtractorsByType(DataStreamsTransactionExtractor.Type.HttpOutHeaders)!.Single(); + extractor.ExtractorType.Should().Be(extractor.ExtractorType); + } +} diff --git a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsTransactionInfoTest.cs b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsTransactionInfoTest.cs new file mode 100644 index 000000000000..9e162226f6da --- /dev/null +++ b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsTransactionInfoTest.cs @@ -0,0 +1,66 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +using Datadog.Trace.DataStreamsMonitoring.TransactionTracking; +using FluentAssertions; +using Xunit; + +namespace Datadog.Trace.Tests.DataStreamsMonitoring; + +[Collection(nameof(DataStreamsTransactionCacheTestCollection))] +public class DataStreamsTransactionInfoTest +{ + [Fact] + public void TransactionInfoSerializesCorrectly() + { + DataStreamsTransactionInfo.ClearCache(); + var transaction = new DataStreamsTransactionInfo("1", 1, "1"); + transaction.TimestampNs.Should().Be(1); + transaction.GetBytes().Should().BeEquivalentTo(new byte[] { 1, 0, 0, 0, 0, 0, 0, 0, 1, 1, 49 }); + } + + [Fact] + public void TransactionInfoCacheSerializesCorrectly() + { + DataStreamsTransactionInfo.ClearCache(); + _ = new DataStreamsTransactionInfo("1", 1, "1"); + var cacheBytes = DataStreamsTransactionInfo.GetCacheBytes(); + cacheBytes.Should().BeEquivalentTo(new byte[] { 1, 1, 49 }); + } + + [Fact] + public void GetBytes_ReturnsSameBytesOnSubsequentCalls() + { + var transaction = new DataStreamsTransactionInfo("tx-abc-123", 9_876_543_210L, "some-checkpoint"); + transaction.GetBytes().Should().BeEquivalentTo(transaction.GetBytes()); + } + + [Fact] + public void WriteTo_WritesSameBytesAsGetBytes() + { + var transaction = new DataStreamsTransactionInfo("tx-id", 42L, "checkpoint"); + var expected = transaction.GetBytes(); + + var buffer = new byte[expected.Length + 4]; // extra padding to catch over-writes + transaction.WriteTo(buffer, 2); + + buffer[0].Should().Be(0); // padding before offset untouched + buffer[1].Should().Be(0); + for (var i = 0; i < expected.Length; i++) + { + buffer[2 + i].Should().Be(expected[i]); + } + + buffer[2 + expected.Length].Should().Be(0); // padding after untouched + buffer[3 + expected.Length].Should().Be(0); + } + + [Fact] + public void GetByteCount_MatchesGetBytesLength() + { + var transaction = new DataStreamsTransactionInfo("hello-world", 12345L, "my-cp"); + transaction.GetByteCount().Should().Be(transaction.GetBytes().Length); + } +} diff --git a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsWriterTests.cs b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsWriterTests.cs index d17450feb4ca..0e5cdba55196 100644 --- a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsWriterTests.cs +++ b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsWriterTests.cs @@ -15,6 +15,7 @@ using Datadog.Trace.DataStreamsMonitoring; using Datadog.Trace.DataStreamsMonitoring.Aggregation; using Datadog.Trace.DataStreamsMonitoring.Hashes; +using Datadog.Trace.DataStreamsMonitoring.TransactionTracking; using Datadog.Trace.DataStreamsMonitoring.Transport; using Datadog.Trace.ExtensionMethods; using Datadog.Trace.TestHelpers.DataStreamsMonitoring; @@ -67,6 +68,44 @@ public async Task DoesNotWriteIfNoStats_OnClose(bool? isSupported) api.Sent.Should().BeEmpty(); } + [Fact] + public async Task WhenSupported_TracksTransactions() + { + var bucketDurationMs = 100; + var api = new StubApi(); + var writer = CreateWriter(api, out var discovery, bucketDurationMs); + TriggerSupportUpdate(discovery, isSupported: true); + + writer.AddTransaction(new DataStreamsTransactionInfo("id", 1, "checkpoint")); + await api.WaitForCount(1, 30_000); + + HasOneOrTwoPoints(api); + await writer.DisposeAsync(); + } + + [Fact] + public async Task WhenSupported_TriggersEarlyFlush_WhenTransactionsExceedThreshold() + { + var bucketDuration = int.MaxValue; // timer will never fire + var api = new StubApi(); + var writer = CreateWriter(api, out var discovery, bucketDuration); + TriggerSupportUpdate(discovery, isSupported: true); + + var id = new string('x', 512); + var byteCount = new DataStreamsTransactionInfo(id, 0L, "cp").GetByteCount(); + var count = (512 * 1024 / byteCount) + 1; + + for (var i = 0; i < count; i++) + { + writer.AddTransaction(new DataStreamsTransactionInfo(id, (long)i, "cp")); + } + + await api.WaitForCount(1, 30_000); + api.Sent.Should().NotBeEmpty(); + + await writer.DisposeAsync(); + } + [Fact] public async Task WhenSupported_WritesAStatsPointAfterDelay() { diff --git a/tracer/test/Datadog.Trace.Tests/Telemetry/config_norm_rules.json b/tracer/test/Datadog.Trace.Tests/Telemetry/config_norm_rules.json index bc808d44bf8d..38c4ae54149b 100644 --- a/tracer/test/Datadog.Trace.Tests/Telemetry/config_norm_rules.json +++ b/tracer/test/Datadog.Trace.Tests/Telemetry/config_norm_rules.json @@ -379,6 +379,7 @@ "DD_TRACE_X_DATADOG_TAGS_MAX_LENGTH": "trace_x_datadog_tags_max_length", "DD_DATA_STREAMS_ENABLED": "data_streams_enabled", "DD_DATA_STREAMS_LEGACY_HEADERS": "data_streams_legacy_headers", + "DD_DATA_STREAMS_TRANSACTION_EXTRACTORS": "data_streams_transaction_extractors", "DD_CIVISIBILITY_ENABLED": "ci_visibility_enabled", "DD_CIVISIBILITY_AGENTLESS_ENABLED": "ci_visibility_agentless_enabled", "DD_CIVISIBILITY_AGENTLESS_URL": "ci_visibility_agentless_url",