Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions tracer/src/Datadog.Trace/Agent/IApiRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.IO;
using System.Threading.Tasks;
using Datadog.Trace.Agent.Transports;
using Datadog.Trace.Vendors.Newtonsoft.Json;

namespace Datadog.Trace.Agent
{
Expand All @@ -20,6 +21,10 @@ internal interface IApiRequest

Task<IApiResponse> PostAsync(ArraySegment<byte> bytes, string contentType, string contentEncoding);

Task<IApiResponse> PostAsJsonAsync<T>(T payload, MultipartCompression compression);

Task<IApiResponse> PostAsJsonAsync<T>(T payload, MultipartCompression compression, JsonSerializerSettings settings);

Task<IApiResponse> PostAsync(Func<Stream, Task> writeToRequestStream, string contentType, string contentEncoding, string multipartBoundary);

Task<IApiResponse> PostAsync(MultipartFormItem[] items, MultipartCompression multipartCompression = MultipartCompression.None);
Expand Down
38 changes: 38 additions & 0 deletions tracer/src/Datadog.Trace/Agent/Transports/ApiWebRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
using System.Threading.Tasks;
using Datadog.Trace.Logging;
using Datadog.Trace.Util;
using Datadog.Trace.Vendors.Newtonsoft.Json;
using Datadog.Trace.Vendors.Serilog.Events;
using static Datadog.Trace.HttpOverStreams.DatadogHttpValues;

namespace Datadog.Trace.Agent.Transports
Expand Down Expand Up @@ -58,6 +60,42 @@ public async Task<IApiResponse> PostAsync(ArraySegment<byte> bytes, string conte
return await FinishAndGetResponse().ConfigureAwait(false);
}

public Task<IApiResponse> PostAsJsonAsync<T>(T payload, MultipartCompression compression)
=> PostAsJsonAsync(payload, compression, SerializationHelpers.DefaultJsonSettings);

public async Task<IApiResponse> PostAsJsonAsync<T>(T payload, MultipartCompression compression, JsonSerializerSettings settings)
{
var contentEncoding = compression == MultipartCompression.GZip ? "gzip" : null;
if (Log.IsEnabled(LogEventLevel.Debug))
{
Log.Debug("Sending {Type} data as JSON with compression '{Compression}'", typeof(T).FullName, contentEncoding ?? "none");
}

ResetRequest(method: "POST", contentType: MimeTypes.Json, contentEncoding: contentEncoding);

using (var reqStream = await _request.GetRequestStreamAsync().ConfigureAwait(false))
{
// wrap in gzip if requested
using Stream gzip = (compression == MultipartCompression.GZip
? new GZipStream(reqStream, CompressionMode.Compress, leaveOpen: true)
: null);
var streamToWriteTo = gzip ?? reqStream;

using var streamWriter = new StreamWriter(streamToWriteTo, EncodingHelpers.Utf8NoBom, bufferSize: 1024, leaveOpen: true);
using var jsonWriter = new JsonTextWriter(streamWriter)
{
CloseOutput = false
};
var serializer = JsonSerializer.Create(settings);
serializer.Serialize(jsonWriter, payload);
await streamWriter.FlushAsync().ConfigureAwait(false);
await streamToWriteTo.FlushAsync().ConfigureAwait(false);
await reqStream.FlushAsync().ConfigureAwait(false);
}

return await FinishAndGetResponse().ConfigureAwait(false);
}

public async Task<IApiResponse> PostAsync(Func<Stream, Task> writeToRequestStream, string contentType, string contentEncoding, string multipartBoundary)
{
ResetRequest(method: "POST", ContentTypeHelper.GetContentType(contentType, multipartBoundary), contentEncoding);
Expand Down
48 changes: 48 additions & 0 deletions tracer/src/Datadog.Trace/Agent/Transports/HttpClientRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
#if NETCOREAPP
using System;
using System.IO;
using System.IO.Compression;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
using Datadog.Trace.AppSec;
using Datadog.Trace.HttpOverStreams;
using Datadog.Trace.Logging;
using Datadog.Trace.Util;
using Datadog.Trace.Vendors.Newtonsoft.Json;
using Datadog.Trace.Vendors.Serilog.Events;

namespace Datadog.Trace.Agent.Transports
{
Expand Down Expand Up @@ -67,6 +71,50 @@ public async Task<IApiResponse> PostAsync(ArraySegment<byte> bytes, string conte
}
}

public Task<IApiResponse> PostAsJsonAsync<T>(T payload, MultipartCompression compression)
=> PostAsJsonAsync(payload, compression, SerializationHelpers.DefaultJsonSettings);

public async Task<IApiResponse> PostAsJsonAsync<T>(T payload, MultipartCompression compression, JsonSerializerSettings settings)
{
if (Log.IsEnabled(LogEventLevel.Debug))
{
Log.Debug("Sending {Type} data as JSON with compression '{Compression}'", typeof(T).FullName, compression == MultipartCompression.GZip ? "gzip" : "none");
}

using var content = new PushStreamContent(stream => WriteAsJson(stream, payload, settings, compression));
content.Headers.ContentType = new MediaTypeHeaderValue(MimeTypes.Json);

if (compression == MultipartCompression.GZip)
{
content.Headers.ContentEncoding.Add("gzip");
}

_postRequest.Content = content;

var response = await _client.SendAsync(_postRequest).ConfigureAwait(false);
return new HttpClientResponse(response);

static async Task WriteAsJson(Stream requestStream, T payload, JsonSerializerSettings serializationSettings, MultipartCompression compression)
{
// wrap in gzip if requested
using Stream gzip = compression == MultipartCompression.GZip
? new GZipStream(requestStream, CompressionMode.Compress, leaveOpen: true)
: null;
var streamToWriteTo = gzip ?? requestStream;

using var streamWriter = new StreamWriter(streamToWriteTo, EncodingHelpers.Utf8NoBom, bufferSize: 1024, leaveOpen: true);
using var jsonWriter = new JsonTextWriter(streamWriter)
{
CloseOutput = false
};
var serializer = JsonSerializer.Create(serializationSettings);
serializer.Serialize(jsonWriter, payload);
await streamWriter.FlushAsync().ConfigureAwait(false);
await streamToWriteTo.FlushAsync().ConfigureAwait(false);
await requestStream.FlushAsync().ConfigureAwait(false);
}
}

public async Task<IApiResponse> PostAsync(Func<Stream, Task> writeToRequestStream, string contentType, string contentEncoding, string multipartBoundary)
{
// re-create HttpContent on every retry because some versions of HttpClient always dispose of it, so we can't reuse.
Expand Down
48 changes: 48 additions & 0 deletions tracer/src/Datadog.Trace/Agent/Transports/HttpStreamRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,22 @@

using System;
using System.IO;
using System.IO.Compression;
using System.Net;
using System.Threading.Tasks;
using Datadog.Trace.HttpOverStreams;
using Datadog.Trace.HttpOverStreams.HttpContent;
using Datadog.Trace.Logging;
using Datadog.Trace.Util;
using Datadog.Trace.Vendors.Newtonsoft.Json;
using Datadog.Trace.Vendors.Serilog.Events;

namespace Datadog.Trace.Agent.Transports
{
internal sealed class HttpStreamRequest : IApiRequest
{
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor<HttpStreamRequest>();

private readonly Uri _uri;
private readonly DatadogHttpClient _client;
private readonly IStreamFactory _streamFactory;
Expand All @@ -41,6 +47,48 @@ public Task<IApiResponse> PostAsync(ArraySegment<byte> bytes, string contentType
public async Task<IApiResponse> PostAsync(ArraySegment<byte> bytes, string contentType, string contentEncoding)
=> (await SendAsync(WebRequestMethods.Http.Post, contentType, new BufferContent(bytes), contentEncoding, chunkedEncoding: false).ConfigureAwait(false)).Item1;

public Task<IApiResponse> PostAsJsonAsync<T>(T payload, MultipartCompression compression)
=> PostAsJsonAsync(payload, compression, SerializationHelpers.DefaultJsonSettings);

public async Task<IApiResponse> PostAsJsonAsync<T>(T payload, MultipartCompression compression, JsonSerializerSettings settings)
{
var contentEncoding = compression == MultipartCompression.GZip ? "gzip" : null;
if (Log.IsEnabled(LogEventLevel.Debug))
{
Log.Debug("Sending {Type} data as JSON with compression '{Compression}'", typeof(T).FullName, contentEncoding ?? "none");
}

var result = await SendAsync(
WebRequestMethods.Http.Post,
contentType: MimeTypes.Json,
content: new HttpOverStreams.HttpContent.PushStreamContent(stream => WriteAsJson(stream, payload, settings, compression)),
contentEncoding: contentEncoding,
chunkedEncoding: true) // must use chunked encoding because push-stream content
.ConfigureAwait(false);

return result.Item1;

static async Task WriteAsJson(Stream requestStream, T payload, JsonSerializerSettings serializationSettings, MultipartCompression compression)
{
// wrap in gzip if requested
using Stream gzip = compression == MultipartCompression.GZip
? new GZipStream(requestStream, CompressionMode.Compress, leaveOpen: true)
: null;
var streamToWriteTo = gzip ?? requestStream;

using var streamWriter = new StreamWriter(streamToWriteTo, EncodingHelpers.Utf8NoBom, bufferSize: 1024, leaveOpen: true);
using var jsonWriter = new JsonTextWriter(streamWriter)
{
CloseOutput = false
};
var serializer = JsonSerializer.Create(serializationSettings);
serializer.Serialize(jsonWriter, payload);
await streamWriter.FlushAsync().ConfigureAwait(false);
await streamToWriteTo.FlushAsync().ConfigureAwait(false);
await requestStream.FlushAsync().ConfigureAwait(false);
}
}

public async Task<IApiResponse> PostAsync(Func<Stream, Task> writeToRequestStream, string contentType, string contentEncoding, string multipartBoundary)
=> (await SendAsync(WebRequestMethods.Http.Post, contentType, new HttpOverStreams.HttpContent.PushStreamContent(writeToRequestStream), contentEncoding, chunkedEncoding: true, multipartBoundary).ConfigureAwait(false)).Item1;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// <copyright file="SerializationHelpers.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

using Datadog.Trace.Vendors.Newtonsoft.Json;
using Datadog.Trace.Vendors.Newtonsoft.Json.Serialization;

namespace Datadog.Trace.Agent.Transports;

internal static class SerializationHelpers
{
public static readonly JsonSerializerSettings DefaultJsonSettings = new()
{
NullValueHandling = NullValueHandling.Ignore,
ContractResolver = new DefaultContractResolver
{
NamingStrategy = new SnakeCaseNamingStrategy(),
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ public static RemoteConfigurationApi Create(IApiRequestFactory apiRequestFactory
var uri = _apiRequestFactory.GetEndpoint(configEndpoint);
var apiRequest = _apiRequestFactory.Create(uri);

var requestContent = JsonConvert.SerializeObject(request);
Log.Debug("Sending Remote Configuration Request: {Content}", requestContent);
var bytes = Encoding.UTF8.GetBytes(requestContent);
var payload = new ArraySegment<byte>(bytes);

if (_containerId != null)
{
apiRequest.AddHeader(AgentHttpHeaderNames.ContainerId, _containerId);
Expand All @@ -75,7 +70,7 @@ public static RemoteConfigurationApi Create(IApiRequestFactory apiRequestFactory
apiRequest.AddHeader(AgentHttpHeaderNames.EntityId, _entityId);
}

using var apiResponse = await apiRequest.PostAsync(payload, MimeTypes.Json).ConfigureAwait(false);
using var apiResponse = await apiRequest.PostAsJsonAsync(request, MultipartCompression.None).ConfigureAwait(false);
var isRcmDisabled = apiResponse.StatusCode == 404;
if (isRcmDisabled)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,12 @@
#nullable enable

using System;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.IO.Compression;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using Datadog.Trace.Agent;
using Datadog.Trace.Agent.Transports;
using Datadog.Trace.Logging;
using Datadog.Trace.PlatformHelpers;
using Datadog.Trace.SourceGenerators;
using Datadog.Trace.Telemetry.Metrics;
using Datadog.Trace.Util.Http;
using Datadog.Trace.Vendors.Newtonsoft.Json;
Expand All @@ -34,7 +30,6 @@ internal abstract class JsonTelemetryTransport : ITelemetryTransport
private readonly string? _entityId;
private readonly bool _enableDebug;
private readonly bool _telemetryGzipCompressionEnabled;
private readonly string _telemetryCompressionMethod;

protected JsonTelemetryTransport(IApiRequestFactory requestFactory, bool enableDebug, string telemetryCompressionMethod)
{
Expand All @@ -44,7 +39,6 @@ protected JsonTelemetryTransport(IApiRequestFactory requestFactory, bool enableD
_containerId = ContainerMetadata.GetContainerId();
_entityId = ContainerMetadata.GetEntityId();
_telemetryGzipCompressionEnabled = telemetryCompressionMethod.Equals("gzip", StringComparison.OrdinalIgnoreCase);
_telemetryCompressionMethod = _telemetryGzipCompressionEnabled ? "gzip" : "uncompressed";
}

protected string GetEndpointInfo() => _requestFactory.Info(_endpoint);
Expand All @@ -55,17 +49,6 @@ public async Task<TelemetryPushResult> PushTelemetry(TelemetryData data)

try
{
byte[] bytes;

if (_telemetryGzipCompressionEnabled)
{
bytes = SerializeTelemetryWithGzip(data);
}
else
{
bytes = Encoding.UTF8.GetBytes(SerializeTelemetry(data));
}

var request = _requestFactory.Create(_endpoint);
request.AddHeader(TelemetryConstants.ApiVersionHeader, data.ApiVersion);
request.AddHeader(TelemetryConstants.RequestTypeHeader, data.RequestType);
Expand All @@ -87,35 +70,35 @@ public async Task<TelemetryPushResult> PushTelemetry(TelemetryData data)

TelemetryFactory.Metrics.RecordCountTelemetryApiRequests(endpointMetricTag);

using var response = await request.PostAsync(new ArraySegment<byte>(bytes), "application/json", _telemetryGzipCompressionEnabled ? "gzip" : null).ConfigureAwait(false);
using var response = await request.PostAsJsonAsync(data, _telemetryGzipCompressionEnabled ? MultipartCompression.GZip : MultipartCompression.None, SerializerSettings).ConfigureAwait(false);
TelemetryFactory.Metrics.RecordCountTelemetryApiResponses(endpointMetricTag, response.GetTelemetryStatusCodeMetricTag());
if (response.StatusCode is >= 200 and < 300)
{
Log.Debug("Telemetry sent successfully. Compression {Compression}", _telemetryCompressionMethod);
Log.Debug("Telemetry sent successfully. CompressionEnabled {Compression}", _telemetryGzipCompressionEnabled);
return TelemetryPushResult.Success;
}

TelemetryFactory.Metrics.RecordCountTelemetryApiErrors(endpointMetricTag, MetricTags.ApiError.StatusCode);

if (response.StatusCode == 404)
{
Log.Debug("Error sending telemetry: 404. Disabling further telemetry, as endpoint '{Endpoint}' not found. Compression {Compression}", GetEndpointInfo(), _telemetryCompressionMethod);
Log.Debug("Error sending telemetry: 404. Disabling further telemetry, as endpoint '{Endpoint}' not found. CompressionEnabled {Compression}", GetEndpointInfo(), _telemetryGzipCompressionEnabled);
return TelemetryPushResult.FatalError;
}

Log.Debug<string, int, string>("Error sending telemetry to '{Endpoint}' {StatusCode} . Compression {Compression}", GetEndpointInfo(), response.StatusCode, _telemetryCompressionMethod);
Log.Debug("Error sending telemetry to '{Endpoint}' {StatusCode} . CompressionEnabled {Compression}", GetEndpointInfo(), response.StatusCode, _telemetryGzipCompressionEnabled);
return TelemetryPushResult.TransientFailure;
}
catch (Exception ex) when (IsFatalException(ex))
{
Log.Information(ex, "Error sending telemetry data, unable to communicate with '{Endpoint}'. Compression {Compression}", GetEndpointInfo(), _telemetryCompressionMethod);
Log.Information(ex, "Error sending telemetry data, unable to communicate with '{Endpoint}'. CompressionEnabled {Compression}", GetEndpointInfo(), _telemetryGzipCompressionEnabled);
var tag = ex is TimeoutException ? MetricTags.ApiError.Timeout : MetricTags.ApiError.NetworkError;
TelemetryFactory.Metrics.RecordCountTelemetryApiErrors(endpointMetricTag, tag);
return TelemetryPushResult.FatalError;
}
catch (Exception ex)
{
Log.Information(ex, "Error sending telemetry data to '{Endpoint}'. Compression {Compression}", GetEndpointInfo(), _telemetryCompressionMethod);
Log.Information(ex, "Error sending telemetry data to '{Endpoint}'. CompressionEnabled {Compression}", GetEndpointInfo(), _telemetryGzipCompressionEnabled);
var tag = ex is TimeoutException ? MetricTags.ApiError.Timeout : MetricTags.ApiError.NetworkError;
TelemetryFactory.Metrics.RecordCountTelemetryApiErrors(endpointMetricTag, tag);
return TelemetryPushResult.TransientFailure;
Expand All @@ -124,26 +107,8 @@ public async Task<TelemetryPushResult> PushTelemetry(TelemetryData data)

public abstract string GetTransportInfo();

[TestingAndPrivateOnly]
internal static string SerializeTelemetry<T>(T data) => JsonConvert.SerializeObject(data, Formatting.None, SerializerSettings);

protected abstract MetricTags.TelemetryEndpoint GetEndpointMetricTag();

internal static byte[] SerializeTelemetryWithGzip<T>(T data)
{
using var memStream = new MemoryStream();
using (var zipStream = new GZipStream(memStream, CompressionMode.Compress, true))
{
using var streamWriter = new StreamWriter(zipStream);
using var jsonWriter = new JsonTextWriter(streamWriter);
var serializer = new JsonSerializer { NullValueHandling = NullValueHandling.Ignore, ContractResolver = new DefaultContractResolver { NamingStrategy = new SnakeCaseNamingStrategy(), }, Formatting = Formatting.None };

serializer.Serialize(jsonWriter, data);
}

return memStream.ToArray();
}

private static bool IsFatalException(Exception ex)
{
return ex.IsSocketException()
Expand Down
Loading
Loading