Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/cmd/build/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (b *builder) integrationTest() error {
if *devServerFlag {
devServer, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{
CachedDownload: testsuite.CachedDownload{
Version: "v1.6.2-server-1.31.0-151.6",
Version: "v1.7.0",
},
ClientOptions: &client.Options{
HostPort: "127.0.0.1:7233",
Expand Down
20 changes: 20 additions & 0 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,24 @@ const (
//
// Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewVersioningBehaviorAutoUpgrade]
ContinueAsNewVersioningBehaviorAutoUpgrade = 1

// ContinueAsNewVersioningBehaviorUseRampingVersion - Use the Ramping Version of the workflow's task queue at start time,
// regardless of the workflow's Target Version.
//
// After the first workflow task completes, the workflow will use whatever Versioning Behavior it is annotated with. If
// there is no Ramping Version by the time that the first workflow task is dispatched, it will be sent to the Current Version.
//
// It is highly discouraged to use this if the workflow is annotated with AutoUpgrade behavior, because
// this setting ONLY applies to the first task of the workflow. If, after the first task, the workflow
// is AutoUpgrade, it will behave like a normal AutoUpgrade workflow and go to the Target Version, which
// may be the Current Version instead of the Ramping Version.
//
// Note that if the workflow being continued has a Pinned override, that override will be inherited by the
// new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new
// command. Versioning Override always takes precedence until it's removed manually via UpdateWorkflowExecutionOptions.
//
// Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewVersioningBehaviorUseRampingVersion]
ContinueAsNewVersioningBehaviorUseRampingVersion = 2
)

// ContinueAsNewSuggestedReason specifies why ContinueAsNewSuggested is true. Multiple reasons can be true at the same time.
Expand Down Expand Up @@ -3176,6 +3194,8 @@ func continueAsNewVersioningBehaviorToProto(t ContinueAsNewVersioningBehavior) e
return enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_UNSPECIFIED
case ContinueAsNewVersioningBehaviorAutoUpgrade:
return enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE
case ContinueAsNewVersioningBehaviorUseRampingVersion:
return enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_USE_RAMPING_VERSION
default:
panic("unknown continue-as-new versioning behavior type")
}
Expand Down
84 changes: 84 additions & 0 deletions test/worker_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1454,3 +1454,87 @@ func (ts *WorkerDeploymentTestSuite) TestContinueAsNewWithVersionUpgrade() {
ts.NoError(wfHandle.Get(ctx, &result))
ts.Equal("v2.0", result)
}

func (ts *WorkerDeploymentTestSuite) TestContinueAsNewWithRampingVersion() {
if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" {
ts.T().Skip("temporal server 1.27+ required")
}
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()

deploymentName := "deploy-test-" + uuid.NewString()
v1 := worker.WorkerDeploymentVersion{
DeploymentName: deploymentName,
BuildID: "1.0",
}
v2 := worker.WorkerDeploymentVersion{
DeploymentName: deploymentName,
BuildID: "2.0",
}

// Keep current at 1.0 and ramp 0% to 2.0. A continued run with
// UseRampingVersion should still start on 2.0.
worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{
DeploymentOptions: worker.DeploymentOptions{
UseVersioning: true,
Version: v1,
},
})
worker1.RegisterWorkflowWithOptions(ts.workflows.ContinueAsNewWithRampingVersionV1, workflow.RegisterOptions{
Name: "ContinueAsNewWithRampingVersion",
VersioningBehavior: workflow.VersioningBehaviorPinned,
})
ts.NoError(worker1.Start())
defer worker1.Stop()

worker2 := worker.New(ts.client, ts.taskQueueName, worker.Options{
DeploymentOptions: worker.DeploymentOptions{
UseVersioning: true,
Version: v2,
},
})
worker2.RegisterWorkflowWithOptions(ts.workflows.ContinueAsNewWithRampingVersionV2, workflow.RegisterOptions{
Name: "ContinueAsNewWithRampingVersion",
VersioningBehavior: workflow.VersioningBehaviorPinned,
})
ts.NoError(worker2.Start())
defer worker2.Stop()

dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName)
ts.waitForWorkerDeployment(ctx, dHandle)
response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
ts.NoError(err)

ts.waitForWorkerDeploymentVersion(ctx, dHandle, v1)
response2, err := dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{
BuildID: v1.BuildID,
ConflictToken: response1.ConflictToken,
})
ts.NoError(err)
ts.waitForWorkerDeploymentRoutingConfigPropagation(ctx, deploymentName, v1.BuildID, "")

