diff --git a/src/Temporalio.Extensions.OpenTelemetry/TracingInterceptor.cs b/src/Temporalio.Extensions.OpenTelemetry/TracingInterceptor.cs index dfbbb855..846c0662 100644 --- a/src/Temporalio.Extensions.OpenTelemetry/TracingInterceptor.cs +++ b/src/Temporalio.Extensions.OpenTelemetry/TracingInterceptor.cs @@ -231,6 +231,21 @@ protected virtual IDictionary HeadersFromContext( return ret; } + /// + /// Create tag collection for the given standalone activity ID. + /// + /// Standalone activity ID. + /// Tags. + protected virtual IEnumerable> CreateStandaloneActivityTags( + string activityId) + { + if (Options.TagNameActivityId is string name) + { + return new KeyValuePair[] { new(name, activityId) }; + } + return Enumerable.Empty>(); + } + private static void RecordExceptionWithStatus(Activity? activity, Exception exception) { // If the exception is a benign exception, we do not consider the status an error. Note, @@ -394,6 +409,112 @@ public override async Task> StartWorkflowUpdateAsy } } + public override async Task> StartActivityAsync( + StartActivityInput input) + { + using (var activity = ClientSource.StartActivity( + $"StartActivity:{input.Activity}", + kind: ActivityKind.Client, + parentContext: default, + tags: input.Options.Id is string id ? root.CreateStandaloneActivityTags(id) : null)) + { + if (HeadersFromContext(input.Headers) is Dictionary headers) + { + input = input with { Headers = headers }; + } + try + { + return await base.StartActivityAsync(input).ConfigureAwait(false); + } + catch (Exception e) + { + RecordExceptionWithStatus(activity, e); + throw; + } + } + } + + public override async Task DescribeActivityAsync( + DescribeActivityInput input) + { + using (var activity = ClientSource.StartActivity( + $"DescribeActivity:{input.Id}", + kind: ActivityKind.Client, + parentContext: default, + tags: root.CreateStandaloneActivityTags(input.Id))) + { + try + { + return await base.DescribeActivityAsync(input).ConfigureAwait(false); + } + catch (Exception e) + { + RecordExceptionWithStatus(activity, e); + throw; + } + } + } + + public override async Task CancelActivityAsync(CancelActivityInput input) + { + using (var activity = ClientSource.StartActivity( + $"CancelActivity:{input.Id}", + kind: ActivityKind.Client, + parentContext: default, + tags: root.CreateStandaloneActivityTags(input.Id))) + { + try + { + await base.CancelActivityAsync(input).ConfigureAwait(false); + } + catch (Exception e) + { + RecordExceptionWithStatus(activity, e); + throw; + } + } + } + + public override async Task TerminateActivityAsync(TerminateActivityInput input) + { + using (var activity = ClientSource.StartActivity( + $"TerminateActivity:{input.Id}", + kind: ActivityKind.Client, + parentContext: default, + tags: root.CreateStandaloneActivityTags(input.Id))) + { + try + { + await base.TerminateActivityAsync(input).ConfigureAwait(false); + } + catch (Exception e) + { + RecordExceptionWithStatus(activity, e); + throw; + } + } + } + + public override async Task CountActivitiesAsync( + CountActivitiesInput input) + { + using (var activity = ClientSource.StartActivity( + "CountActivities", + kind: ActivityKind.Client, + parentContext: default)) + { + try + { + return await base.CountActivitiesAsync(input).ConfigureAwait(false); + } + catch (Exception e) + { + RecordExceptionWithStatus(activity, e); + throw; + } + } + } + /// /// Serialize current context to headers if one exists. /// diff --git a/tests/Temporalio.Tests/Extensions/OpenTelemetry/TracingInterceptorTests.cs b/tests/Temporalio.Tests/Extensions/OpenTelemetry/TracingInterceptorTests.cs index 6471af79..931b3c0a 100644 --- a/tests/Temporalio.Tests/Extensions/OpenTelemetry/TracingInterceptorTests.cs +++ b/tests/Temporalio.Tests/Extensions/OpenTelemetry/TracingInterceptorTests.cs @@ -797,6 +797,43 @@ public async Task TracingInterceptor_Nexus_HasTracing() "RunWorkflow:TracingWorkflow")); } + [Fact] + public async Task TracingInterceptor_StandaloneActivity_HasProperSpans() + { + var cancelId = $"act-cancel-{Guid.NewGuid()}"; + var terminateId = $"act-terminate-{Guid.NewGuid()}"; + var spans = await WithTracingWorkerAsync(async (client, _) => + { + var taskQueue = $"standalone-tq-{Guid.NewGuid()}"; + + var cancelHandle = await client.StartActivityAsync( + "StandaloneActivity", + Array.Empty(), + new(cancelId, taskQueue) { ScheduleToCloseTimeout = TimeSpan.FromMinutes(5) }); + await cancelHandle.DescribeAsync(); + await cancelHandle.CancelAsync(); + + var terminateHandle = await client.StartActivityAsync( + "StandaloneActivity", + Array.Empty(), + new(terminateId, taskQueue) { ScheduleToCloseTimeout = TimeSpan.FromMinutes(5) }); + await terminateHandle.TerminateAsync(); + + await client.CountActivitiesAsync($"TaskQueue = '{taskQueue}'"); + }); + + var cancelTags = new[] { ActivityAssertion.TagEqual("temporalActivityID", cancelId) }; + var terminateTags = new[] { ActivityAssertion.TagEqual("temporalActivityID", terminateId) }; + AssertActivities( + spans, + new("StartActivity:StandaloneActivity", Parent: null, Tags: cancelTags), + new("DescribeActivity:" + cancelId, Parent: null, Tags: cancelTags), + new("CancelActivity:" + cancelId, Parent: null, Tags: cancelTags), + new("StartActivity:StandaloneActivity", Parent: null, Tags: terminateTags), + new("TerminateActivity:" + terminateId, Parent: null, Tags: terminateTags), + ActivityAssertion.NameAndParent("CountActivities", null)); + } + private static void AssertActivities( IReadOnlyCollection activities, params ActivityAssertion[] assertions) { @@ -886,7 +923,7 @@ private async Task> WithTracingWorkerAsync( var activities = new List(); // Setup provider - using var tracerProvider = global::OpenTelemetry.Sdk.CreateTracerProviderBuilder(). + using var _ = global::OpenTelemetry.Sdk.CreateTracerProviderBuilder(). AddSource( TracingInterceptor.ClientSource.Name, TracingInterceptor.WorkflowsSource.Name,