Skip to content

Commit 9beadba

Browse files
authored
Introduce Workerfactory (#188)
* Introduce Worker.Factory for worker creation * Change Worker constructors, start and shutdown method to private
1 parent 5d9cfad commit 9beadba

File tree

9 files changed

+270
-133
lines changed

9 files changed

+270
-133
lines changed

build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ dependencies {
5050
compile group: 'org.apache.thrift', name: 'libthrift', version: '0.9.3'
5151
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.2'
5252
compile group: 'com.uber.m3', name: 'tally-core', version: '0.2.1'
53+
compile group: 'com.google.guava', name: 'guava', version: '25.1-jre'
5354
testCompile group: 'junit', name: 'junit', version: '4.12'
5455
testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4'
5556
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

+9-14
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,6 @@
7878
import com.uber.cadence.worker.WorkerOptions;
7979
import java.lang.reflect.Type;
8080
import java.time.Duration;
81-
import java.util.ArrayList;
82-
import java.util.Collections;
83-
import java.util.List;
8481
import java.util.Optional;
8582
import java.util.concurrent.CompletableFuture;
8683
import java.util.concurrent.ExecutionException;
@@ -98,7 +95,7 @@ public final class TestWorkflowEnvironmentInternal implements TestWorkflowEnviro
9895

9996
private final TestEnvironmentOptions testEnvironmentOptions;
10097
private final WorkflowServiceWrapper service;
101-
private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());
98+
private final Worker.Factory workerFactory;
10299

103100
public TestWorkflowEnvironmentInternal(TestEnvironmentOptions options) {
104101
if (options == null) {
@@ -108,6 +105,7 @@ public TestWorkflowEnvironmentInternal(TestEnvironmentOptions options) {
108105
}
109106
service = new WorkflowServiceWrapper();
110107
service.lockTimeSkipping("TestWorkflowEnvironmentInternal constructor");
108+
workerFactory = new Worker.Factory(service, options.getDomain());
111109
}
112110

113111
@Override
@@ -127,9 +125,7 @@ public Worker newWorker(
127125
builder.setDataConverter(testEnvironmentOptions.getDataConverter());
128126
}
129127
builder = overrideOptions.apply(builder);
130-
Worker result =
131-
new Worker(service, testEnvironmentOptions.getDomain(), taskList, builder.build());
132-
workers.add(result);
128+
Worker result = workerFactory.newWorker(taskList, builder.build());
133129
return result;
134130
}
135131

@@ -191,16 +187,15 @@ public String getDiagnostics() {
191187

192188
@Override
193189
public void close() {
194-
for (Worker w : workers) {
195-
if (w.isStarted()) {
196-
w.shutdown(Duration.ofMillis(10));
197-
} else {
198-
log.warn("Worker was created, but never started for taskList: " + w.getTaskList());
199-
}
200-
}
190+
workerFactory.shutdown(Duration.ofMillis(10));
201191
service.close();
202192
}
203193

194+
@Override
195+
public void start() {
196+
workerFactory.start();
197+
}
198+
204199
private static class WorkflowServiceWrapper implements IWorkflowService {
205200

206201
private final TestWorkflowService impl;

src/main/java/com/uber/cadence/testing/TestWorkflowEnvironment.java

+3
Original file line numberDiff line numberDiff line change
@@ -176,4 +176,7 @@ Worker newWorker(
176176
* service.
177177
*/
178178
void close();
179+
180+
/** Starts all the workers created by the test environment instance */
181+
void start();
179182
}

src/main/java/com/uber/cadence/worker/Worker.java

+134-94
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
package com.uber.cadence.worker;
1919

2020
import com.google.common.annotations.VisibleForTesting;
21+
import com.google.common.base.MoreObjects;
22+
import com.google.common.base.Preconditions;
23+
import com.google.common.base.Strings;
2124
import com.uber.cadence.WorkflowExecution;
2225
import com.uber.cadence.client.WorkflowClient;
2326
import com.uber.cadence.converter.DataConverter;
@@ -33,6 +36,8 @@
3336
import com.uber.m3.util.ImmutableMap;
3437
import java.lang.reflect.Type;
3538
import java.time.Duration;
39+
import java.util.ArrayList;
40+
import java.util.List;
3641
import java.util.Map;
3742
import java.util.Objects;
3843
import java.util.concurrent.TimeUnit;
@@ -49,95 +54,44 @@ public final class Worker {
4954
private final SyncWorkflowWorker workflowWorker;
5055
private final SyncActivityWorker activityWorker;
5156
private final AtomicBoolean started = new AtomicBoolean();
57+
private final AtomicBoolean closed = new AtomicBoolean();
5258

5359
/**
54-
* Creates worker that connects to the local instance of the Cadence Service that listens on a
55-
* default port (7933).
56-
*
57-
* @param domain domain that worker uses to poll.
58-
* @param taskList task list name worker uses to poll. It uses this name for both decision and
59-
* activity task list polls.
60-
*/
61-
public Worker(String domain, String taskList) {
62-
this(domain, taskList, null);
63-
}
64-
65-
/**
66-
* Creates worker that connects to the local instance of the Cadence Service that listens on a
67-
* default port (7933).
60+
* Creates worker that connects to an instance of the Cadence Service.
6861
*
62+
* @param service client to the Cadence Service endpoint.
6963
* @param domain domain that worker uses to poll.
7064
* @param taskList task list name worker uses to poll. It uses this name for both decision and
7165
* activity task list polls.
7266
* @param options Options (like {@link DataConverter} override) for configuring worker.
7367
*/
74-
public Worker(String domain, String taskList, WorkerOptions options) {
75-
this(new WorkflowServiceTChannel(), domain, taskList, options);
76-
}
68+
private Worker(IWorkflowService service, String domain, String taskList, WorkerOptions options) {
69+
Objects.requireNonNull(service, "service should not be null");
70+
Preconditions.checkArgument(
71+
!Strings.isNullOrEmpty(domain), "domain should not be an empty string");
72+
Preconditions.checkArgument(
73+
!Strings.isNullOrEmpty(taskList), "taskList should not be an empty string");
7774

78-
/**
79-
* Creates worker that connects to an instance of the Cadence Service.
80-
*
81-
* @param host of the Cadence Service endpoint
82-
* @param port of the Cadence Service endpoint
83-
* @param domain domain that worker uses to poll.
84-
* @param taskList task list name worker uses to poll. It uses this name for both decision and
85-
* activity task list polls.
86-
*/
87-
public Worker(String host, int port, String domain, String taskList) {
88-
this(new WorkflowServiceTChannel(host, port), domain, taskList, null);
89-
}
75+
this.taskList = taskList;
76+
this.options = MoreObjects.firstNonNull(options, new Builder().build());
9077

91-
/**
92-
* Creates worker that connects to an instance of the Cadence Service.
93-
*
94-
* @param host of the Cadence Service endpoint
95-
* @param port of the Cadence Service endpoint
96-
* @param domain domain that worker uses to poll.
97-
* @param taskList task list name worker uses to poll. It uses this name for both decision and
98-
* activity task list polls.
99-
* @param options Options (like {@link DataConverter} override) for configuring worker.
100-
*/
101-
public Worker(String host, int port, String domain, String taskList, WorkerOptions options) {
102-
this(new WorkflowServiceTChannel(host, port), domain, taskList, options);
103-
}
78+
SingleWorkerOptions activityOptions = toActivityOptions(this.options, domain, taskList);
79+
activityWorker =
80+
this.options.isDisableActivityWorker()
81+
? null
82+
: new SyncActivityWorker(service, domain, taskList, activityOptions);
10483

105-
/**
106-
* Creates worker that connects to an instance of the Cadence Service.
107-
*
108-
* @param service client to the Cadence Service endpoint.
109-
* @param domain domain that worker uses to poll.
110-
* @param taskList task list name worker uses to poll. It uses this name for both decision and
111-
* activity task list polls.
112-
* @param options Options (like {@link DataConverter} override) for configuring worker.
113-
*/
114-
public Worker(IWorkflowService service, String domain, String taskList, WorkerOptions options) {
115-
Objects.requireNonNull(service, "service");
116-
Objects.requireNonNull(domain, "domain");
117-
this.taskList = Objects.requireNonNull(taskList, "taskList");
118-
if (options == null) {
119-
options = new Builder().build();
120-
}
121-
this.options = options;
122-
SingleWorkerOptions activityOptions = toActivityOptions(options, domain, taskList);
123-
if (!options.isDisableActivityWorker()) {
124-
activityWorker = new SyncActivityWorker(service, domain, taskList, activityOptions);
125-
} else {
126-
activityWorker = null;
127-
}
128-
SingleWorkerOptions workflowOptions = toWorkflowOptions(options, domain, taskList);
129-
if (!options.isDisableWorkflowWorker()) {
130-
workflowWorker =
131-
new SyncWorkflowWorker(
132-
service,
133-
domain,
134-
taskList,
135-
options.getInterceptorFactory(),
136-
workflowOptions,
137-
options.getMaxWorkflowThreads());
138-
} else {
139-
workflowWorker = null;
140-
}
84+
SingleWorkerOptions workflowOptions = toWorkflowOptions(this.options, domain, taskList);
85+
workflowWorker =
86+
this.options.isDisableWorkflowWorker()
87+
? null
88+
: new SyncWorkflowWorker(
89+
service,
90+
domain,
91+
taskList,
92+
this.options.getInterceptorFactory(),
93+
workflowOptions,
94+
this.options.getMaxWorkflowThreads());
14195
}
14296

14397
private SingleWorkerOptions toActivityOptions(
@@ -190,10 +144,13 @@ private SingleWorkerOptions toWorkflowOptions(
190144
* workflows are stateful and a new instance is created for each workflow execution.
191145
*/
192146
public void registerWorkflowImplementationTypes(Class<?>... workflowImplementationClasses) {
193-
if (workflowWorker == null) {
194-
throw new IllegalStateException("disableWorkflowWorker is set in worker options");
195-
}
196-
checkNotStarted();
147+
Preconditions.checkState(
148+
workflowWorker != null,
149+
"registerWorkflowImplementationTypes is not allowed when disableWorkflowWorker is set in worker options");
150+
Preconditions.checkState(
151+
!started.get(),
152+
"registerWorkflowImplementationTypes is not allowed after worker has started");
153+
197154
workflowWorker.setWorkflowImplementationTypes(workflowImplementationClasses);
198155
}
199156

@@ -234,20 +191,17 @@ public <R> void addWorkflowImplementationFactory(Class<R> workflowInterface, Fun
234191
* <p>
235192
*/
236193
public void registerActivitiesImplementations(Object... activityImplementations) {
237-
if (activityWorker == null) {
238-
throw new IllegalStateException("disableActivityWorker is set in worker options");
239-
}
240-
checkNotStarted();
241-
activityWorker.setActivitiesImplementation(activityImplementations);
242-
}
194+
Preconditions.checkState(
195+
activityWorker != null,
196+
"registerActivitiesImplementations is not allowed when disableWorkflowWorker is set in worker options");
197+
Preconditions.checkState(
198+
!started.get(),
199+
"registerActivitiesImplementations is not allowed after worker has started");
243200

244-
private void checkNotStarted() {
245-
if (started.get()) {
246-
throw new IllegalStateException("already started");
247-
}
201+
activityWorker.setActivitiesImplementation(activityImplementations);
248202
}
249203

250-
public void start() {
204+
private void start() {
251205
if (!started.compareAndSet(false, true)) {
252206
return;
253207
}
@@ -263,10 +217,14 @@ public boolean isStarted() {
263217
return started.get();
264218
}
265219

220+
public boolean isClosed() {
221+
return closed.get();
222+
}
223+
266224
/**
267225
* Shutdown a worker, waiting for activities to complete execution up to the specified timeout.
268226
*/
269-
public void shutdown(Duration timeout) {
227+
private void shutdown(Duration timeout) {
270228
try {
271229
long time = System.currentTimeMillis();
272230
if (activityWorker != null) {
@@ -276,6 +234,7 @@ public void shutdown(Duration timeout) {
276234
long left = timeout.toMillis() - (System.currentTimeMillis() - time);
277235
workflowWorker.shutdownAndAwaitTermination(left, TimeUnit.MILLISECONDS);
278236
}
237+
closed.set(true);
279238
} catch (InterruptedException e) {
280239
throw new RuntimeException(e);
281240
}
@@ -344,4 +303,85 @@ public <R> R queryWorkflowExecution(
344303
public String getTaskList() {
345304
return taskList;
346305
}
306+
307+
public static final class Factory {
308+
309+
private final List<Worker> workers = new ArrayList<>();
310+
private final IWorkflowService workflowService;
311+
private final String domain;
312+
private State state = State.Initial;
313+
314+
private final String statusErrorMessage =
315+
"attempted to %s while in %s state. Acceptable States: %s";
316+
317+
public Factory(String domain) {
318+
this(new WorkflowServiceTChannel(), domain);
319+
}
320+
321+
public Factory(String host, int port, String domain) {
322+
this(new WorkflowServiceTChannel(host, port), domain);
323+
}
324+
325+
public Factory(IWorkflowService workflowService, String domain) {
326+
Objects.requireNonNull(workflowService, "workflowService should not be null");
327+
Preconditions.checkArgument(!Strings.isNullOrEmpty(domain), "domain should not be an empty string");
328+
329+
this.workflowService = workflowService;
330+
this.domain = domain;
331+
}
332+
333+
public Worker newWorker(String taskList) {
334+
return newWorker(taskList, null);
335+
}
336+
337+
public Worker newWorker(String taskList, WorkerOptions options) {
338+
Preconditions.checkArgument(!Strings.isNullOrEmpty(taskList), "taskList should not be an empty string");
339+
340+
synchronized (this) {
341+
Preconditions.checkState(
342+
state == State.Initial,
343+
String.format(
344+
statusErrorMessage, "create new worker", state.name(), State.Initial.name()));
345+
Worker worker = new Worker(workflowService, domain, taskList, options);
346+
workers.add(worker);
347+
return worker;
348+
}
349+
}
350+
351+
public void start() {
352+
synchronized (this) {
353+
Preconditions.checkState(
354+
state == State.Initial || state == State.Started,
355+
String.format(
356+
statusErrorMessage,
357+
"start WorkerFactory",
358+
state.name(),
359+
String.format("%s, %s", State.Initial.name(), State.Initial.name())));
360+
if (state == State.Started) {
361+
return;
362+
}
363+
state = State.Started;
364+
365+
for (Worker worker : workers) {
366+
worker.start();
367+
}
368+
}
369+
}
370+
371+
public void shutdown(Duration timeout) {
372+
synchronized (this) {
373+
state = State.Shutdown;
374+
375+
for (Worker worker : workers) {
376+
worker.shutdown(timeout);
377+
}
378+
}
379+
}
380+
381+
enum State {
382+
Initial,
383+
Started,
384+
Shutdown
385+
}
386+
}
347387
}

0 commit comments

Comments
 (0)