Skip to content

Commit 373ada0

Browse files
authored
Fix retryer in activity and workflow worker (#250)
* Do not retry certain errors on respond activity/workflow completion and failure * Rebase, bump up version and update changelog
1 parent f067000 commit 373ada0

File tree

6 files changed

+107
-11
lines changed

6 files changed

+107
-11
lines changed

CHANGELOG.md

+7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Changelog
22

3+
## v2.3.0
4+
- Added cron schedule support.
5+
- Fix infinite retryer in activity and workflow worker due to non-retryable error.
6+
- Fixed hanging on testEnv.close when testEnv was not started.
7+
- Fix for NPE when method has base type return type like int.
8+
- Fixed JsonDataConverter to correctly report non serializable exceptions.
9+
310
## v2.2.0
411
- Added support for workflow and activity server side retries.
512
- Clean worker shutdown. Replaced Worker shutdown(Duration) with Worker shutdown, shutdownNow and awaitTermination.

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ googleJavaFormat {
3737
}
3838

3939
group = 'com.uber.cadence'
40-
version = '2.2.0'
40+
version = '2.3.0'
4141

4242
description = """Uber Cadence Java Client"""
4343

src/main/java/com/uber/cadence/common/RetryOptions.java

+25
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,31 @@ public RetryOptions merge(RetryOptions o) {
8383
.validateBuildWithDefaults();
8484
}
8585

86+
@SafeVarargs
87+
public final RetryOptions addDoNotRetry(Class<? extends Throwable>... doNotRetry) {
88+
if (doNotRetry == null) {
89+
return this;
90+
}
91+
92+
double backoffCoefficient = getBackoffCoefficient();
93+
if (backoffCoefficient == 0) {
94+
backoffCoefficient = DEFAULT_BACKOFF_COEFFICIENT;
95+
}
96+
97+
RetryOptions.Builder builder =
98+
new RetryOptions.Builder()
99+
.setInitialInterval(getInitialInterval())
100+
.setExpiration(getExpiration())
101+
.setMaximumInterval(getMaximumInterval())
102+
.setBackoffCoefficient(backoffCoefficient)
103+
.setDoNotRetry(merge(getDoNotRetry(), Arrays.asList(doNotRetry)));
104+
105+
if (getMaximumAttempts() > 0) {
106+
builder.setMaximumAttempts(getMaximumAttempts());
107+
}
108+
return builder.validateBuildWithDefaults();
109+
}
110+
86111
public static final class Builder {
87112

88113
private Duration initialInterval;

src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java

+23-8
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,7 @@
1717

1818
package com.uber.cadence.internal.worker;
1919

20-
import com.uber.cadence.PollForActivityTaskResponse;
21-
import com.uber.cadence.RespondActivityTaskCanceledRequest;
22-
import com.uber.cadence.RespondActivityTaskCompletedRequest;
23-
import com.uber.cadence.RespondActivityTaskFailedRequest;
24-
import com.uber.cadence.WorkflowExecution;
20+
import com.uber.cadence.*;
2521
import com.uber.cadence.common.RetryOptions;
2622
import com.uber.cadence.internal.common.Retryer;
2723
import com.uber.cadence.internal.logging.LoggerTag;
@@ -224,15 +220,27 @@ private void sendReply(PollForActivityTaskResponse task, ActivityTaskHandler.Res
224220
RetryOptions ro = response.getRequestRetryOptions();
225221
RespondActivityTaskCompletedRequest taskCompleted = response.getTaskCompleted();
226222
if (taskCompleted != null) {
227-
ro = options.getReportCompletionRetryOptions().merge(ro);
223+
ro =
224+
options
225+
.getReportCompletionRetryOptions()
226+
.merge(ro)
227+
.addDoNotRetry(
228+
BadRequestError.class, EntityNotExistsError.class, DomainNotActiveError.class);
228229
taskCompleted.setTaskToken(task.getTaskToken());
229230
taskCompleted.setIdentity(options.getIdentity());
230231
Retryer.retry(ro, () -> service.RespondActivityTaskCompleted(taskCompleted));
231232
options.getMetricsScope().counter(MetricsType.ACTIVITY_TASK_COMPLETED_COUNTER).inc(1);
232233
} else {
233234
RespondActivityTaskFailedRequest taskFailed = response.getTaskFailed();
234235
if (taskFailed != null) {
235-
ro = options.getReportFailureRetryOptions().merge(ro);
236+
ro =
237+
options
238+
.getReportFailureRetryOptions()
239+
.merge(ro)
240+
.addDoNotRetry(
241+
BadRequestError.class,
242+
EntityNotExistsError.class,
243+
DomainNotActiveError.class);
236244
taskFailed.setTaskToken(task.getTaskToken());
237245
taskFailed.setIdentity(options.getIdentity());
238246
Retryer.retry(ro, () -> service.RespondActivityTaskFailed(taskFailed));
@@ -242,7 +250,14 @@ private void sendReply(PollForActivityTaskResponse task, ActivityTaskHandler.Res
242250
if (taskCancelled != null) {
243251
taskCancelled.setTaskToken(task.getTaskToken());
244252
taskCancelled.setIdentity(options.getIdentity());
245-
ro = options.getReportFailureRetryOptions().merge(ro);
253+
ro =
254+
options
255+
.getReportFailureRetryOptions()
256+
.merge(ro)
257+
.addDoNotRetry(
258+
BadRequestError.class,
259+
EntityNotExistsError.class,
260+
DomainNotActiveError.class);
246261
Retryer.retry(ro, () -> service.RespondActivityTaskCanceled(taskCancelled));
247262
options.getMetricsScope().counter(MetricsType.ACTIVITY_TASK_CANCELED_COUNTER).inc(1);
248263
}

src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -270,14 +270,26 @@ private void sendReply(
270270
RetryOptions ro = response.getRequestRetryOptions();
271271
RespondDecisionTaskCompletedRequest taskCompleted = response.getTaskCompleted();
272272
if (taskCompleted != null) {
273-
ro = options.getReportCompletionRetryOptions().merge(ro);
273+
ro =
274+
options
275+
.getReportCompletionRetryOptions()
276+
.merge(ro)
277+
.addDoNotRetry(
278+
BadRequestError.class, EntityNotExistsError.class, DomainNotActiveError.class);
274279
taskCompleted.setIdentity(options.getIdentity());
275280
taskCompleted.setTaskToken(taskToken);
276281
Retryer.retry(ro, () -> service.RespondDecisionTaskCompleted(taskCompleted));
277282
} else {
278283
RespondDecisionTaskFailedRequest taskFailed = response.getTaskFailed();
279284
if (taskFailed != null) {
280-
ro = options.getReportFailureRetryOptions().merge(ro);
285+
ro =
286+
options
287+
.getReportFailureRetryOptions()
288+
.merge(ro)
289+
.addDoNotRetry(
290+
BadRequestError.class,
291+
EntityNotExistsError.class,
292+
DomainNotActiveError.class);
281293
taskFailed.setIdentity(options.getIdentity());
282294
taskFailed.setTaskToken(taskToken);
283295
Retryer.retry(ro, () -> service.RespondDecisionTaskFailed(taskFailed));

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

+37
Original file line numberDiff line numberDiff line change
@@ -4075,6 +4075,43 @@ public void testLargeHistory() {
40754075
assertEquals("done", result);
40764076
}
40774077

4078+
public interface DecisionTimeoutWorkflow {
4079+
@WorkflowMethod(executionStartToCloseTimeoutSeconds = 10000)
4080+
String execute(String testName) throws InterruptedException;
4081+
}
4082+
4083+
public static class DecisionTimeoutWorkflowImpl implements DecisionTimeoutWorkflow {
4084+
4085+
@Override
4086+
public String execute(String testName) throws InterruptedException {
4087+
4088+
AtomicInteger count = retryCount.get(testName);
4089+
if (count == null) {
4090+
count = new AtomicInteger();
4091+
retryCount.put(testName, count);
4092+
Thread.sleep(2000);
4093+
}
4094+
4095+
return "some result";
4096+
}
4097+
}
4098+
4099+
@Test
4100+
public void testDecisionTimeoutWorkflow() throws InterruptedException {
4101+
startWorkerFor(DecisionTimeoutWorkflowImpl.class);
4102+
4103+
WorkflowOptions options =
4104+
new WorkflowOptions.Builder()
4105+
.setTaskList(taskList)
4106+
.setTaskStartToCloseTimeout(Duration.ofSeconds(1))
4107+
.build();
4108+
4109+
DecisionTimeoutWorkflow stub =
4110+
workflowClient.newWorkflowStub(DecisionTimeoutWorkflow.class, options);
4111+
String result = stub.execute(testName.getMethodName());
4112+
Assert.assertEquals("some result", result);
4113+
}
4114+
40784115
private static class FilteredTrace {
40794116

40804117
private final List<String> impl = Collections.synchronizedList(new ArrayList<>());

0 commit comments

Comments
 (0)