Skip to content

[feature request] Add PartialActivityProcessor to provide additional visibility to very long-running processes #2704

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#nullable enable
Microsoft.Extensions.Logging.OpenTelemetryLoggingExtensions
OpenTelemetry.Extensions.Trace.PartialActivityProcessor
OpenTelemetry.Extensions.Trace.PartialActivityProcessor.PartialActivityProcessor(OpenTelemetry.BaseExporter<OpenTelemetry.Logs.LogRecord!>! logExporter) -> void
OpenTelemetry.Logs.LogToActivityEventConversionOptions
OpenTelemetry.Logs.LogToActivityEventConversionOptions.Filter.get -> System.Func<OpenTelemetry.Logs.LogRecord!, bool>?
OpenTelemetry.Logs.LogToActivityEventConversionOptions.Filter.set -> void
Expand All @@ -12,6 +14,10 @@ OpenTelemetry.RateLimitingSampler
OpenTelemetry.RateLimitingSampler.RateLimitingSampler(int maxTracesPerSecond) -> void
OpenTelemetry.Trace.BaggageActivityProcessor
OpenTelemetry.Trace.TracerProviderBuilderExtensions
override OpenTelemetry.Extensions.Trace.PartialActivityProcessor.Dispose(bool disposing) -> void
override OpenTelemetry.Extensions.Trace.PartialActivityProcessor.OnEnd(System.Diagnostics.Activity! data) -> void
override OpenTelemetry.Extensions.Trace.PartialActivityProcessor.OnShutdown(int timeoutMilliseconds) -> bool
override OpenTelemetry.Extensions.Trace.PartialActivityProcessor.OnStart(System.Diagnostics.Activity! data) -> void
override OpenTelemetry.RateLimitingSampler.ShouldSample(in OpenTelemetry.Trace.SamplingParameters samplingParameters) -> OpenTelemetry.Trace.SamplingResult
override OpenTelemetry.Trace.BaggageActivityProcessor.OnStart(System.Diagnostics.Activity! data) -> void
static Microsoft.Extensions.Logging.OpenTelemetryLoggingExtensions.AddBaggageProcessor(this OpenTelemetry.Logs.LoggerProviderBuilder! builder) -> OpenTelemetry.Logs.LoggerProviderBuilder!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

<ItemGroup>
<PackageReference Include="OpenTelemetry" Version="$(OpenTelemetryCoreLatestVersion)" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="$(OpenTelemetryCoreLatestVersion)" />
</ItemGroup>

</Project>
188 changes: 188 additions & 0 deletions src/OpenTelemetry.Extensions/Trace/PartialActivityProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
using System.Diagnostics;
using System.Reflection;
using Microsoft.Extensions.Logging;
using OpenTelemetry.Logs;

namespace OpenTelemetry.Extensions.Trace;

/// <summary>
/// Activity processor that exports LogRecord on Activity start, during it's lifetime as a heartbeat and on end.
/// </summary>
/// <remarks>
/// Add this processor *before* exporter related Activity processors.
/// </remarks>
public class PartialActivityProcessor : BaseProcessor<Activity>
{
private static MethodInfo writeTraceDataMethod = null!;
private static ConstructorInfo logRecordConstructor = null!;
private static object sdkLimitOptions = null!;
private readonly ManualResetEvent shutdownTrigger;
private readonly BaseExporter<LogRecord> logExporter;

/// <summary>
/// Initializes a new instance of the <see cref="PartialActivityProcessor"/> class.
/// </summary>
/// <param name="logExporter">Log exporter to be used.</param>
public PartialActivityProcessor(BaseExporter<LogRecord> logExporter)
{
this.logExporter = logExporter ?? throw new ArgumentNullException(nameof(logExporter));

this.shutdownTrigger = new ManualResetEvent(false);

// Access OpenTelemetry internals as soon as possible to fail fast rather than waiting for the first heartbeat
AccessOpenTelemetryInternals(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pellared while at it, do you know of any good practice in open telemetry, where this can be rewritten without any hacks or ugly practices?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I am not a maintainer/approver here 😉
CC @open-telemetry/dotnet-approvers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think it is a good idea to rely on any internals. If your intent to send a signal at Activity start/end, accept a ILoggerFactory/iLogger, and then use that to emit the log, just like how end users would do.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the idea about accessing internals comes from serializing span into proto base64 encoded representation to body of log record so that partial collector (there should be soon a pr to collector contrib) can reuse span data. this is a broader approach.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we really need is just a way to serialise all the Span content into a log message so that if the Span never completes normally (e.g. due to a process crash) we can rehydrate it in the component that has been recording these partial span events (including all the original attributes, etc.), mark it as finished in an error state and send it back in to our OTEL pipeline so it's not lost.

Using the serialiser from OpenTelemetry.Exporter.OpenTelemetryProtocol seems the obvious way to do that without missing something, but that code's not on the public API.

FWIW I agree that accessing this via reflection is far from ideal (versioning issues worry me). Does anyone know why this isn't public in the upstream exporter?

Copy link
Member

@pellared pellared May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serializing to OTLP would be coupling the processor to certain exporter implementation. We should simply encode the span as a complex log attribute named span.state. The the OTLP exporter would be responsible to serialize it accordingly.

PS. I have no idea how OTel .NET SDK handles complex log attributes. Referring to @open-telemetry/dotnet-approvers so that they can you out if necessary.

out writeTraceDataMethod,
out logRecordConstructor,
out sdkLimitOptions);
}

/// <inheritdoc />
public override void OnStart(Activity data)
{
if (data == null)
{
return;
}

var logRecord = GetLogRecord(data, GetStartLogRecordAttributes());
this.logExporter.Export(new Batch<LogRecord>(logRecord));
}

/// <inheritdoc />
public override void OnEnd(Activity data)
{
if (data == null)
{
return;
}

var logRecord = GetLogRecord(data, GetEndLogRecordAttributes());
this.logExporter.Export(new Batch<LogRecord>(logRecord));
}

/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds)
{
try
{
this.shutdownTrigger.Set();
}
catch (ObjectDisposedException)
{
return false;
}

switch (timeoutMilliseconds)
{
case Timeout.Infinite:
return this.logExporter.Shutdown();
case 0:
return this.logExporter.Shutdown(0);
}

var sw = Stopwatch.StartNew();
var timeout = timeoutMilliseconds - sw.ElapsedMilliseconds;
return this.logExporter.Shutdown((int)Math.Max(timeout, 0));
}

