Skip to content

Commit 132f356

Browse files
Extract BuilID from Version (temporalio#1881)
1 parent a80e6d5 commit 132f356

File tree

2 files changed

+115
-0
lines changed

2 files changed

+115
-0
lines changed

Diff for: internal/internal_task_handlers.go

+4
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,10 @@ OrderEvents:
500500
if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
501501
bidStr := event.GetWorkflowTaskCompletedEventAttributes().
502502
GetWorkerVersion().GetBuildId()
503+
version := event.GetWorkflowTaskCompletedEventAttributes().GetWorkerDeploymentVersion()
504+
if splitVersion := strings.SplitN(version, ".", 2); len(splitVersion) == 2 {
505+
bidStr = splitVersion[1]
506+
}
503507
taskEvents.buildID = &bidStr
504508
} else if isPreloadMarkerEvent(event) {
505509
taskEvents.markers = append(taskEvents.markers, event)

Diff for: test/worker_deployment_test.go

+111
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ import (
3434
"github.com/google/uuid"
3535
"github.com/stretchr/testify/require"
3636
"github.com/stretchr/testify/suite"
37+
"go.temporal.io/api/common/v1"
3738
enumspb "go.temporal.io/api/enums/v1"
39+
"go.temporal.io/api/workflowservice/v1"
3840

3941
"go.temporal.io/sdk/client"
4042
"go.temporal.io/sdk/worker"
@@ -133,6 +135,115 @@ func (ts *WorkerDeploymentTestSuite) runWorkflowAndCheckV1(ctx context.Context,
133135
return IsWorkerVersionOne(result)
134136
}
135137

138+
func (ts *WorkerDeploymentTestSuite) TestBuildIDChangesOverWorkflowLifetime() {
139+
if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" {
140+
ts.T().Skip("temporal server 1.27+ required")
141+
}
142+
143+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
144+
defer cancel()
145+
146+
deploymentName := "deploy-test-" + uuid.NewString()
147+
worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{
148+
DeploymentOptions: worker.DeploymentOptions{
149+
UseVersioning: true,
150+
Version: deploymentName + ".1.0",
151+
},
152+
})
153+
worker1.RegisterWorkflowWithOptions(ts.workflows.BuildIDWorkflow, workflow.RegisterOptions{
154+
Name: "BuildIDWorkflow",
155+
VersioningBehavior: workflow.VersioningBehaviorAutoUpgrade,
156+
})
157+
worker1.RegisterActivity(ts.activities)
158+
159+
ts.NoError(worker1.Start())
160+
161+
dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName)
162+
163+
ts.waitForWorkerDeployment(ctx, dHandle)
164+
165+
response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
166+
ts.NoError(err)
167+
168+
ts.waitForWorkerDeploymentVersion(ctx, dHandle, deploymentName+".1.0")
169+
170+
response2, err := dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{
171+
Version: deploymentName + ".1.0",
172+
ConflictToken: response1.ConflictToken,
173+
})
174+
ts.NoError(err)
175+
176+
// start workflow1 with 1.0, BuildIDWorkflow, auto-upgrade
177+
wfHandle, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("evolving-wf-1"), "BuildIDWorkflow")
178+
ts.NoError(err)
179+
180+
ts.waitForWorkflowRunning(ctx, wfHandle)
181+
182+
// Query to see that the build ID is 1.0
183+
res, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil)
184+
var lastBuildID string
185+
ts.NoError(err)
186+
ts.NoError(res.Get(&lastBuildID))
187+
ts.Equal("1.0", lastBuildID)
188+
189+
// Make sure we've got to the activity
190+
ts.Eventually(func() bool {
191+
var didRun bool
192+
res, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "activity-ran", nil)
193+
ts.NoError(err)
194+
ts.NoError(res.Get(&didRun))
195+
return didRun
196+
}, time.Second*10, time.Millisecond*100)
197+
worker1.Stop()
198+
199+
worker2 := worker.New(ts.client, ts.taskQueueName, worker.Options{
200+
DeploymentOptions: worker.DeploymentOptions{
201+
UseVersioning: true,
202+
Version: deploymentName + ".2.0",
203+
},
204+
})
205+
worker2.RegisterWorkflowWithOptions(ts.workflows.BuildIDWorkflow, workflow.RegisterOptions{
206+
Name: "BuildIDWorkflow",
207+
VersioningBehavior: workflow.VersioningBehaviorAutoUpgrade,
208+
})
209+
worker2.RegisterActivity(ts.activities)
210+
211+
ts.NoError(worker2.Start())
212+
defer worker2.Stop()
213+
214+
ts.waitForWorkerDeploymentVersion(ctx, dHandle, deploymentName+".2.0")
215+
216+
_, err = dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{
217+
Version: deploymentName + ".2.0",
218+
ConflictToken: response2.ConflictToken,
219+
})
220+
ts.NoError(err)
221+
222+
_, err = ts.client.WorkflowService().ResetStickyTaskQueue(ctx, &workflowservice.ResetStickyTaskQueueRequest{
223+
Namespace: ts.config.Namespace,
224+
Execution: &common.WorkflowExecution{
225+
WorkflowId: wfHandle.GetID(),
226+
},
227+
})
228+
ts.NoError(err)
229+
230+
// The current task, with the new worker, should still be 1.0 since no new tasks have happened
231+
enval, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil)
232+
ts.NoError(err)
233+
ts.NoError(enval.Get(&lastBuildID))
234+
ts.Equal("1.0", lastBuildID)
235+
236+
// finish the workflow under 1.1
237+
ts.NoError(ts.client.SignalWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "finish", ""))
238+
ts.NoError(wfHandle.Get(ctx, nil))
239+
240+
// Post completion it should have the value of the last task
241+
enval, err = ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil)
242+
ts.NoError(err)
243+
ts.NoError(enval.Get(&lastBuildID))
244+
ts.Equal("2.0", lastBuildID)
245+
}
246+
136247
func (ts *WorkerDeploymentTestSuite) TestPinnedBehaviorThreeWorkers() {
137248
if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" {
138249
ts.T().Skip("temporal server 1.27+ required")

0 commit comments

Comments
 (0)