diff --git a/src/OpenTelemetry.Extensions/.publicApi/PublicAPI.Unshipped.txt b/src/OpenTelemetry.Extensions/.publicApi/PublicAPI.Unshipped.txt index 064129dc23..0a58d75ab3 100644 --- a/src/OpenTelemetry.Extensions/.publicApi/PublicAPI.Unshipped.txt +++ b/src/OpenTelemetry.Extensions/.publicApi/PublicAPI.Unshipped.txt @@ -1,5 +1,7 @@ #nullable enable Microsoft.Extensions.Logging.OpenTelemetryLoggingExtensions +OpenTelemetry.Extensions.Trace.PartialActivityProcessor +OpenTelemetry.Extensions.Trace.PartialActivityProcessor.PartialActivityProcessor(OpenTelemetry.BaseExporter! logExporter) -> void OpenTelemetry.Logs.LogToActivityEventConversionOptions OpenTelemetry.Logs.LogToActivityEventConversionOptions.Filter.get -> System.Func? OpenTelemetry.Logs.LogToActivityEventConversionOptions.Filter.set -> void @@ -12,6 +14,10 @@ OpenTelemetry.RateLimitingSampler OpenTelemetry.RateLimitingSampler.RateLimitingSampler(int maxTracesPerSecond) -> void OpenTelemetry.Trace.BaggageActivityProcessor OpenTelemetry.Trace.TracerProviderBuilderExtensions +override OpenTelemetry.Extensions.Trace.PartialActivityProcessor.Dispose(bool disposing) -> void +override OpenTelemetry.Extensions.Trace.PartialActivityProcessor.OnEnd(System.Diagnostics.Activity! data) -> void +override OpenTelemetry.Extensions.Trace.PartialActivityProcessor.OnShutdown(int timeoutMilliseconds) -> bool +override OpenTelemetry.Extensions.Trace.PartialActivityProcessor.OnStart(System.Diagnostics.Activity! data) -> void override OpenTelemetry.RateLimitingSampler.ShouldSample(in OpenTelemetry.Trace.SamplingParameters samplingParameters) -> OpenTelemetry.Trace.SamplingResult override OpenTelemetry.Trace.BaggageActivityProcessor.OnStart(System.Diagnostics.Activity! data) -> void static Microsoft.Extensions.Logging.OpenTelemetryLoggingExtensions.AddBaggageProcessor(this OpenTelemetry.Logs.LoggerProviderBuilder! builder) -> OpenTelemetry.Logs.LoggerProviderBuilder! diff --git a/src/OpenTelemetry.Extensions/OpenTelemetry.Extensions.csproj b/src/OpenTelemetry.Extensions/OpenTelemetry.Extensions.csproj index 74b07e5117..3747791636 100644 --- a/src/OpenTelemetry.Extensions/OpenTelemetry.Extensions.csproj +++ b/src/OpenTelemetry.Extensions/OpenTelemetry.Extensions.csproj @@ -21,6 +21,7 @@ + diff --git a/src/OpenTelemetry.Extensions/Trace/PartialActivityProcessor.cs b/src/OpenTelemetry.Extensions/Trace/PartialActivityProcessor.cs new file mode 100644 index 0000000000..8d375941cd --- /dev/null +++ b/src/OpenTelemetry.Extensions/Trace/PartialActivityProcessor.cs @@ -0,0 +1,188 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +using System.Diagnostics; +using System.Reflection; +using Microsoft.Extensions.Logging; +using OpenTelemetry.Logs; + +namespace OpenTelemetry.Extensions.Trace; + +/// +/// Activity processor that exports LogRecord on Activity start, during it's lifetime as a heartbeat and on end. +/// +/// +/// Add this processor *before* exporter related Activity processors. +/// +public class PartialActivityProcessor : BaseProcessor +{ + private static MethodInfo writeTraceDataMethod = null!; + private static ConstructorInfo logRecordConstructor = null!; + private static object sdkLimitOptions = null!; + private readonly ManualResetEvent shutdownTrigger; + private readonly BaseExporter logExporter; + + /// + /// Initializes a new instance of the class. + /// + /// Log exporter to be used. + public PartialActivityProcessor(BaseExporter logExporter) + { + this.logExporter = logExporter ?? throw new ArgumentNullException(nameof(logExporter)); + + this.shutdownTrigger = new ManualResetEvent(false); + + // Access OpenTelemetry internals as soon as possible to fail fast rather than waiting for the first heartbeat + AccessOpenTelemetryInternals( + out writeTraceDataMethod, + out logRecordConstructor, + out sdkLimitOptions); + } + + /// + public override void OnStart(Activity data) + { + if (data == null) + { + return; + } + + var logRecord = GetLogRecord(data, GetStartLogRecordAttributes()); + this.logExporter.Export(new Batch(logRecord)); + } + + /// + public override void OnEnd(Activity data) + { + if (data == null) + { + return; + } + + var logRecord = GetLogRecord(data, GetEndLogRecordAttributes()); + this.logExporter.Export(new Batch(logRecord)); + } + + /// + protected override bool OnShutdown(int timeoutMilliseconds) + { + try + { + this.shutdownTrigger.Set(); + } + catch (ObjectDisposedException) + { + return false; + } + + switch (timeoutMilliseconds) + { + case Timeout.Infinite: + return this.logExporter.Shutdown(); + case 0: + return this.logExporter.Shutdown(0); + } + + var sw = Stopwatch.StartNew(); + var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds; + return this.logExporter.Shutdown((int)Math.Max(timeout, 0)); + } + + /// + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + this.shutdownTrigger.Dispose(); + } + + private static LogRecord GetLogRecord( + Activity data, + List> logRecordAttributesToBeAdded) + { + var buffer = new byte[750000]; + + var result = writeTraceDataMethod.Invoke( + null, + [buffer, 0, sdkLimitOptions, null!, new Batch(data)]); + var writePosition = result as int? ?? 0; // Use a default value if null + + var logRecord = (LogRecord)logRecordConstructor.Invoke(null); + logRecord.Timestamp = DateTime.UtcNow; + logRecord.TraceId = data.TraceId; + logRecord.SpanId = data.SpanId; + logRecord.TraceFlags = ActivityTraceFlags.None; + logRecord.Body = Convert.ToBase64String(buffer, 0, writePosition); + logRecord.LogLevel = LogLevel.Information; + + var logRecordAttributes = GetLogRecordAttributes(); + logRecordAttributes.AddRange(logRecordAttributesToBeAdded); + logRecord.Attributes = logRecordAttributes; + + return logRecord; + } + + private static void AccessOpenTelemetryInternals( + out MethodInfo writeTraceDataMethodParam, + out ConstructorInfo logRecordConstructorParam, + out object sdkLimitOptionsParam) + { + var sdkLimitOptionsType = Type.GetType( + "OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.SdkLimitOptions, OpenTelemetry.Exporter.OpenTelemetryProtocol", + true); + + if (sdkLimitOptionsType == null) + { + throw new InvalidOperationException("Failed to get the type 'SdkLimitOptions'."); + } + + sdkLimitOptionsParam = Activator.CreateInstance(sdkLimitOptionsType, nonPublic: true) ?? + throw new InvalidOperationException( + "Failed to create an instance of 'SdkLimitOptions'."); + + var protobufOtlpTraceSerializerType = Type.GetType( + "OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer.ProtobufOtlpTraceSerializer, OpenTelemetry.Exporter.OpenTelemetryProtocol", + true); + + if (protobufOtlpTraceSerializerType == null) + { + throw new InvalidOperationException( + "Failed to get the type 'ProtobufOtlpTraceSerializer'."); + } + + writeTraceDataMethodParam = + protobufOtlpTraceSerializerType.GetMethod( + "WriteTraceData", + BindingFlags.NonPublic | BindingFlags.Static) ?? + throw new InvalidOperationException("Failed to get the method 'WriteTraceData'."); + + var logRecordType = Type.GetType("OpenTelemetry.Logs.LogRecord, OpenTelemetry", true); + + if (logRecordType == null) + { + throw new InvalidOperationException("Failed to get the type 'LogRecord'."); + } + + logRecordConstructorParam = logRecordType.GetConstructor( + BindingFlags.NonPublic | BindingFlags.Instance, + null, + Type.EmptyTypes, + null) ?? + throw new InvalidOperationException( + "Failed to get the constructor of 'LogRecord'."); + } + + private static List> GetLogRecordAttributes() => + [ + new("telemetry.logs.cluster", "partial"), + new("telemetry.logs.project", "span"), + ]; + + private static List> GetEndLogRecordAttributes() => + [ + new("span.state", "ended") + ]; + + private static List> GetStartLogRecordAttributes() => + [ + new("span.state", "started") + ]; +} diff --git a/test/OpenTelemetry.Extensions.Tests/Trace/PartialActivityProcessorTests.cs b/test/OpenTelemetry.Extensions.Tests/Trace/PartialActivityProcessorTests.cs new file mode 100644 index 0000000000..4b98bb297d --- /dev/null +++ b/test/OpenTelemetry.Extensions.Tests/Trace/PartialActivityProcessorTests.cs @@ -0,0 +1,48 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +using System.Diagnostics; +using OpenTelemetry.Exporter; +using OpenTelemetry.Extensions.Trace; +using OpenTelemetry.Logs; +using Xunit; + +namespace OpenTelemetry.Extensions.Tests.Trace; + +public class PartialActivityProcessorTests +{ + private readonly List exportedLogs = []; + private readonly PartialActivityProcessor processor; + + public PartialActivityProcessorTests() + { + InMemoryExporter + logExporter = new InMemoryExporter(this.exportedLogs); + this.processor = new PartialActivityProcessor(logExporter); + } + + [Fact] + public void Constructor_ShouldInitializeFields() => Assert.NotNull(this.processor); + + [Fact] + public void OnStart_ShouldExportStartLog() + { + var activity = new Activity("TestActivity"); + + this.processor.OnStart(activity); + + Assert.Single(this.exportedLogs); + } + + [Fact] + public void OnEnd_ShouldExportEndLog() + { + var activity = new Activity("TestActivity"); + + this.processor.OnStart(activity); + + this.processor.OnEnd(activity); + + Assert.Equal(2, this.exportedLogs.Count); + } +}