Skip to content

Commit 97f623c

Browse files
authored
Extend versioning's replay tester (temporalio#8124)
## What changed? - Make the replay tester, used by versioning entity workflows, better. - The versioning entity workflows CAN on every state change. Previously, the replay tester was simply fetching the last execution of a versioning entity workflow. Thus, we were skipping over a lot of NDE's that could happen if one were to make changes in our workflows. To fix this, the replay tester: 1. Fetches all the workflow ID's present in the CAN chain. Then, it tests if for each individual run, are there code changes that have caused NDE's. 2. It is important to remember that while we are fetching all the workflows in CAN chain, SDK's replay tester does not test the chaining itself. In other words, it tests the code path from the input that is given at the start of each workflow execution. Thus, there could be a chance that a code change introduced doesn't necessarily add new events into a single workflow run. However, this code change can increase the overall runs of the entity workflows and now the replay tester will see this and understand that this as a cause for NDE. 3. Neatly makes a folder for that run and saves the required workflow histories over there. 4. *Bonus*: I have also made the change to store the raw json file of each workflow history. This would make debugging a whole lotta simpler. It shall now look like: <img width="1064" height="861" alt="image" src="https://github.com/user-attachments/assets/67daf5c5-04c3-4236-86a0-f81470661d72" /> ## Why? - Robustness. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) ## Potential risks - None
1 parent 9ba6533 commit 97f623c

File tree

24 files changed

+255
-19
lines changed

24 files changed

+255
-19
lines changed

service/worker/workerdeployment/replaytester/generate_history.sh

Lines changed: 99 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,111 @@
1515
deploymentName="foo"
1616
version="1.0"
1717

18+
# Expected workflow counts - users can override these if their changes are expected to generate more workflows which will be true when a breaking change to
19+
# these worfklows is introduced.
20+
# These values are used by the replay tester to validate that your workflow changes haven't accidentally created additional executions.
21+
EXPECTED_DEPLOYMENT_WORKFLOWS=${EXPECTED_DEPLOYMENT_WORKFLOWS:-8}
22+
EXPECTED_VERSION_WORKFLOWS=${EXPECTED_VERSION_WORKFLOWS:-10}
23+
24+
echo "📋 Expected workflow counts:"
25+
echo " Deployment workflows: $EXPECTED_DEPLOYMENT_WORKFLOWS"
26+
echo " Version workflows: $EXPECTED_VERSION_WORKFLOWS"
27+
echo " (Override with EXPECTED_DEPLOYMENT_WORKFLOWS=X EXPECTED_VERSION_WORKFLOWS=Y if you expect different counts)"
28+
echo ""
29+
1830
# Create the default namespace
1931
temporal operator namespace create default
2032

2133
# Run the worker which shall start the deployment entity workflows....
2234
echo "Running the Go program..."
2335
go run "$(dirname "$0")/worker/worker.go"
2436

