diff --git a/tracer/src/Datadog.Trace/Agent/IApiRequest.cs b/tracer/src/Datadog.Trace/Agent/IApiRequest.cs index c041d58dbfdb..89ab8d285480 100644 --- a/tracer/src/Datadog.Trace/Agent/IApiRequest.cs +++ b/tracer/src/Datadog.Trace/Agent/IApiRequest.cs @@ -7,6 +7,7 @@ using System.IO; using System.Threading.Tasks; using Datadog.Trace.Agent.Transports; +using Datadog.Trace.Vendors.Newtonsoft.Json; namespace Datadog.Trace.Agent { @@ -20,6 +21,10 @@ internal interface IApiRequest Task PostAsync(ArraySegment bytes, string contentType, string contentEncoding); + Task PostAsJsonAsync(T payload, MultipartCompression compression); + + Task PostAsJsonAsync(T payload, MultipartCompression compression, JsonSerializerSettings settings); + Task PostAsync(Func writeToRequestStream, string contentType, string contentEncoding, string multipartBoundary); Task PostAsync(MultipartFormItem[] items, MultipartCompression multipartCompression = MultipartCompression.None); diff --git a/tracer/src/Datadog.Trace/Agent/Transports/ApiWebRequest.cs b/tracer/src/Datadog.Trace/Agent/Transports/ApiWebRequest.cs index 0aca217814aa..bafb3e8db686 100644 --- a/tracer/src/Datadog.Trace/Agent/Transports/ApiWebRequest.cs +++ b/tracer/src/Datadog.Trace/Agent/Transports/ApiWebRequest.cs @@ -11,6 +11,8 @@ using System.Threading.Tasks; using Datadog.Trace.Logging; using Datadog.Trace.Util; +using Datadog.Trace.Vendors.Newtonsoft.Json; +using Datadog.Trace.Vendors.Serilog.Events; using static Datadog.Trace.HttpOverStreams.DatadogHttpValues; namespace Datadog.Trace.Agent.Transports @@ -58,6 +60,42 @@ public async Task PostAsync(ArraySegment bytes, string conte return await FinishAndGetResponse().ConfigureAwait(false); } + public Task PostAsJsonAsync(T payload, MultipartCompression compression) + => PostAsJsonAsync(payload, compression, SerializationHelpers.DefaultJsonSettings); + + public async Task PostAsJsonAsync(T payload, MultipartCompression compression, JsonSerializerSettings settings) + { + var contentEncoding = compression == MultipartCompression.GZip ? "gzip" : null; + if (Log.IsEnabled(LogEventLevel.Debug)) + { + Log.Debug("Sending {Type} data as JSON with compression '{Compression}'", typeof(T).FullName, contentEncoding ?? "none"); + } + + ResetRequest(method: "POST", contentType: MimeTypes.Json, contentEncoding: contentEncoding); + + using (var reqStream = await _request.GetRequestStreamAsync().ConfigureAwait(false)) + { + // wrap in gzip if requested + using Stream gzip = (compression == MultipartCompression.GZip + ? new GZipStream(reqStream, CompressionMode.Compress, leaveOpen: true) + : null); + var streamToWriteTo = gzip ?? reqStream; + + using var streamWriter = new StreamWriter(streamToWriteTo, EncodingHelpers.Utf8NoBom, bufferSize: 1024, leaveOpen: true); + using var jsonWriter = new JsonTextWriter(streamWriter) + { + CloseOutput = false + }; + var serializer = JsonSerializer.Create(settings); + serializer.Serialize(jsonWriter, payload); + await streamWriter.FlushAsync().ConfigureAwait(false); + await streamToWriteTo.FlushAsync().ConfigureAwait(false); + await reqStream.FlushAsync().ConfigureAwait(false); + } + + return await FinishAndGetResponse().ConfigureAwait(false); + } + public async Task PostAsync(Func writeToRequestStream, string contentType, string contentEncoding, string multipartBoundary) { ResetRequest(method: "POST", ContentTypeHelper.GetContentType(contentType, multipartBoundary), contentEncoding); diff --git a/tracer/src/Datadog.Trace/Agent/Transports/HttpClientRequest.cs b/tracer/src/Datadog.Trace/Agent/Transports/HttpClientRequest.cs index 9bc5c6bc1443..5d0785172e4d 100644 --- a/tracer/src/Datadog.Trace/Agent/Transports/HttpClientRequest.cs +++ b/tracer/src/Datadog.Trace/Agent/Transports/HttpClientRequest.cs @@ -6,12 +6,16 @@ #if NETCOREAPP using System; using System.IO; +using System.IO.Compression; using System.Net.Http; using System.Net.Http.Headers; using System.Threading.Tasks; using Datadog.Trace.AppSec; using Datadog.Trace.HttpOverStreams; using Datadog.Trace.Logging; +using Datadog.Trace.Util; +using Datadog.Trace.Vendors.Newtonsoft.Json; +using Datadog.Trace.Vendors.Serilog.Events; namespace Datadog.Trace.Agent.Transports { @@ -67,6 +71,50 @@ public async Task PostAsync(ArraySegment bytes, string conte } } + public Task PostAsJsonAsync(T payload, MultipartCompression compression) + => PostAsJsonAsync(payload, compression, SerializationHelpers.DefaultJsonSettings); + + public async Task PostAsJsonAsync(T payload, MultipartCompression compression, JsonSerializerSettings settings) + { + if (Log.IsEnabled(LogEventLevel.Debug)) + { + Log.Debug("Sending {Type} data as JSON with compression '{Compression}'", typeof(T).FullName, compression == MultipartCompression.GZip ? "gzip" : "none"); + } + + using var content = new PushStreamContent(stream => WriteAsJson(stream, payload, settings, compression)); + content.Headers.ContentType = new MediaTypeHeaderValue(MimeTypes.Json); + + if (compression == MultipartCompression.GZip) + { + content.Headers.ContentEncoding.Add("gzip"); + } + + _postRequest.Content = content; + + var response = await _client.SendAsync(_postRequest).ConfigureAwait(false); + return new HttpClientResponse(response); + + static async Task WriteAsJson(Stream requestStream, T payload, JsonSerializerSettings serializationSettings, MultipartCompression compression) + { + // wrap in gzip if requested + using Stream gzip = compression == MultipartCompression.GZip + ? new GZipStream(requestStream, CompressionMode.Compress, leaveOpen: true) + : null; + var streamToWriteTo = gzip ?? requestStream; + + using var streamWriter = new StreamWriter(streamToWriteTo, EncodingHelpers.Utf8NoBom, bufferSize: 1024, leaveOpen: true); + using var jsonWriter = new JsonTextWriter(streamWriter) + { + CloseOutput = false + }; + var serializer = JsonSerializer.Create(serializationSettings); + serializer.Serialize(jsonWriter, payload); + await streamWriter.FlushAsync().ConfigureAwait(false); + await streamToWriteTo.FlushAsync().ConfigureAwait(false); + await requestStream.FlushAsync().ConfigureAwait(false); + } + } + public async Task PostAsync(Func writeToRequestStream, string contentType, string contentEncoding, string multipartBoundary) { // re-create HttpContent on every retry because some versions of HttpClient always dispose of it, so we can't reuse. diff --git a/tracer/src/Datadog.Trace/Agent/Transports/HttpStreamRequest.cs b/tracer/src/Datadog.Trace/Agent/Transports/HttpStreamRequest.cs index 874dbf077d9d..2893175c5495 100644 --- a/tracer/src/Datadog.Trace/Agent/Transports/HttpStreamRequest.cs +++ b/tracer/src/Datadog.Trace/Agent/Transports/HttpStreamRequest.cs @@ -5,16 +5,22 @@ using System; using System.IO; +using System.IO.Compression; using System.Net; using System.Threading.Tasks; using Datadog.Trace.HttpOverStreams; using Datadog.Trace.HttpOverStreams.HttpContent; +using Datadog.Trace.Logging; using Datadog.Trace.Util; +using Datadog.Trace.Vendors.Newtonsoft.Json; +using Datadog.Trace.Vendors.Serilog.Events; namespace Datadog.Trace.Agent.Transports { internal sealed class HttpStreamRequest : IApiRequest { + private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor(); + private readonly Uri _uri; private readonly DatadogHttpClient _client; private readonly IStreamFactory _streamFactory; @@ -41,6 +47,48 @@ public Task PostAsync(ArraySegment bytes, string contentType public async Task PostAsync(ArraySegment bytes, string contentType, string contentEncoding) => (await SendAsync(WebRequestMethods.Http.Post, contentType, new BufferContent(bytes), contentEncoding, chunkedEncoding: false).ConfigureAwait(false)).Item1; + public Task PostAsJsonAsync(T payload, MultipartCompression compression) + => PostAsJsonAsync(payload, compression, SerializationHelpers.DefaultJsonSettings); + + public async Task PostAsJsonAsync(T payload, MultipartCompression compression, JsonSerializerSettings settings) + { + var contentEncoding = compression == MultipartCompression.GZip ? "gzip" : null; + if (Log.IsEnabled(LogEventLevel.Debug)) + { + Log.Debug("Sending {Type} data as JSON with compression '{Compression}'", typeof(T).FullName, contentEncoding ?? "none"); + } + + var result = await SendAsync( + WebRequestMethods.Http.Post, + contentType: MimeTypes.Json, + content: new HttpOverStreams.HttpContent.PushStreamContent(stream => WriteAsJson(stream, payload, settings, compression)), + contentEncoding: contentEncoding, + chunkedEncoding: true) // must use chunked encoding because push-stream content + .ConfigureAwait(false); + + return result.Item1; + + static async Task WriteAsJson(Stream requestStream, T payload, JsonSerializerSettings serializationSettings, MultipartCompression compression) + { + // wrap in gzip if requested + using Stream gzip = compression == MultipartCompression.GZip + ? new GZipStream(requestStream, CompressionMode.Compress, leaveOpen: true) + : null; + var streamToWriteTo = gzip ?? requestStream; + + using var streamWriter = new StreamWriter(streamToWriteTo, EncodingHelpers.Utf8NoBom, bufferSize: 1024, leaveOpen: true); + using var jsonWriter = new JsonTextWriter(streamWriter) + { + CloseOutput = false + }; + var serializer = JsonSerializer.Create(serializationSettings); + serializer.Serialize(jsonWriter, payload); + await streamWriter.FlushAsync().ConfigureAwait(false); + await streamToWriteTo.FlushAsync().ConfigureAwait(false); + await requestStream.FlushAsync().ConfigureAwait(false); + } + } + public async Task PostAsync(Func writeToRequestStream, string contentType, string contentEncoding, string multipartBoundary) => (await SendAsync(WebRequestMethods.Http.Post, contentType, new HttpOverStreams.HttpContent.PushStreamContent(writeToRequestStream), contentEncoding, chunkedEncoding: true, multipartBoundary).ConfigureAwait(false)).Item1; diff --git a/tracer/src/Datadog.Trace/Agent/Transports/SerializationHelpers.cs b/tracer/src/Datadog.Trace/Agent/Transports/SerializationHelpers.cs new file mode 100644 index 000000000000..a534ae459b0e --- /dev/null +++ b/tracer/src/Datadog.Trace/Agent/Transports/SerializationHelpers.cs @@ -0,0 +1,23 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +#nullable enable + +using Datadog.Trace.Vendors.Newtonsoft.Json; +using Datadog.Trace.Vendors.Newtonsoft.Json.Serialization; + +namespace Datadog.Trace.Agent.Transports; + +internal static class SerializationHelpers +{ + public static readonly JsonSerializerSettings DefaultJsonSettings = new() + { + NullValueHandling = NullValueHandling.Ignore, + ContractResolver = new DefaultContractResolver + { + NamingStrategy = new SnakeCaseNamingStrategy(), + } + }; +} diff --git a/tracer/src/Datadog.Trace/RemoteConfigurationManagement/Transport/RemoteConfigurationApi.cs b/tracer/src/Datadog.Trace/RemoteConfigurationManagement/Transport/RemoteConfigurationApi.cs index b2616f9ac89b..c7a700a5a53f 100644 --- a/tracer/src/Datadog.Trace/RemoteConfigurationManagement/Transport/RemoteConfigurationApi.cs +++ b/tracer/src/Datadog.Trace/RemoteConfigurationManagement/Transport/RemoteConfigurationApi.cs @@ -60,11 +60,6 @@ public static RemoteConfigurationApi Create(IApiRequestFactory apiRequestFactory var uri = _apiRequestFactory.GetEndpoint(configEndpoint); var apiRequest = _apiRequestFactory.Create(uri); - var requestContent = JsonConvert.SerializeObject(request); - Log.Debug("Sending Remote Configuration Request: {Content}", requestContent); - var bytes = Encoding.UTF8.GetBytes(requestContent); - var payload = new ArraySegment(bytes); - if (_containerId != null) { apiRequest.AddHeader(AgentHttpHeaderNames.ContainerId, _containerId); @@ -75,7 +70,7 @@ public static RemoteConfigurationApi Create(IApiRequestFactory apiRequestFactory apiRequest.AddHeader(AgentHttpHeaderNames.EntityId, _entityId); } - using var apiResponse = await apiRequest.PostAsync(payload, MimeTypes.Json).ConfigureAwait(false); + using var apiResponse = await apiRequest.PostAsJsonAsync(request, MultipartCompression.None).ConfigureAwait(false); var isRcmDisabled = apiResponse.StatusCode == 404; if (isRcmDisabled) { diff --git a/tracer/src/Datadog.Trace/Telemetry/Transports/JsonTelemetryTransport.cs b/tracer/src/Datadog.Trace/Telemetry/Transports/JsonTelemetryTransport.cs index 530219ebf150..f30219245352 100644 --- a/tracer/src/Datadog.Trace/Telemetry/Transports/JsonTelemetryTransport.cs +++ b/tracer/src/Datadog.Trace/Telemetry/Transports/JsonTelemetryTransport.cs @@ -6,16 +6,12 @@ #nullable enable using System; -using System.Diagnostics.CodeAnalysis; -using System.IO; -using System.IO.Compression; using System.Net; -using System.Text; using System.Threading.Tasks; using Datadog.Trace.Agent; +using Datadog.Trace.Agent.Transports; using Datadog.Trace.Logging; using Datadog.Trace.PlatformHelpers; -using Datadog.Trace.SourceGenerators; using Datadog.Trace.Telemetry.Metrics; using Datadog.Trace.Util.Http; using Datadog.Trace.Vendors.Newtonsoft.Json; @@ -34,7 +30,6 @@ internal abstract class JsonTelemetryTransport : ITelemetryTransport private readonly string? _entityId; private readonly bool _enableDebug; private readonly bool _telemetryGzipCompressionEnabled; - private readonly string _telemetryCompressionMethod; protected JsonTelemetryTransport(IApiRequestFactory requestFactory, bool enableDebug, string telemetryCompressionMethod) { @@ -44,7 +39,6 @@ protected JsonTelemetryTransport(IApiRequestFactory requestFactory, bool enableD _containerId = ContainerMetadata.GetContainerId(); _entityId = ContainerMetadata.GetEntityId(); _telemetryGzipCompressionEnabled = telemetryCompressionMethod.Equals("gzip", StringComparison.OrdinalIgnoreCase); - _telemetryCompressionMethod = _telemetryGzipCompressionEnabled ? "gzip" : "uncompressed"; } protected string GetEndpointInfo() => _requestFactory.Info(_endpoint); @@ -55,17 +49,6 @@ public async Task PushTelemetry(TelemetryData data) try { - byte[] bytes; - - if (_telemetryGzipCompressionEnabled) - { - bytes = SerializeTelemetryWithGzip(data); - } - else - { - bytes = Encoding.UTF8.GetBytes(SerializeTelemetry(data)); - } - var request = _requestFactory.Create(_endpoint); request.AddHeader(TelemetryConstants.ApiVersionHeader, data.ApiVersion); request.AddHeader(TelemetryConstants.RequestTypeHeader, data.RequestType); @@ -87,11 +70,11 @@ public async Task PushTelemetry(TelemetryData data) TelemetryFactory.Metrics.RecordCountTelemetryApiRequests(endpointMetricTag); - using var response = await request.PostAsync(new ArraySegment(bytes), "application/json", _telemetryGzipCompressionEnabled ? "gzip" : null).ConfigureAwait(false); + using var response = await request.PostAsJsonAsync(data, _telemetryGzipCompressionEnabled ? MultipartCompression.GZip : MultipartCompression.None, SerializerSettings).ConfigureAwait(false); TelemetryFactory.Metrics.RecordCountTelemetryApiResponses(endpointMetricTag, response.GetTelemetryStatusCodeMetricTag()); if (response.StatusCode is >= 200 and < 300) { - Log.Debug("Telemetry sent successfully. Compression {Compression}", _telemetryCompressionMethod); + Log.Debug("Telemetry sent successfully. CompressionEnabled {Compression}", _telemetryGzipCompressionEnabled); return TelemetryPushResult.Success; } @@ -99,23 +82,23 @@ public async Task PushTelemetry(TelemetryData data) if (response.StatusCode == 404) { - Log.Debug("Error sending telemetry: 404. Disabling further telemetry, as endpoint '{Endpoint}' not found. Compression {Compression}", GetEndpointInfo(), _telemetryCompressionMethod); + Log.Debug("Error sending telemetry: 404. Disabling further telemetry, as endpoint '{Endpoint}' not found. CompressionEnabled {Compression}", GetEndpointInfo(), _telemetryGzipCompressionEnabled); return TelemetryPushResult.FatalError; } - Log.Debug("Error sending telemetry to '{Endpoint}' {StatusCode} . Compression {Compression}", GetEndpointInfo(), response.StatusCode, _telemetryCompressionMethod); + Log.Debug("Error sending telemetry to '{Endpoint}' {StatusCode} . CompressionEnabled {Compression}", GetEndpointInfo(), response.StatusCode, _telemetryGzipCompressionEnabled); return TelemetryPushResult.TransientFailure; } catch (Exception ex) when (IsFatalException(ex)) { - Log.Information(ex, "Error sending telemetry data, unable to communicate with '{Endpoint}'. Compression {Compression}", GetEndpointInfo(), _telemetryCompressionMethod); + Log.Information(ex, "Error sending telemetry data, unable to communicate with '{Endpoint}'. CompressionEnabled {Compression}", GetEndpointInfo(), _telemetryGzipCompressionEnabled); var tag = ex is TimeoutException ? MetricTags.ApiError.Timeout : MetricTags.ApiError.NetworkError; TelemetryFactory.Metrics.RecordCountTelemetryApiErrors(endpointMetricTag, tag); return TelemetryPushResult.FatalError; } catch (Exception ex) { - Log.Information(ex, "Error sending telemetry data to '{Endpoint}'. Compression {Compression}", GetEndpointInfo(), _telemetryCompressionMethod); + Log.Information(ex, "Error sending telemetry data to '{Endpoint}'. CompressionEnabled {Compression}", GetEndpointInfo(), _telemetryGzipCompressionEnabled); var tag = ex is TimeoutException ? MetricTags.ApiError.Timeout : MetricTags.ApiError.NetworkError; TelemetryFactory.Metrics.RecordCountTelemetryApiErrors(endpointMetricTag, tag); return TelemetryPushResult.TransientFailure; @@ -124,26 +107,8 @@ public async Task PushTelemetry(TelemetryData data) public abstract string GetTransportInfo(); - [TestingAndPrivateOnly] - internal static string SerializeTelemetry(T data) => JsonConvert.SerializeObject(data, Formatting.None, SerializerSettings); - protected abstract MetricTags.TelemetryEndpoint GetEndpointMetricTag(); - internal static byte[] SerializeTelemetryWithGzip(T data) - { - using var memStream = new MemoryStream(); - using (var zipStream = new GZipStream(memStream, CompressionMode.Compress, true)) - { - using var streamWriter = new StreamWriter(zipStream); - using var jsonWriter = new JsonTextWriter(streamWriter); - var serializer = new JsonSerializer { NullValueHandling = NullValueHandling.Ignore, ContractResolver = new DefaultContractResolver { NamingStrategy = new SnakeCaseNamingStrategy(), }, Formatting = Formatting.None }; - - serializer.Serialize(jsonWriter, data); - } - - return memStream.ToArray(); - } - private static bool IsFatalException(Exception ex) { return ex.IsSocketException() diff --git a/tracer/test/Datadog.Trace.TestHelpers/TransportHelpers/TestApiRequest.cs b/tracer/test/Datadog.Trace.TestHelpers/TransportHelpers/TestApiRequest.cs index ac4c7799269a..945c18fa9f9a 100644 --- a/tracer/test/Datadog.Trace.TestHelpers/TransportHelpers/TestApiRequest.cs +++ b/tracer/test/Datadog.Trace.TestHelpers/TransportHelpers/TestApiRequest.cs @@ -10,6 +10,7 @@ using System.Threading.Tasks; using Datadog.Trace.Agent; using Datadog.Trace.Agent.Transports; +using Datadog.Trace.Vendors.Newtonsoft.Json; namespace Datadog.Trace.TestHelpers.TransportHelpers; @@ -64,6 +65,18 @@ public virtual Task PostAsync(ArraySegment bytes, string con return Task.FromResult((IApiResponse)response); } + public virtual Task PostAsJsonAsync(T payload, MultipartCompression compression) + => PostAsJsonAsync(payload, compression, SerializationHelpers.DefaultJsonSettings); + + public virtual Task PostAsJsonAsync(T payload, MultipartCompression compression, JsonSerializerSettings settings) + { + var response = new TestApiResponse(_statusCode, _responseContent, _responseContentType); + Responses.Add(response); + ContentType = MimeTypes.Json; + + return Task.FromResult((IApiResponse)response); + } + public async Task PostAsync(Func writeToRequestStream, string contentType, string contentEncoding, string multipartBoundary) { using (var ms = new MemoryStream()) diff --git a/tracer/test/Datadog.Trace.Tests/Agent/Transports/ApiRequestTests.cs b/tracer/test/Datadog.Trace.Tests/Agent/Transports/ApiRequestTests.cs new file mode 100644 index 000000000000..24871b284715 --- /dev/null +++ b/tracer/test/Datadog.Trace.Tests/Agent/Transports/ApiRequestTests.cs @@ -0,0 +1,251 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Datadog.Trace.Agent; +using Datadog.Trace.Agent.StreamFactories; +using Datadog.Trace.Agent.Transports; +using Datadog.Trace.HttpOverStreams; +using Datadog.Trace.Telemetry; +using Datadog.Trace.TestHelpers; +using Datadog.Trace.Util; +using Datadog.Trace.Vendors.Newtonsoft.Json; +using Datadog.Trace.Vendors.Newtonsoft.Json.Serialization; +using FluentAssertions; +using VerifyXunit; +using Xunit; +using Xunit.Abstractions; + +namespace Datadog.Trace.Tests.Agent.Transports; + +[Collection(nameof(WebRequestCollection))] +[UsesVerify] +public class ApiRequestTests +{ + // Matches SerializationHelpers.DefaultSettings + private static readonly JsonSerializerSettings DefaultSettings = new() { NullValueHandling = NullValueHandling.Ignore, ContractResolver = new DefaultContractResolver { NamingStrategy = new SnakeCaseNamingStrategy(), } }; + + private static readonly Uri Localhost = new Uri("http://localhost"); + private readonly ITestOutputHelper _output; + + public ApiRequestTests(ITestOutputHelper output) + { + _output = output; + VerifyHelper.InitializeGlobalSettings(); + } + + [Theory] + [CombinatorialData] + public async Task ApiWebRequest(bool useGzip) + { + using var agent = MockTracerAgent.Create(_output); + var url = new Uri($"http://localhost:{agent.Port}/"); + var factory = new ApiWebRequestFactory(url, AgentHttpHeaderNames.DefaultHeaders); + await RunTest(agent, () => factory.Create(url), useGzip); + } + +#if NETCOREAPP3_1_OR_GREATER + + [Theory] + [CombinatorialData] + public async Task HttpClientRequest(bool useGzip) + { + using var agent = MockTracerAgent.Create(_output); + var url = new Uri($"http://localhost:{agent.Port}/"); + var factory = new HttpClientRequestFactory(url, AgentHttpHeaderNames.DefaultHeaders); + await RunTest(agent, () => factory.Create(url), useGzip); + } + + [Theory] + [CombinatorialData] + public async Task HttpStreamRequest_UDS(bool useGzip) + { + using var agent = MockTracerAgent.Create(_output, new UnixDomainSocketConfig(Path.Combine(Path.GetTempPath(), Path.GetRandomFileName()), null)); + var factory = new HttpStreamRequestFactory( + new UnixDomainSocketStreamFactory(agent.TracesUdsPath), + new DatadogHttpClient(new TraceAgentHttpHeaderHelper()), + Localhost); + await RunTest(agent, () => factory.Create(Localhost), useGzip); + } +#endif + +#if NET6_0_OR_GREATER + [Theory] + [CombinatorialData] + public async Task HttpClientRequest_UDS(bool useGzip) + { + using var agent = MockTracerAgent.Create(_output, new UnixDomainSocketConfig(Path.Combine(Path.GetTempPath(), Path.GetRandomFileName()), null)); + var factory = new SocketHandlerRequestFactory( + new UnixDomainSocketStreamFactory(agent.TracesUdsPath), + AgentHttpHeaderNames.DefaultHeaders, + Localhost); + await RunTest(agent, () => factory.Create(Localhost), useGzip); + } +#endif + + private async Task RunTest(MockTracerAgent agent, Func createRequest, bool useGzip) + { + agent.ShouldDeserializeTraces = false; + byte[] requestBody = null; + agent.RequestReceived += (_, args) => + { + requestBody = args.Value.ReadStreamBody(); + }; + + var request = createRequest(); + var compression = useGzip ? MultipartCompression.GZip : MultipartCompression.None; + var payload = GetData(); + await request.PostAsJsonAsync(payload, compression); + + // payload should be the same as if we had serialized directly + // We have to use the vendored NewtonsoftJson here to ensure it reads all the attributes etc correctly + var expectedPayload = EncodingHelpers.Utf8NoBom.GetBytes(JsonConvert.SerializeObject(payload, DefaultSettings)); + requestBody.Should().NotBeNull().And.Equal(expectedPayload, "serialized request body was '{0}' but expected '{1}'", EncodingHelpers.Utf8NoBom.GetString(requestBody), EncodingHelpers.Utf8NoBom.GetString(expectedPayload)); + } + + private TelemetryData GetData() => + new TelemetryData( + requestType: TelemetryRequestTypes.GenerateMetrics, + runtimeId: "20338dfd-f700-4e5c-b3f6-0d470f054ae8", + seqId: 5672, + tracerTime: 1628099086, + application: new ApplicationTelemetryData( + serviceName: "myapp", + env: "prod", + serviceVersion: "1.2.3", + tracerVersion: "0.33.1", + languageName: "node.js", + languageVersion: "14.16.1", + runtimeName: "dotnet", + runtimeVersion: "7.0.3", + commitSha: "testCommitSha", + repositoryUrl: "testRepositoryUrl", + processTags: "entrypoint.basedir:Users,entrypoint.workdir:Downloads"), + host: new HostTelemetryData( + hostname: "i-09ecf74c319c49be8", + os: "GNU/Linux", + architecture: "x86_64") + { + OsVersion = "ubuntu 18.04.5 LTS (Bionic Beaver)", + KernelName = "Linux", + KernelRelease = "5.4.0-1037-gcp", + KernelVersion = "#40~18.04.1-Ubuntu SMP Fri Feb 5 15:41:35 UTC 2021" + }, + payload: new GenerateMetricsPayload( + new MetricData[] + { + new( + "tracer_init_time", + new MetricSeries() + { + new(1575317847, 2241), + new(1575317947, 2352), + }, + common: true, + type: MetricTypeConstants.Count) + { + Tags = new[] + { + "org_id: 2", + "environment:test" + } + }, + new( + "app_sec_initialization_time", + new MetricSeries() + { + new(1575317447, 254), + new(1575317547, 643), + }, + common: false, + type: MetricTypeConstants.Gauge) + { + Namespace = MetricNamespaceConstants.ASM, + Interval = 60, + }, + })); +} + + + // #endif + // + // [Theory] + // [MemberData(nameof(GetTestData))] + // [Trait("Category", "LinuxUnsupported")] + // public async Task HttpStreamRequest_NamedPipes_MultipartTest(bool useStream, bool useGzip) + // { + // if (!EnvironmentTools.IsWindows()) + // { + // // Can't use WindowsNamedPipes on non-Windows + // return; + // } + // + // // named pipes is notoriously flaky + // var attemptsRemaining = 1; + // while (true) + // { + // try + // { + // attemptsRemaining--; + // await RunNamedPipesTest(); + // return; + // } + // catch (Exception ex) when (attemptsRemaining > 0 && ex is not SkipException) + // { + // } + // } + // + // async Task RunNamedPipesTest() + // { + // using var agent = MockTracerAgent.Create(_output, new WindowsPipesConfig($"trace-{Guid.NewGuid()}", null)); + // var factory = new HttpStreamRequestFactory( + // new NamedPipeClientStreamFactory(agent.TracesWindowsPipeName, timeoutMs: 100), + // new DatadogHttpClient(new TraceAgentHttpHeaderHelper()), + // Localhost); + // await RunTest(agent, () => factory.Create(Localhost), useStream, useGzip, nameof(ApiWebRequest_MultipartTest)); + // } + // } + // + // [Theory] + // [MemberData(nameof(GetTestData))] + // [Trait("Category", "LinuxUnsupported")] + // public async Task HttpStreamRequest_NamedPipes_VerificationTest(bool useStream, bool useGzip) + // { + // if (!EnvironmentTools.IsWindows()) + // { + // // Can't use WindowsNamedPipes on non-Windows + // return; + // } + // + // // named pipes is notoriously flaky + // var attemptsRemaining = 1; + // while (true) + // { + // try + // { + // attemptsRemaining--; + // await RunNamedPipesTest(); + // return; + // } + // catch (Exception ex) when (attemptsRemaining > 0 && ex is not SkipException) + // { + // } + // } + // + // async Task RunNamedPipesTest() + // { + // using var agent = MockTracerAgent.Create(_output, new WindowsPipesConfig($"trace-{Guid.NewGuid()}", null)); + // var factory = new HttpStreamRequestFactory( + // new NamedPipeClientStreamFactory(agent.TracesWindowsPipeName, timeoutMs: 100), + // new DatadogHttpClient(new TraceAgentHttpHeaderHelper()), + // Localhost); + // await RunValidationTest(agent, () => factory.Create(Localhost), useStream, useGzip, nameof(ApiWebRequest_ValidationTest)); + // } + // } diff --git a/tracer/test/Datadog.Trace.Tests/Telemetry/Transports/JsonTelemetryTransportTests.cs b/tracer/test/Datadog.Trace.Tests/Telemetry/Transports/JsonTelemetryTransportTests.cs index 61043ed5440b..bb4684ff5fc4 100644 --- a/tracer/test/Datadog.Trace.Tests/Telemetry/Transports/JsonTelemetryTransportTests.cs +++ b/tracer/test/Datadog.Trace.Tests/Telemetry/Transports/JsonTelemetryTransportTests.cs @@ -9,6 +9,7 @@ using Datadog.Trace.Telemetry; using Datadog.Trace.Telemetry.Transports; using Datadog.Trace.TestHelpers.FluentAssertionsExtensions.Json; +using Datadog.Trace.Vendors.Newtonsoft.Json; using Datadog.Trace.Vendors.Newtonsoft.Json.Linq; using FluentAssertions; using Xunit; @@ -74,7 +75,7 @@ public void SerializedAppStartedShouldProduceJsonWithExpectedFormat() NamingSchemaVersion = "1" }; - var serialized = JsonTelemetryTransport.SerializeTelemetry(data); + var serialized = SerializeTelemetry(data); serialized.Should().NotBeNullOrEmpty(); var actualJson = JToken.Parse(serialized); @@ -146,7 +147,7 @@ public void SerializedMetricsTelemetryShouldProduceJsonWithExpectedFormat() }, })); - var serialized = JsonTelemetryTransport.SerializeTelemetry(data); + var serialized = SerializeTelemetry(data); serialized.Should().NotBeNullOrEmpty(); var actualJson = JToken.Parse(serialized); @@ -213,7 +214,7 @@ public void SerializedDistributionMetricsTelemetryShouldProduceJsonWithExpectedF }, })); - var serialized = JsonTelemetryTransport.SerializeTelemetry(data); + var serialized = SerializeTelemetry(data); serialized.Should().NotBeNullOrEmpty(); var actualJson = JToken.Parse(serialized); @@ -284,7 +285,7 @@ public void SerializedMessageBatchShouldProduceJsonWithExpectedFormat() NamingSchemaVersion = "1" }; - var serialized = JsonTelemetryTransport.SerializeTelemetry(data); + var serialized = SerializeTelemetry(data); serialized.Should().NotBeNullOrEmpty(); var actualJson = JToken.Parse(serialized); @@ -310,5 +311,7 @@ private static string GetSampleTelemetryData(string filename) using var streamReader = new StreamReader(stream); return streamReader.ReadToEnd(); } + + private static string SerializeTelemetry(T data) => JsonConvert.SerializeObject(data, Formatting.None, JsonTelemetryTransport.SerializerSettings); } } diff --git a/tracer/test/benchmarks/Benchmarks.Trace/AgentWriterBenchmark.cs b/tracer/test/benchmarks/Benchmarks.Trace/AgentWriterBenchmark.cs index 071abd941b86..cb385963aea5 100644 --- a/tracer/test/benchmarks/Benchmarks.Trace/AgentWriterBenchmark.cs +++ b/tracer/test/benchmarks/Benchmarks.Trace/AgentWriterBenchmark.cs @@ -11,6 +11,7 @@ using Datadog.Trace.DogStatsd; using Datadog.Trace.Tagging; using Datadog.Trace.Util; +using Datadog.Trace.Vendors.Newtonsoft.Json; namespace Benchmarks.Trace { @@ -159,6 +160,12 @@ public async Task PostAsync(ArraySegment traces, string cont return new FakeApiResponse(); } + public Task PostAsJsonAsync(T payload, MultipartCompression compression) + => throw new NotImplementedException(); + + public Task PostAsJsonAsync(T payload, MultipartCompression compression, JsonSerializerSettings settings) + => throw new NotImplementedException(); + public async Task PostAsync(Func writeToRequestStream, string contentType, string contentEncoding, string multipartBoundary) { using (var requestStream = Stream.Null)