Skip to content

Commit 5d9cfad

Browse files
authored
Thread leaks2 (#186)
* Fixed core sizes of thread pools * Increased WorkflowTest[TestService] timeout to not have failures on travis * Increased WorkflowTest[TestService] timeout to not have failures on travis * Added thread dump after the fixed timeout * Updated thread dump delay * Fixed race condition in TestWorkflowMutableState.query * Disabled excessive unit test logging
1 parent 56d6703 commit 5d9cfad

File tree

7 files changed

+36
-45
lines changed

7 files changed

+36
-45
lines changed

build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -215,3 +215,5 @@ uploadArchives {
215215
}
216216
}
217217
}
218+
219+
//logging.captureStandardOutput LogLevel.INFO

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

+15-9
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,7 @@
4545
import java.util.Random;
4646
import java.util.Set;
4747
import java.util.UUID;
48-
import java.util.concurrent.ExecutorService;
49-
import java.util.concurrent.SynchronousQueue;
50-
import java.util.concurrent.ThreadPoolExecutor;
51-
import java.util.concurrent.TimeUnit;
48+
import java.util.concurrent.*;
5249
import java.util.concurrent.locks.Lock;
5350
import java.util.concurrent.locks.ReentrantLock;
5451
import java.util.function.BiConsumer;
@@ -121,11 +118,20 @@ static void setCurrentThreadInternal(WorkflowThread coroutine) {
121118
}
122119

123120
DeterministicRunnerImpl(Supplier<Long> clock, Runnable root) {
124-
this(
125-
new ThreadPoolExecutor(0, 1000, 1, TimeUnit.MINUTES, new SynchronousQueue<>()),
126-
newDummySyncDecisionContext(),
127-
clock,
128-
root);
121+
this(getDefaultThreadPool(), newDummySyncDecisionContext(), clock, root);
122+
}
123+
124+
private static ThreadPoolExecutor getDefaultThreadPool() {
125+
ThreadPoolExecutor result =
126+
new ThreadPoolExecutor(0, 1000, 1, TimeUnit.SECONDS, new SynchronousQueue<>());
127+
result.setThreadFactory(
128+
new ThreadFactory() {
129+
@Override
130+
public Thread newThread(Runnable r) {
131+
return new Thread(r, "deterministic runner thread");
132+
}
133+
});
134+
return result;
129135
}
130136