25-
# Download the history for the worker deployment workflow...
37+
echo "Waiting 5 seconds for all workflows to show up in visibility..."
38+
sleep 5
39+
40+
# Function to download all workflow runs in CAN chain
41+
download_workflow_chain() {
42+
local workflow_id=$1
43+
local workflow_name=$2
44+
local workflow_type=$3
45+
local run_dir=$4
46+
47+
echo "📥 Downloading all executions for: $workflow_id"
48+
49+
# Use the working query method with TemporalNamespaceDivision
50+
echo " Getting the chain of CAN runs for this workflow using the TemporalNamespaceDivision query..."
51+
run_ids=$(temporal workflow list \
52+
--query "TemporalNamespaceDivision = \"TemporalWorkerDeployment\" AND WorkflowType = \"$workflow_type\"" \
53+
--output json | \
54+
jq -r '.[] | .execution.runId')
55+
56+
# Count how many we found
57+
if [ -z "$run_ids" ]; then
58+
run_count=0
59+
else
60+
run_count=$(echo "$run_ids" | wc -l | tr -d ' ')
61+
fi
62+
echo " Found $run_count executions"
63+
64+
if [ "$run_count" -eq 0 ]; then
65+
echo " No executions found for $workflow_id"
66+
return
67+
fi
68+
69+
# Download each execution
70+
run_index=0
71+
for run_id in $run_ids; do
72+
if [ -n "$run_id" ]; then
73+
echo " Downloading run $((run_index + 1))/$run_count: $run_id"
74+
75+
temporal workflow show \
76+
-w "$workflow_id" \
77+
-r "$run_id" \
78+
--output json | \
79+
gzip -9c > "$run_dir/replay_${workflow_name}_run_${run_id}.json.gz"
80+
81+
((run_index++))
82+
fi
83+
done
84+
85+
# # Save run IDs for reference
86+
# echo "$run_ids" > "$run_dir/${workflow_name}_all_runs.txt"
87+
88+
echo " ✅ Downloaded $run_index executions for $workflow_name"
89+
}
90+
91+
# Create timestamped run directory
2692
now=$(date +%s)
27-
temporal workflow show -w "temporal-sys-worker-deployment:$deploymentName" --output json | gzip -9c > "$(dirname "$0")/testdata/replay_worker_deployment_wf_$now.json.gz"
93+
run_dir="$(dirname "$0")/testdata/run_$now"
94+
mkdir -p "$run_dir"
95+
96+
echo "📁 Creating run directory: $run_dir"
97+
98+
# Download all executions for both workflow types
99+
download_workflow_chain "temporal-sys-worker-deployment:$deploymentName" "worker_deployment_wf" "temporal-sys-worker-deployment-workflow" "$run_dir"
100+
download_workflow_chain "temporal-sys-worker-deployment-version:$deploymentName:$version" "worker_deployment_version_wf" "temporal-sys-worker-deployment-version-workflow" "$run_dir"
101+
102+
echo ""
103+
echo "🎉 Complete! All workflow execution histories downloaded to $run_dir"
104+
echo ""
105+
echo "📊 Summary for this run:"
106+
echo " 📂 Run directory: $run_dir"
107+
108+
# Count files by workflow type
109+
deployment_files=$(find "$run_dir" -name "replay_worker_deployment_wf_*.json.gz" 2>/dev/null | wc -l | tr -d ' ')
110+
version_files=$(find "$run_dir" -name "replay_worker_deployment_version_wf_*.json.gz" 2>/dev/null | wc -l | tr -d ' ')
111+
112+
echo " Worker Deployment workflows: $deployment_files executions"
113+
echo " Worker Version workflows: $version_files executions"
114+
115+
# Save expected counts to a file for the replay tester to read
116+
cat > "$run_dir/expected_counts.txt" << EOF
117+
# Expected workflow counts for replay testing
118+
# Generated by generate_history.sh on $(date)
119+
EXPECTED_DEPLOYMENT_WORKFLOWS=$EXPECTED_DEPLOYMENT_WORKFLOWS
120+
EXPECTED_VERSION_WORKFLOWS=$EXPECTED_VERSION_WORKFLOWS
121+
ACTUAL_DEPLOYMENT_WORKFLOWS=$deployment_files
122+
ACTUAL_VERSION_WORKFLOWS=$version_files
123+
EOF
28124

29-
# Download the history for the worker deployment version workflow...
30-
temporal workflow show -w "temporal-sys-worker-deployment-version:$deploymentName.$version" --output json | gzip -9c > "$(dirname "$0")/testdata/replay_worker_deployment_version_wf_$now.json.gz"
125+
echo " 📝 Expected counts saved to: $run_dir/expected_counts.txt"
Lines changed: 150 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package replaytester
22

