Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions tracer/build/supported_calltargets.g.json
Original file line number Diff line number Diff line change
Expand Up @@ -6657,6 +6657,32 @@
"IsAdoNetIntegration": false,
"InstrumentationCategory": 1
},
{
"IntegrationName": "DatadogTraceManual",
"AssemblyName": "Datadog.Trace.Manual",
"TargetTypeName": "Datadog.Trace.DataStreams",
"TargetMethodName": "TrackTransaction",
"TargetReturnType": "System.Void",
"TargetParameterTypes": [
"Datadog.Trace.ISpan",
"System.String",
"System.String"
],
"MinimumVersion": {
"Item1": 3,
"Item2": 0,
"Item3": 0
},
"MaximumVersion": {
"Item1": 3,
"Item2": 65535,
"Item3": 65535
},
"InstrumentationTypeName": "Datadog.Trace.ClrProfiler.AutoInstrumentation.ManualInstrumentation.DataStreams.DataStreamsTrackTransactionIntegration",
"IntegrationKind": 0,
"IsAdoNetIntegration": false,
"InstrumentationCategory": 1
},
{
"IntegrationName": "DatadogTraceManual",
"AssemblyName": "Datadog.Trace.Manual",
Expand Down
33 changes: 33 additions & 0 deletions tracer/src/Datadog.Trace.Manual/DataStreams.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// <copyright file="DataStreams.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

using System.Runtime.CompilerServices;
using Datadog.Trace.SourceGenerators;

#nullable enable

namespace Datadog.Trace
{
/// <summary>
/// Provides manual Data Streams Monitoring operations for cases where auto-instrumentation
/// cannot propagate transaction tracking automatically.
/// </summary>
public static class DataStreams
{
/// <summary>
/// Records a Data Streams Monitoring transaction checkpoint on the given span.
/// Sets the <c>dsm.transaction.id</c> tag on the span and sends the transaction
/// to the Data Streams backend (if DSM is enabled).
/// </summary>
/// <param name="span">The active span representing the current operation.</param>
/// <param name="transactionId">A stable identifier for the transaction being tracked (e.g. a message ID or trace ID).</param>
/// <param name="checkpointName">The logical name of the checkpoint (e.g. "kafka-produce", "http-send").</param>
[Instrumented]
[MethodImpl(MethodImplOptions.NoInlining)]
public static void TrackTransaction(ISpan span, string transactionId, string checkpointName)
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Threading;
using Datadog.Trace.ClrProfiler.CallTarget;
using Datadog.Trace.Configuration;
using Datadog.Trace.DataStreamsMonitoring.TransactionTracking;
using Datadog.Trace.Propagators;

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.Http.HttpClient
Expand Down Expand Up @@ -53,6 +54,25 @@ public static CallTargetState OnMethodBegin<TTarget, TRequest>(TTarget instance,
tracer.TracerManager.Telemetry.IntegrationGeneratedSpan(implementationIntegrationId ?? integrationId);
return new CallTargetState(scope);
}

var dataStreamsManager = tracer.TracerManager.DataStreamsManager;
if (dataStreamsManager.IsEnabled)
{
var extractors = dataStreamsManager.GetExtractorsByType(DataStreamsTransactionExtractor.Type.HttpOutHeaders);
if (extractors != null)
{
foreach (var extractor in extractors)
{
if (headers.TryGetValues(extractor.Value, out var headerValues))
{
foreach (var headerValue in headerValues)
{
dataStreamsManager.TrackTransaction(headerValue, extractor.Name);
}
}
}
}
}
}

return CallTargetState.GetDefault();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.Threading;
using Datadog.Trace.Configuration.Schema;
using Datadog.Trace.DataStreamsMonitoring;
using Datadog.Trace.DataStreamsMonitoring.TransactionTracking;
using Datadog.Trace.DataStreamsMonitoring.Utils;
using Datadog.Trace.DuckTyping;
using Datadog.Trace.Logging;
Expand Down Expand Up @@ -288,6 +289,8 @@ private static long GetMessageSize<T>(T message)
var base64PathwayContext = Convert.ToBase64String(BitConverter.GetBytes(span.Context.PathwayContext.Value.Hash.Value));
message.Headers.Add(DataStreamsPropagationHeaders.TemporaryBase64PathwayContext, Encoding.UTF8.GetBytes(base64PathwayContext));
}

ApplyDataStreamsExtractors(dataStreamsManager, DataStreamsTransactionExtractor.Type.KafkaConsumeHeaders, message);
}
}
catch (Exception ex)
Expand Down Expand Up @@ -327,6 +330,30 @@ internal static void CloseConsumerScope(Tracer tracer)
}
}

