Skip to content

Commit 6df06a2

Browse files
authored
Fixed query thread leak (#222)
* Fixed query thread leak * Updated version to 2.1.2
1 parent 44296d2 commit 6df06a2

File tree

12 files changed

+255
-129
lines changed

12 files changed

+255
-129
lines changed

CHANGELOG.md

+9-1
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,12 @@
1010
- Metrics and Logging support in client.
1111
- Side effects, mutable side effects, random uuid and workflow getVersion support.
1212
- Activity heartbeat throttling.
13-
- Deterministic retry of failed operation.
13+
- Deterministic retry of failed operation.
14+
15+
## v2.1.2
16+
- Requires minimum server release v0.4.0
17+
- Introduced WorkerFactory and FactoryOptions
18+
- Added sticky workflow execution, which is caching of a workflow object between decisions. It is enabled by default,
19+
to disable use FactoryOptions.disableStickyExecution property.
20+
- Updated Thrift to expose new types of service exceptions: ServiceBusyError, DomainNotActiveError, LimitExceededError
21+
- Added metric for corrupted signal as well as metrics related to caching and evictions.

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ googleJavaFormat {
3737
}
3838

3939
group = 'com.uber.cadence'
40-
version = '2.1.0'
40+
version = '2.1.2'
4141

4242
description = """Uber Cadence Java Client"""
4343

src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java

+13-7
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,12 @@ private Result handleDecisionTaskImpl(PollForDecisionTaskResponse decisionTask)
115115
}
116116