33
import (
4+
"bufio"
45
"compress/gzip"
56
"fmt"
67
"os"
78
"path/filepath"
9+
"strconv"
10+
"strings"
811
"testing"
12+
"time"
913

1014
"github.com/stretchr/testify/require"
1115
"go.temporal.io/sdk/client"
@@ -19,33 +23,164 @@ import (
1923
// TestReplays tests workflow logic backwards compatibility from previous versions.
2024
func TestReplays(t *testing.T) {
2125
replayer := worker.NewWorkflowReplayer()
26+
27+
// Create version workflow wrapper to match production registration
28+
versionWorkflow := func(ctx workflow.Context, args *deploymentspb.WorkerDeploymentVersionWorkflowArgs) error {
29+
refreshIntervalGetter := func() any {
30+
return 5 * time.Minute // default value for testing
31+
}
32+
visibilityGracePeriodGetter := func() any {
33+
return 3 * time.Minute // default value for testing
34+
}
35+
return workerdeployment.VersionWorkflow(ctx, refreshIntervalGetter, visibilityGracePeriodGetter, args)
36+
}
37+
38+
// Create deployment workflow wrapper to match production registration
2239
deploymentWorkflow := func(ctx workflow.Context, args *deploymentspb.WorkerDeploymentWorkflowArgs) error {
2340
maxVersionsGetter := func() int {
2441
return 100
2542
}
2643
return workerdeployment.Workflow(ctx, maxVersionsGetter, args)
2744
}
28-
replayer.RegisterWorkflowWithOptions(workerdeployment.VersionWorkflow, workflow.RegisterOptions{Name: workerdeployment.WorkerDeploymentVersionWorkflowType})
45+
46+
replayer.RegisterWorkflowWithOptions(versionWorkflow, workflow.RegisterOptions{Name: workerdeployment.WorkerDeploymentVersionWorkflowType})
2947
replayer.RegisterWorkflowWithOptions(deploymentWorkflow, workflow.RegisterOptions{Name: workerdeployment.WorkerDeploymentWorkflowType})
3048

31-
files, err := filepath.Glob("testdata/replay_*.json.gz")
49+
logger := log.NewSdkLogger(log.NewTestLogger())
50+
51+
// Test all run directories (default behavior for comprehensive replay testing)
52+
testAllRunDirectories(t, replayer, logger)
53+
54+
}
55+
56+
// testAllRunDirectories tests all directories prepended with "run_" since they contain replay test data
57+
func testAllRunDirectories(t *testing.T, replayer worker.WorkflowReplayer, logger *log.SdkLogger) {
58+
runDirs, err := filepath.Glob("testdata/run_*")
59+
require.NoError(t, err)
60+
61+
if len(runDirs) == 0 {
62+
t.Skip("No run directories found. Run generate_history.sh first.")
63+
}
64+
65+
fmt.Printf("Testing %d run directories\n", len(runDirs))
66+
67+
for _, runDir := range runDirs {
68+
t.Run(filepath.Base(runDir), func(t *testing.T) {
69+
fmt.Printf("Testing run: %s\n", runDir)
70+
testRunDirectory(t, replayer, logger, runDir)
71+
})
72+
}
73+
}
74+
75+
// testRunDirectory tests all workflow histories in a specific run directory
76+
func testRunDirectory(t *testing.T, replayer worker.WorkflowReplayer, logger *log.SdkLogger, runDir string) {
77+
files, err := filepath.Glob(filepath.Join(runDir, "replay_*.json.gz"))
3278
require.NoError(t, err)
3379

34-
fmt.Println("Number of files to replay:", len(files))
80+
fmt.Printf(" Found %d workflow histories to replay\n", len(files))
3581

36-
logger := log.NewSdkLogger(log.NewTestLogger())
82+
// Validate that workflow counts match expected values
83+
validateWorkflowCounts(t, files, runDir)
3784

85+
// Validate that histories replay successfully
3886
for _, filename := range files {
39-
logger.Info("Replaying", "file", filename)
40-
f, err := os.Open(filename)
41-
require.NoError(t, err)
42-
r, err := gzip.NewReader(f)
43-
require.NoError(t, err)
44-
history, err := client.HistoryFromJSON(r, client.HistoryJSONOptions{})
45-
require.NoError(t, err)
46-
err = replayer.ReplayWorkflowHistory(logger, history)
47-
require.NoError(t, err)
48-
_ = r.Close()
49-
_ = f.Close()
87+
t.Run(filepath.Base(filename), func(t *testing.T) {
88+
replayWorkflowHistory(t, replayer, logger, filename)
89+
})
90+
}
91+
}
92+
93+
// replayWorkflowHistory replays a single workflow history file and validates it
94+
func replayWorkflowHistory(t *testing.T, replayer worker.WorkflowReplayer, logger *log.SdkLogger, filename string) {
95+
logger.Info("Replaying", "file", filename)
96+
f, err := os.Open(filename)
97+
require.NoError(t, err)
98+
r, err := gzip.NewReader(f)
99+
require.NoError(t, err)
100+
history, err := client.HistoryFromJSON(r, client.HistoryJSONOptions{})
101+
require.NoError(t, err)
102+
err = replayer.ReplayWorkflowHistory(logger, history)
103+
require.NoError(t, err)
104+
_ = r.Close()
105+
_ = f.Close()
106+
}
107+
108+
// readExpectedCounts reads expected workflow counts from the expected_counts.txt file
109+
func readExpectedCounts(runDir string) (deploymentCount, versionCount int, err error) {
110+
expectedCountsFile := filepath.Join(runDir, "expected_counts.txt")
111+
112+
file, err := os.Open(expectedCountsFile)
113+
if err != nil {
114+
// File doesn't exist - this might be an older test data directory
115+
return 0, 0, fmt.Errorf("expected_counts.txt not found in %s: %w", runDir, err)
116+
}
117+
defer file.Close()
118+
119+
scanner := bufio.NewScanner(file)
120+
for scanner.Scan() {
121+
line := strings.TrimSpace(scanner.Text())
122+
if strings.HasPrefix(line, "#") || line == "" {
123+
continue // Skip comments and empty lines
124+
}
125+
126+
if strings.HasPrefix(line, "EXPECTED_DEPLOYMENT_WORKFLOWS=") {
127+
value := strings.TrimPrefix(line, "EXPECTED_DEPLOYMENT_WORKFLOWS=")
128+
deploymentCount, err = strconv.Atoi(value)
129+
if err != nil {
130+
return 0, 0, fmt.Errorf("invalid deployment count: %w", err)
131+
}
132+
} else if strings.HasPrefix(line, "EXPECTED_VERSION_WORKFLOWS=") {
133+
value := strings.TrimPrefix(line, "EXPECTED_VERSION_WORKFLOWS=")
134+
versionCount, err = strconv.Atoi(value)
135+
if err != nil {
136+
return 0, 0, fmt.Errorf("invalid version count: %w", err)
137+
}
138+
}
139+
}
140+
141+
return deploymentCount, versionCount, scanner.Err()
142+
}
143+
144+
// validateWorkflowCounts ensures the number of deployment and version workflows matches expectations
145+
func validateWorkflowCounts(t *testing.T, files []string, runDir string) {
146+
// Read expected counts from file
147+
expectedDeploymentCount, expectedVersionCount, err := readExpectedCounts(runDir)
148+
if err != nil {
149+
// For backwards compatibility, skip validation if expected_counts.txt doesn't exist
150+
fmt.Printf(" ⚠️ Skipping workflow count validation since expected_counts.txt doesn't exist for this test data directory; this is expected for older test data directories: %v\n", err)
151+
return
152+
}
153+
154+
// Count actual workflows
155+
actualDeploymentCount := 0
156+
actualVersionCount := 0
157+
158+
for _, file := range files {
159+
filename := filepath.Base(file)
160+
// Only count .gz files since those are the ones used for replay testing
161+
if !strings.HasSuffix(filename, ".json.gz") {
162+
continue
163+
}
164+
165+
if strings.Contains(filename, "replay_worker_deployment_wf_run_") {
166+
actualDeploymentCount++
167+
} else if strings.Contains(filename, "replay_worker_deployment_version_wf_run_") {
168+
actualVersionCount++
169+
}
50170
}
171+
172+
fmt.Printf(" Workflow counts - Expected: Deployment=%d, Version=%d | Actual: Deployment=%d, Version=%d\n",
173+
expectedDeploymentCount, expectedVersionCount, actualDeploymentCount, actualVersionCount)
174+
175+
require.Equal(t, expectedDeploymentCount, actualDeploymentCount,
176+
"Deployment workflow count mismatch in %s. Expected %d, got %d. "+
177+
"This could mean your changes caused additional workflow executions. "+
178+
"If this is expected, regenerate test data with: EXPECTED_DEPLOYMENT_WORKFLOWS=%d ./generate_history.sh",
179+
runDir, expectedDeploymentCount, actualDeploymentCount, actualDeploymentCount)
180+
181+
require.Equal(t, expectedVersionCount, actualVersionCount,
182+
"Version workflow count mismatch in %s. Expected %d, got %d. "+
183+
"This could mean your changes caused additional workflow executions. "+
184+
"If this is expected, regenerate test data with: EXPECTED_VERSION_WORKFLOWS=%d ./generate_history.sh",
185+
runDir, expectedVersionCount, actualVersionCount, actualVersionCount)
51186
}

service/worker/workerdeployment/replaytester/testdata/replay_deployment-workflow_initial.json.gz renamed to service/worker/workerdeployment/replaytester/testdata/run_1753913370/replay_deployment-workflow_initial.json.gz

File renamed without changes.

service/worker/workerdeployment/replaytester/testdata/replay_deployment-workflow_maprange.json.gz renamed to service/worker/workerdeployment/replaytester/testdata/run_1753913370/replay_deployment-workflow_maprange.json.gz

File renamed without changes.

service/worker/workerdeployment/replaytester/testdata/replay_worker_deployment_wf_1748123053.json.gz renamed to service/worker/workerdeployment/replaytester/testdata/run_1753913370/replay_worker_deployment_wf_1748123053.json.gz

File renamed without changes.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Expected workflow counts for replay testing
2+
# Generated by generate_history.sh on Thu 31 Jul 2025 14:44:51 EDT
3+
EXPECTED_DEPLOYMENT_WORKFLOWS=8
4+
EXPECTED_VERSION_WORKFLOWS=10
5+
ACTUAL_DEPLOYMENT_WORKFLOWS=8
6+
ACTUAL_VERSION_WORKFLOWS=10

0 commit comments

Comments
 (0)