Skip to content

Commit b077ec1

Browse files
authored
Fix getResult hang after termination using test service (#700)
Issue #628
1 parent a00ddff commit b077ec1

7 files changed

Lines changed: 98 additions & 21 deletions

File tree

temporal-sdk/src/test/java/io/temporal/workflow/TerminatedWorkflowQueryTest.java renamed to temporal-sdk/src/test/java/io/temporal/workflow/TerminatedWorkflowTest.java

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919

2020
package io.temporal.workflow;
2121

22-
import io.temporal.client.WorkflowClientOptions;
23-
import io.temporal.client.WorkflowFailedException;
24-
import io.temporal.client.WorkflowOptions;
22+
import static org.junit.Assert.*;
23+
24+
import io.temporal.client.*;
25+
import io.temporal.failure.TerminatedFailure;
2526
import io.temporal.testing.internal.SDKTestOptions;
2627
import io.temporal.testing.internal.SDKTestWorkflowRule;
2728
import io.temporal.workflow.shared.TestActivities.TestActivitiesImpl;
@@ -30,18 +31,22 @@
3031
import java.time.Duration;
3132
import java.util.ArrayList;
3233
import java.util.List;
34+
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.TimeoutException;
3336
import org.junit.Assert;
3437
import org.junit.Rule;
3538
import org.junit.Test;
3639

37-
public class TerminatedWorkflowQueryTest {
38-
40+
/**
41+
* Tests verifying the correct behavior of the SDK if the workflow is in unsuccessful final states
42+
*/
43+
public class TerminatedWorkflowTest {
3944
private final TestActivitiesImpl activitiesImpl = new TestActivitiesImpl();
4045

4146
@Rule
4247
public SDKTestWorkflowRule testWorkflowRule =
4348
SDKTestWorkflowRule.newBuilder()
44-
.setWorkflowTypes(TestTraceWorkflowImpl.class)
49+
.setWorkflowTypes(TraceTimingOutWorkflowImpl.class)
4550
.setActivityImplementations(activitiesImpl)
4651
.setWorkflowClientOptions(WorkflowClientOptions.newBuilder().build())
4752
.build();
@@ -53,24 +58,50 @@ public void testShouldReturnQueryResultAfterWorkflowTimeout() {
5358
.toBuilder()
5459
.setWorkflowRunTimeout(Duration.ofSeconds(1))
5560
.build();
56-
TestTraceWorkflow client =
61+
TestTraceWorkflow workflow =
5762
testWorkflowRule.getWorkflowClient().newWorkflowStub(TestTraceWorkflow.class, options);
5863

5964
Assert.assertThrows(
6065
"Workflow should throw because of timeout",
6166
WorkflowFailedException.class,
62-
() -> client.execute(SDKTestWorkflowRule.useExternalService));
67+
workflow::execute);
6368

64-
Assert.assertEquals(1, client.getTrace().size());
65-
Assert.assertEquals("started", client.getTrace().get(0));
69+
Assert.assertEquals(1, workflow.getTrace().size());
70+
Assert.assertEquals("started", workflow.getTrace().get(0));
6671
}
6772

68-
public static class TestTraceWorkflowImpl implements TestTraceWorkflow {
73+
@Test
74+
public void getResultShouldThrowAfterTerminationOfWorkflow() {
75+
WorkflowOptions options =
76+
WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build();
77+
78+
WorkflowStub workflow =
79+
testWorkflowRule.getWorkflowClient().newUntypedWorkflowStub("execute", options);
80+
81+
workflow.start();
82+
83+
workflow.terminate("testing");
84+
85+
WorkflowFailedException exception = null;
86+
try {
87+
workflow.getResult(1000, TimeUnit.MILLISECONDS, String.class);
88+
fail("getResult should throw WorkflowFailedException because the workflow was terminated");
89+
} catch (WorkflowFailedException e) {
90+
// This is expected
91+
exception = e;
92+
} catch (TimeoutException e) {
93+
fail(
94+
"getResult shouldn't wait all 5 seconds till the end of the workflow because it was already terminated");
95+
}
96+
assertNotNull(exception);
97+
assertTrue(exception.getCause() instanceof TerminatedFailure);
98+
}
6999

100+
public static class TraceTimingOutWorkflowImpl implements TestTraceWorkflow {
70101
private final List<String> trace = new ArrayList<>();
71102

72103
@Override
73-
public String execute(boolean useExternalService) {
104+
public String execute() {
74105
VariousTestActivities localActivities =
75106
Workflow.newLocalActivityStub(
76107
VariousTestActivities.class, SDKTestOptions.newLocalActivityOptions());

temporal-sdk/src/test/java/io/temporal/workflow/TimerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public void testTimer() {
5252
}
5353
TestTraceWorkflow client =
5454
testWorkflowRule.getWorkflowClient().newWorkflowStub(TestTraceWorkflow.class, options);
55-
String result = client.execute(SDKTestWorkflowRule.useExternalService);
55+
String result = client.execute();
5656
Assert.assertEquals("testTimer", result);
5757
if (testWorkflowRule.isUseExternalService()) {
5858
testWorkflowRule
@@ -88,9 +88,10 @@ public void testTimer() {
8888
public static class TestTimerWorkflowImpl implements TestTraceWorkflow {
8989

9090
@Override
91-
public String execute(boolean useExternalService) {
91+
public String execute() {
9292
Promise<Void> timer1;
9393
Promise<Void> timer2;
94+
boolean useExternalService = SDKTestWorkflowRule.useExternalService;
9495
Duration timeout1 = useExternalService ? Duration.ofMillis(700) : Duration.ofSeconds(700);
9596
Duration timeout2 = useExternalService ? Duration.ofMillis(1300) : Duration.ofSeconds(1300);
9697
timer1 = Workflow.newTimer(timeout1);

temporal-sdk/src/test/java/io/temporal/workflow/activityTests/AsyncRetryOptionsChangeTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void testAsyncRetryOptionsChange() {
5555
testWorkflowRule.newWorkflowStubTimeoutOptions(TestTraceWorkflow.class);
5656
String result = null;
5757
try {
58-
result = client.execute(SDKTestWorkflowRule.useExternalService);
58+
result = client.execute();
5959
Assert.fail("unreachable");
6060
} catch (WorkflowException e) {
6161
Assert.assertTrue(e.getCause() instanceof ApplicationFailure);
@@ -79,7 +79,7 @@ public static class TestAsyncRetryOptionsChangeWorkflow implements TestTraceWork
7979
private final List<String> trace = new ArrayList<>();
8080

8181
@Override
82-
public String execute(boolean useExternalService) {
82+
public String execute() {
8383
RetryOptions retryOptions;
8484
if (Workflow.isReplaying()) {
8585
retryOptions =

temporal-sdk/src/test/java/io/temporal/workflow/activityTests/AsyncRetryTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public void testAsyncRetry() {
4848
testWorkflowRule.newWorkflowStubTimeoutOptions(TestTraceWorkflow.class);
4949
String result = null;
5050
try {
51-
result = client.execute(SDKTestWorkflowRule.useExternalService);
51+
result = client.execute();
5252
Assert.fail("unreachable");
5353
} catch (WorkflowException e) {
5454
Assert.assertTrue(e.getCause() instanceof ApplicationFailure);
@@ -76,7 +76,7 @@ public static class TestAsyncRetryWorkflowImpl implements TestTraceWorkflow {
7676
private final List<String> trace = new ArrayList<>();
7777

7878
@Override
79-
public String execute(boolean useExternalService) {
79+
public String execute() {
8080
trace.clear(); // clear because of replay
8181
trace.add("started");
8282
Async.retry(

temporal-sdk/src/test/java/io/temporal/workflow/shared/TestWorkflows.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ public interface ITestNamedChild {
114114
@WorkflowInterface
115115
public interface TestTraceWorkflow {
116116

117-
@WorkflowMethod(name = "testActivity")
118-
String execute(boolean useExternalService);
117+
@WorkflowMethod(name = "execute")
118+
String execute();
119119

120120
@QueryMethod(name = "getTrace")
121121
List<String> getTrace();
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.internal.testservice;
21+
22+
public class StateUtils {
23+
/** @return true if the workflow was completed not by workflow task completion result */
24+
public static boolean isWorkflowExecutionForcefullyCompleted(StateMachines.State state) {
25+
switch (state) {
26+
case TERMINATED:
27+
case TIMED_OUT:
28+
return true;
29+
default:
30+
return false;
31+
}
32+
}
33+
}

temporal-testing/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,11 +324,23 @@ private void update(boolean completeWorkflowTaskUpdate, UpdateProcedure updater,
324324

325325
RequestContext ctx = new RequestContext(clock, this, nextEventId);
326326
updater.apply(ctx);
327-
if (concurrentWorkflowTask && workflow.getState() != State.TIMED_OUT) {
327+
328+
if (StateUtils.isWorkflowExecutionForcefullyCompleted(workflow.getState())) {
329+
// if we completed the workflow "externally", not through the result of workflow task
330+
// (timed out or got a termination request) -
331+
// we don't buffer the events and don't wait till the finish of the workflow task
332+
// in-progress even if there is one,
333+
// but instead we apply them to the history immediately.
334+
nextEventId = ctx.commitChanges(store);
335+
} else if (concurrentWorkflowTask) {
336+
// if there is a concurrent workflow task in progress and the workflow wasn't terminated and
337+
// considered timed out,
338+
// we buffer the events and wait till the finish of the workflow task
328339
workflowTaskStateMachine.getData().bufferedEvents.add(ctx);
329340
ctx.fireCallbacks(0);
330341
store.applyTimersAndLocks(ctx);
331342
} else {
343+
// if there is no concurrent workflow task in progress - apply events to the history
332344
nextEventId = ctx.commitChanges(store);
333345
}
334346
} catch (StatusRuntimeException e) {

0 commit comments

Comments
 (0)