Skip to content

Commit 3ee76a7

Browse files
authored
Fix thread leak on non-deterministic error (#365)
* Also throw error on rejected execution exception
1 parent abb9cd4 commit 3ee76a7

File tree

5 files changed

+73
-3
lines changed

5 files changed

+73
-3
lines changed

src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java

+7
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,13 @@ private Result processDecision(PollForDecisionTaskResponse decisionTask) throws
168168
}
169169
return createCompletedRequest(decisionTask, result);
170170
} catch (Throwable e) {
171+
// Note here that the decider might not be in the cache, even sticky is on. In that case we
172+
// need to close the decider explicitly.
173+
// For items in the cache, invalidation callback will try to close again, which should be ok.
174+
if (decider != null) {
175+
decider.close();
176+
}
177+
171178
if (stickyTaskListName != null) {
172179
cache.invalidate(decisionTask.getWorkflowExecution().getRunId());
173180
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.sync;
19+
20+
public class WorkflowRejectedExecutionError extends Error {
21+
22+
WorkflowRejectedExecutionError(Throwable cause) {
23+
super(cause);
24+
}
25+
}

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,13 @@ public void start() {
256256
cache.evictAnyNotInProcessing(
257257
this.runner.getDecisionContext().getContext().getRunId());
258258
if (!evicted) {
259-
throw e;
259+
// Note here we need to throw error, not exception. Otherwise it will be
260+
// translated to workflow execution exception and instead of failing the
261+
// decision we will be failing the workflow.
262+
throw new WorkflowRejectedExecutionError(e);
260263
}
261264
} else {
262-
throw e;
265+
throw new WorkflowRejectedExecutionError(e);
263266
}
264267
}
265268
}

src/test/java/com/uber/cadence/internal/sync/DeterministicRunnerTest.java

+24
Original file line numberDiff line numberDiff line change
@@ -809,4 +809,28 @@ public void close() {
809809
runner.close();
810810
}
811811
}
812+
813+
@Test
814+
public void testRejectedExecutionError() {
815+
ThreadPoolExecutor threadPool =
816+
new ThreadPoolExecutor(0, 1, 1, TimeUnit.SECONDS, new SynchronousQueue<>());
817+
818+
DeterministicRunner d =
819+
new DeterministicRunnerImpl(
820+
threadPool,
821+
null,
822+
System::currentTimeMillis,
823+
() -> {
824+
Promise<Void> async = Async.procedure(() -> status = "started");
825+
async.get();
826+
});
827+
828+
assertEquals("initial", status);
829+
830+
try {
831+
d.runUntilAllBlocked();
832+
} catch (Throwable t) {
833+
assertTrue(t instanceof WorkflowRejectedExecutionError);
834+
}
835+
}
812836
}

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import java.io.IOException;
7676
import java.io.InputStream;
7777
import java.lang.management.ManagementFactory;
78+
import java.lang.management.ThreadInfo;
7879
import java.lang.reflect.Type;
7980
import java.text.SimpleDateFormat;
8081
import java.time.Duration;
@@ -4176,7 +4177,7 @@ public void testNonDeterministicWorkflowPolicyBlockWorkflow() {
41764177
startWorkerFor(DeterminismFailingWorkflowImpl.class);
41774178
WorkflowOptions options =
41784179
new WorkflowOptions.Builder()
4179-
.setExecutionStartToCloseTimeout(Duration.ofSeconds(1))
4180+
.setExecutionStartToCloseTimeout(Duration.ofSeconds(10))
41804181
.setTaskStartToCloseTimeout(Duration.ofSeconds(1))
41814182
.setTaskList(taskList)
41824183
.build();
@@ -4188,6 +4189,16 @@ public void testNonDeterministicWorkflowPolicyBlockWorkflow() {
41884189
} catch (WorkflowTimedOutException e) {
41894190
// expected to timeout as workflow is going get blocked.
41904191
}
4192+
4193+
int workflowRootThreads = 0;
4194+
ThreadInfo[] threads = ManagementFactory.getThreadMXBean().dumpAllThreads(false, false);
4195+
for (ThreadInfo thread : threads) {
4196+
if (thread.getThreadName().contains("workflow-root")) {
4197+
workflowRootThreads++;
4198+
}
4199+
}
4200+
4201+
assertTrue("workflow threads might leak", workflowRootThreads < 10);
41914202
}
41924203

41934204
@Test

0 commit comments

Comments
 (0)