Skip to content

Commit e52b86f

Browse files
committed
feat(otel): add otel tracing for standalone activities.
1 parent fe2f2e3 commit e52b86f

2 files changed

Lines changed: 151 additions & 0 deletions

File tree

src/Temporalio.Extensions.OpenTelemetry/TracingInterceptor.cs

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,21 @@ protected virtual IDictionary<string, string> HeadersFromContext(
232232
return ret;
233233
}
234234

235+
/// <summary>
236+
/// Create tag collection for the given standalone activity ID.
237+
/// </summary>
238+
/// <param name="activityId">Standalone activity ID.</param>
239+
/// <returns>Tags.</returns>
240+
protected virtual IEnumerable<KeyValuePair<string, object?>> CreateStandaloneActivityTags(
241+
string activityId)
242+
{
243+
if (Options.TagNameActivityId is string name)
244+
{
245+
return new KeyValuePair<string, object?>[] { new(name, activityId) };
246+
}
247+
return Enumerable.Empty<KeyValuePair<string, object?>>();
248+
}
249+
235250
private static void RecordExceptionWithStatus(Activity? activity, Exception exception)
236251
{
237252
// If the exception is a benign exception, we do not consider the status an error. Note,
@@ -395,6 +410,112 @@ public override async Task<WorkflowUpdateHandle<TResult>> StartWorkflowUpdateAsy
395410
}
396411
}
397412

413+
public override async Task<ActivityHandle<TResult>> StartActivityAsync<TResult>(
414+
StartActivityInput input)
415+
{
416+
using (var activity = ClientSource.StartActivity(
417+
$"StartActivity:{input.Activity}",
418+
kind: ActivityKind.Client,
419+
parentContext: default,
420+
tags: input.Options.Id is string id ? root.CreateStandaloneActivityTags(id) : null))
421+
{
422+
if (HeadersFromContext(input.Headers) is Dictionary<string, Payload> headers)
423+
{
424+
input = input with { Headers = headers };
425+
}
426+
try
427+
{
428+
return await base.StartActivityAsync<TResult>(input).ConfigureAwait(false);
429+
}
430+
catch (Exception e)
431+
{
432+
RecordExceptionWithStatus(activity, e);
433+
throw;
434+
}
435+
}
436+
}
437+
438+
public override async Task<ActivityExecutionDescription> DescribeActivityAsync(
439+
DescribeActivityInput input)
440+
{
441+
using (var activity = ClientSource.StartActivity(
442+
$"DescribeActivity:{input.Id}",
443+
kind: ActivityKind.Client,
444+
parentContext: default,
445+
tags: root.CreateStandaloneActivityTags(input.Id)))
446+
{
447+
try
448+
{
449+
return await base.DescribeActivityAsync(input).ConfigureAwait(false);
450+
}
451+
catch (Exception e)
452+
{
453+
RecordExceptionWithStatus(activity, e);
454+
throw;
455+
}
456+
}
457+
}
458+
459+
public override async Task CancelActivityAsync(CancelActivityInput input)
460+
{
461+
using (var activity = ClientSource.StartActivity(
462+
$"CancelActivity:{input.Id}",
463+
kind: ActivityKind.Client,
464+
parentContext: default,
465+
tags: root.CreateStandaloneActivityTags(input.Id)))
466+
{
467+
try
468+
{
469+
await base.CancelActivityAsync(input).ConfigureAwait(false);
470+
}
471+
catch (Exception e)
472+
{
473+
RecordExceptionWithStatus(activity, e);
474+
throw;
475+
}
476+
}
477+
}
478+
479+
public override async Task TerminateActivityAsync(TerminateActivityInput input)
480+
{
481+
using (var activity = ClientSource.StartActivity(
482+
$"TerminateActivity:{input.Id}",
483+
kind: ActivityKind.Client,
484+
parentContext: default,
485+
tags: root.CreateStandaloneActivityTags(input.Id)))
486+
{
487+
try
488+
{
489+
await base.TerminateActivityAsync(input).ConfigureAwait(false);
490+
}
491+
catch (Exception e)
492+
{
493+
RecordExceptionWithStatus(activity, e);
494+
throw;
495+
}
496+
}
497+
}
498+
499+
public override async Task<ActivityExecutionCount> CountActivitiesAsync(
500+
CountActivitiesInput input)
501+
{
502+
using (var activity = ClientSource.StartActivity(
503+
"CountActivities",
504+
kind: ActivityKind.Client,
505+
parentContext: default))
506+
{
507+
try
508+
{
509+
return await base.CountActivitiesAsync(input).ConfigureAwait(false);
510+
}
511+
catch (Exception e)
512+
{
513+
RecordExceptionWithStatus(activity, e);
514+
throw;
515+
}
516+
}
517+
}
518+
398519
/// <summary>
399520
/// Serialize current context to headers if one exists.
400521
/// </summary>

tests/Temporalio.Tests/Extensions/OpenTelemetry/TracingInterceptorTests.cs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,36 @@ public async Task TracingInterceptor_Nexus_HasTracing()
797797
"RunWorkflow:TracingWorkflow"));
798798
}
799799

800+
[Fact]
801+
public async Task TracingInterceptor_StandaloneActivity_HasProperSpans()
802+
{
803+
var activityId = $"act-{Guid.NewGuid()}";
804+
var spans = await WithTracingWorkerAsync(async (client, _) =>
805+
{
806+
var taskQueue = $"standalone-tq-{Guid.NewGuid()}";
807+
var handle = await client.StartActivityAsync(
808+
"StandaloneActivity",
809+
Array.Empty<object?>(),
810+
new(activityId, taskQueue)
811+
{
812+
ScheduleToCloseTimeout = TimeSpan.FromMinutes(5),
813+
});
814+
await handle.DescribeAsync();
815+
await handle.CancelAsync();
816+
await handle.TerminateAsync();
817+
await client.CountActivitiesAsync($"ActivityId = '{activityId}'");
818+
});
819+
820+
var activityTags = new[] { ActivityAssertion.TagEqual("temporalActivityID", activityId) };
821+
AssertActivities(
822+
spans,
823+
new("StartActivity:StandaloneActivity", Parent: null, Tags: activityTags),
824+
new("DescribeActivity:" + activityId, Parent: null, Tags: activityTags),
825+
new("CancelActivity:" + activityId, Parent: null, Tags: activityTags),
826+
new("TerminateActivity:" + activityId, Parent: null, Tags: activityTags),
827+
new(Name: "CountActivities", Parent: null, IgnoreTags: true));
828+
}
829+
800830
private static void AssertActivities(
801831
IReadOnlyCollection<Activity> activities, params ActivityAssertion[] assertions)
802832
{

0 commit comments

Comments
 (0)