Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/cmd/build/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (b *builder) integrationTest() error {
if *devServerFlag {
devServer, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{
CachedDownload: testsuite.CachedDownload{
Version: "v1.6.2-server-1.31.0-151.6",
Version: "v1.7.0",
},
ClientOptions: &client.Options{
HostPort: "127.0.0.1:7233",
Expand Down
20 changes: 20 additions & 0 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,24 @@ const (
//
// Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewVersioningBehaviorAutoUpgrade]
ContinueAsNewVersioningBehaviorAutoUpgrade = 1

// ContinueAsNewVersioningBehaviorUseRampingVersion - Use the Ramping Version of the workflow's task queue at start time,
// regardless of the workflow's Target Version.
//
// After the first workflow task completes, the workflow will use whatever Versioning Behavior it is annotated with. If
// there is no Ramping Version by the time that the first workflow task is dispatched, it will be sent to the Current Version.
//
// It is highly discouraged to use this if the workflow is annotated with AutoUpgrade behavior, because
// this setting ONLY applies to the first task of the workflow. If, after the first task, the workflow
// is AutoUpgrade, it will behave like a normal AutoUpgrade workflow and go to the Target Version, which
// may be the Current Version instead of the Ramping Version.
//
// Note that if the workflow being continued has a Pinned override, that override will be inherited by the
// new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new
// command. Versioning Override always takes precedence until it's removed manually via UpdateWorkflowExecutionOptions.
//
// Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewVersioningBehaviorUseRampingVersion]
ContinueAsNewVersioningBehaviorUseRampingVersion = 2
)

// ContinueAsNewSuggestedReason specifies why ContinueAsNewSuggested is true. Multiple reasons can be true at the same time.
Expand Down Expand Up @@ -3176,6 +3194,8 @@ func continueAsNewVersioningBehaviorToProto(t ContinueAsNewVersioningBehavior) e
return enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_UNSPECIFIED
case ContinueAsNewVersioningBehaviorAutoUpgrade:
return enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE
case ContinueAsNewVersioningBehaviorUseRampingVersion:
return enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_USE_RAMPING_VERSION
default:
panic("unknown continue-as-new versioning behavior type")
}
Expand Down
6 changes: 1 addition & 5 deletions test/nexus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1937,11 +1937,7 @@ func TestAsyncOperationCompletionCustomFailureConverter(t *testing.T) {
var appErr *temporal.ApplicationError
require.ErrorAs(t, err, &appErr)
require.Equal(t, "async failure", appErr.Message())
if os.Getenv("DISABLE_NEW_NEXUS_ERROR_FORMAT_TESTS") != "" {
require.Equal(t, "NexusFailure", appErr.Type())
} else {
require.Equal(t, "", appErr.Type())
}
require.Equal(t, "NexusFailure", appErr.Type())
var details nexus.Failure
require.NoError(t, appErr.Details(&details))
require.Equal(t, "custom", details.Metadata["type"])
Expand Down
84 changes: 84 additions & 0 deletions test/worker_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1454,3 +1454,87 @@ func (ts *WorkerDeploymentTestSuite) TestContinueAsNewWithVersionUpgrade() {
ts.NoError(wfHandle.Get(ctx, &result))
ts.Equal("v2.0", result)
}

func (ts *WorkerDeploymentTestSuite) TestContinueAsNewWithRampingVersion() {
if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" {
ts.T().Skip("temporal server 1.27+ required")
}
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()

deploymentName := "deploy-test-" + uuid.NewString()
v1 := worker.WorkerDeploymentVersion{
DeploymentName: deploymentName,
BuildID: "1.0",
}
v2 := worker.WorkerDeploymentVersion{
DeploymentName: deploymentName,
BuildID: "2.0",
}

// Keep current at 1.0 and ramp 0% to 2.0. A continued run with
// UseRampingVersion should still start on 2.0.
worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{
DeploymentOptions: worker.DeploymentOptions{
UseVersioning: true,
Version: v1,
},
})
worker1.RegisterWorkflowWithOptions(ts.workflows.ContinueAsNewWithRampingVersionV1, workflow.RegisterOptions{
Name: "ContinueAsNewWithRampingVersion",
VersioningBehavior: workflow.VersioningBehaviorPinned,
})
ts.NoError(worker1.Start())
defer worker1.Stop()

worker2 := worker.New(ts.client, ts.taskQueueName, worker.Options{
DeploymentOptions: worker.DeploymentOptions{
UseVersioning: true,
Version: v2,
},
})
worker2.RegisterWorkflowWithOptions(ts.workflows.ContinueAsNewWithRampingVersionV2, workflow.RegisterOptions{
Name: "ContinueAsNewWithRampingVersion",
VersioningBehavior: workflow.VersioningBehaviorPinned,
})
ts.NoError(worker2.Start())
defer worker2.Stop()

dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName)
ts.waitForWorkerDeployment(ctx, dHandle)
response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
ts.NoError(err)

ts.waitForWorkerDeploymentVersion(ctx, dHandle, v1)
response2, err := dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{
BuildID: v1.BuildID,
ConflictToken: response1.ConflictToken,
})
ts.NoError(err)
ts.waitForWorkerDeploymentRoutingConfigPropagation(ctx, deploymentName, v1.BuildID, "")

wfHandle, err := ts.client.ExecuteWorkflow(
ctx,
ts.startWorkflowOptions("test-continueasnew-with-ramping-version"),
"ContinueAsNewWithRampingVersion",
0,
)
ts.NoError(err)
ts.waitForWorkflowRunningOnVersion(ctx, wfHandle, v1.BuildID)

ts.waitForWorkerDeploymentVersion(ctx, dHandle, v2)
// Set ramping percentage to 0%, ramping should still occur due to CaN.
_, err = dHandle.SetRampingVersion(ctx, client.WorkerDeploymentSetRampingVersionOptions{
BuildID: v2.BuildID,
ConflictToken: response2.ConflictToken,
Percentage: float32(0.0),
})
ts.NoError(err)
ts.waitForWorkerDeploymentRoutingConfigPropagation(ctx, deploymentName, v1.BuildID, v2.BuildID)

ts.NoError(ts.client.SignalWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "continue-as-new", nil))

