Skip to content

Commit 62b8da8

Browse files
committed
otel-update-with-start
1 parent ff939d7 commit 62b8da8

4 files changed

Lines changed: 148 additions & 0 deletions

File tree

temporal-opentracing/src/main/java/io/temporal/opentracing/SpanOperationType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
public enum SpanOperationType {
44
START_WORKFLOW("StartWorkflow"),
55
SIGNAL_WITH_START_WORKFLOW("SignalWithStartWorkflow"),
6+
UPDATE_WITH_START_WORKFLOW("UpdateWithStartWorkflow"),
67
RUN_WORKFLOW("RunWorkflow"),
78
START_CHILD_WORKFLOW("StartChildWorkflow"),
89
START_CONTINUE_AS_NEW_WORKFLOW("StartContinueAsNewWorkflow"),

temporal-opentracing/src/main/java/io/temporal/opentracing/internal/ActionTypeAndNameSpanBuilderProvider.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ protected Map<String, String> getSpanTags(SpanCreationContext context) {
5454
SpanOperationType operationType = context.getSpanOperationType();
5555
switch (operationType) {
5656
case START_WORKFLOW:
57+
case UPDATE_WITH_START_WORKFLOW:
5758
case SIGNAL_WITH_START_WORKFLOW:
5859
return ImmutableMap.of(StandardTagNames.WORKFLOW_ID, context.getWorkflowId());
5960
case START_CHILD_WORKFLOW:

temporal-opentracing/src/main/java/io/temporal/opentracing/internal/OpenTracingWorkflowClientCallsInterceptor.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,24 @@ public WorkflowSignalWithStartOutput signalWithStart(WorkflowSignalWithStartInpu
7878
}
7979
}
8080

81+
@Override
82+
public WorkflowUpdateWithStartOutput updateWithStart(WorkflowUpdateWithStartInput input) {
83+
WorkflowStartInput workflowStartInput = input.getWorkflowStartInput();
84+
Span workflowStartSpan =
85+
contextAccessor.writeSpanContextToHeader(
86+
() ->
87+
createWorkflowStartSpanBuilder(
88+
workflowStartInput, SpanOperationType.UPDATE_WITH_START_WORKFLOW)
89+
.start(),
90+
workflowStartInput.getHeader(),
91+
tracer);
92+
try (Scope ignored = tracer.scopeManager().activate(workflowStartSpan)) {
93+
return super.updateWithStart(input);
94+
} finally {
95+
workflowStartSpan.finish();
96+
}
97+
}
98+
8199
@Override
82100
public <R> QueryOutput<R> query(QueryInput<R> input) {
83101
Span workflowQuerySpan =
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package io.temporal.opentracing;
2+
3+
import static org.junit.Assert.assertEquals;
4+
5+
import io.opentracing.Scope;
6+
import io.opentracing.Span;
7+
import io.opentracing.mock.MockSpan;
8+
import io.opentracing.mock.MockTracer;
9+
import io.opentracing.util.ThreadLocalScopeManager;
10+
import io.temporal.api.enums.v1.WorkflowIdConflictPolicy;
11+
import io.temporal.client.*;
12+
import io.temporal.testing.internal.SDKTestWorkflowRule;
13+
import io.temporal.worker.WorkerFactoryOptions;
14+
import io.temporal.workflow.UpdateMethod;
15+
import io.temporal.workflow.WorkflowInterface;
16+
import io.temporal.workflow.WorkflowMethod;
17+
import java.util.List;
18+
import org.junit.After;
19+
import org.junit.Rule;
20+
import org.junit.Test;
21+
22+
public class UpdateWithStartTest {
23+
24+
private static final MockTracer mockTracer =
25+
new MockTracer(new ThreadLocalScopeManager(), MockTracer.Propagator.TEXT_MAP);
26+
27+
private final OpenTracingOptions OT_OPTIONS =
28+
OpenTracingOptions.newBuilder().setTracer(mockTracer).build();
29+
30+
@Rule
31+
public SDKTestWorkflowRule testWorkflowRule =
32+
SDKTestWorkflowRule.newBuilder()
33+
.setWorkflowClientOptions(
34+
WorkflowClientOptions.newBuilder()
35+
.setInterceptors(new OpenTracingClientInterceptor(OT_OPTIONS))
36+
.validateAndBuildWithDefaults())
37+
.setWorkerFactoryOptions(
38+
WorkerFactoryOptions.newBuilder()
39+
.setWorkerInterceptors(new OpenTracingWorkerInterceptor(OT_OPTIONS))
40+
.validateAndBuildWithDefaults())
41+
.setWorkflowTypes(WorkflowImpl.class)
42+
.build();
43+
44+
@After
45+
public void tearDown() {
46+
mockTracer.reset();
47+
}
48+
49+
@WorkflowInterface
50+
public interface TestWorkflow {
51+
@WorkflowMethod
52+
String run(String input);
53+
54+
@UpdateMethod
55+
void update(String update);
56+
}
57+
58+
public static class WorkflowImpl implements TestWorkflow {
59+
60+
private String update;
61+
62+
@Override
63+
public String run(String input) {
64+
return update;
65+
}
66+
67+
@Override
68+
public void update(String update) {
69+
this.update = update;
70+
}
71+
}
72+
73+
@Test
74+
public void updateWithStart() {
75+
WorkflowClient client = testWorkflowRule.getWorkflowClient();
76+
TestWorkflow workflow =
77+
client.newWorkflowStub(
78+
TestWorkflow.class,
79+
WorkflowOptions.newBuilder()
80+
.setTaskQueue(testWorkflowRule.getTaskQueue())
81+
.setWorkflowIdConflictPolicy(
82+
WorkflowIdConflictPolicy.WORKFLOW_ID_CONFLICT_POLICY_TERMINATE_EXISTING)
83+
.validateBuildWithDefaults());
84+
85+
Span span = mockTracer.buildSpan("ClientFunction").start();
86+
87+
try (Scope scope = mockTracer.scopeManager().activate(span)) {
88+
WorkflowClient.executeUpdateWithStart(
89+
workflow::update,
90+
"input",
91+
UpdateOptions.<Void>newBuilder().build(),
92+
new WithStartWorkflowOperation<>(workflow::run, "updateInput"));
93+
} finally {
94+
span.finish();
95+
}
96+
97+
// wait for the workflow completion
98+
WorkflowStub.fromTyped(workflow).getResult(String.class);
99+
100+
OpenTracingSpansHelper spansHelper = new OpenTracingSpansHelper(mockTracer.finishedSpans());
101+
102+
MockSpan clientSpan = spansHelper.getSpanByOperationName("ClientFunction");
103+
104+
MockSpan workflowStartSpan = spansHelper.getByParentSpan(clientSpan).get(0);
105+
assertEquals(clientSpan.context().spanId(), workflowStartSpan.parentId());
106+
assertEquals("UpdateWithStartWorkflow:TestWorkflow", workflowStartSpan.operationName());
107+
108+
if (SDKTestWorkflowRule.useExternalService) {
109+
List<MockSpan> workflowSpans = spansHelper.getByParentSpan(workflowStartSpan);
110+
assertEquals(2, workflowSpans.size());
111+
112+
MockSpan workflowUpdateSpan = workflowSpans.get(0);
113+
assertEquals(workflowStartSpan.context().spanId(), workflowUpdateSpan.parentId());
114+
assertEquals("HandleUpdate:update", workflowUpdateSpan.operationName());
115+
116+
MockSpan workflowRunSpan = workflowSpans.get(1);
117+
assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId());
118+
assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName());
119+
} else {
120+
List<MockSpan> workflowRunSpans = spansHelper.getByParentSpan(workflowStartSpan);
121+
assertEquals(1, workflowRunSpans.size());
122+
123+
MockSpan workflowRunSpan = workflowRunSpans.get(0);
124+
assertEquals(workflowStartSpan.context().spanId(), workflowRunSpan.parentId());
125+
assertEquals("RunWorkflow:TestWorkflow", workflowRunSpan.operationName());
126+
}
127+
}
128+
}

0 commit comments

Comments
 (0)