Skip to content

Commit a0f2c83

Browse files
authored
Enable open tracing propagation in workflow lifecycles (#876)
How it works Context Propagation in Cadence (Customer) On start workflow API, trace span with context is written into workflow start event attributes, which is persisted in cadence server side. On workflow-start in client, this span is referenced and activated on execute workflow. On scheduling child workflows and activities (including local activities), the span is written into child workflow's workflow start event attributes and activity's schedule activity event attributes. On processing activities/childworkflows, the persisted span is referenced and activated again. Sample Spans Notes: Poll + Respond apis spans are omitted here {traceId:1, spanId:2, parentId:0, operationName:"cadence-RegisterDomain"} {traceId:1, spanId:3, parentId:2, operationName:"Test Started"} {traceId:1, spanId:18, parentId:3, operationName:"cadence-StartWorkflowExecution"} {traceId:1, spanId:19, parentId:18, operationName:"cadence-GetWorkflowExecutionHistory"} {traceId:1, spanId:21, parentId:18, operationName:"cadence-ExecuteWorkflow"} {traceId:1, spanId:24, parentId:21, operationName:"cadence-ExecuteActivity"} {traceId:1, spanId:25, parentId:24, operationName:"cadence-RespondActivityTaskCompleted"} {traceId:1, spanId:31, parentId:21, operationName:"cadence-ExecuteWorkflow"} {traceId:1, spanId:32, parentId:31, operationName:"cadence-ExecuteLocalActivity"} What changed? added an Propagator entity with tracing extract/inject logic added trace activation logic in activity and workflow executors added trace activation on service client (Tchannel + GRPC) Why? improve observability How did you test it? integration test
1 parent ac96636 commit a0f2c83

16 files changed

+964
-12
lines changed

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ dependencies {
8181
testCompile group: 'junit', name: 'junit', version: '4.12'
8282
testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4'
8383
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'
84+
testImplementation 'io.opentracing:opentracing-mock:0.33.0'
8485
}
8586

8687
license {
@@ -364,4 +365,3 @@ jacocoTestReport {
364365
}))
365366
}
366367
}
367-

src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java

+84-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package com.uber.cadence.internal.compatibility.proto.serviceclient;
1717

1818
import com.google.common.base.Strings;
19+
import com.google.protobuf.ByteString;
20+
import com.uber.cadence.api.v1.*;
1921
import com.uber.cadence.api.v1.DomainAPIGrpc;
2022
import com.uber.cadence.api.v1.MetaAPIGrpc;
2123
import com.uber.cadence.api.v1.MetaAPIGrpc.MetaAPIBlockingStub;
@@ -30,6 +32,7 @@
3032
import com.uber.cadence.api.v1.WorkflowAPIGrpc.WorkflowAPIBlockingStub;
3133
import com.uber.cadence.api.v1.WorkflowAPIGrpc.WorkflowAPIFutureStub;
3234
import com.uber.cadence.internal.Version;
35+
import com.uber.cadence.internal.tracing.TracingPropagator;
3336
import com.uber.cadence.serviceclient.ClientOptions;
3437
import io.grpc.CallOptions;
3538
import io.grpc.Channel;
@@ -48,6 +51,11 @@
4851
import io.opentelemetry.context.Context;
4952
import io.opentelemetry.context.propagation.TextMapPropagator;
5053
import io.opentelemetry.context.propagation.TextMapSetter;
54+
import io.opentracing.Span;
55+
import io.opentracing.Tracer;
56+
import java.util.HashMap;
57+
import java.util.Map;
58+
import java.util.Objects;
5159
import java.util.concurrent.TimeUnit;
5260
import java.util.concurrent.atomic.AtomicBoolean;
5361
import org.slf4j.Logger;
@@ -116,7 +124,8 @@ final class GrpcServiceStubs implements IGrpcServiceStubs {
116124
channel,
117125
deadlineInterceptor,
118126
MetadataUtils.newAttachHeadersInterceptor(headers),
119-
newOpenTelemetryInterceptor());
127+
newOpenTelemetryInterceptor(),
128+
newOpenTracingInterceptor(options.getTracer()));
120129
if (log.isTraceEnabled()) {
121130
interceptedChannel = ClientInterceptors.intercept(interceptedChannel, tracingInterceptor);
122131
}
@@ -162,6 +171,80 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
162171
};
163172
}
164173