131137
DeterministicRunnerImpl(

src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.SynchronousQueue;
3131
import java.util.concurrent.ThreadPoolExecutor;
3232
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicInteger;
3334
import java.util.function.Function;
3435

3536
/** Workflow worker that supports POJO workflow implementations. */
@@ -38,6 +39,7 @@ public class SyncWorkflowWorker {
3839
private final WorkflowWorker worker;
3940
private final POJOWorkflowImplementationFactory factory;
4041
private final SingleWorkerOptions options;
42+
private final AtomicInteger workflowThreadCounter = new AtomicInteger();
4143

4244
public SyncWorkflowWorker(
4345
IWorkflowService service,
@@ -48,11 +50,9 @@ public SyncWorkflowWorker(
4850
int workflowThreadPoolSize) {
4951
ThreadPoolExecutor workflowThreadPool =
5052
new ThreadPoolExecutor(
51-
workflowThreadPoolSize,
52-
workflowThreadPoolSize,
53-
10,
54-
TimeUnit.SECONDS,
55-
new SynchronousQueue<>());
53+
0, workflowThreadPoolSize, 1, TimeUnit.SECONDS, new SynchronousQueue<>());
54+
workflowThreadPool.setThreadFactory(
55+
r -> new Thread(r, "workflow-thread-" + workflowThreadCounter.incrementAndGet()));
5656
factory =
5757
new POJOWorkflowImplementationFactory(
5858
options.getDataConverter(),

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1173,9 +1173,9 @@ public QueryWorkflowResponse query(QueryWorkflowRequest queryRequest) throws TEx
11731173
.setQuery(queryRequest.getQuery());
11741174
TaskListId taskListId =
11751175
new TaskListId(queryRequest.getDomain(), startRequest.getTaskList().getName());
1176-
store.sendQueryTask(executionId, taskListId, task);
11771176
CompletableFuture<QueryWorkflowResponse> result = new CompletableFuture<>();
11781177
queries.put(queryId.getQueryId(), result);
1178+
store.sendQueryTask(executionId, taskListId, task);
11791179
try {
11801180
return result.get();
11811181
} catch (InterruptedException e) {

src/main/java/com/uber/cadence/internal/worker/PollTask.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ public interface Poller<T> {
6565
this.handler = handler;
6666
taskExecutor =
6767
new ThreadPoolExecutor(
68+
0,
6869
options.getTaskExecutorThreadPoolSize(),
69-
options.getTaskExecutorThreadPoolSize(),
70-
10,
70+
1,
7171
TimeUnit.SECONDS,
7272
new SynchronousQueue<>());
7373
taskExecutor.setThreadFactory(

src/main/java/com/uber/cadence/internal/worker/Poller.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -180,22 +180,17 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
180180
// not started yet.
181181
return true;
182182
}
183-
boolean result = pollExecutor.awaitTermination(timeout, unit);
184-
log.info("awaitTermination done");
185-
return result;
183+
return pollExecutor.awaitTermination(timeout, unit);
186184
}
187185

188186
@Override
189187
public boolean shutdownAndAwaitTermination(long timeout, TimeUnit unit)
190188
throws InterruptedException {
191-
log.info("shutdownAndAwaitTermination poller=" + this.options.getPollThreadNamePrefix());
192189
if (!isStarted()) {
193190
return true;
194191
}
195192
pollExecutor.shutdownNow();
196-
boolean result = pollExecutor.awaitTermination(timeout, unit);
197-
log.info("shutdownAndAwaitTermination done");
198-
return result;
193+
return pollExecutor.awaitTermination(timeout, unit);
199194
}
200195

201196
@Override

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

+9-21
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import static org.junit.Assert.assertTrue;
2626
import static org.junit.Assert.fail;
2727

28-
import com.googlecode.junittoolbox.ParallelParameterized;
2928
import com.uber.cadence.SignalExternalWorkflowExecutionFailedCause;
3029
import com.uber.cadence.TimeoutType;
3130
import com.uber.cadence.WorkflowExecution;
@@ -60,20 +59,7 @@
6059
import java.io.IOException;
6160
import java.lang.reflect.Type;
6261
import java.time.Duration;
63-
import java.util.ArrayDeque;
64-
import java.util.ArrayList;
65-
import java.util.Arrays;
66-
import java.util.Collections;
67-
import java.util.HashMap;
68-
import java.util.HashSet;
69-
import java.util.List;
70-
import java.util.Map;
71-
import java.util.Objects;
72-
import java.util.Optional;
73-
import java.util.Queue;
74-
import java.util.Random;
75-
import java.util.Set;
76-
import java.util.UUID;
62+
import java.util.*;
7763
import java.util.concurrent.CancellationException;
7864
import java.util.concurrent.CompletableFuture;
7965
import java.util.concurrent.ExecutionException;
@@ -99,12 +85,13 @@
9985
import org.junit.rules.Timeout;
10086
import org.junit.runner.Description;
10187
import org.junit.runner.RunWith;
88+
import org.junit.runners.Parameterized;
10289
import org.junit.runners.Parameterized.Parameter;
10390
import org.junit.runners.Parameterized.Parameters;
10491
import org.slf4j.Logger;
10592
import org.slf4j.LoggerFactory;
10693

107-
@RunWith(ParallelParameterized.class)
94+
@RunWith(Parameterized.class)
10895
public class WorkflowTest {
10996

11097
/**
@@ -132,7 +119,7 @@ public static Object[] data() {
132119

133120
@Rule
134121
public Timeout globalTimeout =
135-
Timeout.seconds(DEBUGGER_TIMEOUTS ? 500 : (skipDockerService ? 3 : 20));
122+
Timeout.seconds(DEBUGGER_TIMEOUTS ? 500 : (skipDockerService ? 5 : 20));
136123

137124
@Rule
138125
public TestWatcher watchman =
@@ -251,7 +238,7 @@ public void setUp() {
251238

252239
@After
253240
public void tearDown() throws Throwable {
254-
worker.shutdown(Duration.ofMillis(1));
241+
worker.shutdown(Duration.ofMinutes(1));
255242
activitiesImpl.close();
256243
if (testEnvironment != null) {
257244
testEnvironment.close();
@@ -2581,14 +2568,15 @@ private static class TestActivitiesImpl implements TestActivities {
25812568
final List<String> invocations = Collections.synchronizedList(new ArrayList<>());
25822569
final List<String> procResult = Collections.synchronizedList(new ArrayList<>());
25832570
private final ThreadPoolExecutor executor =
2584-
new ThreadPoolExecutor(2, 100, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
2571+
new ThreadPoolExecutor(0, 100, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
25852572

25862573
private TestActivitiesImpl(ActivityCompletionClient completionClient) {
25872574
this.completionClient = completionClient;
25882575
}
25892576

2590-
void close() {
2591-
executor.shutdown();
2577+
void close() throws InterruptedException {
2578+
executor.shutdownNow();
2579+
executor.awaitTermination(1, TimeUnit.MINUTES);
25922580
}
25932581

25942582
void assertInvocations(String... expected) {

0 commit comments

Comments
 (0)