117117
private Result processDecision(PollForDecisionTaskResponse decisionTask) throws Throwable {
118-
Decider decider =
119-
stickyTaskListName == null
120-
? createDecider(decisionTask)
121-
: cache.getOrCreate(decisionTask, this::createDecider);
118+
Decider decider = null;
122119
try {
120+
decider =
121+
stickyTaskListName == null
122+
? createDecider(decisionTask)
123+
: cache.getOrCreate(decisionTask, this::createDecider);
123124
List<Decision> decisions = decider.decide(decisionTask);
124125
if (log.isTraceEnabled()) {
125126
WorkflowExecution execution = decisionTask.getWorkflowExecution();
@@ -146,13 +147,13 @@ private Result processDecision(PollForDecisionTaskResponse decisionTask) throws
146147
+ " new decisions");
147148
}
148149
return createCompletedRequest(decisionTask, decisions);
149-
} catch (Exception e) {
150+
} catch (Throwable e) {
150151
if (stickyTaskListName != null) {
151152
cache.invalidate(decisionTask);
152153
}
153154
throw e;
154155
} finally {
155-
if (stickyTaskListName == null) {
156+
if (stickyTaskListName == null && decider != null) {
156157
decider.close();
157158
}
158159
}
@@ -161,8 +162,9 @@ private Result processDecision(PollForDecisionTaskResponse decisionTask) throws
161162
private Result processQuery(PollForDecisionTaskResponse decisionTask) {
162163
RespondQueryTaskCompletedRequest queryCompletedRequest = new RespondQueryTaskCompletedRequest();
163164
queryCompletedRequest.setTaskToken(decisionTask.getTaskToken());
165+
Decider decider = null;
164166
try {
165-
Decider decider =
167+
decider =
166168
stickyTaskListName == null
167169
? createDecider(decisionTask)
168170
: cache.getOrCreate(decisionTask, this::createDecider);
@@ -176,6 +178,10 @@ private Result processQuery(PollForDecisionTaskResponse decisionTask) {
176178
e.printStackTrace(pw);
177179
queryCompletedRequest.setErrorMessage(sw.toString());
178180
queryCompletedRequest.setCompletedType(QueryTaskCompletedType.FAILED);
181+
} finally {
182+
if (stickyTaskListName == null && decider != null) {
183+
decider.close();
184+
}
179185
}
180186
return new Result(null, null, queryCompletedRequest, null);
181187
}

src/main/java/com/uber/cadence/internal/sync/WorkflowInvocationHandler.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -206,13 +206,15 @@ private Object queryWorkflow(Method method, QueryMethod queryMethod, Object[] ar
206206
private Object startWorkflow(Method method, Object[] args) {
207207
Optional<WorkflowOptions> options = untyped.getOptions();
208208
if (untyped.getExecution() == null
209-
|| options.get().getWorkflowIdReusePolicy() == WorkflowIdReusePolicy.AllowDuplicate) {
209+
|| (options.isPresent()
210+
&& options.get().getWorkflowIdReusePolicy() == WorkflowIdReusePolicy.AllowDuplicate)) {
210211
try {
211212
untyped.start(args);
212213
} catch (DuplicateWorkflowException e) {
213214
// We do allow duplicated calls if policy is not AllowDuplicate. Semantic is to wait for
214215
// result.
215-
if (options.get().getWorkflowIdReusePolicy() == WorkflowIdReusePolicy.AllowDuplicate) {
216+
if (options.isPresent()
217+
&& options.get().getWorkflowIdReusePolicy() == WorkflowIdReusePolicy.AllowDuplicate) {
216218
throw e;
217219
}
218220
}

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ public void start() {
228228

229229
getDecisionContext()
230230
.getMetricsScope()
231-
.gauge(MetricsType.STICKY_CACHE_SIZE)
231+
.gauge(MetricsType.WORKFLOW_ACTIVE_THREAD_COUNT)
232232
.update(((ThreadPoolExecutor) threadPool).getActiveCount());
233233

234234
try {

src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java

+30-10
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,16 @@
1717

1818
package com.uber.cadence.internal.worker;
1919

20-
import com.uber.cadence.GetWorkflowExecutionHistoryResponse;
21-
import com.uber.cadence.PollForDecisionTaskResponse;
22-
import com.uber.cadence.RespondDecisionTaskCompletedRequest;
23-
import com.uber.cadence.RespondDecisionTaskFailedRequest;
24-
import com.uber.cadence.RespondQueryTaskCompletedRequest;
25-
import com.uber.cadence.WorkflowExecution;
26-
import com.uber.cadence.WorkflowQuery;
20+
import com.uber.cadence.*;
2721
import com.uber.cadence.common.RetryOptions;
2822
import com.uber.cadence.internal.common.Retryer;
2923
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
3024
import com.uber.cadence.internal.logging.LoggerTag;
3125
import com.uber.cadence.internal.metrics.MetricsType;
3226
import com.uber.cadence.serviceclient.IWorkflowService;
3327
import com.uber.m3.tally.Stopwatch;
28+
import java.util.Arrays;
29+
import java.util.List;
3430
import java.util.Objects;
3531
import java.util.concurrent.TimeUnit;
3632
import java.util.function.Consumer;
@@ -112,17 +108,41 @@ public byte[] queryWorkflowExecution(WorkflowExecution execution, String queryTy
112108
task.setWorkflowExecution(execution);
113109
task.setStartedEventId(Long.MAX_VALUE);
114110
task.setPreviousStartedEventId(Long.MAX_VALUE);
115-
task.setWorkflowType(task.getWorkflowType());
116111
WorkflowQuery query = new WorkflowQuery();
117112
query.setQueryType(queryType).setQueryArgs(args);
118113
task.setQuery(query);
119-
GetWorkflowExecutionHistoryResponse history =
114+
GetWorkflowExecutionHistoryResponse historyResponse =
120115
WorkflowExecutionUtils.getHistoryPage(null, service, domain, execution);
121-
task.setHistory(history.getHistory());
116+
History history = historyResponse.getHistory();
122117

118+
List<HistoryEvent> events = history.getEvents();
119+
if (events == null || events.isEmpty()) {
120+
throw new IllegalStateException("Empty history for " + execution);
121+
}
122+
HistoryEvent startedEvent = events.get(0);
123+
WorkflowExecutionStartedEventAttributes started =
124+
startedEvent.getWorkflowExecutionStartedEventAttributes();
125+
if (started == null) {
126+
throw new IllegalStateException(
127+
"First event of the history is not WorkflowExecutionStarted: " + startedEvent);
128+
}
129+
WorkflowType workflowType = started.getWorkflowType();
130+
task.setWorkflowType(workflowType);
131+
task.setHistory(history);
123132
DecisionTaskHandler.Result result = handler.handleDecisionTask(task);
124133
if (result.getQueryCompleted() != null) {
125134
RespondQueryTaskCompletedRequest r = result.getQueryCompleted();
135+
if (r.getErrorMessage() != null) {
136+
throw new RuntimeException(
137+
"query failure for "
138+
+ execution
139+
+ ", queryType="
140+
+ queryType
141+
+ ", args="
142+
+ Arrays.toString(args)
143+
+ ", error="
144+
+ r.getErrorMessage());
145+
}
126146
return r.getQueryResult();
127147
}
128148

src/main/java/com/uber/cadence/testing/TestEnvironmentOptions.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ public TestEnvironmentOptions build() {
9393
}
9494

9595
if (factoryOptions == null) {
96-
factoryOptions = new Worker.FactoryOptions.Builder().build();
96+
factoryOptions =
97+
new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build();
9798
}
9899

99100
return new TestEnvironmentOptions(

src/main/java/com/uber/cadence/worker/Worker.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -595,8 +595,7 @@ enum State {
595595

596596
public static class FactoryOptions {
597597
public static class Builder {
598-
// TODO: Enable by default as soon the service is released
599-
private boolean disableStickyExecution = true;
598+
private boolean disableStickyExecution;
600599
private int stickyDecisionScheduleToStartTimeoutInSeconds = 5;
601600
private int cacheMaximumSize = 600;
602601
private int maxWorkflowThreadCount = 600;

src/test/java/com/uber/cadence/worker/StickyWorkerTest.java

+57-25
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import com.uber.cadence.internal.metrics.MetricsTag;
3535
import com.uber.cadence.internal.metrics.MetricsType;
3636
import com.uber.cadence.internal.replay.DeciderCache;
37+
import com.uber.cadence.serviceclient.IWorkflowService;
38+
import com.uber.cadence.serviceclient.WorkflowServiceTChannel;
3739
import com.uber.cadence.testing.TestEnvironmentOptions;
3840
import com.uber.cadence.testing.TestWorkflowEnvironment;
3941
import com.uber.cadence.workflow.Async;
@@ -56,24 +58,19 @@
5658
import java.util.Map;
5759
import java.util.Queue;
5860
import java.util.Random;
59-
import org.junit.Assert;
60-
import org.junit.Ignore;
61-
import org.junit.Rule;
62-
import org.junit.Test;
61+
import org.junit.*;
6362
import org.junit.rules.TestName;
63+
import org.junit.runner.RunWith;
6464
import org.junit.runners.Parameterized;
6565
import org.slf4j.Logger;
6666
import org.slf4j.LoggerFactory;
6767

68-
// @RunWith(Parameterized.class)
69-
@Ignore
68+
@RunWith(Parameterized.class)
7069
public class StickyWorkerTest {
7170
public static final String DOMAIN = "UnitTest";
7271

73-
// TODO: Enable for docker as soon as the server commit
74-
// a36c84991664571636d37a3826b282ddbdbd2402 is released
75-
private static final boolean skipDockerService = true;
76-
// Boolean.parseBoolean(System.getenv("SKIP_DOCKER_SERVICE"));
72+
private static final boolean skipDockerService =
73+
Boolean.parseBoolean(System.getenv("SKIP_DOCKER_SERVICE"));
7774

7875
@Parameterized.Parameter public boolean useExternalService;
7976

@@ -91,6 +88,15 @@ public static Object[] data() {
9188

9289
@Rule public TestName testName = new TestName();
9390

91+
private IWorkflowService service;
92+
93+
@Before
94+
public void setUp() {
95+
if (testType.equals("Docker") && service == null) {
96+
service = new WorkflowServiceTChannel();
97+
}
98+
}
99+
94100
@Test
95101
public void whenStickyIsEnabledThenTheWorkflowIsCachedSignals() throws Exception {
96102
// Arrange
@@ -104,7 +110,10 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedSignals() throws Exception
104110

105111
TestEnvironmentWrapper wrapper =
106112
new TestEnvironmentWrapper(
107-
new Worker.FactoryOptions.Builder().setMetricScope(scope).build());
113+
new Worker.FactoryOptions.Builder()
114+
.setDisableStickyExecution(false)
115+
.setMetricScope(scope)
116+
.build());
108117
Worker.Factory factory = wrapper.getWorkerFactory();
109118
Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build());
110119
worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class);
@@ -159,7 +168,10 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedActivities() throws Except
159168

160169
TestEnvironmentWrapper wrapper =
161170
new TestEnvironmentWrapper(
162-
new Worker.FactoryOptions.Builder().setMetricScope(scope).build());
171+
new Worker.FactoryOptions.Builder()
172+
.setDisableStickyExecution(false)
173+
.setMetricScope(scope)
174+
.build());
163175
Worker.Factory factory = wrapper.getWorkerFactory();
164176
Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build());
165177
worker.registerWorkflowImplementationTypes(ActivitiesWorkflowImpl.class);
@@ -214,7 +226,10 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedChildWorkflows() throws Ex
214226

215227
TestEnvironmentWrapper wrapper =
216228
new TestEnvironmentWrapper(
217-
new Worker.FactoryOptions.Builder().setMetricScope(scope).build());
229+
new Worker.FactoryOptions.Builder()
230+
.setDisableStickyExecution(false)
231+
.setMetricScope(scope)
232+
.build());
218233
Worker.Factory factory = wrapper.getWorkerFactory();
219234
Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build());
220235
worker.registerWorkflowImplementationTypes(
@@ -262,7 +277,10 @@ public void whenStickyIsEnabledThenTheWorkflowIsCachedMutableSideEffect() throws
262277

263278
TestEnvironmentWrapper wrapper =
264279
new TestEnvironmentWrapper(
265-
new Worker.FactoryOptions.Builder().setMetricScope(scope).build());
280+
new Worker.FactoryOptions.Builder()
281+
.setDisableStickyExecution(false)
282+
.setMetricScope(scope)
283+
.build());
266284
Worker.Factory factory = wrapper.getWorkerFactory();
267285
Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build());
268286
worker.registerWorkflowImplementationTypes(TestMutableSideEffectWorkflowImpl.class);
@@ -343,7 +361,8 @@ public void whenCacheIsEvictedTheWorkerCanRecover() throws Exception {
343361
// Arrange
344362
String taskListName = "evictedStickyTest";
345363
TestEnvironmentWrapper wrapper =
346-
new TestEnvironmentWrapper(new Worker.FactoryOptions.Builder().build());
364+
new TestEnvironmentWrapper(
365+
new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build());
347366
Worker.Factory factory = wrapper.getWorkerFactory();
348367
Worker worker = factory.newWorker(taskListName);
349368
worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class);
@@ -382,7 +401,8 @@ public void workflowsCanBeQueried() throws Exception {
382401
// Arrange
383402
String taskListName = "queryStickyTest";
384403
TestEnvironmentWrapper wrapper =
385-
new TestEnvironmentWrapper(new Worker.FactoryOptions.Builder().build());
404+
new TestEnvironmentWrapper(
405+
new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build());
386406
Worker.Factory factory = wrapper.getWorkerFactory();
387407
Worker worker = factory.newWorker(taskListName);
388408
worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class);
@@ -422,7 +442,8 @@ public void workflowsCanBeQueriedAfterEviction() throws Exception {
422442
// Arrange
423443
String taskListName = "queryEvictionStickyTest";
424444
TestEnvironmentWrapper wrapper =
425-
new TestEnvironmentWrapper(new Worker.FactoryOptions.Builder().build());
445+
new TestEnvironmentWrapper(
446+
new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build());
426447
Worker.Factory factory = wrapper.getWorkerFactory();
427448
Worker worker = factory.newWorker(taskListName);
428449
worker.registerWorkflowImplementationTypes(GreetingSignalWorkflowImpl.class);
@@ -467,25 +488,36 @@ private class TestEnvironmentWrapper {
467488

468489
public TestEnvironmentWrapper(Worker.FactoryOptions options) {
469490
if (options == null) {
470-
options = new Worker.FactoryOptions.Builder().build();
491+
options = new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build();
492+
}
493+
if (useExternalService) {
494+
factory = new Worker.Factory(service, DOMAIN, options);
495+
} else {
496+
TestEnvironmentOptions testOptions =
497+
new TestEnvironmentOptions.Builder()
498+
.setDomain(DOMAIN)
499+
.setFactoryOptions(options)
500+
.build();
501+
testEnv = TestWorkflowEnvironment.newInstance(testOptions);
471502
}
472-
factory = new Worker.Factory(DOMAIN, options);
473-
TestEnvironmentOptions testOptions =
474-
new TestEnvironmentOptions.Builder().setDomain(DOMAIN).setFactoryOptions(options).build();
475-
testEnv = TestWorkflowEnvironment.newInstance(testOptions);
476503
}
477504

478505
private Worker.Factory getWorkerFactory() {
479506
return useExternalService ? factory : testEnv.getWorkerFactory();
480507
}
481508

482509
private WorkflowClient getWorkflowClient() {
483-
return useExternalService ? WorkflowClient.newInstance(DOMAIN) : testEnv.newWorkflowClient();
510+
return useExternalService
511+
? WorkflowClient.newInstance(service, DOMAIN)
512+
: testEnv.newWorkflowClient();
484513
}
485514

486515
private void close() {
487-
factory.shutdown(Duration.ofSeconds(1));
488-
testEnv.close();
516+
if (useExternalService) {
517+
factory.shutdown(Duration.ofSeconds(1));
518+
} else {
519+
testEnv.close();
520+
}
489521
}
490522
}
491523

src/test/java/com/uber/cadence/worker/WorkerStressTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ private class TestEnvironmentWrapper {
171171

172172
public TestEnvironmentWrapper(Worker.FactoryOptions options) {
173173
if (options == null) {
174-
options = new Worker.FactoryOptions.Builder().build();
174+
options = new Worker.FactoryOptions.Builder().setDisableStickyExecution(false).build();
175175
}
176176
factory = new Worker.Factory(DOMAIN, options);
177177
TestEnvironmentOptions testOptions =

0 commit comments

Comments
 (0)