Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading;
using Datadog.Trace.ClrProfiler.CallTarget;
using Datadog.Trace.Configuration;
using Datadog.Trace.DataStreamsMonitoring.TransactionTracking;
using Datadog.Trace.Propagators;

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Http.HttpClient
Expand Down Expand Up @@ -53,6 +54,25 @@ public static CallTargetState OnMethodBegin<TTarget, TRequest>(TTarget instance,
tracer.TracerManager.Telemetry.IntegrationGeneratedSpan(implementationIntegrationId ?? integrationId);
return new CallTargetState(scope);
}

var dataStreamsManager = tracer.TracerManager.DataStreamsManager;
if (dataStreamsManager.IsEnabled)
{
var extractors = dataStreamsManager.GetExtractorsByType(DataStreamsTransactionExtractor.Type.HttpOutHeaders);
if (extractors != null)
{
foreach (var extractor in extractors)
{
if (headers.TryGetValues(extractor.Value, out var headerValues))
{
foreach (var headerValue in headerValues)
{
dataStreamsManager.TrackTransaction(headerValue, extractor.Name);
}
}
}
}
}
}

return CallTargetState.GetDefault();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Threading;
using Datadog.Trace.Configuration.Schema;
using Datadog.Trace.DataStreamsMonitoring;
using Datadog.Trace.DataStreamsMonitoring.TransactionTracking;
using Datadog.Trace.DataStreamsMonitoring.Utils;
using Datadog.Trace.DuckTyping;
using Datadog.Trace.Logging;
Expand Down Expand Up @@ -288,6 +289,8 @@ private static long GetMessageSize<T>(T message)
var base64PathwayContext = Convert.ToBase64String(BitConverter.GetBytes(span.Context.PathwayContext.Value.Hash.Value));
message.Headers.Add(DataStreamsPropagationHeaders.TemporaryBase64PathwayContext, Encoding.UTF8.GetBytes(base64PathwayContext));
}

ApplyDataStreamsExtractors(dataStreamsManager, DataStreamsTransactionExtractor.Type.KafkaConsumeHeaders, message);
}
}
catch (Exception ex)
Expand Down Expand Up @@ -327,6 +330,30 @@ internal static void CloseConsumerScope(Tracer tracer)
}
}

internal static void ApplyDataStreamsExtractors<TMessage>(
DataStreamsManager dataStreamsManager,
DataStreamsTransactionExtractor.Type extractorType,
TMessage? message)
where TMessage : IMessage
{
if (message?.Instance == null)
{
return;
}

var extractors = dataStreamsManager.GetExtractorsByType(extractorType);
if (extractors != null && message.Headers != null)
{
foreach (var extractor in extractors)
{
if (message.Headers.TryGetLastBytes(extractor.Value, out var transactionId))
{
dataStreamsManager.TrackTransaction(Encoding.UTF8.GetString(transactionId), extractor.Name);
}
}
}
}

/// <summary>
/// Try to inject the prop
/// </summary>
Expand Down Expand Up @@ -392,6 +419,7 @@ internal static void TryInjectHeaders<TTopicPartitionMarker, TMessage>(
}

dataStreamsManager.InjectPathwayContext(span.Context.PathwayContext, adapter);
ApplyDataStreamsExtractors(dataStreamsManager, DataStreamsTransactionExtractor.Type.KafkaProduceHeaders, message);
}
}
catch (Exception ex)
Expand Down
9 changes: 9 additions & 0 deletions tracer/src/Datadog.Trace/Configuration/TracerSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,10 @@ not null when string.Equals(value, "otlp", StringComparison.OrdinalIgnoreCase) =
.WithKeys(ConfigurationKeys.DataStreamsMonitoring.Enabled)
.AsBool() == null;

DataStreamsTransactionExtractors = config
.WithKeys(ConfigurationKeys.DataStreamsMonitoring.TransactionExtractors)
.AsString(string.Empty);

// no legacy headers if we are in "enbaled by default" state
IsDataStreamsLegacyHeadersEnabled = config
.WithKeys(ConfigurationKeys.DataStreamsMonitoring.LegacyHeadersEnabled)
Expand Down Expand Up @@ -1203,6 +1207,11 @@ not null when string.Equals(value, "otlp", StringComparison.OrdinalIgnoreCase) =
/// </summary>
internal bool IsDataStreamsMonitoringInDefaultState { get; }

