Skip to content

Commit d5b5e3c

Browse files
authored
Introduce experimental flag and add logic for capacity based polling (#664)
1 parent 0109025 commit d5b5e3c

File tree

6 files changed

+64
-4
lines changed

6 files changed

+64
-4
lines changed

build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,11 @@ compileTestJava {
108108
// Generation version.properties for value to be included into the request header
109109
task createProperties(dependsOn: processResources) {
110110
doLast {
111-
def subdir = new File('$buildDir/resources/main/com/uber/cadence/')
111+
def subdir = new File("$buildDir/resources/main/com/uber/cadence/")
112112
if( !subdir.exists() ) {
113113
subdir.mkdirs()
114114
}
115-
new File('$buildDir/resources/main/com/uber/cadence/version.properties').withWriter { w ->
115+
new File("$buildDir/resources/main/com/uber/cadence/version.properties").withWriter { w ->
116116
Properties p = new Properties()
117117
p['cadence-client-version'] = project.version.toString()
118118
p.store w, null

src/main/java/com/uber/cadence/internal/worker/PollDecisionTaskDispatcher.java

+5
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@ public void process(PollForDecisionTaskResponse t) {
8282
}
8383
}
8484

85+
@Override
86+
public boolean hasCapacity() {
87+
return true;
88+
}
89+
8590
public void subscribe(String taskList, Consumer<PollForDecisionTaskResponse> consumer) {
8691
subscribers.put(taskList, consumer);
8792
}

src/main/java/com/uber/cadence/internal/worker/PollTaskExecutor.java

+5
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ public void process(T task) {
8181
});
8282
}
8383

84+
@Override
85+
public boolean hasCapacity() {
86+
return taskExecutor.getActiveCount() < taskExecutor.getPoolSize();
87+
}
88+
8489
@Override
8590
public boolean isShutdown() {
8691
return taskExecutor.isShutdown();

src/main/java/com/uber/cadence/internal/worker/Poller.java

+26
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,8 @@ public void run() {
256256
}
257257

258258
private class PollExecutionTask implements Poller.ThrowingRunnable {
259+
private static final int EXECUTOR_CAPACITY_CHECK_INTERVAL_MS = 100;
260+
private static final int EXECUTOR_CAPACITY_CHECK_OFFSET_MS = 10;
259261
private Semaphore pollSemaphore;
260262

261263
PollExecutionTask() {
@@ -272,7 +274,31 @@ public void run() throws Exception {
272274
}
273275
taskExecutor.process(task);
274276
} finally {
277+
releasePollSemaphore();
278+
}
279+
}
280+
281+
private void releasePollSemaphore() {
282+
if (!pollerOptions.getPollOnlyIfExecutorHasCapacity()) {
275283
pollSemaphore.release();
284+
} else {
285+
while (true) {
286+
// sleep to avoid racing condition
287+
try {
288+
Thread.sleep(EXECUTOR_CAPACITY_CHECK_OFFSET_MS);
289+
} catch (InterruptedException ignored) {
290+
}
291+
if (taskExecutor.hasCapacity()) {
292+
pollSemaphore.release();
293+
break;
294+
} else {
295+
// sleep to avoid busy loop
296+
try {
297+
Thread.sleep(EXECUTOR_CAPACITY_CHECK_INTERVAL_MS);
298+
} catch (InterruptedException ignored) {
299+
}
300+
}
301+
}
276302
}
277303
}
278304
}

src/main/java/com/uber/cadence/internal/worker/PollerOptions.java

+25-2
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public static final class Builder {
6060

6161
private String pollThreadNamePrefix;
6262

63+
private Boolean pollOnlyIfExecutorHasCapacity = Boolean.FALSE;
64+
6365
private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
6466

6567
private Builder() {}
@@ -75,6 +77,7 @@ private Builder(PollerOptions o) {
7577
this.pollBackoffMaximumInterval = o.getPollBackoffMaximumInterval();
7678
this.pollThreadCount = o.getPollThreadCount();
7779
this.pollThreadNamePrefix = o.getPollThreadNamePrefix();
80+
this.pollOnlyIfExecutorHasCapacity = o.getPollOnlyIfExecutorHasCapacity();
7881
this.uncaughtExceptionHandler = o.getUncaughtExceptionHandler();
7982
}
8083

@@ -133,6 +136,15 @@ public Builder setPollThreadNamePrefix(String pollThreadNamePrefix) {
133136
return this;
134137
}
135138

139+
/**
140+
* The poller will check task executor's remaining capacity before polling tasks.
141+
* This is to prevent task to get started but not being able to execute in time.
142+
*/
143+
public Builder setPollOnlyIfExecutorHasCapacity(boolean pollOnlyIfExecutorHasCapacity) {
144+
this.pollOnlyIfExecutorHasCapacity = pollOnlyIfExecutorHasCapacity;
145+
return this;
146+
}
147+
136148
public PollerOptions build() {
137149
if (uncaughtExceptionHandler == null) {
138150
uncaughtExceptionHandler = (t, e) -> log.error("uncaught exception", e);
@@ -145,7 +157,8 @@ public PollerOptions build() {
145157
pollBackoffMaximumInterval,
146158
pollThreadCount,
147159
uncaughtExceptionHandler,
148-
pollThreadNamePrefix);
160+
pollThreadNamePrefix,
161+
pollOnlyIfExecutorHasCapacity);
149162
}
150163
}
151164

@@ -165,6 +178,8 @@ public PollerOptions build() {
165178

166179
private final String pollThreadNamePrefix;
167180

181+
private final Boolean pollOnlyIfExecutorHasCapacity;
182+
168183
private PollerOptions(
169184
int maximumPollRateIntervalMilliseconds,
170185
double maximumPollRatePerSecond,
@@ -173,7 +188,8 @@ private PollerOptions(
173188
Duration pollBackoffMaximumInterval,
174189
int pollThreadCount,
175190
Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
176-
String pollThreadNamePrefix) {
191+
String pollThreadNamePrefix,
192+
boolean pollOnlyIfExecutorHasCapacity) {
177193
this.maximumPollRateIntervalMilliseconds = maximumPollRateIntervalMilliseconds;
178194
this.maximumPollRatePerSecond = maximumPollRatePerSecond;
179195
this.pollBackoffCoefficient = pollBackoffCoefficient;
@@ -182,6 +198,7 @@ private PollerOptions(
182198
this.pollThreadCount = pollThreadCount;
183199
this.uncaughtExceptionHandler = uncaughtExceptionHandler;
184200
this.pollThreadNamePrefix = pollThreadNamePrefix;
201+
this.pollOnlyIfExecutorHasCapacity = pollOnlyIfExecutorHasCapacity;
185202
}
186203

187204
public int getMaximumPollRateIntervalMilliseconds() {
@@ -216,6 +233,10 @@ public String getPollThreadNamePrefix() {
216233
return pollThreadNamePrefix;
217234
}
218235

236+
public Boolean getPollOnlyIfExecutorHasCapacity() {
237+
return pollOnlyIfExecutorHasCapacity;
238+
}
239+
219240
@Override
220241
public String toString() {
221242
return "PollerOptions{"
@@ -233,6 +254,8 @@ public String toString() {
233254
+ pollThreadCount
234255
+ ", pollThreadNamePrefix='"
235256
+ pollThreadNamePrefix
257+
+ ", pollOnlyIfExecutorHasCapacity='"
258+
+ pollOnlyIfExecutorHasCapacity
236259
+ '\''
237260
+ '}';
238261
}

src/main/java/com/uber/cadence/internal/worker/TaskExecutor.java

+1
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@
1919

2020
interface TaskExecutor<T> {
2121
void process(T task);
22+
boolean hasCapacity();
2223
}

0 commit comments

Comments
 (0)