/// <inheritdoc />
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
this.shutdownTrigger.Dispose();
}

private static LogRecord GetLogRecord(
Activity data,
List<KeyValuePair<string, object?>> logRecordAttributesToBeAdded)
{
var buffer = new byte[750000];

var result = writeTraceDataMethod.Invoke(
null,
[buffer, 0, sdkLimitOptions, null!, new Batch<Activity>(data)]);
var writePosition = result as int? ?? 0; // Use a default value if null

var logRecord = (LogRecord)logRecordConstructor.Invoke(null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also set the Severity and EventId.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about INFO for Severity?

About EventId, it get a little complicated. In docs it says that it should be only set when used through ILogger - and this is not the case. From where/what would it value be derived from?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about INFO for Severity?

Works for me.

From where/what would it value be derived from?

This should be something which defines the name of the event. Maybe simply span.state.

Example: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/gen-ai-events.md

logRecord.Timestamp = DateTime.UtcNow;
logRecord.TraceId = data.TraceId;
logRecord.SpanId = data.SpanId;
logRecord.TraceFlags = ActivityTraceFlags.None;
logRecord.Body = Convert.ToBase64String(buffer, 0, writePosition);
logRecord.LogLevel = LogLevel.Information;

var logRecordAttributes = GetLogRecordAttributes();
logRecordAttributes.AddRange(logRecordAttributesToBeAdded);
logRecord.Attributes = logRecordAttributes;

return logRecord;
}

private static void AccessOpenTelemetryInternals(
out MethodInfo writeTraceDataMethodParam,
out ConstructorInfo logRecordConstructorParam,
out object sdkLimitOptionsParam)
{
var sdkLimitOptionsType = Type.GetType(
"OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.SdkLimitOptions, OpenTelemetry.Exporter.OpenTelemetryProtocol",
true);

if (sdkLimitOptionsType == null)
{
throw new InvalidOperationException("Failed to get the type 'SdkLimitOptions'.");
}

sdkLimitOptionsParam = Activator.CreateInstance(sdkLimitOptionsType, nonPublic: true) ??
throw new InvalidOperationException(
"Failed to create an instance of 'SdkLimitOptions'.");

var protobufOtlpTraceSerializerType = Type.GetType(
"OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer.ProtobufOtlpTraceSerializer, OpenTelemetry.Exporter.OpenTelemetryProtocol",
true);

if (protobufOtlpTraceSerializerType == null)
{
throw new InvalidOperationException(
"Failed to get the type 'ProtobufOtlpTraceSerializer'.");
}

writeTraceDataMethodParam =
protobufOtlpTraceSerializerType.GetMethod(
"WriteTraceData",
BindingFlags.NonPublic | BindingFlags.Static) ??
throw new InvalidOperationException("Failed to get the method 'WriteTraceData'.");

var logRecordType = Type.GetType("OpenTelemetry.Logs.LogRecord, OpenTelemetry", true);

if (logRecordType == null)
{
throw new InvalidOperationException("Failed to get the type 'LogRecord'.");
}

logRecordConstructorParam = logRecordType.GetConstructor(
BindingFlags.NonPublic | BindingFlags.Instance,
null,
Type.EmptyTypes,
null) ??
throw new InvalidOperationException(
"Failed to get the constructor of 'LogRecord'.");
}

private static List<KeyValuePair<string, object?>> GetLogRecordAttributes() =>
[
new("telemetry.logs.cluster", "partial"),
new("telemetry.logs.project", "span"),
];

private static List<KeyValuePair<string, object?>> GetEndLogRecordAttributes() =>
[
new("span.state", "ended")
];

private static List<KeyValuePair<string, object?>> GetStartLogRecordAttributes() =>
[
new("span.state", "started")
];
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Diagnostics;
using OpenTelemetry.Exporter;
using OpenTelemetry.Extensions.Trace;
using OpenTelemetry.Logs;
using Xunit;

namespace OpenTelemetry.Extensions.Tests.Trace;

public class PartialActivityProcessorTests
{
private readonly List<LogRecord> exportedLogs = [];
private readonly PartialActivityProcessor processor;

public PartialActivityProcessorTests()
{
InMemoryExporter<LogRecord>
logExporter = new InMemoryExporter<LogRecord>(this.exportedLogs);
this.processor = new PartialActivityProcessor(logExporter);
}

[Fact]
public void Constructor_ShouldInitializeFields() => Assert.NotNull(this.processor);

[Fact]
public void OnStart_ShouldExportStartLog()
{
var activity = new Activity("TestActivity");

this.processor.OnStart(activity);

Assert.Single(this.exportedLogs);
}

[Fact]
public void OnEnd_ShouldExportEndLog()
{
var activity = new Activity("TestActivity");

this.processor.OnStart(activity);

this.processor.OnEnd(activity);

Assert.Equal(2, this.exportedLogs.Count);
}
}