Skip to content

Commit f7b299b

Browse files
Making dynamic poller configurable. (#762)
Using dynamic poller and making it configurable.
1 parent c46c85c commit f7b299b

File tree

7 files changed

+294
-19
lines changed

7 files changed

+294
-19
lines changed

src/main/java/com/uber/cadence/internal/worker/Poller.java

+24-16
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@
2020
import com.uber.cadence.internal.common.BackoffThrottler;
2121
import com.uber.cadence.internal.common.InternalUtils;
2222
import com.uber.cadence.internal.metrics.MetricsType;
23+
import com.uber.cadence.internal.worker.autoscaler.AutoScaler;
24+
import com.uber.cadence.internal.worker.autoscaler.AutoScalerFactory;
2325
import com.uber.m3.tally.Scope;
2426
import java.util.Objects;
2527
import java.util.concurrent.ArrayBlockingQueue;
2628
import java.util.concurrent.CountDownLatch;
27-
import java.util.concurrent.Semaphore;
2829
import java.util.concurrent.ThreadPoolExecutor;
2930
import java.util.concurrent.TimeUnit;
3031
import java.util.concurrent.atomic.AtomicReference;
@@ -69,6 +70,8 @@ interface ThrowingRunnable {
6970
log.error("Failure in thread " + t.getName(), e);
7071
};
7172

73+
private final AutoScaler pollerAutoScaler;
74+
7275
public Poller(
7376
String identity,
7477
PollTask<T> pollTask,
@@ -86,6 +89,7 @@ public Poller(
8689
this.taskExecutor = taskExecutor;
8790
this.pollerOptions = pollerOptions;
8891
this.metricsScope = metricsScope;
92+
this.pollerAutoScaler = AutoScalerFactory.getInstance().createAutoScaler(pollerOptions);
8993
}
9094

9195
@Override
@@ -124,6 +128,8 @@ public void start() {
124128
pollExecutor.execute(new PollLoopTask(new PollExecutionTask()));
125129
metricsScope.counter(MetricsType.POLLER_START_COUNTER).inc(1);
126130
}
131+
132+
pollerAutoScaler.start();
127133
}
128134

129135
@Override
@@ -155,6 +161,7 @@ public void shutdown() {
155161
} catch (InterruptedException e) {
156162
}
157163
taskExecutor.shutdown();
164+
pollerAutoScaler.stop();
158165
}
159166