internal static void ApplyDataStreamsExtractors<TMessage>(
DataStreamsManager dataStreamsManager,
DataStreamsTransactionExtractor.Type extractorType,
TMessage? message)
where TMessage : IMessage
{
if (message?.Instance == null)
{
return;
}

var extractors = dataStreamsManager.GetExtractorsByType(extractorType);
if (extractors != null && message.Headers != null)
{
foreach (var extractor in extractors)
{
if (message.Headers.TryGetLastBytes(extractor.Value, out var transactionId))
{
dataStreamsManager.TrackTransaction(Encoding.UTF8.GetString(transactionId), extractor.Name);
}
}
}
}

/// <summary>
/// Try to inject the prop
/// </summary>
Expand Down Expand Up @@ -392,6 +419,7 @@ internal static void TryInjectHeaders<TTopicPartitionMarker, TMessage>(
}

dataStreamsManager.InjectPathwayContext(span.Context.PathwayContext, adapter);
ApplyDataStreamsExtractors(dataStreamsManager, DataStreamsTransactionExtractor.Type.KafkaProduceHeaders, message);
}
}
catch (Exception ex)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// <copyright file="DataStreamsTrackTransactionIntegration.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

using System.ComponentModel;
using Datadog.Trace.ClrProfiler.AutoInstrumentation.ManualInstrumentation.Proxies;
using Datadog.Trace.ClrProfiler.CallTarget;
using Datadog.Trace.DuckTyping;
using Datadog.Trace.Telemetry;
using Datadog.Trace.Telemetry.Metrics;

namespace Datadog.Trace.ClrProfiler.AutoInstrumentation.ManualInstrumentation.DataStreams;

/// <summary>
/// Instrumentation for <c>Datadog.Trace.DataStreams.TrackTransaction</c>
/// </summary>
[InstrumentMethod(
AssemblyName = "Datadog.Trace.Manual",
TypeName = "Datadog.Trace.DataStreams",
MethodName = "TrackTransaction",
ReturnTypeName = ClrNames.Void,
ParameterTypeNames = new[] { "Datadog.Trace.ISpan", "System.String", "System.String" },
MinimumVersion = ManualInstrumentationConstants.MinVersion,
MaximumVersion = ManualInstrumentationConstants.MaxVersion,
IntegrationName = ManualInstrumentationConstants.IntegrationName)]
[Browsable(false)]
[EditorBrowsable(EditorBrowsableState.Never)]
public sealed class DataStreamsTrackTransactionIntegration
{
internal static CallTargetState OnMethodBegin<TTarget, TSpan>(TSpan span, string transactionId, string checkpointName)
{
TelemetryFactory.Metrics.Record(PublicApiUsage.DataStreams_TrackTransaction);
Invoke(Datadog.Trace.Tracer.Instance, span, transactionId, checkpointName);
return CallTargetState.GetDefault();
}

/// <summary>
/// Separated from <see cref="OnMethodBegin{TTarget,TSpan}"/> so tests can call it directly
/// without going through the CallTarget machinery.
/// </summary>
/// <typeparam name="TSpan">
/// Typically a duck-typed <c>Datadog.Trace.ISpan</c> proxy from <c>Datadog.Trace.Manual</c>,
/// a real <see cref="Span"/>, or null (bad usage — transaction is still tracked, tag is skipped).
/// </typeparam>
internal static void Invoke<TSpan>(Datadog.Trace.Tracer tracer, TSpan span, string transactionId, string checkpointName)
{
var dsm = tracer.TracerManager.DataStreamsManager;
if (dsm is null || !dsm.IsEnabled)
{
return;
}

if (span is IDuckType { Instance: Span s })
{
s.SetTag("dsm.transaction.id", transactionId);
}
else if (span is Span autoSpan)
{
autoSpan.SetTag("dsm.transaction.id", transactionId);
}
else if (span is null)
{
// bad usage, but catering to it just in case
}
else
{
span.DuckCast<ISpanSetTagProxy>()!.SetTag("dsm.transaction.id", transactionId);
}

dsm.TrackTransaction(transactionId, checkpointName);
}
}
9 changes: 9 additions & 0 deletions tracer/src/Datadog.Trace/Configuration/TracerSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,10 @@ not null when string.Equals(value, "otlp", StringComparison.OrdinalIgnoreCase) =
.WithKeys(ConfigurationKeys.DataStreamsMonitoring.Enabled)
.AsBool() == null;

DataStreamsTransactionExtractors = config
.WithKeys(ConfigurationKeys.DataStreamsMonitoring.TransactionExtractors)
.AsString(string.Empty);