var result string
ts.NoError(wfHandle.Get(ctx, &result))
ts.Equal("v2.0", result)
}
27 changes: 27 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,33 @@ func (w *Workflows) ContinueAsNewWithVersionUpgradeV2(
return "v2.0", nil
}

func (w *Workflows) ContinueAsNewWithRampingVersionV1(
ctx workflow.Context,
attempt int,
) (string, error) {
if attempt > 0 {
return "v1.0", nil
}

workflow.GetSignalChannel(ctx, "continue-as-new").Receive(ctx, nil)

return "", workflow.NewContinueAsNewErrorWithOptions(
ctx,
workflow.ContinueAsNewErrorOptions{
InitialVersioningBehavior: workflow.ContinueAsNewVersioningBehaviorUseRampingVersion,
},
"ContinueAsNewWithRampingVersion",
attempt+1,
)
}

func (w *Workflows) ContinueAsNewWithRampingVersionV2(
ctx workflow.Context,
attempt int,
) (string, error) {
return "v2.0", nil
}

func (w *Workflows) ContinueAsNewWithChildWF(
ctx workflow.Context,
iterations int,
Expand Down
15 changes: 15 additions & 0 deletions workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,21 @@ const (
// Note that if the previous workflow had a Pinned override, that override will be inherited by the new workflow
// run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new command.
ContinueAsNewVersioningBehaviorAutoUpgrade = internal.ContinueAsNewVersioningBehaviorAutoUpgrade

// ContinueAsNewVersioningBehaviorUseRampingVersion - Use the Ramping Version of the workflow's task queue at start time,
// regardless of the workflow's Target Version. After the first workflow task completes, the workflow will use whatever
// Versioning Behavior it is annotated with. If there is no Ramping Version by the time that the first workflow task is
// dispatched, it will be sent to the Current Version.
//
// It is highly discouraged to use this if the workflow is annotated with AutoUpgrade behavior, because
// this setting ONLY applies to the first task of the workflow. If, after the first task, the workflow
// is AutoUpgrade, it will behave like a normal AutoUpgrade workflow and go to the Target Version, which
// may be the Current Version instead of the Ramping Version.
//
// Note that if the workflow being continued has a Pinned override, that override will be inherited by the
// new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new
// command. Versioning Override always takes precedence until it's removed manually via UpdateWorkflowExecutionOptions.
ContinueAsNewVersioningBehaviorUseRampingVersion = internal.ContinueAsNewVersioningBehaviorUseRampingVersion
)

// ContinueAsNewSuggestedReason specifies why ContinueAsNewSuggested is true. Multiple reasons can be true at the same time.
Expand Down