160167
@Override
@@ -258,38 +265,39 @@ public void run() {
258265
private class PollExecutionTask implements Poller.ThrowingRunnable {
259266
private static final int EXECUTOR_CAPACITY_CHECK_INTERVAL_MS = 100;
260267
private static final int EXECUTOR_CAPACITY_CHECK_OFFSET_MS = 10;
261-
private Semaphore pollSemaphore;
262268

263-
PollExecutionTask() {
264-
this.pollSemaphore = new Semaphore(pollerOptions.getPollThreadCount());
265-
}
269+
PollExecutionTask() {}
266270

267271
@Override
268272
public void run() throws Exception {
269273
try {
270-
pollSemaphore.acquire();
271-
T task = pollTask.poll();
272-
if (task == null) {
273-
return;
274+
pollerAutoScaler.acquire();
275+
try {
276+
T task = pollTask.poll();
277+
if (task == null) {
278+
pollerAutoScaler.increaseNoopPollCount();
279+
return;
280+
}
281+
282+
pollerAutoScaler.increaseActionablePollCount();
283+
taskExecutor.process(task);
284+
} finally {
285+
checkIfTaskHasExecutorHasCapacity();
274286
}
275-
taskExecutor.process(task);
276287
} finally {
277-
releasePollSemaphore();
288+
pollerAutoScaler.release();
278289
}
279290
}
280291

281-
private void releasePollSemaphore() {
282-
if (!pollerOptions.getPollOnlyIfExecutorHasCapacity()) {
283-
pollSemaphore.release();
284-
} else {
292+
private void checkIfTaskHasExecutorHasCapacity() {
293+
if (pollerOptions.getPollOnlyIfExecutorHasCapacity()) {
285294
while (true) {
286295
// sleep to avoid racing condition
287296
try {
288297
Thread.sleep(EXECUTOR_CAPACITY_CHECK_OFFSET_MS);
289298
} catch (InterruptedException ignored) {
290299
}
291300
if (taskExecutor.hasCapacity()) {
292-
pollSemaphore.release();
293301
break;
294302
} else {
295303
// sleep to avoid busy loop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License is
6+
* located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.uber.cadence.internal.worker;
17+
18+
import java.time.Duration;
19+
import java.util.Objects;
20+
21+
public class PollerAutoScalerOptions {
22+
23+
private Duration pollerScalingInterval;
24+
private int minConcurrentPollers;
25+
private float targetPollerUtilisation;
26+
27+
private PollerAutoScalerOptions() {}
28+
29+
public static class Builder {
30+
31+
private Duration pollerScalingInterval = Duration.ofMinutes(1);
32+
private int minConcurrentPollers = 1;
33+
private float targetPollerUtilisation = 0.6f;
34+
35+
private Builder() {}
36+
37+
public static Builder newBuilder() {
38+
return new Builder();
39+
}
40+
41+
public Builder setPollerScalingInterval(Duration duration) {
42+
this.pollerScalingInterval = duration;
43+
return this;
44+
}
45+
46+
public Builder setMinConcurrentPollers(int minConcurrentPollers) {
47+
this.minConcurrentPollers = minConcurrentPollers;
48+
return this;
49+
}
50+
51+
public Builder setTargetPollerUtilisation(float targetPollerUtilisation) {
52+
this.targetPollerUtilisation = targetPollerUtilisation;
53+
return this;
54+
}
55+
56+
public PollerAutoScalerOptions build() {
57+
PollerAutoScalerOptions pollerAutoScalerOptions = new PollerAutoScalerOptions();
58+
pollerAutoScalerOptions.pollerScalingInterval = this.pollerScalingInterval;
59+
pollerAutoScalerOptions.minConcurrentPollers = this.minConcurrentPollers;
60+
pollerAutoScalerOptions.targetPollerUtilisation = this.targetPollerUtilisation;
61+
return pollerAutoScalerOptions;
62+
}
63+
}
64+
65+
public Duration getPollerScalingInterval() {
66+
return pollerScalingInterval;
67+
}
68+
69+
public int getMinConcurrentPollers() {
70+
return minConcurrentPollers;
71+
}
72+
73+
public float getTargetPollerUtilisation() {
74+
return targetPollerUtilisation;
75+
}
76+
77+
@Override
78+
public boolean equals(Object o) {
79+
if (this == o) return true;
80+
if (o == null || getClass() != o.getClass()) return false;
81+
PollerAutoScalerOptions that = (PollerAutoScalerOptions) o;
82+
return minConcurrentPollers == that.minConcurrentPollers
83+
&& Float.compare(that.targetPollerUtilisation, targetPollerUtilisation) == 0
84+
&& Objects.equals(pollerScalingInterval, that.pollerScalingInterval);
85+
}
86+
87+
@Override
88+
public int hashCode() {
89+
return Objects.hash(pollerScalingInterval, minConcurrentPollers, targetPollerUtilisation);
90+
}
91+
92+
@Override
93+
public String toString() {
94+
return "PollerAutoScalerOptions{"
95+
+ "pollerScalingInterval="
96+
+ pollerScalingInterval
97+
+ ", minConcurrentPollers="
98+
+ minConcurrentPollers
99+
+ ", targetPollerUtilisation="
100+
+ targetPollerUtilisation
101+
+ '}';
102+
}
103+
}

src/main/java/com/uber/cadence/internal/worker/PollerOptions.java

+21-2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public static final class Builder {
6464

6565
private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
6666

67+
private PollerAutoScalerOptions pollerAutoScalerOptions;
68+
6769
private Builder() {}
6870

6971
private Builder(PollerOptions o) {
@@ -79,6 +81,7 @@ private Builder(PollerOptions o) {
7981
this.pollThreadNamePrefix = o.getPollThreadNamePrefix();
8082
this.pollOnlyIfExecutorHasCapacity = o.getPollOnlyIfExecutorHasCapacity();
8183
this.uncaughtExceptionHandler = o.getUncaughtExceptionHandler();
84+
this.pollerAutoScalerOptions = o.getPollerAutoScalerOptions();
8285
}
8386

8487
/** Defines interval for measuring poll rate. Larger the interval more spiky can be the load. */
@@ -145,6 +148,11 @@ public Builder setPollOnlyIfExecutorHasCapacity(boolean pollOnlyIfExecutorHasCap
145148
return this;
146149
}
147150

151+
public Builder setPollerAutoScalerOptions(PollerAutoScalerOptions pollerAutoScalerOptions) {
152+
this.pollerAutoScalerOptions = pollerAutoScalerOptions;
153+
return this;
154+
}
155+
148156
public PollerOptions build() {
149157
if (uncaughtExceptionHandler == null) {
150158
uncaughtExceptionHandler = (t, e) -> log.error("uncaught exception", e);
@@ -158,7 +166,8 @@ public PollerOptions build() {
158166
pollThreadCount,
159167
uncaughtExceptionHandler,
160168
pollThreadNamePrefix,
161-
pollOnlyIfExecutorHasCapacity);
169+
pollOnlyIfExecutorHasCapacity,
170+
pollerAutoScalerOptions);
162171
}
163172
}
164173

@@ -180,6 +189,8 @@ public PollerOptions build() {
180189

181190
private final Boolean pollOnlyIfExecutorHasCapacity;
182191

192+
private final PollerAutoScalerOptions pollerAutoScalerOptions;
193+
183194
private PollerOptions(
184195
int maximumPollRateIntervalMilliseconds,
185196
double maximumPollRatePerSecond,
@@ -189,7 +200,8 @@ private PollerOptions(
189200
int pollThreadCount,
190201
Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
191202
String pollThreadNamePrefix,
192-
boolean pollOnlyIfExecutorHasCapacity) {
203+
boolean pollOnlyIfExecutorHasCapacity,
204+
PollerAutoScalerOptions pollerAutoScalerOptions) {
193205
this.maximumPollRateIntervalMilliseconds = maximumPollRateIntervalMilliseconds;
194206
this.maximumPollRatePerSecond = maximumPollRatePerSecond;
195207
this.pollBackoffCoefficient = pollBackoffCoefficient;
@@ -199,6 +211,7 @@ private PollerOptions(
199211
this.uncaughtExceptionHandler = uncaughtExceptionHandler;
200212
this.pollThreadNamePrefix = pollThreadNamePrefix;
201213
this.pollOnlyIfExecutorHasCapacity = pollOnlyIfExecutorHasCapacity;
214+
this.pollerAutoScalerOptions = pollerAutoScalerOptions;
202215
}
203216

204217
public int getMaximumPollRateIntervalMilliseconds() {
@@ -237,6 +250,10 @@ public Boolean getPollOnlyIfExecutorHasCapacity() {
237250
return pollOnlyIfExecutorHasCapacity;
238251
}
239252

253+
public PollerAutoScalerOptions getPollerAutoScalerOptions() {
254+
return pollerAutoScalerOptions;
255+
}
256+
240257
@Override
241258
public String toString() {
242259
return "PollerOptions{"
@@ -256,6 +273,8 @@ public String toString() {
256273
+ pollThreadNamePrefix
257274
+ ", pollOnlyIfExecutorHasCapacity='"
258275
+ pollOnlyIfExecutorHasCapacity
276+
+ ", pollerAutoScalerOptions='"
277+
+ pollerAutoScalerOptions
259278
+ '\''
260279
+ '}';
261280
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License is
6+
* located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.uber.cadence.internal.worker.autoscaler;
17+
18+
/**
19+
* Poller autoscaler interface for acquiring and releasing locks. In order to control number of
20+
* concurrent operations.
21+
*/
22+
public interface AutoScaler {
23+
24+
void start();
25+
26+
void stop();
27+
28+
/**
29+
* Reduce the number of available locks. Intended to be blocking operation until lock is acquired.
30+
*/
31+
void acquire() throws InterruptedException;
32+
33+
/**
34+
* Releases lock into the autoscaler pool. Release should be always called in same process,
35+
* failing to do so is considered a usage error.
36+
*/
37+
void release();
38+
39+
void increaseNoopPollCount();
40+
41+
void increaseActionablePollCount();
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
5+
* use this file except in compliance with the License. A copy of the License is
6+
* located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed on
11+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package com.uber.cadence.internal.worker.autoscaler;
17+
18+
import com.uber.cadence.internal.worker.PollerAutoScalerOptions;
19+
import com.uber.cadence.internal.worker.PollerOptions;
20+
21+
public class AutoScalerFactory {
22+
23+
private static final AutoScalerFactory INSTANCE = new AutoScalerFactory();
24+
25+
private AutoScalerFactory() {}
26+
27+
public AutoScaler createAutoScaler(PollerOptions pollerOptions) {
28+
if (pollerOptions == null || pollerOptions.getPollerAutoScalerOptions() == null) {
29+
return new NoopAutoScaler();
30+
}
31+
32+
PollerAutoScalerOptions autoScalerOptions = pollerOptions.getPollerAutoScalerOptions();
33+
return new PollerAutoScaler(
34+
autoScalerOptions.getPollerScalingInterval(),
35+
new PollerUsageEstimator(),
36+
new Recommender(
37+
autoScalerOptions.getTargetPollerUtilisation(),
38+
pollerOptions.getPollThreadCount(),
39+
autoScalerOptions.getMinConcurrentPollers()));
40+
}
41+
42+
public static AutoScalerFactory getInstance() {
43+
return INSTANCE;
44+
}
45+
}

0 commit comments

Comments
 (0)