Skip to content

Commit 794dc95

Browse files
authored
Fix memory leak caused by incorrect context deactivation (#896)
* Fix memory leak caused by incorrect context deactivation * fix tracer initialization and formatting * correct span and scope in GRPC * fix test
1 parent 639fec9 commit 794dc95

File tree

7 files changed

+103
-99
lines changed

7 files changed

+103
-99
lines changed

src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java

+18-15
Original file line numberDiff line numberDiff line change
@@ -35,23 +35,13 @@
3535
import com.uber.cadence.internal.tracing.TracingPropagator;
3636
import com.uber.cadence.serviceclient.ClientOptions;
3737
import com.uber.cadence.serviceclient.auth.IAuthorizationProvider;
38-
import io.grpc.CallOptions;
39-
import io.grpc.Channel;
40-
import io.grpc.ClientCall;
41-
import io.grpc.ClientInterceptor;
42-
import io.grpc.ClientInterceptors;
43-
import io.grpc.Deadline;
44-
import io.grpc.ForwardingClientCall;
45-
import io.grpc.ForwardingClientCallListener;
46-
import io.grpc.ManagedChannel;
47-
import io.grpc.ManagedChannelBuilder;
48-
import io.grpc.Metadata;
49-
import io.grpc.MethodDescriptor;
38+
import io.grpc.*;
5039
import io.grpc.stub.MetadataUtils;
5140
import io.opentelemetry.api.GlobalOpenTelemetry;
5241
import io.opentelemetry.context.Context;
5342
import io.opentelemetry.context.propagation.TextMapPropagator;
5443
import io.opentelemetry.context.propagation.TextMapSetter;
44+
import io.opentracing.Scope;
5545
import io.opentracing.Span;
5646
import io.opentracing.Tracer;
5747
import java.nio.charset.StandardCharsets;
@@ -228,10 +218,23 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
228218
@Override
229219
public void start(Listener<RespT> responseListener, Metadata headers) {
230220
Span span =
231-
tracingPropagator.activateSpanByServiceMethod(
221+
tracingPropagator.spanByServiceMethod(
232222
String.format(OPERATIONFORMAT, method.getBareMethodName()));
233-
super.start(responseListener, headers);
234-
span.finish();
223+
Scope scope = tracer.activateSpan(span);
224+
super.start(
225+
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
226+
responseListener) {
227+
@Override
228+
public void onClose(Status status, Metadata trailers) {
229+
try {
230+
super.onClose(status, trailers);
231+
} finally {
232+
span.finish();
233+
scope.close();
234+
}
235+
}
236+
},
237+
headers);
235238
}
236239

237240
@SuppressWarnings("unchecked")

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ final class SyncDecisionContext implements WorkflowInterceptor {
9595
private final byte[] lastCompletionResult;
9696
private final WorkflowImplementationOptions workflowImplementationOptions;
9797
private final TracingPropagator tracingPropagator;
98+
private final Tracer tracer;
9899

99100
public SyncDecisionContext(
100101
DecisionContext context,
@@ -133,6 +134,7 @@ public SyncDecisionContext(
133134
this.lastCompletionResult = lastCompletionResult;
134135
this.workflowImplementationOptions = workflowImplementationOptions;
135136
this.tracingPropagator = new TracingPropagator(tracer);
137+
this.tracer = tracer;
136138
}
137139

138140
/**
@@ -154,8 +156,8 @@ public WorkflowInterceptor getWorkflowInterceptor() {
154156
@Override
155157
public byte[] executeWorkflow(
156158
SyncWorkflowDefinition workflowDefinition, WorkflowExecuteInput input) {
157-
Span span = tracingPropagator.activateSpanForExecuteWorkflow(context);
158-
try {
159+
Span span = tracingPropagator.spanForExecuteWorkflow(context);
160+
try (io.opentracing.Scope scope = tracer.activateSpan(span)) {
159161
return workflowDefinition.execute(input.getInput());
160162
} finally {
161163
span.finish();

src/main/java/com/uber/cadence/internal/tracing/TracingPropagator.java

+28-40
Original file line numberDiff line numberDiff line change
@@ -53,57 +53,45 @@ public TracingPropagator(Tracer tracer) {
5353
this.tracer = tracer;
5454
}
5555

56-
public Span activateSpanByServiceMethod(String serviceMethod) {
57-
Span span = tracer.buildSpan(serviceMethod).asChildOf(tracer.activeSpan()).start();
58-
tracer.activateSpan(span);
59-
return span;
56+
public Span spanByServiceMethod(String serviceMethod) {
57+
return tracer.buildSpan(serviceMethod).asChildOf(tracer.activeSpan()).start();
6058
}
6159

62-
public Span activateSpanForExecuteWorkflow(DecisionContext context) {
60+
public Span spanForExecuteWorkflow(DecisionContext context) {
6361
WorkflowExecutionStartedEventAttributes attributes =
6462
context.getWorkflowExecutionStartedEventAttributes();
6563
SpanContext parent = extract(attributes.getHeader());
6664

67-
Span span =
68-
tracer
69-
.buildSpan(EXECUTE_WORKFLOW)
70-
.addReference(
71-
References.FOLLOWS_FROM, parent != NoopSpan.INSTANCE.context() ? parent : null)
72-
.withTag(TAG_WORKFLOW_TYPE, context.getWorkflowType().getName())
73-
.withTag(TAG_WORKFLOW_ID, context.getWorkflowId())
74-
.withTag(TAG_WORKFLOW_RUN_ID, context.getRunId())
75-
.start();
76-
tracer.activateSpan(span);
77-
return span;
65+
return tracer
66+
.buildSpan(EXECUTE_WORKFLOW)
67+
.addReference(
68+
References.FOLLOWS_FROM, parent != NoopSpan.INSTANCE.context() ? parent : null)
69+
.withTag(TAG_WORKFLOW_TYPE, context.getWorkflowType().getName())
70+
.withTag(TAG_WORKFLOW_ID, context.getWorkflowId())
71+
.withTag(TAG_WORKFLOW_RUN_ID, context.getRunId())
72+
.start();
7873
}
7974

80-
public Span activateSpanForExecuteActivity(PollForActivityTaskResponse task) {
75+
public Span spanForExecuteActivity(PollForActivityTaskResponse task) {
8176
SpanContext parent = extract(task.getHeader());
82-
Span span =
83-
tracer
84-
.buildSpan(EXECUTE_ACTIVITY)
85-
.addReference(
86-
References.FOLLOWS_FROM, parent != NoopSpan.INSTANCE.context() ? parent : null)
87-
.withTag(
88-
TAG_WORKFLOW_TYPE,
89-
task.isSetWorkflowType() ? task.getWorkflowType().getName() : "null")
90-
.withTag(
91-
TAG_WORKFLOW_ID,
92-
task.isSetWorkflowExecution()
93-
? task.getWorkflowExecution().getWorkflowId()
94-
: "null")
95-
.withTag(
96-
TAG_WORKFLOW_RUN_ID,
97-
task.isSetWorkflowExecution() ? task.getWorkflowExecution().getRunId() : "null")
98-
.withTag(
99-
TAG_ACTIVITY_TYPE,
100-
task.isSetActivityType() ? task.getActivityType().getName() : "null")
101-
.start();
102-
tracer.activateSpan(span);
103-
return span;
77+
return tracer
78+
.buildSpan(EXECUTE_ACTIVITY)
79+
.addReference(
80+
References.FOLLOWS_FROM, parent != NoopSpan.INSTANCE.context() ? parent : null)
81+
.withTag(
82+
TAG_WORKFLOW_TYPE, task.isSetWorkflowType() ? task.getWorkflowType().getName() : "null")
83+
.withTag(
84+
TAG_WORKFLOW_ID,
85+
task.isSetWorkflowExecution() ? task.getWorkflowExecution().getWorkflowId() : "null")
86+
.withTag(
87+
TAG_WORKFLOW_RUN_ID,
88+
task.isSetWorkflowExecution() ? task.getWorkflowExecution().getRunId() : "null")
89+
.withTag(
90+
TAG_ACTIVITY_TYPE, task.isSetActivityType() ? task.getActivityType().getName() : "null")
91+
.start();
10492
}
10593

106-
public Span activateSpanForExecuteLocalActivity(Task task) {
94+
public Span spanForExecuteLocalActivity(Task task) {
10795
ExecuteLocalActivityParameters params = task.getExecuteLocalActivityParameters();
10896

10997
// retrieve spancontext from params

src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.uber.m3.util.Duration;
3838
import com.uber.m3.util.ImmutableMap;
3939
import io.opentracing.Span;
40+
import io.opentracing.Tracer;
4041
import java.nio.charset.StandardCharsets;
4142
import java.util.HashMap;
4243
import java.util.Map;
@@ -53,6 +54,7 @@ public class ActivityWorker extends SuspendableWorkerBase {
5354
private final IWorkflowService service;
5455
private final String domain;
5556
private final String taskList;
57+
private final Tracer tracer;
5658
private final TracingPropagator spanFactory;
5759

5860
public ActivityWorker(
@@ -75,6 +77,7 @@ public ActivityWorker(
7577
this.domain = Objects.requireNonNull(domain);
7678
this.taskList = Objects.requireNonNull(taskList);
7779
this.handler = handler;
80+
this.tracer = options.getTracer();
7881
this.spanFactory = new TracingPropagator(options.getTracer());
7982

8083
PollerOptions pollerOptions = options.getPollerOptions();
@@ -142,9 +145,8 @@ public void handle(PollForActivityTaskResponse task) throws Exception {
142145
MDC.put(LoggerTag.RUN_ID, task.getWorkflowExecution().getRunId());
143146

144147
propagateContext(task);
145-
Span span = spanFactory.activateSpanForExecuteActivity(task);
146-
147-
try {
148+
Span span = spanFactory.spanForExecuteActivity(task);
149+
try (io.opentracing.Scope scope = tracer.activateSpan(span)) {
148150
Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_EXEC_LATENCY).start();
149151
ActivityTaskHandler.Result response = handler.handle(task, metricsScope, false);
150152
sw.stop();

src/main/java/com/uber/cadence/internal/worker/LocalActivityWorker.java

+39-35
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import com.uber.m3.tally.Stopwatch;
3333
import com.uber.m3.util.ImmutableMap;
3434
import io.opentracing.Span;
35+
import io.opentracing.Tracer;
3536
import java.time.Duration;
3637
import java.util.Map;
3738
import java.util.Objects;
@@ -50,6 +51,7 @@ public final class LocalActivityWorker extends SuspendableWorkerBase {
5051
private final SingleWorkerOptions options;
5152
private final LocalActivityPollTask laPollTask;
5253
private final TracingPropagator spanFactory;
54+
private final Tracer tracer;
5355

5456
public LocalActivityWorker(
5557
String domain, String taskList, SingleWorkerOptions options, ActivityTaskHandler handler) {
@@ -58,6 +60,7 @@ public LocalActivityWorker(
5860
this.handler = handler;
5961
this.laPollTask = new LocalActivityPollTask();
6062
this.spanFactory = new TracingPropagator(options.getTracer());
63+
this.tracer = options.getTracer();
6164

6265
PollerOptions pollerOptions = options.getPollerOptions();
6366
if (pollerOptions.getPollThreadNamePrefix() == null) {
@@ -129,42 +132,43 @@ public void handle(Task task) throws Exception {
129132
propagateContext(task.params);
130133

131134
// start and activate span for local activities
132-
Span span = spanFactory.activateSpanForExecuteLocalActivity(task);
133-
134-
task.taskStartTime = System.currentTimeMillis();
135-
ActivityTaskHandler.Result result = handleLocalActivity(task);
136-
137-
LocalActivityMarkerData.Builder markerBuilder = new LocalActivityMarkerData.Builder();
138-
markerBuilder.setActivityId(task.params.getActivityId());
139-
markerBuilder.setActivityType(task.params.getActivityType());
140-
long replayTimeMillis =
141-
task.currentTimeMillis.getAsLong()
142-
+ (System.currentTimeMillis() - task.replayTimeUpdatedAtMillis.getAsLong());
143-
markerBuilder.setReplayTimeMillis(replayTimeMillis);
144-
145-
if (result.getTaskCompleted() != null) {
146-
markerBuilder.setResult(result.getTaskCompleted().getResult());
147-
} else if (result.getTaskFailedResult() != null) {
148-
markerBuilder.setTaskFailedRequest(result.getTaskFailedResult().getTaskFailedRequest());
149-
markerBuilder.setAttempt(result.getAttempt());
150-
markerBuilder.setBackoff(result.getBackoff());
151-
} else {
152-
markerBuilder.setTaskCancelledRequest(result.getTaskCancelled());
135+
Span span = spanFactory.spanForExecuteLocalActivity(task);
136+
try (io.opentracing.Scope scope = tracer.activateSpan(span)) {
137+
task.taskStartTime = System.currentTimeMillis();
138+
ActivityTaskHandler.Result result = handleLocalActivity(task);
139+
140+
LocalActivityMarkerData.Builder markerBuilder = new LocalActivityMarkerData.Builder();
141+
markerBuilder.setActivityId(task.params.getActivityId());
142+
markerBuilder.setActivityType(task.params.getActivityType());
143+
long replayTimeMillis =
144+
task.currentTimeMillis.getAsLong()
145+
+ (System.currentTimeMillis() - task.replayTimeUpdatedAtMillis.getAsLong());
146+
markerBuilder.setReplayTimeMillis(replayTimeMillis);
147+
148+
if (result.getTaskCompleted() != null) {
149+
markerBuilder.setResult(result.getTaskCompleted().getResult());
150+
} else if (result.getTaskFailedResult() != null) {
151+
markerBuilder.setTaskFailedRequest(result.getTaskFailedResult().getTaskFailedRequest());
152+
markerBuilder.setAttempt(result.getAttempt());
153+
markerBuilder.setBackoff(result.getBackoff());
154+
} else {
155+
markerBuilder.setTaskCancelledRequest(result.getTaskCancelled());
156+
}
157+
158+
LocalActivityMarkerData marker = markerBuilder.build();
159+
160+
HistoryEvent event = new HistoryEvent();
161+
event.setEventType(EventType.MarkerRecorded);
162+
MarkerRecordedEventAttributes attributes =
163+
new MarkerRecordedEventAttributes()
164+
.setMarkerName(ClockDecisionContext.LOCAL_ACTIVITY_MARKER_NAME)
165+
.setHeader(marker.getHeader(options.getDataConverter()))
166+
.setDetails(marker.getResult());
167+
event.setMarkerRecordedEventAttributes(attributes);
168+
task.eventConsumer.accept(event);
169+
} finally {
170+
span.finish();
153171
}
154-
155-
LocalActivityMarkerData marker = markerBuilder.build();
156-
157-
HistoryEvent event = new HistoryEvent();
158-
event.setEventType(EventType.MarkerRecorded);
159-
MarkerRecordedEventAttributes attributes =
160-
new MarkerRecordedEventAttributes()
161-
.setMarkerName(ClockDecisionContext.LOCAL_ACTIVITY_MARKER_NAME)
162-
.setHeader(marker.getHeader(options.getDataConverter()))
163-
.setDetails(marker.getResult());
164-
event.setMarkerRecordedEventAttributes(attributes);
165-
task.eventConsumer.accept(event);
166-
167-
span.finish();
168172
}
169173

170174
@Override

src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import io.opentelemetry.context.propagation.TextMapPropagator;
5050
import io.opentelemetry.context.propagation.TextMapSetter;
5151
import io.opentracing.Span;
52+
import io.opentracing.Tracer;
5253
import java.net.InetAddress;
5354
import java.net.InetSocketAddress;
5455
import java.net.UnknownHostException;
@@ -74,6 +75,7 @@ public class WorkflowServiceTChannel implements IWorkflowService {
7475
private final Map<String, String> thriftHeaders;
7576
private final TChannel tChannel;
7677
private final TracingPropagator tracingPropagator;
78+
private final Tracer tracer;
7779
private final SubChannel subChannel;
7880

7981
/**
@@ -86,6 +88,7 @@ public WorkflowServiceTChannel(ClientOptions options) {
8688
this.thriftHeaders = getThriftHeaders(options);
8789
this.tChannel = new TChannel.Builder(options.getClientAppName()).build();
8890
this.tracingPropagator = new TracingPropagator(options.getTracer());
91+
this.tracer = options.getTracer();
8992

9093
InetAddress address;
9194
try {
@@ -126,6 +129,7 @@ public WorkflowServiceTChannel(SubChannel subChannel, ClientOptions options) {
126129
this.tChannel = null;
127130
this.subChannel = subChannel;
128131
this.tracingPropagator = new TracingPropagator(options.getTracer());
132+
this.tracer = options.getTracer();
129133
}
130134

131135
private static Map<String, String> getThriftHeaders(ClientOptions options) {
@@ -326,14 +330,15 @@ private <T> T measureRemoteCall(String scopeName, RemoteCall<T> call) throws TEx
326330

327331
private <T> T measureRemoteCallWithTags(
328332
String scopeName, RemoteCall<T> call, Map<String, String> tags) throws TException {
329-
Span span = tracingPropagator.activateSpanByServiceMethod(scopeName);
330333
Scope scope = options.getMetricsScope().subScope(scopeName);
331334
if (tags != null) {
332335
scope = scope.tagged(tags);
333336
}
334337
scope.counter(MetricsType.CADENCE_REQUEST).inc(1);
335338
Stopwatch sw = scope.timer(MetricsType.CADENCE_LATENCY).start();
336-
try {
339+
340+
Span span = tracingPropagator.spanByServiceMethod(scopeName);
341+
try (io.opentracing.Scope tracingScope = tracer.activateSpan(span)) {
337342
T resp = call.apply();
338343
sw.stop();
339344
return resp;

src/test/java/com/uber/cadence/internal/tracing/StartWorkflowTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ private void testStartWorkflowHelper(
278278
}
279279

280280
// assert workflow spans
281-
MockSpan spanExecuteWF = getLinkedSpans(spans, spanStartWorkflow.context()).get(1);
281+
MockSpan spanExecuteWF = getLinkedSpans(spans, spanStartWorkflow.context()).get(0);
282282
assertEquals(spanExecuteWF.operationName(), "cadence-ExecuteWorkflow");
283283
assertSpanReferences(spanExecuteWF, "follows_from", spanStartWorkflow);
284284

@@ -382,7 +382,7 @@ private void testSignalWithStartWorkflowHelper(
382382
// assert workflow spans
383383
List<MockSpan> workflowSpans =
384384
getSpansByTraceID(spans, spanStartWorkflow.context().toTraceId());
385-
MockSpan spanExecuteWF = getLinkedSpans(spans, spanStartWorkflow.context()).get(1);
385+
MockSpan spanExecuteWF = getLinkedSpans(spans, spanStartWorkflow.context()).get(0);
386386
assertEquals(spanExecuteWF.operationName(), "cadence-ExecuteWorkflow");
387387
assertSpanReferences(spanExecuteWF, "follows_from", spanStartWorkflow);
388388

0 commit comments

Comments
 (0)