Skip to content

Commit 0ecd0e7

Browse files
committed
feat(otel): add otel tracing for standalone activities.
1 parent 8158715 commit 0ecd0e7

6 files changed

Lines changed: 85 additions & 18 deletions

File tree

contrib/opentelemetry/tracing_interceptor_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@ import (
1414
"go.opentelemetry.io/otel/sdk/trace/tracetest"
1515
"go.opentelemetry.io/otel/trace"
1616

17+
"go.temporal.io/sdk/client"
1718
"go.temporal.io/sdk/contrib/opentelemetry"
1819
"go.temporal.io/sdk/interceptor"
20+
"go.temporal.io/sdk/internal"
1921
"go.temporal.io/sdk/internal/interceptortest"
2022
"go.temporal.io/sdk/temporal"
2123
"go.temporal.io/sdk/testsuite"
@@ -35,6 +37,34 @@ func TestSpanPropagation(t *testing.T) {
3537
interceptortest.AssertSpanPropagation(t, testTracer)
3638
}
3739

40+
func TestStandaloneActivitySpanCreation(t *testing.T) {
41+
rec := tracetest.NewSpanRecorder()
42+
tracer, err := opentelemetry.NewTracer(opentelemetry.TracerOptions{
43+
Tracer: sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(rec)).Tracer(""),
44+
})
45+
require.NoError(t, err)
46+
47+
ctx := internal.NewHeaderContext(context.Background())
48+
outbound := interceptor.NewTracingInterceptor(tracer).InterceptClient(&testNoopClientOutbound{})
49+
_, _ = outbound.ExecuteActivity(ctx, &interceptor.ClientExecuteActivityInput{
50+
ActivityType: "test-saa",
51+
Options: &client.StartActivityOptions{ID: "test-saa-123"},
52+
})
53+
54+
spans := rec.Ended()
55+
require.Len(t, spans, 1)
56+
assert.Equal(t, "StartActivity:test-saa", spans[0].Name())
57+
58+
var foundActivityID bool
59+
for _, attr := range spans[0].Attributes() {
60+
if string(attr.Key) == "temporalActivityID" {
61+
foundActivityID = true
62+
assert.Equal(t, "test-saa-123", attr.Value.AsString())
63+
}
64+
}
65+
require.True(t, foundActivityID, "expected activity ID span attribute")
66+
}
67+
3868
type testTracer struct {
3969
interceptor.Tracer
4070
rec *tracetest.SpanRecorder
@@ -235,3 +265,11 @@ func TestSpanFromWorkflowContextNoOpSpan(t *testing.T) {
235265
require.True(t, env.IsWorkflowCompleted())
236266
require.NoError(t, env.GetWorkflowError())
237267
}
268+
269+
type testNoopClientOutbound struct {
270+
interceptor.ClientOutboundInterceptorBase
271+
}
272+
273+
func (n *testNoopClientOutbound) ExecuteActivity(_ context.Context, _ *interceptor.ClientExecuteActivityInput) (client.ActivityHandle, error) {
274+
return nil, nil
275+
}

interceptor/interceptor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ type NexusCancelOperationInput = internal.NexusCancelOperationInput
302302
//
303303
// This returns a non-nil map only for contexts inside
304304
// ActivityInboundInterceptor.ExecuteActivity,
305+
// ClientOutboundInterceptor.ExecuteActivity,
305306
// ClientOutboundInterceptor.ExecuteWorkflow, and
306307
// ClientOutboundInterceptor.SignalWithStartWorkflow.
307308
func Header(ctx context.Context) map[string]*commonpb.Payload {

interceptor/tracing_interceptor.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,28 @@ func (t *tracingClientOutboundInterceptor) UpdateWithStartWorkflow(
394394
return val, err
395395
}
396396

397+
func (t *tracingClientOutboundInterceptor) ExecuteActivity(
398+
ctx context.Context,
399+
in *ClientExecuteActivityInput,
400+
) (client.ActivityHandle, error) {
401+
span, ctx, err := t.root.startSpanFromContext(ctx, &TracerStartSpanOptions{
402+
Operation: "StartActivity",
403+
Name: in.ActivityType,
404+
Tags: map[string]string{activityIDTagKey: in.Options.ID},
405+
ToHeader: true,
406+
Time: time.Now(),
407+
}, t.root.headerReader(ctx), t.root.headerWriter(ctx))
408+
if err != nil {
409+
return nil, err
410+
}
411+
var finishOpts TracerFinishSpanOptions
412+
defer span.Finish(&finishOpts)
413+
414+
handle, err := t.Next.ExecuteActivity(ctx, in)
415+
finishOpts.Error = err
416+
return handle, err
417+
}
418+
397419
type tracingActivityOutboundInterceptor struct {
398420
ActivityOutboundInterceptorBase
399421
root *tracingInterceptor

internal/cmd/tools/doclink/doclink.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,25 @@ func processInternal(cfg config, file *os.File, pairs map[string]map[string]stri
390390
trimmedNextLine = nextLine
391391
}
392392

393-
// Check for new doc links to add
393+
// Track whether nextLine is inside a function or interface block.
394+
// We update this first so the first line inside a block is not treated
395+
// as a top-level definition.
396+
if strings.HasPrefix(trimmedLine, "func ") {
397+
funcSpaces = indentSize
398+
inFunc = true
399+
} else if inFunc && trimmedLine == "}" && funcSpaces == indentSize {
400+
funcSpaces = -1
401+
inFunc = false
402+
}
403+
if strings.HasSuffix(trimmedLine, "interface {") {
404+
interfaceSpaces = indentSize
405+
inInterface = true
406+
} else if inInterface && trimmedLine == "}" && interfaceSpaces == indentSize {
407+
interfaceSpaces = -1
408+
inInterface = false
409+
}
410+
411+
// Check for new doc links to add on top-level definitions only.
394412
if !inFunc && !inInterface && isValidDefinition(trimmedNextLine, &inGroup, &inStruct) {
395413
// Find the "Exposed As" line in the doc comment
396414
var existingDoclink string
@@ -451,23 +469,6 @@ func processInternal(cfg config, file *os.File, pairs map[string]map[string]stri
451469
}
452470
}
453471

454-
// update inFunc after we actually check for doclinks to allow us to check
455-
// a function's definition, without checking anything inside the function
456-
if strings.HasPrefix(trimmedLine, "func ") {
457-
funcSpaces = indentSize
458-
inFunc = true
459-
} else if inFunc && trimmedLine == "}" && funcSpaces == indentSize {
460-
funcSpaces = -1
461-
inFunc = false
462-
}
463-
if strings.HasSuffix(trimmedLine, "interface {") {
464-
interfaceSpaces = indentSize
465-
inInterface = true
466-
} else if inInterface && trimmedLine == "}" && interfaceSpaces == indentSize {
467-
interfaceSpaces = -1
468-
inInterface = false
469-
}
470-
471472
newFile += line + "\n"
472473
}
473474

internal/interceptor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,7 @@ type ClientOutboundInterceptor interface {
419419
DescribeWorkflow(context.Context, *ClientDescribeWorkflowInput) (*ClientDescribeWorkflowOutput, error)
420420

421421
// ExecuteActivity intercepts client.Client.ExecuteActivity.
422+
// interceptor.Header will return a non-nil map for this context.
422423
//
423424
// NOTE: Experimental
424425
ExecuteActivity(context.Context, *ClientExecuteActivityInput) (ClientActivityHandle, error)

internal/interceptor_header.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ func contextWithNewHeader(ctx context.Context) context.Context {
2323
return context.WithValue(ctx, headerKey{}, map[string]*commonpb.Payload{})
2424
}
2525

26+
func NewHeaderContext(ctx context.Context) context.Context {
27+
return contextWithNewHeader(ctx)
28+
}
29+
2630
func contextWithoutHeader(ctx context.Context) context.Context {
2731
return context.WithValue(ctx, headerKey{}, nil)
2832
}

0 commit comments

Comments
 (0)