174+
private ClientInterceptor newOpenTracingInterceptor(Tracer tracer) {
175+
return new ClientInterceptor() {
176+
private final TracingPropagator tracingPropagator = new TracingPropagator(tracer);
177+
private final String OPERATIONFORMAT = "cadence-%s";
178+
179+
@Override
180+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
181+
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
182+
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
183+
next.newCall(method, callOptions)) {
184+
185+
@Override
186+
public void start(Listener<RespT> responseListener, Metadata headers) {
187+
Span span =
188+
tracingPropagator.activateSpanByServiceMethod(
189+
String.format(OPERATIONFORMAT, method.getBareMethodName()));
190+
super.start(responseListener, headers);
191+
span.finish();
192+
}
193+
194+
@Override
195+
public void sendMessage(ReqT message) {
196+
if (Objects.equals(method.getBareMethodName(), "StartWorkflowExecution")
197+
&& message instanceof StartWorkflowExecutionRequest) {
198+
StartWorkflowExecutionRequest request = (StartWorkflowExecutionRequest) message;
199+
Map<String, byte[]> headers = new HashMap<>();
200+
tracingPropagator.inject(headers);
201+
Header.Builder headerBuilder = request.getHeader().toBuilder();
202+
headers.forEach(
203+
(k, v) -> {
204+
headerBuilder.putFields(
205+
k, Payload.newBuilder().setData(ByteString.copyFrom(v)).build());
206+
});
207+
208+
// cast should not throw error as we are using the builder
209+
message =
210+
(ReqT)
211+
((StartWorkflowExecutionRequest) message)
212+
.toBuilder()
213+
.setHeader(headerBuilder.build())
214+
.build();
215+
}
216+
if (Objects.equals(method.getBareMethodName(), "SignalWithStartWorkflowExecution")
217+
&& message instanceof SignalWithStartWorkflowExecutionRequest) {
218+
SignalWithStartWorkflowExecutionRequest request =
219+
(SignalWithStartWorkflowExecutionRequest) message;
220+
Map<String, byte[]> headers = new HashMap<>();
221+
tracingPropagator.inject(headers);
222+
Header.Builder headerBuilder = request.getStartRequest().getHeader().toBuilder();
223+
headers.forEach(
224+
(k, v) -> {
225+
headerBuilder.putFields(
226+
k, Payload.newBuilder().setData(ByteString.copyFrom(v)).build());
227+
});
228+
229+
// cast should not throw error as we are using the builder
230+
message =
231+
(ReqT)
232+
((SignalWithStartWorkflowExecutionRequest) message)
233+
.toBuilder()
234+
.setStartRequest(
235+
request
236+
.getStartRequest()
237+
.toBuilder()
238+
.setHeader(headerBuilder.build()))
239+
.build();
240+
}
241+
super.sendMessage(message);
242+
}
243+
};
244+
}
245+
};
246+
}
247+
165248
private ClientInterceptor newTracingInterceptor() {
166249
return new ClientInterceptor() {
167250

src/main/java/com/uber/cadence/internal/sync/POJOWorkflowImplementationFactory.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import com.uber.cadence.workflow.WorkflowInfo;
4343
import com.uber.cadence.workflow.WorkflowInterceptor;
4444
import com.uber.cadence.workflow.WorkflowMethod;
45+
import io.opentracing.Tracer;
4546
import java.lang.reflect.InvocationTargetException;
4647
import java.lang.reflect.Method;
4748
import java.util.Collections;
@@ -61,6 +62,7 @@ final class POJOWorkflowImplementationFactory implements ReplayWorkflowFactory {
6162
LoggerFactory.getLogger(POJOWorkflowImplementationFactory.class);
6263
private static final byte[] EMPTY_BLOB = {};
6364
private final Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory;
65+
private final Tracer tracer;
6466

6567
private DataConverter dataConverter;
6668
private List<ContextPropagator> contextPropagators;
@@ -83,12 +85,14 @@ final class POJOWorkflowImplementationFactory implements ReplayWorkflowFactory {
8385
ExecutorService threadPool,
8486
Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory,
8587
DeciderCache cache,
86-
List<ContextPropagator> contextPropagators) {
88+
List<ContextPropagator> contextPropagators,
89+
Tracer tracer) {
8790
this.dataConverter = Objects.requireNonNull(dataConverter);
8891
this.threadPool = Objects.requireNonNull(threadPool);
8992
this.interceptorFactory = Objects.requireNonNull(interceptorFactory);
9093
this.cache = cache;
9194
this.contextPropagators = contextPropagators;
95+
this.tracer = tracer;
9296
}
9397

9498
void setWorkflowImplementationTypes(
@@ -216,7 +220,8 @@ public ReplayWorkflow getWorkflow(WorkflowType workflowType) {
216220
threadPool,
217221
interceptorFactory,
218222
cache,
219-
contextPropagators);
223+
contextPropagators,
224+
tracer);
220225
}
221226

222227
@Override

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

+36-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.uber.cadence.internal.replay.ExecuteLocalActivityParameters;
4141
import com.uber.cadence.internal.replay.SignalExternalWorkflowParameters;
4242
import com.uber.cadence.internal.replay.StartChildWorkflowExecutionParameters;
43+
import com.uber.cadence.internal.tracing.TracingPropagator;
4344
import com.uber.cadence.worker.WorkflowImplementationOptions;
4445
import com.uber.cadence.workflow.ActivityException;
4546
import com.uber.cadence.workflow.ActivityFailureException;
@@ -58,6 +59,9 @@
5859
import com.uber.cadence.workflow.Workflow;
5960
import com.uber.cadence.workflow.WorkflowInterceptor;
6061
import com.uber.m3.tally.Scope;
62+
import io.opentracing.Span;
63+
import io.opentracing.Tracer;
64+
import io.opentracing.noop.NoopTracerFactory;
6165
import java.lang.reflect.Type;
6266
import java.time.Duration;
6367
import java.util.HashMap;
@@ -90,6 +94,7 @@ final class SyncDecisionContext implements WorkflowInterceptor {
9094
private final Map<String, Functions.Func1<byte[], byte[]>> queryCallbacks = new HashMap<>();
9195
private final byte[] lastCompletionResult;
9296
private final WorkflowImplementationOptions workflowImplementationOptions;
97+
private final TracingPropagator tracingPropagator;
9398

9499
public SyncDecisionContext(
95100
DecisionContext context,
@@ -98,6 +103,24 @@ public SyncDecisionContext(
98103
Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory,
99104
byte[] lastCompletionResult,
100105
WorkflowImplementationOptions workflowImplementationOptions) {
106+
this(
107+
context,
108+
converter,
109+
contextPropagators,
110+
interceptorFactory,
111+
lastCompletionResult,
112+
workflowImplementationOptions,
113+
NoopTracerFactory.create());
114+
}
115+
116+
public SyncDecisionContext(
117+
DecisionContext context,
118+
DataConverter converter,
119+
List<ContextPropagator> contextPropagators,
120+
Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory,
121+
byte[] lastCompletionResult,
122+
WorkflowImplementationOptions workflowImplementationOptions,
123+
Tracer tracer) {
101124
this.context = context;
102125
this.converter = converter;
103126
this.contextPropagators = contextPropagators;
@@ -109,6 +132,7 @@ public SyncDecisionContext(
109132
this.headInterceptor = interceptor;
110133
this.lastCompletionResult = lastCompletionResult;
111134
this.workflowImplementationOptions = workflowImplementationOptions;
135+
this.tracingPropagator = new TracingPropagator(tracer);
112136
}
113137

114138
/**
@@ -130,9 +154,18 @@ public WorkflowInterceptor getWorkflowInterceptor() {
130154
@Override
131155
public byte[] executeWorkflow(
132156
SyncWorkflowDefinition workflowDefinition, WorkflowExecuteInput input) {
133-
return workflowDefinition.execute(input.getInput());
157+
Span span = tracingPropagator.activateSpanForExecuteWorkflow(context);
158+
try {
159+
return workflowDefinition.execute(input.getInput());
160+
} finally {
161+
span.finish();
162+
}
134163
}
135164

165+
/**
166+
* Schedule an activity task for the provided activity name and input. The activity task is not
167+
* necessarily executed in the same thread.
168+
*/
136169
@Override
137170
public <T> Promise<T> executeActivity(
138171
String activityName,
@@ -466,6 +499,8 @@ private Map<String, byte[]> extractContextsAndConvertToBytes(
466499
for (ContextPropagator propagator : contextPropagators) {
467500
result.putAll(propagator.serializeContext(propagator.getCurrentContext()));
468501
}
502+
// inject trace span context
503+
tracingPropagator.inject(result);
469504
return result;
470505
}
471506

src/main/java/com/uber/cadence/internal/sync/SyncWorkflow.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.uber.cadence.internal.worker.WorkflowExecutionException;
3333
import com.uber.cadence.worker.WorkflowImplementationOptions;
3434
import com.uber.cadence.workflow.WorkflowInterceptor;
35+
import io.opentracing.Tracer;
3536
import java.util.List;
3637
import java.util.Objects;
3738
import java.util.concurrent.ExecutorService;
@@ -49,6 +50,7 @@ class SyncWorkflow implements ReplayWorkflow {
4950
private final SyncWorkflowDefinition workflow;
5051
WorkflowImplementationOptions workflowImplementationOptions;
5152
private final Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory;
53+
private final Tracer tracer;
5254
private DeciderCache cache;
5355
private WorkflowRunnable workflowProc;
5456
private DeterministicRunner runner;
@@ -60,7 +62,8 @@ public SyncWorkflow(
6062
ExecutorService threadPool,
6163
Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory,
6264
DeciderCache cache,
63-
List<ContextPropagator> contextPropagators) {
65+
List<ContextPropagator> contextPropagators,
66+
Tracer tracer) {
6467
this.workflow = Objects.requireNonNull(workflow);
6568
this.workflowImplementationOptions =
6669
workflowImplementationOptions == null
@@ -71,6 +74,7 @@ public SyncWorkflow(
7174
this.interceptorFactory = Objects.requireNonNull(interceptorFactory);
7275
this.cache = cache;
7376
this.contextPropagators = contextPropagators;
77+
this.tracer = tracer;
7478
}
7579

7680
@Override
@@ -97,7 +101,8 @@ public void start(HistoryEvent event, DecisionContext context) {
97101
contextPropagators,
98102
interceptorFactory,
99103
event.getWorkflowExecutionStartedEventAttributes().getLastCompletionResult(),
100-
workflowImplementationOptions);
104+
workflowImplementationOptions,
105+
tracer);
101106

102107
workflowProc =
103108
new WorkflowRunnable(

src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ public SyncWorkflowWorker(
8080
workflowThreadPool,
8181
interceptorFactory,
8282
cache,
83-
workflowOptions.getContextPropagators());
83+
workflowOptions.getContextPropagators(),
84+
workflowOptions.getTracer());
8485

8586
laTaskHandler =
8687
new POJOActivityTaskHandler(

0 commit comments

Comments
 (0)