Skip to content

Commit 0a9b597

Browse files
authored
Refactor poller: Decouple Polling from Decision Task Execution (#189)
1 parent 9beadba commit 0a9b597

File tree

7 files changed

+405
-336
lines changed

7 files changed

+405
-336
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.worker;
19+
20+
import com.uber.cadence.*;
21+
import com.uber.cadence.internal.metrics.MetricsType;
22+
import com.uber.cadence.serviceclient.IWorkflowService;
23+
import com.uber.m3.tally.Stopwatch;
24+
import org.apache.thrift.TException;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
final class ActivityPollTask
29+
implements Poller.PollTask<ActivityWorker.MeasurableActivityTask> {
30+
31+
private final IWorkflowService service;
32+
private final String domain;
33+
private final String taskList;
34+
private final SingleWorkerOptions options;
35+
private static final Logger log = LoggerFactory.getLogger(ActivityPollTask.class);
36+
37+
public ActivityPollTask(
38+
IWorkflowService service, String domain, String taskList, SingleWorkerOptions options) {
39+
40+
this.service = service;
41+
this.domain = domain;
42+
this.taskList = taskList;
43+
this.options = options;
44+
}
45+
46+
@Override
47+
public ActivityWorker.MeasurableActivityTask poll() throws TException {
48+
options.getMetricsScope().counter(MetricsType.ACTIVITY_POLL_COUNTER).inc(1);
49+
Stopwatch sw = options.getMetricsScope().timer(MetricsType.ACTIVITY_POLL_LATENCY).start();
50+
Stopwatch e2eSW = options.getMetricsScope().timer(MetricsType.ACTIVITY_E2E_LATENCY).start();
51+
52+
PollForActivityTaskRequest pollRequest = new PollForActivityTaskRequest();
53+
pollRequest.setDomain(domain);
54+
pollRequest.setIdentity(options.getIdentity());
55+
pollRequest.setTaskList(new TaskList().setName(taskList));
56+
if (log.isDebugEnabled()) {
57+
log.debug("poll request begin: " + pollRequest);
58+
}
59+
PollForActivityTaskResponse result;
60+
try {
61+
result = service.PollForActivityTask(pollRequest);
62+
} catch (InternalServiceError | ServiceBusyError e) {
63+
options.getMetricsScope().counter(MetricsType.ACTIVITY_POLL_TRANSIENT_FAILED_COUNTER).inc(1);
64+
throw e;
65+
} catch (TException e) {
66+
options.getMetricsScope().counter(MetricsType.ACTIVITY_POLL_FAILED_COUNTER).inc(1);
67+
throw e;
68+
}
69+
70+
if (result == null || result.getTaskToken() == null) {
71+
if (log.isDebugEnabled()) {
72+
log.debug("poll request returned no task");
73+
}
74+
options.getMetricsScope().counter(MetricsType.ACTIVITY_POLL_NO_TASK_COUNTER).inc(1);
75+
return null;
76+
}
77+
78+
if (log.isTraceEnabled()) {
79+
log.trace("poll request returned " + result);
80+
}
81+
82+
options.getMetricsScope().counter(MetricsType.ACTIVITY_POLL_SUCCEED_COUNTER).inc(1);
83+
sw.stop();
84+
return new ActivityWorker.MeasurableActivityTask(result, e2eSW);
85+
}
86+
}

src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java

+9-57
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,10 @@
1717

1818
package com.uber.cadence.internal.worker;
1919

20-
import com.uber.cadence.InternalServiceError;
21-
import com.uber.cadence.PollForActivityTaskRequest;
2220
import com.uber.cadence.PollForActivityTaskResponse;
2321
import com.uber.cadence.RespondActivityTaskCanceledRequest;
2422
import com.uber.cadence.RespondActivityTaskCompletedRequest;
2523
import com.uber.cadence.RespondActivityTaskFailedRequest;
26-
import com.uber.cadence.ServiceBusyError;
27-
import com.uber.cadence.TaskList;
2824
import com.uber.cadence.WorkflowExecution;
2925
import com.uber.cadence.common.RetryOptions;
3026
import com.uber.cadence.internal.common.Retryer;
@@ -88,10 +84,13 @@ public void start() {
8884
+ "\", type=\"activity\"")
8985
.build();
9086
}
91-
Poller.ThrowingRunnable pollTask =
92-
new PollTask<>(service, domain, taskList, options, new TaskHandlerImpl(handler));
9387
poller =
94-
new Poller(pollerOptions, options.getIdentity(), pollTask, options.getMetricsScope());
88+
new Poller<>(
89+
options.getIdentity(),
90+
new ActivityPollTask(service, domain, taskList, options),
91+
new PollTaskExecutor<>(domain, taskList, options, new TaskHandlerImpl(handler)),
92+
pollerOptions,
93+
options.getMetricsScope());
9594
poller.start();
9695
options.getMetricsScope().counter(MetricsType.WORKER_START_COUNTER).inc(1);
9796
}
@@ -150,7 +149,7 @@ public void resumePolling() {
150149
}
151150
}
152151