// no legacy headers if we are in "enbaled by default" state
IsDataStreamsLegacyHeadersEnabled = config
.WithKeys(ConfigurationKeys.DataStreamsMonitoring.LegacyHeadersEnabled)
Expand Down Expand Up @@ -1203,6 +1207,11 @@ not null when string.Equals(value, "otlp", StringComparison.OrdinalIgnoreCase) =
/// </summary>
internal bool IsDataStreamsMonitoringInDefaultState { get; }

/// <summary>
/// Gets a raw value for DSM extractors
/// </summary>
internal string DataStreamsTransactionExtractors { get; }

/// <summary>
/// Gets a value indicating whether data streams schema extraction is enabled or not.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,15 @@ supportedConfigurations:
Configuration key for enabling legacy binary headers in Data Streams Monitoring.
false by default if DSM is in default state, true otherwise
<see cref="Datadog.Trace.Configuration.TracerSettings.IsDataStreamsLegacyHeadersEnabled"/>
DD_DATA_STREAMS_TRANSACTION_EXTRACTORS:
- implementation: A
type: array
default: null
product: DataStreamsMonitoring
const_name: TransactionExtractors
documentation: |-
Configuration key for transaction extractors in Data Streams Monitoring.
<see cref="Datadog.Trace.Configuration.TracerSettings.DataStreamsTransactionExtractors"/>
DD_DBM_PROPAGATION_MODE:
- implementation: A
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.IO;
using Datadog.Trace.DataStreamsMonitoring.TransactionTracking;
using Datadog.Trace.DataStreamsMonitoring.Utils;
using Datadog.Trace.SourceGenerators;

Expand All @@ -17,7 +18,7 @@ namespace Datadog.Trace.DataStreamsMonitoring.Aggregation;
/// Aggregates multiple <see cref="StatsPoint"/>s into their correct buckets
/// Note that this class is *not* thread safe
/// </summary>
internal sealed class DataStreamsAggregator
internal sealed class DataStreamsAggregator(DataStreamsMessagePackFormatter formatter, int bucketDurationMs)
{
// The inner dictionary is constrained in size by the number of unique hashes seen by the app
// Unique hashes are unique paths from origin to here, which could be unbounded if there are loops
Expand All @@ -30,18 +31,14 @@ internal sealed class DataStreamsAggregator

private readonly Dictionary<long, Dictionary<string, BacklogBucket>> _backlogBuckets = new();

private readonly DataStreamsMessagePackFormatter _formatter;
private readonly DataStreamsTransactionContainer _dataStreamsTransactionContainer = new(1024);

private readonly DataStreamsMessagePackFormatter _formatter = formatter;
private readonly DDSketchPool _sketchPool = new();
private readonly long _bucketDurationInNs;
private readonly long _bucketDurationInNs = ((long)bucketDurationMs) * 1_000_000;
private List<SerializableStatsBucket>? _statsToWrite;
private List<SerializableBacklogBucket>? _backlogsToWrite;

public DataStreamsAggregator(DataStreamsMessagePackFormatter formatter, int bucketDurationMs)
{
_formatter = formatter;
_bucketDurationInNs = ((long)bucketDurationMs) * 1_000_000;
}

/// <summary>
/// Add the stats point to the aggregated stats
/// </summary>
Expand Down Expand Up @@ -70,6 +67,11 @@ public void AddBacklog(in BacklogPoint point)
}
}

public void AddTransaction(in DataStreamsTransactionInfo transaction)
{
_dataStreamsTransactionContainer.Add(transaction);
}

/// <summary>
/// Serialize the aggregated results using message pack
/// </summary>
Expand All @@ -80,9 +82,13 @@ public bool Serialize(Stream stream, long maxBucketFlushTimeNs)
{
var statsToAdd = Export(maxBucketFlushTimeNs) ?? new();
var backlogsToAdd = ExportBacklogs(maxBucketFlushTimeNs) ?? new();
if (statsToAdd.Count > 0 || backlogsToAdd.Count > 0)

// issue 11: drain the container once so we hold the bytes before the condition check,
// avoiding a second Size() lock acquisition and a potential race between check and drain.
var transactionData = _dataStreamsTransactionContainer.GetDataAndReset();
if (statsToAdd.Count > 0 || backlogsToAdd.Count > 0 || transactionData.Length > 0)
{
_formatter.Serialize(stream, _bucketDurationInNs, statsToAdd, backlogsToAdd);
_formatter.Serialize(stream, _bucketDurationInNs, statsToAdd, backlogsToAdd, transactionData);
Clear(statsToAdd, backlogsToAdd);

return true;
Expand Down
Loading
Loading