/// <summary>
/// Gets a raw value for DSM extractors
/// </summary>
internal string DataStreamsTransactionExtractors { get; }

/// <summary>
/// Gets a value indicating whether data streams schema extraction is enabled or not.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,15 @@ supportedConfigurations:
Configuration key for enabling legacy binary headers in Data Streams Monitoring.
false by default if DSM is in default state, true otherwise
<see cref="Datadog.Trace.Configuration.TracerSettings.IsDataStreamsLegacyHeadersEnabled"/>
DD_DATA_STREAMS_TRANSACTION_EXTRACTORS:
- implementation: A
type: array
default: null
product: DataStreamsMonitoring
const_name: TransactionExtractors
documentation: |-
Configuration key for transaction extractors in Data Streams Monitoring.
<see cref="Datadog.Trace.Configuration.TracerSettings.DataStreamsTransactionExtractors"/>
DD_DBM_PROPAGATION_MODE:
- implementation: A
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.IO;
using Datadog.Trace.DataStreamsMonitoring.TransactionTracking;
using Datadog.Trace.DataStreamsMonitoring.Utils;
using Datadog.Trace.SourceGenerators;

Expand All @@ -17,7 +18,7 @@ namespace Datadog.Trace.DataStreamsMonitoring.Aggregation;
/// Aggregates multiple <see cref="StatsPoint"/>s into their correct buckets
/// Note that this class is *not* thread safe
/// </summary>
internal sealed class DataStreamsAggregator
internal sealed class DataStreamsAggregator(DataStreamsMessagePackFormatter formatter, int bucketDurationMs)
{
// The inner dictionary is constrained in size by the number of unique hashes seen by the app
// Unique hashes are unique paths from origin to here, which could be unbounded if there are loops
Expand All @@ -30,18 +31,14 @@ internal sealed class DataStreamsAggregator

private readonly Dictionary<long, Dictionary<string, BacklogBucket>> _backlogBuckets = new();

private readonly DataStreamsMessagePackFormatter _formatter;
private readonly DataStreamsTransactionContainer _dataStreamsTransactionContainer = new(1024);

private readonly DataStreamsMessagePackFormatter _formatter = formatter;
private readonly DDSketchPool _sketchPool = new();
private readonly long _bucketDurationInNs;
private readonly long _bucketDurationInNs = ((long)bucketDurationMs) * 1_000_000;
private List<SerializableStatsBucket>? _statsToWrite;
private List<SerializableBacklogBucket>? _backlogsToWrite;

public DataStreamsAggregator(DataStreamsMessagePackFormatter formatter, int bucketDurationMs)
{
_formatter = formatter;
_bucketDurationInNs = ((long)bucketDurationMs) * 1_000_000;
}

/// <summary>
/// Add the stats point to the aggregated stats
/// </summary>
Expand Down Expand Up @@ -70,6 +67,11 @@ public void AddBacklog(in BacklogPoint point)
}
}

public void AddTransaction(in DataStreamsTransactionInfo transaction)
{
_dataStreamsTransactionContainer.Add(transaction);
}