153-
private static class MeasurableActivityTask {
152+
static class MeasurableActivityTask {
154153
PollForActivityTaskResponse task;
155154
Stopwatch sw;
156155

@@ -164,7 +163,7 @@ void markDone() {
164163
}
165164
}
166165

167-
private class TaskHandlerImpl implements PollTask.TaskHandler<MeasurableActivityTask> {
166+
private class TaskHandlerImpl implements PollTaskExecutor.TaskHandler<MeasurableActivityTask> {
168167

169168
final ActivityTaskHandler handler;
170169

@@ -173,9 +172,7 @@ private TaskHandlerImpl(ActivityTaskHandler handler) {
173172
}
174173

175174
@Override
176-
public void handle(
177-
IWorkflowService service, String domain, String taskList, MeasurableActivityTask task)
178-
throws Exception {
175+
public void handle(MeasurableActivityTask task) throws Exception {
179176
options
180177
.getMetricsScope()
181178
.timer(MetricsType.TASK_LIST_QUEUE_LATENCY)
@@ -215,51 +212,6 @@ public void handle(
215212
}
216213
}
217214

218-
@Override
219-
public MeasurableActivityTask poll(IWorkflowService service, String domain, String taskList)
220-
throws TException {
221-
options.getMetricsScope().counter(MetricsType.ACTIVITY_POLL_COUNTER).inc(1);
222-
Stopwatch sw = options.getMetricsScope().timer(MetricsType.ACTIVITY_POLL_LATENCY).start();
223-
Stopwatch e2eSW = options.getMetricsScope().timer(MetricsType.ACTIVITY_E2E_LATENCY).start();
224-
225-
PollForActivityTaskRequest pollRequest = new PollForActivityTaskRequest();
226-
pollRequest.setDomain(domain);
227-
pollRequest.setIdentity(options.getIdentity());
228-
pollRequest.setTaskList(new TaskList().setName(taskList));
229-
if (log.isDebugEnabled()) {
230-
log.debug("poll request begin: " + pollRequest);
231-
}
232-
PollForActivityTaskResponse result;
233-
try {
234-
result = service.PollForActivityTask(pollRequest);
235-
} catch (InternalServiceError | ServiceBusyError e) {
236-
options
237-
.getMetricsScope()
238-
.counter(MetricsType.ACTIVITY_POLL_TRANSIENT_FAILED_COUNTER)
239-
.inc(1);
240-
throw e;
241-
} catch (TException e) {
242-
options.getMetricsScope().counter(MetricsType.ACTIVITY_POLL_FAILED_COUNTER).inc(1);
243-
throw e;
244-
}
245-
246-
if (result == null || result.getTaskToken() == null) {
247-
if (log.isDebugEnabled()) {
248-
log.debug("poll request returned no task");
249-
}
250-
options.getMetricsScope().counter(MetricsType.ACTIVITY_POLL_NO_TASK_COUNTER).inc(1);
251-
return null;
252-
}
253-
254-
if (log.isTraceEnabled()) {
255-
log.trace("poll request returned " + result);
256-
}
257-
258-
options.getMetricsScope().counter(MetricsType.ACTIVITY_POLL_SUCCEED_COUNTER).inc(1);
259-
sw.stop();
260-
return new MeasurableActivityTask(result, e2eSW);
261-
}
262-
263215
@Override
264216
public Throwable wrapFailure(MeasurableActivityTask task, Throwable failure) {
265217
WorkflowExecution execution = task.task.getWorkflowExecution();

src/main/java/com/uber/cadence/internal/worker/PollTask.java

-126
This file was deleted.

0 commit comments

Comments
 (0)