wfHandle, err := ts.client.ExecuteWorkflow(
ctx,
ts.startWorkflowOptions("test-continueasnew-with-ramping-version"),
"ContinueAsNewWithRampingVersion",
0,
)
ts.NoError(err)
ts.waitForWorkflowRunningOnVersion(ctx, wfHandle, v1.BuildID)

ts.waitForWorkerDeploymentVersion(ctx, dHandle, v2)
// Set ramping percentage to 0%, ramping should still occur due to CaN.
_, err = dHandle.SetRampingVersion(ctx, client.WorkerDeploymentSetRampingVersionOptions{
BuildID: v2.BuildID,
ConflictToken: response2.ConflictToken,
Percentage: float32(0.0),
})
ts.NoError(err)
ts.waitForWorkerDeploymentRoutingConfigPropagation(ctx, deploymentName, v1.BuildID, v2.BuildID)

ts.NoError(ts.client.SignalWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "continue-as-new", nil))

var result string
ts.NoError(wfHandle.Get(ctx, &result))
ts.Equal("v2.0", result)
}
27 changes: 27 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,33 @@ func (w *Workflows) ContinueAsNewWithVersionUpgradeV2(
return "v2.0", nil
}

func (w *Workflows) ContinueAsNewWithRampingVersionV1(
ctx workflow.Context,
attempt int,
) (string, error) {
if attempt > 0 {
return "v1.0", nil
}

workflow.GetSignalChannel(ctx, "continue-as-new").Receive(ctx, nil)

return "", workflow.NewContinueAsNewErrorWithOptions(
ctx,
workflow.ContinueAsNewErrorOptions{
InitialVersioningBehavior: workflow.ContinueAsNewVersioningBehaviorUseRampingVersion,
},
"ContinueAsNewWithRampingVersion",
attempt+1,
)
}

func (w *Workflows) ContinueAsNewWithRampingVersionV2(
ctx workflow.Context,
attempt int,
) (string, error) {
return "v2.0", nil
}

func (w *Workflows) ContinueAsNewWithChildWF(
ctx workflow.Context,
iterations int,
Expand Down
15 changes: 15 additions & 0 deletions workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,21 @@ const (
// Note that if the previous workflow had a Pinned override, that override will be inherited by the new workflow
// run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new command.
ContinueAsNewVersioningBehaviorAutoUpgrade = internal.ContinueAsNewVersioningBehaviorAutoUpgrade

// ContinueAsNewVersioningBehaviorUseRampingVersion - Use the Ramping Version of the workflow's task queue at start time,
// regardless of the workflow's Target Version. After the first workflow task completes, the workflow will use whatever
// Versioning Behavior it is annotated with. If there is no Ramping Version by the time that the first workflow task is
// dispatched, it will be sent to the Current Version.
//
// It is highly discouraged to use this if the workflow is annotated with AutoUpgrade behavior, because
// this setting ONLY applies to the first task of the workflow. If, after the first task, the workflow
// is AutoUpgrade, it will behave like a normal AutoUpgrade workflow and go to the Target Version, which
// may be the Current Version instead of the Ramping Version.
//
// Note that if the workflow being continued has a Pinned override, that override will be inherited by the
// new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new
// command. Versioning Override always takes precedence until it's removed manually via UpdateWorkflowExecutionOptions.
ContinueAsNewVersioningBehaviorUseRampingVersion = internal.ContinueAsNewVersioningBehaviorUseRampingVersion
)

// ContinueAsNewSuggestedReason specifies why ContinueAsNewSuggested is true. Multiple reasons can be true at the same time.
Expand Down
Loading