Skip to content

Commit 8592f7c

Browse files
authored
Added storage of retryOptions in mutableSideEffect for WorkflowRetryerInternal
This is to support changes to retryOptions while workflow is running without breaking determinism.
1 parent 50c887a commit 8592f7c

File tree

3 files changed

+206
-23
lines changed

3 files changed

+206
-23
lines changed

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,7 @@ public void continueAsNewOnCompletion(ContinueAsNewWorkflowExecutionParameters p
506506
@Override
507507
public Optional<byte[]> mutableSideEffect(
508508
String id, DataConverter converter, Func1<Optional<byte[]>, Optional<byte[]>> func) {
509-
throw new UnsupportedOperationException("not implemented");
509+
return func.apply(Optional.empty());
510510
}
511511

512512
@Override
@@ -552,7 +552,7 @@ public boolean getEnableLoggingInReplay() {
552552

553553
@Override
554554
public UUID randomUUID() {
555-
throw new UnsupportedOperationException("not implemented");
555+
return UUID.randomUUID();
556556
}
557557
}
558558
}

src/main/java/com/uber/cadence/internal/sync/WorkflowRetryerInternal.java

+34-21
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public static void retry(RetryOptions options, Functions.Proc proc) {
5454
public static <R> R validateOptionsAndRetry(RetryOptions options, Functions.Func<R> func) {
5555
return retry(RetryOptions.merge(null, options), func);
5656
}
57+
5758
/**
5859
* Retry function synchronously.
5960
*
@@ -64,19 +65,24 @@ public static <R> R validateOptionsAndRetry(RetryOptions options, Functions.Func
6465
public static <R> R retry(RetryOptions options, Functions.Func<R> func) {
6566
options.validate();
6667
int attempt = 1;
67-
long startTime = Workflow.currentTimeMillis();
68+
long startTime = WorkflowInternal.currentTimeMillis();
69+
// Records retry options in the history allowing changing them without breaking determinism.
70+
String retryId = WorkflowInternal.randomUUID().toString();
71+
RetryOptions retryOptions =
72+
WorkflowInternal.mutableSideEffect(
73+
retryId, RetryOptions.class, Object::equals, () -> options);
6874
while (true) {
69-
long nextSleepTime = calculateSleepTime(attempt, options);
75+
long nextSleepTime = calculateSleepTime(attempt, retryOptions);
7076
try {
7177
return func.apply();
7278
} catch (Exception e) {
73-
long elapsed = Workflow.currentTimeMillis() - startTime;
74-
if (shouldRethrow(e, options, attempt, elapsed, nextSleepTime)) {
75-
throw Workflow.wrap(e);
79+
long elapsed = WorkflowInternal.currentTimeMillis() - startTime;
80+
if (shouldRethrow(e, retryOptions, attempt, elapsed, nextSleepTime)) {
81+
throw WorkflowInternal.wrap(e);
7682
}
7783
}
7884
attempt++;
79-
Workflow.sleep(nextSleepTime);
85+
WorkflowInternal.sleep(Duration.ofMillis(nextSleepTime));
8086
}
8187
}
8288

@@ -88,14 +94,23 @@ public static <R> R retry(RetryOptions options, Functions.Func<R> func) {
8894
* @return result promise to the result or failure if retries stopped according to options.
8995
*/
9096
public static <R> Promise<R> retryAsync(RetryOptions options, Functions.Func<Promise<R>> func) {
91-
long startTime = Workflow.currentTimeMillis();
92-
return retryAsync(options, func, startTime, 1);
97+
String retryId = WorkflowInternal.randomUUID().toString();
98+
long startTime = WorkflowInternal.currentTimeMillis();
99+
return retryAsync(retryId, options, func, startTime, 1);
93100
}
94101

95102
private static <R> Promise<R> retryAsync(
96-
RetryOptions options, Functions.Func<Promise<R>> func, long startTime, long attempt) {
103+
String retryId,
104+
RetryOptions options,
105+
Functions.Func<Promise<R>> func,
106+
long startTime,
107+
long attempt) {
97108
options.validate();
98-
CompletablePromise<R> funcResult = Workflow.newPromise();
109+
RetryOptions retryOptions =
110+
WorkflowInternal.mutableSideEffect(
111+
retryId, RetryOptions.class, Object::equals, () -> options);
112+
113+
CompletablePromise<R> funcResult = WorkflowInternal.newCompletablePromise();
99114
try {
100115
funcResult.completeFrom(func.apply());
101116
} catch (RuntimeException e) {
@@ -105,17 +120,18 @@ private static <R> Promise<R> retryAsync(
105120
.handle(
106121
(r, e) -> {
107122
if (e == null) {
108-
return Workflow.newPromise(r);
123+
return WorkflowInternal.newPromise(r);
109124
}
110-
long elapsed = Workflow.currentTimeMillis() - startTime;
111-
long sleepTime = calculateSleepTime(attempt, options);
112-
if (shouldRethrow(e, options, attempt, elapsed, sleepTime)) {
125+
long elapsed = WorkflowInternal.currentTimeMillis() - startTime;
126+
long sleepTime = calculateSleepTime(attempt, retryOptions);
127+
if (shouldRethrow(e, retryOptions, attempt, elapsed, sleepTime)) {
113128
throw e;
114129
}
115130
// newTimer runs in a separate thread, so it performs trampolining eliminating tail
116131
// recursion.
117-
return Workflow.newTimer(Duration.ofMillis(sleepTime))
118-
.thenCompose((nil) -> retryAsync(options, func, startTime, attempt + 1));
132+
return WorkflowInternal.newTimer(Duration.ofMillis(sleepTime))
133+
.thenCompose(
134+
(nil) -> retryAsync(retryId, retryOptions, func, startTime, attempt + 1));
119135
})
120136
.thenCompose((r) -> r);
121137
}
@@ -137,12 +153,9 @@ private static boolean shouldRethrow(
137153
return true;
138154
}
139155
Duration expiration = options.getExpiration();
140-
if (expiration != null
156+
return expiration != null
141157
&& elapsed + sleepTime >= expiration.toMillis()
142-
&& (attempt > options.getMinimumAttempts())) {
143-
return true;
144-
}
145-
return false;
158+
&& (attempt > options.getMinimumAttempts());
146159
}
147160

148161
private static long calculateSleepTime(long attempt, RetryOptions options) {

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

+170
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,56 @@ public void testActivityRetry() {
378378
assertEquals(activitiesImpl.toString(), 3, activitiesImpl.invocations.size());
379379
}
380380

381+
public static class TestActivityRetryOptionsChange implements TestWorkflow1 {
382+
383+
@Override
384+
public String execute(String taskList) {
385+
ActivityOptions.Builder options =
386+
new ActivityOptions.Builder()
387+
.setTaskList(taskList)
388+
.setHeartbeatTimeout(Duration.ofSeconds(5))
389+
.setScheduleToCloseTimeout(Duration.ofSeconds(5))
390+
.setScheduleToStartTimeout(Duration.ofSeconds(5))
391+
.setStartToCloseTimeout(Duration.ofSeconds(10));
392+
RetryOptions retryOptions;
393+
if (Workflow.isReplaying()) {
394+
retryOptions =
395+
new RetryOptions.Builder()
396+
.setMinimumAttempts(1)
397+
.setMaximumInterval(Duration.ofSeconds(1))
398+
.setInitialInterval(Duration.ofSeconds(1))
399+
.setMaximumAttempts(3)
400+
.build();
401+
} else {
402+
retryOptions =
403+
new RetryOptions.Builder()
404+
.setMinimumAttempts(2)
405+
.setMaximumInterval(Duration.ofSeconds(1))
406+
.setInitialInterval(Duration.ofSeconds(1))
407+
.setMaximumAttempts(2)
408+
.build();
409+
}
410+
TestActivities activities = Workflow.newActivityStub(TestActivities.class, options.build());
411+
Workflow.retry(retryOptions, () -> activities.throwIO());
412+
return "ignored";
413+
}
414+
}
415+
416+
@Test
417+
public void testActivityRetryOptionsChange() {
418+
startWorkerFor(TestActivityRetryOptionsChange.class);
419+
TestWorkflow1 workflowStub =
420+
workflowClient.newWorkflowStub(
421+
TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build());
422+
try {
423+
workflowStub.execute(taskList);
424+
fail("unreachable");
425+
} catch (WorkflowException e) {
426+
assertTrue(e.getCause().getCause() instanceof IOException);
427+
}
428+
assertEquals(activitiesImpl.toString(), 2, activitiesImpl.invocations.size());
429+
}
430+
381431
public static class TestUntypedActivityRetry implements TestWorkflow1 {
382432

383433
@Override
@@ -490,6 +540,57 @@ public void testAsyncActivityRetry() {
490540
assertEquals(activitiesImpl.toString(), 3, activitiesImpl.invocations.size());
491541
}
492542

543+
public static class TestAsyncActivityRetryOptionsChange implements TestWorkflow1 {
544+
545+
private TestActivities activities;
546+
547+
@Override
548+
public String execute(String taskList) {
549+
ActivityOptions.Builder options =
550+
new ActivityOptions.Builder()
551+
.setTaskList(taskList)
552+
.setHeartbeatTimeout(Duration.ofSeconds(5))
553+
.setScheduleToCloseTimeout(Duration.ofSeconds(5))
554+
.setScheduleToStartTimeout(Duration.ofSeconds(5))
555+
.setStartToCloseTimeout(Duration.ofSeconds(10));
556+
if (Workflow.isReplaying()) {
557+
options.setRetryOptions(
558+
new RetryOptions.Builder()
559+
.setMinimumAttempts(1)
560+
.setMaximumInterval(Duration.ofSeconds(1))
561+
.setInitialInterval(Duration.ofSeconds(1))
562+
.setMaximumAttempts(3)
563+
.build());
564+
} else {
565+
options.setRetryOptions(
566+
new RetryOptions.Builder()
567+
.setMinimumAttempts(2)
568+
.setMaximumInterval(Duration.ofSeconds(1))
569+
.setInitialInterval(Duration.ofSeconds(1))
570+
.setMaximumAttempts(2)
571+
.build());
572+
}
573+
this.activities = Workflow.newActivityStub(TestActivities.class, options.build());
574+
Async.procedure(activities::throwIO).get();
575+
return "ignored";
576+
}
577+
}
578+
579+
@Test
580+
public void testAsyncActivityRetryOptionsChange() {
581+
startWorkerFor(TestAsyncActivityRetryOptionsChange.class);
582+
TestWorkflow1 workflowStub =
583+
workflowClient.newWorkflowStub(
584+
TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build());
585+
try {
586+
workflowStub.execute(taskList);
587+
fail("unreachable");
588+
} catch (WorkflowException e) {
589+
assertTrue(e.getCause().getCause() instanceof IOException);
590+
}
591+
assertEquals(activitiesImpl.toString(), 2, activitiesImpl.invocations.size());
592+
}
593+
493594
public static class TestHeartbeatTimeoutDetails implements TestWorkflow1 {
494595

495596
@Override
@@ -1470,6 +1571,75 @@ public void testAsyncRetry() {
14701571
assertTrue(trace.get(2).startsWith("retry at "));
14711572
}
14721573

1574+
public static class TestAsyncRetryOptionsChangeWorkflow implements TestWorkflow2 {
1575+
1576+
private final List<String> trace = new ArrayList<>();
1577+
1578+
@Override
1579+
public String execute(boolean useExternalService) {
1580+
RetryOptions retryOptions;
1581+
if (Workflow.isReplaying()) {
1582+
retryOptions =
1583+
new RetryOptions.Builder()
1584+
.setMinimumAttempts(1)
1585+
.setMaximumInterval(Duration.ofSeconds(1))
1586+
.setInitialInterval(Duration.ofSeconds(1))
1587+
.setMaximumAttempts(3)
1588+
.build();
1589+
} else {
1590+
retryOptions =
1591+
new RetryOptions.Builder()
1592+
.setMinimumAttempts(2)
1593+
.setMaximumInterval(Duration.ofSeconds(1))
1594+
.setInitialInterval(Duration.ofSeconds(1))
1595+
.setMaximumAttempts(2)
1596+
.build();
1597+
}
1598+
1599+
trace.clear(); // clear because of replay
1600+
trace.add("started");
1601+
Async.retry(
1602+
retryOptions,
1603+
() -> {
1604+
trace.add("retry at " + Workflow.currentTimeMillis());
1605+
return Workflow.newFailedPromise(new IllegalThreadStateException("simulated"));
1606+
})
1607+
.get();
1608+
trace.add("beforeSleep");
1609+
Workflow.sleep(60000);
1610+
trace.add("done");
1611+
return "";
1612+
}
1613+
1614+
@Override
1615+
public List<String> getTrace() {
1616+
return trace;
1617+
}
1618+
}
1619+
1620+
/** @see DeterministicRunnerTest#testRetry() */
1621+
@Test
1622+
public void testAsyncRetryOptionsChange() {
1623+
startWorkerFor(TestAsyncRetryOptionsChangeWorkflow.class);
1624+
TestWorkflow2 client =
1625+
workflowClient.newWorkflowStub(
1626+
TestWorkflow2.class, newWorkflowOptionsBuilder(taskList).build());
1627+
String result = null;
1628+
try {
1629+
result = client.execute(useExternalService);
1630+
fail("unreachable");
1631+
} catch (WorkflowException e) {
1632+
assertTrue(e.getCause() instanceof IllegalThreadStateException);
1633+
assertEquals("simulated", e.getCause().getMessage());
1634+
}
1635+
assertNull(result);
1636+
List<String> trace = client.getTrace();
1637+
assertEquals(trace.toString(), 3, trace.size());
1638+
assertEquals("started", trace.get(0));
1639+
assertTrue(trace.get(1).startsWith("retry at "));
1640+
assertTrue(trace.get(2).startsWith("retry at "));
1641+
}
1642+
14731643
public interface TestExceptionPropagation {
14741644

14751645
@WorkflowMethod

0 commit comments

Comments
 (0)