/// <summary>
/// Serialize the aggregated results using message pack
/// </summary>
Expand All @@ -80,9 +82,13 @@ public bool Serialize(Stream stream, long maxBucketFlushTimeNs)
{
var statsToAdd = Export(maxBucketFlushTimeNs) ?? new();
var backlogsToAdd = ExportBacklogs(maxBucketFlushTimeNs) ?? new();
if (statsToAdd.Count > 0 || backlogsToAdd.Count > 0)

// issue 11: drain the container once so we hold the bytes before the condition check,
// avoiding a second Size() lock acquisition and a potential race between check and drain.
var transactionData = _dataStreamsTransactionContainer.GetDataAndReset();
if (statsToAdd.Count > 0 || backlogsToAdd.Count > 0 || transactionData.Length > 0)
{
_formatter.Serialize(stream, _bucketDurationInNs, statsToAdd, backlogsToAdd);
_formatter.Serialize(stream, _bucketDurationInNs, statsToAdd, backlogsToAdd, transactionData);
Clear(statsToAdd, backlogsToAdd);

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Threading;
using Datadog.Trace.Configuration;
using Datadog.Trace.ContinuousProfiler;
using Datadog.Trace.DataStreamsMonitoring.TransactionTracking;
using Datadog.Trace.Vendors.Datadog.Sketches;
using Datadog.Trace.Vendors.MessagePack;

Expand Down Expand Up @@ -53,6 +54,9 @@ internal sealed class DataStreamsMessagePackFormatter
private readonly byte[] _processTagsBytes = StringEncoding.UTF8.GetBytes("ProcessTags");
private readonly byte[] _isInDefaultStateBytes = StringEncoding.UTF8.GetBytes("IsInDefaultState");

private readonly byte[] _transactions = StringEncoding.UTF8.GetBytes("Transactions");
private readonly byte[] _transactionCheckpointIds = StringEncoding.UTF8.GetBytes("TransactionCheckpointIds");

private byte[] _environmentValueBytes;
private byte[] _serviceValueBytes;
private ProcessTags? _processTags;
Expand Down Expand Up @@ -116,11 +120,16 @@ private static long GetProductsMask(TracerSettings tracerSettings, ProfilerSetti
return productsMask;
}

public int Serialize(Stream stream, long bucketDurationNs, List<SerializableStatsBucket> statsBuckets, List<SerializableBacklogBucket> backlogsBuckets)
public int Serialize(
Stream stream,
long bucketDurationNs,
List<SerializableStatsBucket> statsBuckets,
List<SerializableBacklogBucket> backlogsBuckets,
byte[] transactionData)
{
var hasTransactions = transactionData.Length > 0;
var withProcessTags = _writeProcessTags && _processTags?.TagsList.Count > 0;
var processTags = _writeProcessTags ? _processTags?.TagsList : null;

var bytesWritten = 0;

// Should be in sync with Java
Expand All @@ -146,11 +155,24 @@ public int Serialize(Stream stream, long bucketDurationNs, List<SerializableStat
bytesWritten += MessagePackBinary.WriteStringBytes(stream, _tracerVersionValueBytes);

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _statsBytes);
bytesWritten += MessagePackBinary.WriteArrayHeader(stream, statsBuckets.Count + backlogsBuckets.Count);
bytesWritten += MessagePackBinary.WriteArrayHeader(stream, statsBuckets.Count + backlogsBuckets.Count + (hasTransactions ? 1 : 0));

if (hasTransactions)
{
var currentTs = DateTimeOffset.UtcNow.ToUnixTimeNanoseconds();
var bucketStartTime = currentTs - (currentTs % bucketDurationNs);
bytesWritten += WriteBucketsHeader(stream, bucketStartTime, bucketDurationNs, 0, 0, true);

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _transactions);
bytesWritten += MessagePackBinary.WriteBytes(stream, transactionData);

bytesWritten += MessagePackBinary.WriteStringBytes(stream, _transactionCheckpointIds);
bytesWritten += MessagePackBinary.WriteBytes(stream, DataStreamsTransactionInfo.GetCacheBytes());
}

foreach (var backlogBucket in backlogsBuckets)
{
bytesWritten += WriteBucketsHeader(stream, backlogBucket.BucketStartTimeNs, bucketDurationNs, 0, backlogBucket.Bucket.Values.Count);
bytesWritten += WriteBucketsHeader(stream, backlogBucket.BucketStartTimeNs, bucketDurationNs, 0, backlogBucket.Bucket.Values.Count, false);

foreach (var point in backlogBucket.Bucket.Values)
{
Expand All @@ -171,7 +193,7 @@ public int Serialize(Stream stream, long bucketDurationNs, List<SerializableStat

foreach (var statsBucket in statsBuckets)
{
bytesWritten += WriteBucketsHeader(stream, statsBucket.BucketStartTimeNs, bucketDurationNs, statsBucket.Bucket.Values.Count, 0);
bytesWritten += WriteBucketsHeader(stream, statsBucket.BucketStartTimeNs, bucketDurationNs, statsBucket.Bucket.Values.Count, 0, false);

var timestampTypeBytes = statsBucket.TimestampType == TimestampType.Current
? _currentTimestampTypeBytes
Expand Down Expand Up @@ -250,12 +272,13 @@ private static int SerializeSketch(Stream stream, DDSketch sketch)
return size + 5; // 5 headers
}

private int WriteBucketsHeader(Stream stream, long bucketStartTimeNs, long bucketDurationNs, int statsBucketCount, int backlogBucketCount)
private int WriteBucketsHeader(Stream stream, long bucketStartTimeNs, long bucketDurationNs, int statsBucketCount, int backlogBucketCount, bool hasTransactions)
{
int bytesWritten = 0;
int count = 2;
count += statsBucketCount > 0 ? 1 : 0;
count += backlogBucketCount > 0 ? 1 : 0;
count += hasTransactions ? 2 : 0;

// 2-4 entries per StatsBucket (Backlogs and Stats are both optional):
// https://github.com/DataDog/data-streams-go/blob/60ba06aec619850aef8ed0b9b1f0f5e310438362/datastreams/payload.go#L48
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
#nullable enable
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Datadog.Trace.Agent.DiscoveryService;
using Datadog.Trace.Configuration;
using Datadog.Trace.ContinuousProfiler;
using Datadog.Trace.DataStreamsMonitoring.Aggregation;
using Datadog.Trace.DataStreamsMonitoring.Hashes;
using Datadog.Trace.DataStreamsMonitoring.TransactionTracking;
using Datadog.Trace.Headers;
using Datadog.Trace.Logging;
using Datadog.Trace.PlatformHelpers;
Expand All @@ -29,9 +31,10 @@ internal sealed class DataStreamsManager
private static readonly AsyncLocal<PathwayContext?> LastConsumePathway = new(); // saves the context on consume checkpointing only
private readonly object _nodeHashUpdateLock = new();
private readonly ConcurrentDictionary<string, RateLimiter> _schemaRateLimiters = new();
private readonly IDiscoveryService _discoveryService;
private readonly DataStreamsExtractorRegistry _registry;
private readonly IDisposable _updateSubscription;
private readonly bool _isLegacyDsmHeadersEnabled;
private readonly IDiscoveryService _discoveryService; // only saved to be able to unsubscribe
private long _nodeHashBase; // note that this actually represents a `ulong` that we have done an unsafe cast for
private MutableSettings _previousMutableSettings;
private string? _previousContainerTagsHash;
Expand All @@ -49,6 +52,10 @@ public DataStreamsManager(
_writer = writer;
_discoveryService = discoveryService;
_isInDefaultState = tracerSettings.IsDataStreamsMonitoringInDefaultState;
_registry = new DataStreamsExtractorRegistry(tracerSettings.DataStreamsTransactionExtractors);

Log.Debug(@"### Initializing DataStreamsManager with extractors (raw) {DataStreamsTransactionExtractors}", tracerSettings.DataStreamsTransactionExtractors);
Log.Debug(@"### Extractors loaded (parsed): {AsJson}", _registry.AsJson());

_previousMutableSettings = tracerSettings.Manager.InitialMutableSettings;
// even though the value will probably get updated by a callback when subscriptions happen just after,
Expand Down Expand Up @@ -159,6 +166,11 @@ public async Task FlushAsync()
where TCarrier : IBinaryHeadersCollection
=> IsEnabled ? DataStreamsContextPropagator.Instance.Extract(headers) : null;

public List<DataStreamsTransactionExtractor>? GetExtractorsByType(DataStreamsTransactionExtractor.Type extractorType)
{
return _registry.GetExtractorsByType(extractorType);
}

/// <summary>
/// Injects a <see cref="PathwayContext"/> into headers
/// </summary>
Expand All @@ -175,6 +187,21 @@ public void InjectPathwayContext<TCarrier>(PathwayContext? context, TCarrier hea
DataStreamsContextPropagator.Instance.Inject(context.Value, headers, _isLegacyDsmHeadersEnabled);
}

public void TrackTransaction(string transactionId, string checkpointName)
{
Log.Debug(@"### Tracking transaction {TransactionId} at checkpoint {CheckpointName}", transactionId, checkpointName);
if (!IsEnabled)
{
return;
}

var writer = Volatile.Read(ref _writer);
writer?.AddTransaction(new DataStreamsTransactionInfo(
transactionId,
DateTimeOffset.UtcNow.ToUnixTimeNanoseconds(),
checkpointName));
}

public void TrackBacklog(string tags, long value)
{
if (!IsEnabled)
Expand Down
Loading
Loading