diff --git a/internal/cmd/build/main.go b/internal/cmd/build/main.go index 297593e60..46525de79 100644 --- a/internal/cmd/build/main.go +++ b/internal/cmd/build/main.go @@ -121,7 +121,7 @@ func (b *builder) integrationTest() error { if *devServerFlag { devServer, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{ CachedDownload: testsuite.CachedDownload{ - Version: "v1.6.2-server-1.31.0-151.6", + Version: "v1.7.0", }, ClientOptions: &client.Options{ HostPort: "127.0.0.1:7233", diff --git a/internal/workflow.go b/internal/workflow.go index 691d2841c..bc85279bb 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -93,6 +93,24 @@ const ( // // Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewVersioningBehaviorAutoUpgrade] ContinueAsNewVersioningBehaviorAutoUpgrade = 1 + + // ContinueAsNewVersioningBehaviorUseRampingVersion - Use the Ramping Version of the workflow's task queue at start time, + // regardless of the workflow's Target Version. + // + // After the first workflow task completes, the workflow will use whatever Versioning Behavior it is annotated with. If + // there is no Ramping Version by the time that the first workflow task is dispatched, it will be sent to the Current Version. + // + // It is highly discouraged to use this if the workflow is annotated with AutoUpgrade behavior, because + // this setting ONLY applies to the first task of the workflow. If, after the first task, the workflow + // is AutoUpgrade, it will behave like a normal AutoUpgrade workflow and go to the Target Version, which + // may be the Current Version instead of the Ramping Version. + // + // Note that if the workflow being continued has a Pinned override, that override will be inherited by the + // new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new + // command. Versioning Override always takes precedence until it's removed manually via UpdateWorkflowExecutionOptions. + // + // Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewVersioningBehaviorUseRampingVersion] + ContinueAsNewVersioningBehaviorUseRampingVersion = 2 ) // ContinueAsNewSuggestedReason specifies why ContinueAsNewSuggested is true. Multiple reasons can be true at the same time. @@ -3176,6 +3194,8 @@ func continueAsNewVersioningBehaviorToProto(t ContinueAsNewVersioningBehavior) e return enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_UNSPECIFIED case ContinueAsNewVersioningBehaviorAutoUpgrade: return enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE + case ContinueAsNewVersioningBehaviorUseRampingVersion: + return enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_USE_RAMPING_VERSION default: panic("unknown continue-as-new versioning behavior type") } diff --git a/test/nexus_test.go b/test/nexus_test.go index f167320f4..df1a1d6bc 100644 --- a/test/nexus_test.go +++ b/test/nexus_test.go @@ -1937,11 +1937,7 @@ func TestAsyncOperationCompletionCustomFailureConverter(t *testing.T) { var appErr *temporal.ApplicationError require.ErrorAs(t, err, &appErr) require.Equal(t, "async failure", appErr.Message()) - if os.Getenv("DISABLE_NEW_NEXUS_ERROR_FORMAT_TESTS") != "" { - require.Equal(t, "NexusFailure", appErr.Type()) - } else { - require.Equal(t, "", appErr.Type()) - } + require.Equal(t, "NexusFailure", appErr.Type()) var details nexus.Failure require.NoError(t, appErr.Details(&details)) require.Equal(t, "custom", details.Metadata["type"]) diff --git a/test/worker_deployment_test.go b/test/worker_deployment_test.go index a8879f929..e9653eb1d 100644 --- a/test/worker_deployment_test.go +++ b/test/worker_deployment_test.go @@ -1454,3 +1454,87 @@ func (ts *WorkerDeploymentTestSuite) TestContinueAsNewWithVersionUpgrade() { ts.NoError(wfHandle.Get(ctx, &result)) ts.Equal("v2.0", result) } + +func (ts *WorkerDeploymentTestSuite) TestContinueAsNewWithRampingVersion() { + if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" { + ts.T().Skip("temporal server 1.27+ required") + } + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + deploymentName := "deploy-test-" + uuid.NewString() + v1 := worker.WorkerDeploymentVersion{ + DeploymentName: deploymentName, + BuildID: "1.0", + } + v2 := worker.WorkerDeploymentVersion{ + DeploymentName: deploymentName, + BuildID: "2.0", + } + + // Keep current at 1.0 and ramp 0% to 2.0. A continued run with + // UseRampingVersion should still start on 2.0. + worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + UseVersioning: true, + Version: v1, + }, + }) + worker1.RegisterWorkflowWithOptions(ts.workflows.ContinueAsNewWithRampingVersionV1, workflow.RegisterOptions{ + Name: "ContinueAsNewWithRampingVersion", + VersioningBehavior: workflow.VersioningBehaviorPinned, + }) + ts.NoError(worker1.Start()) + defer worker1.Stop() + + worker2 := worker.New(ts.client, ts.taskQueueName, worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + UseVersioning: true, + Version: v2, + }, + }) + worker2.RegisterWorkflowWithOptions(ts.workflows.ContinueAsNewWithRampingVersionV2, workflow.RegisterOptions{ + Name: "ContinueAsNewWithRampingVersion", + VersioningBehavior: workflow.VersioningBehaviorPinned, + }) + ts.NoError(worker2.Start()) + defer worker2.Stop() + + dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName) + ts.waitForWorkerDeployment(ctx, dHandle) + response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{}) + ts.NoError(err) + + ts.waitForWorkerDeploymentVersion(ctx, dHandle, v1) + response2, err := dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{ + BuildID: v1.BuildID, + ConflictToken: response1.ConflictToken, + }) + ts.NoError(err) + ts.waitForWorkerDeploymentRoutingConfigPropagation(ctx, deploymentName, v1.BuildID, "") + + wfHandle, err := ts.client.ExecuteWorkflow( + ctx, + ts.startWorkflowOptions("test-continueasnew-with-ramping-version"), + "ContinueAsNewWithRampingVersion", + 0, + ) + ts.NoError(err) + ts.waitForWorkflowRunningOnVersion(ctx, wfHandle, v1.BuildID) + + ts.waitForWorkerDeploymentVersion(ctx, dHandle, v2) + // Set ramping percentage to 0%, ramping should still occur due to CaN. + _, err = dHandle.SetRampingVersion(ctx, client.WorkerDeploymentSetRampingVersionOptions{ + BuildID: v2.BuildID, + ConflictToken: response2.ConflictToken, + Percentage: float32(0.0), + }) + ts.NoError(err) + ts.waitForWorkerDeploymentRoutingConfigPropagation(ctx, deploymentName, v1.BuildID, v2.BuildID) + + ts.NoError(ts.client.SignalWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "continue-as-new", nil)) + + var result string + ts.NoError(wfHandle.Get(ctx, &result)) + ts.Equal("v2.0", result) +} diff --git a/test/workflow_test.go b/test/workflow_test.go index 2962bb7d7..c7747152a 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -635,6 +635,33 @@ func (w *Workflows) ContinueAsNewWithVersionUpgradeV2( return "v2.0", nil } +func (w *Workflows) ContinueAsNewWithRampingVersionV1( + ctx workflow.Context, + attempt int, +) (string, error) { + if attempt > 0 { + return "v1.0", nil + } + + workflow.GetSignalChannel(ctx, "continue-as-new").Receive(ctx, nil) + + return "", workflow.NewContinueAsNewErrorWithOptions( + ctx, + workflow.ContinueAsNewErrorOptions{ + InitialVersioningBehavior: workflow.ContinueAsNewVersioningBehaviorUseRampingVersion, + }, + "ContinueAsNewWithRampingVersion", + attempt+1, + ) +} + +func (w *Workflows) ContinueAsNewWithRampingVersionV2( + ctx workflow.Context, + attempt int, +) (string, error) { + return "v2.0", nil +} + func (w *Workflows) ContinueAsNewWithChildWF( ctx workflow.Context, iterations int, diff --git a/workflow/workflow.go b/workflow/workflow.go index 814924194..575ff1594 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -46,6 +46,21 @@ const ( // Note that if the previous workflow had a Pinned override, that override will be inherited by the new workflow // run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new command. ContinueAsNewVersioningBehaviorAutoUpgrade = internal.ContinueAsNewVersioningBehaviorAutoUpgrade + + // ContinueAsNewVersioningBehaviorUseRampingVersion - Use the Ramping Version of the workflow's task queue at start time, + // regardless of the workflow's Target Version. After the first workflow task completes, the workflow will use whatever + // Versioning Behavior it is annotated with. If there is no Ramping Version by the time that the first workflow task is + // dispatched, it will be sent to the Current Version. + // + // It is highly discouraged to use this if the workflow is annotated with AutoUpgrade behavior, because + // this setting ONLY applies to the first task of the workflow. If, after the first task, the workflow + // is AutoUpgrade, it will behave like a normal AutoUpgrade workflow and go to the Target Version, which + // may be the Current Version instead of the Ramping Version. + // + // Note that if the workflow being continued has a Pinned override, that override will be inherited by the + // new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new + // command. Versioning Override always takes precedence until it's removed manually via UpdateWorkflowExecutionOptions. + ContinueAsNewVersioningBehaviorUseRampingVersion = internal.ContinueAsNewVersioningBehaviorUseRampingVersion ) // ContinueAsNewSuggestedReason specifies why ContinueAsNewSuggested is true. Multiple reasons can be true at the same time.