Skip to content

Commit c9c3c25

Browse files
Add replay test case for problematic continue-as-new case
1 parent a9f22e3 commit c9c3c25

File tree

3 files changed

+126
-0
lines changed

3 files changed

+126
-0
lines changed

test/replaytests/continue_as_new.json

+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
[
2+
{
3+
"eventId": 1,
4+
"timestamp": 1699856700704442400,
5+
"eventType": "WorkflowExecutionStarted",
6+
"version": 4,
7+
"taskId": 882931375,
8+
"workflowExecutionStartedEventAttributes": {
9+
"workflowType": {
10+
"name": "fx.SimpleSignalWorkflow"
11+
},
12+
"taskList": {
13+
"name": "fx-worker"
14+
},
15+
"executionStartToCloseTimeoutSeconds": 600,
16+
"taskStartToCloseTimeoutSeconds": 10,
17+
"continuedExecutionRunId": "a664f402-bfe9-4739-945c-9cbc637548f1",
18+
"initiator": "CronSchedule",
19+
"continuedFailureReason": "cadenceInternal:Timeout START_TO_CLOSE",
20+
"originalExecutionRunId": "d0baf930-6a83-4740-b773-71aaa696eed1",
21+
"firstExecutionRunId": "e85fa1b9-8899-40ce-8af9-7e0f93ed7ae5",
22+
"firstScheduleTimeNano": "2023-05-22T15:45:26.535595761-07:00",
23+
"cronSchedule": "* * * * *",
24+
"firstDecisionTaskBackoffSeconds": 60,
25+
"PartitionConfig": {
26+
"isolation-group": "dca11"
27+
}
28+
}
29+
},
30+
{
31+
"eventId": 2,
32+
"timestamp": 1699856760713586608,
33+
"eventType": "DecisionTaskScheduled",
34+
"version": 4,
35+
"taskId": 882931383,
36+
"decisionTaskScheduledEventAttributes": {
37+
"taskList": {
38+
"name": "fx-worker"
39+
},
40+
"startToCloseTimeoutSeconds": 10
41+
}
42+
},
43+
{
44+
"eventId": 3,
45+
"timestamp": 1699856760741837021,
46+
"eventType": "DecisionTaskStarted",
47+
"version": 4,
48+
"taskId": 882931387,
49+
"decisionTaskStartedEventAttributes": {
50+
"scheduledEventId": 2,
51+
"identity": "202@dca50-7q@fx-worker@db443597-5124-483a-b1a5-4b1ff35a0ed4",
52+
"requestId": "bb0ee926-13d1-4af4-9f9c-51433333ad04"
53+
}
54+
},
55+
{
56+
"eventId": 4,
57+
"timestamp": 1699856760773459755,
58+
"eventType": "DecisionTaskCompleted",
59+
"version": 4,
60+
"taskId": 882931391,
61+
"decisionTaskCompletedEventAttributes": {
62+
"scheduledEventId": 2,
63+
"startedEventId": 3,
64+
"identity": "202@dca50-7q@fx-worker@db443597-5124-483a-b1a5-4b1ff35a0ed4",
65+
"binaryChecksum": "uDeploy:dc3e318b30a49e8bb88f462a50fe3a01dd210a3a"
66+
}
67+
},
68+
{
69+
"eventId": 5,
70+
"timestamp": 1699857360713649962,
71+
"eventType": "WorkflowExecutionContinuedAsNew",
72+
"version": 4,
73+
"taskId": 882931394,
74+
"workflowExecutionContinuedAsNewEventAttributes": {
75+
"newExecutionRunId": "06c2468c-2d2d-44f7-ac7a-ff3c383f6e90",
76+
"workflowType": {
77+
"name": "fx.SimpleSignalWorkflow"
78+
},
79+
"taskList": {
80+
"name": "fx-worker"
81+
},
82+
"executionStartToCloseTimeoutSeconds": 600,
83+
"taskStartToCloseTimeoutSeconds": 10,
84+
"decisionTaskCompletedEventId": -23,
85+
"backoffStartIntervalInSeconds": 60,
86+
"initiator": "CronSchedule",
87+
"failureReason": "cadenceInternal:Timeout START_TO_CLOSE"
88+
}
89+
}
90+
]
+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package replaytests
2+
3+
import (
4+
"go.uber.org/cadence/workflow"
5+
"go.uber.org/zap"
6+
)
7+
8+
// ContinueAsNewWorkflow is a sample Cadence workflows that can receive a signal
9+
func ContinueAsNewWorkflow(ctx workflow.Context) error {
10+
selector := workflow.NewSelector(ctx)
11+
var signalResult string
12+
signalName := "helloWorldSignal"
13+
for {
14+
signalChan := workflow.GetSignalChannel(ctx, signalName)
15+
selector.AddReceive(signalChan, func(c workflow.Channel, more bool) {
16+
c.Receive(ctx, &signalResult)
17+
workflow.GetLogger(ctx).Info("Received age signalResult from signal!", zap.String("signal", signalName), zap.String("value", signalResult))
18+
})
19+
workflow.GetLogger(ctx).Info("Waiting for signal on channel.. " + signalName)
20+
// Wait for signal
21+
selector.Select(ctx)
22+
if signalResult == "kill" {
23+
return nil
24+
}
25+
}
26+
}

test/replaytests/replay_test.go

+10
Original file line numberDiff line numberDiff line change
@@ -181,3 +181,13 @@ func TestSimpleParallelWorkflowWithMissingActivityCall(t *testing.T) {
181181
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "branch2.json")
182182
assert.ErrorContains(t, err, "nondeterministic workflow: missing replay decision")
183183
}
184+
185+
// Runs a history which ends with WorkflowExecutionContinuedAsNew. Replay fails because of the additional checks done
186+
// for continue as new case by replayWorkflowHistory().
187+
// This should not have any error because it's a valid continue as new case.
188+
func TestContinueAsNew(t *testing.T) {
189+
replayer := worker.NewWorkflowReplayer()
190+
replayer.RegisterWorkflowWithOptions(ContinueAsNewWorkflow, workflow.RegisterOptions{Name: "fx.SimpleSignalWorkflow"})
191+
err := replayer.ReplayWorkflowHistoryFromJSONFile(zaptest.NewLogger(t), "continue_as_new.json")
192+
assert.ErrorContains(t, err, "replay workflow doesn't return the same result as the last event")
193+
}

0 commit comments

Comments
 (0)