Skip to content

Commit 6317ecb

Browse files
committed
Replayer scenario when frequency is changed
1 parent 680fdce commit 6317ecb

File tree

3 files changed

+182
-8
lines changed

3 files changed

+182
-8
lines changed

src/test/java/com/uber/cadence/samples/replaytests/HelloActivityReplayTest.java

-8
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package com.uber.cadence.samples.replaytests;
1919

2020
import com.uber.cadence.samples.hello.HelloActivity;
21-
import com.uber.cadence.samples.hello.HelloPeriodic;
2221
import com.uber.cadence.testing.WorkflowReplayer;
2322
import org.junit.Test;
2423

@@ -38,11 +37,4 @@ public void testReplay() throws Exception {
3837
WorkflowReplayer.replayWorkflowExecutionFromResource(
3938
"replaytests/HelloActivity.json", HelloActivity.GreetingWorkflowImpl.class);
4039
}
41-
42-
// continue-as-new case for replayer tests
43-
@Test
44-
public void testReplay_continueAsNew() throws Exception {
45-
WorkflowReplayer.replayWorkflowExecutionFromResource(
46-
"replaytests/HelloPeriodic.json", HelloPeriodic.GreetingWorkflowImpl.class);
47-
}
4840
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.uber.cadence.samples.replaytests;
2+
3+
import com.uber.cadence.samples.hello.HelloPeriodic;
4+
import com.uber.cadence.testing.WorkflowReplayer;
5+
import org.junit.Test;
6+
7+
public class HelloPeriodicReplayTest {
8+
9+
// continue-as-new case for replayer tests: Passing
10+
@Test
11+
public void testReplay_continueAsNew() throws Exception {
12+
WorkflowReplayer.replayWorkflowExecutionFromResource(
13+
"replaytests/HelloPeriodic.json", HelloPeriodic.GreetingWorkflowImpl.class);
14+
}
15+
16+
// Continue as new case: change in sleep timer compared to original workflow definition. It should
17+
// fail. BUT it is currently passing.
18+
@Test
19+
public void testReplay_continueAsNew_timerChange() throws Exception {
20+
WorkflowReplayer.replayWorkflowExecutionFromResource(
21+
"replaytests/HelloPeriodic.json",
22+
HelloPeriodic_sleepTimerChange.GreetingWorkflowImpl.class);
23+
}
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.samples.replaytests;
19+
20+
import static com.uber.cadence.samples.common.SampleConstants.DOMAIN;
21+
22+
import com.google.common.base.Throwables;
23+
import com.uber.cadence.WorkflowExecution;
24+
import com.uber.cadence.WorkflowIdReusePolicy;
25+
import com.uber.cadence.activity.Activity;
26+
import com.uber.cadence.activity.ActivityOptions;
27+
import com.uber.cadence.client.DuplicateWorkflowException;
28+
import com.uber.cadence.client.WorkflowClient;
29+
import com.uber.cadence.client.WorkflowClientOptions;
30+
import com.uber.cadence.client.WorkflowException;
31+
import com.uber.cadence.client.WorkflowStub;
32+
import com.uber.cadence.internal.compatibility.Thrift2ProtoAdapter;
33+
import com.uber.cadence.internal.compatibility.proto.serviceclient.IGrpcServiceStubs;
34+
import com.uber.cadence.worker.Worker;
35+
import com.uber.cadence.worker.WorkerFactory;
36+
import com.uber.cadence.workflow.Workflow;
37+
import com.uber.cadence.workflow.WorkflowMethod;
38+
import java.time.Duration;
39+
import java.util.Optional;
40+
41+
public class HelloPeriodic_sleepTimerChange {
42+
43+
static final String TASK_LIST = "HelloPeriodic";
44+
static final String PERIODIC_WORKFLOW_ID = "HelloPeriodic";
45+
46+
public interface GreetingWorkflow {
47+
@WorkflowMethod(
48+
// At most one instance.
49+
workflowId = PERIODIC_WORKFLOW_ID,
50+
// To allow starting workflow with the same ID after the previous one has terminated.
51+
workflowIdReusePolicy = WorkflowIdReusePolicy.AllowDuplicate,
52+
// Adjust this value to the maximum time workflow is expected to run.
53+
// It usually depends on the number of repetitions and interval between them.
54+
executionStartToCloseTimeoutSeconds = 300,
55+
taskList = TASK_LIST
56+
)
57+
void greetPeriodically(String name, Duration delay);
58+
}
59+
60+
public interface GreetingActivities {
61+
void greet(String greeting);
62+
}
63+
64+
public static class GreetingWorkflowImpl implements GreetingWorkflow {
65+
66+
/**
67+
* This value is so low just to make the example interesting to watch. In real life you would
68+
* use something like 100 or a value that matches a business cycle. For example if it runs once
69+
* an hour 24 would make sense.
70+
*/
71+
private final int CONTINUE_AS_NEW_FREQUENCEY = 1000;
72+
73+
private final GreetingActivities activities =
74+
Workflow.newActivityStub(
75+
GreetingActivities.class,
76+
new ActivityOptions.Builder()
77+
.setScheduleToCloseTimeout(Duration.ofSeconds(10))
78+
.build());
79+
80+
/**
81+
* Stub used to terminate this workflow run and create the next one with the same ID atomically.
82+
*/
83+
private final GreetingWorkflow continueAsNew =
84+
Workflow.newContinueAsNewStub(GreetingWorkflow.class);
85+
86+
@Override
87+
public void greetPeriodically(String name, Duration delay) {
88+
// Loop the predefined number of times then continue this workflow as new.
89+
// This is needed to periodically truncate the history size.
90+
for (int i = 0; i < CONTINUE_AS_NEW_FREQUENCEY; i++) {
91+
activities.greet("Hello " + name + "!");
92+
Workflow.sleep(delay);
93+
}
94+
// Current workflow run stops executing after this call.
95+
continueAsNew.greetPeriodically(name, delay);
96+
// unreachable line
97+
}
98+
}
99+
100+
static class GreetingActivitiesImpl implements GreetingActivities {
101+
@Override
102+
public void greet(String greeting) {
103+
System.out.println("From " + Activity.getWorkflowExecution() + ": " + greeting);
104+
}
105+
}
106+
107+
public static void main(String[] args) throws InterruptedException {
108+
// Get a new client
109+
// NOTE: to set a different options, you can do like this:
110+
// ClientOptions.newBuilder().setRpcTimeout(5 * 1000).build();
111+
WorkflowClient workflowClient =
112+
WorkflowClient.newInstance(
113+
new Thrift2ProtoAdapter(IGrpcServiceStubs.newInstance()),
114+
WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build());
115+
// Get worker to poll the task list.
116+
WorkerFactory factory = WorkerFactory.newInstance(workflowClient);
117+
Worker worker = factory.newWorker(TASK_LIST);
118+
// Workflows are stateful. So you need a type to create instances.
119+
worker.registerWorkflowImplementationTypes(GreetingWorkflowImpl.class);
120+
// Activities are stateless and thread safe. So a shared instance is used.
121+
worker.registerActivitiesImplementations(new GreetingActivitiesImpl());
122+
// Start listening to the workflow and activity task lists.
123+
factory.start();
124+
125+
// Start a workflow execution. Usually this is done from another program.
126+
// To ensure that this daemon type workflow is always running try to start it periodically
127+
// ignoring the duplicated exception.
128+
// It is only to protect from application level failures.
129+
// Failures of a workflow worker don't lead to workflow failures.
130+
WorkflowExecution execution = null;
131+
while (true) {
132+
// Print reason of failure of the previous run, before restarting.
133+
if (execution != null) {
134+
WorkflowStub workflow = workflowClient.newUntypedWorkflowStub(execution, Optional.empty());
135+
try {
136+
workflow.getResult(Void.class); //
137+
} catch (WorkflowException e) {
138+
System.out.println("Previous instance failed:\n" + Throwables.getStackTraceAsString(e));
139+
}
140+
}
141+
// New stub instance should be created for each new workflow start.
142+
GreetingWorkflow workflow = workflowClient.newWorkflowStub(GreetingWorkflow.class);
143+
try {
144+
execution =
145+
WorkflowClient.start(workflow::greetPeriodically, "World", Duration.ofSeconds(3));
146+
System.out.println("Started " + execution);
147+
} catch (DuplicateWorkflowException e) {
148+
System.out.println("Still running as " + e.getExecution());
149+
} catch (Throwable e) {
150+
e.printStackTrace();
151+
System.exit(1);
152+
}
153+
// This value is so low just for the sample purpose. In production workflow
154+
// it is usually much higher.
155+
Thread.sleep(10000);
156+
}
157+
}
158+
}

0 commit comments

Comments
 (0)