Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/cmd/build/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (b *builder) integrationTest() error {
if *devServerFlag {
devServer, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{
CachedDownload: testsuite.CachedDownload{
Version: "v1.6.2-server-1.31.0-151.6",
Version: "v1.7.0",
},
ClientOptions: &client.Options{
HostPort: "127.0.0.1:7233",
Expand Down
19 changes: 19 additions & 0 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,23 @@ const (
//
// Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewVersioningBehaviorAutoUpgrade]
ContinueAsNewVersioningBehaviorAutoUpgrade = 1

// ContinueAsNewVersioningBehaviorUseRampingVersion - Use the Ramping Version of the workflow's task queue at start time,
// regardless of the workflow's Target Version (according to f(workflow_id, ramp_percentage)). After the first workflow
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of f(workflow_id, ramp_percentage), should we say something along the lines of:

  // The Target Version is chosen with the default formula:
  //   if calcRampThreshold(workflow_id) <= ramp_percentage:
  //     target=ramping_version
  //   else:
  //     target=current_version

I feel like f(workflow_id, ramp_percentage) will invite lots of questions.
I am ok being transparent about this formula -- we won't change it by default after GA because that could cause workflows to "roll back" accidentally, even with same ramp percentage

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calcRampThreshold is the name of the server function we actually use, so people (or claude) can look it up if they're interested

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to remove it entirely, I don't think we need to leak server versioning internals here (I also don't anticipate people finding this particularly necessary)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We define Ramping, Current, and Target version here fwiw https://docs.temporal.io/worker-versioning#versioning-definitions fwiw, so people can look those up there

// task completes, the workflow will use whatever Versioning Behavior it is annotated with. If there is no Ramping
// Version by the time that the first workflow task is dispatched, it will be sent to the Current Version.
//
// It is highly discouraged to use this if the workflow is annotated with AutoUpgrade behavior, because
// this setting ONLY applies to the first task of the workflow. If, after the first task, the workflow
// is AutoUpgrade, it will behave like a normal AutoUpgrade workflow and go to the Target Version, which
// may be the Current Version instead of the Ramping Version.
//
// Note that if the workflow being continued has a Pinned override, that override will be inherited by the
// new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new
// command. Versioning Override always takes precedence until it's removed manually via UpdateWorkflowExecutionOptions.
//
// Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewVersioningBehaviorUseRampingVersion]
ContinueAsNewVersioningBehaviorUseRampingVersion = 2
)

// ContinueAsNewSuggestedReason specifies why ContinueAsNewSuggested is true. Multiple reasons can be true at the same time.
Expand Down Expand Up @@ -3176,6 +3193,8 @@ func continueAsNewVersioningBehaviorToProto(t ContinueAsNewVersioningBehavior) e
return enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_UNSPECIFIED
case ContinueAsNewVersioningBehaviorAutoUpgrade:
return enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE
case ContinueAsNewVersioningBehaviorUseRampingVersion:
return enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_USE_RAMPING_VERSION
default:
panic("unknown continue-as-new versioning behavior type")
}
Expand Down
84 changes: 84 additions & 0 deletions test/worker_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1454,3 +1454,87 @@ func (ts *WorkerDeploymentTestSuite) TestContinueAsNewWithVersionUpgrade() {
ts.NoError(wfHandle.Get(ctx, &result))
ts.Equal("v2.0", result)
}

func (ts *WorkerDeploymentTestSuite) TestContinueAsNewWithRampingVersion() {
if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" {
ts.T().Skip("temporal server 1.27+ required")
}
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()

deploymentName := "deploy-test-" + uuid.NewString()
v1 := worker.WorkerDeploymentVersion{
DeploymentName: deploymentName,
BuildID: "1.0",
}
v2 := worker.WorkerDeploymentVersion{
DeploymentName: deploymentName,
BuildID: "2.0",
}

// Keep current at 1.0 and ramp 0% to 2.0. A continued run with
// UseRampingVersion should still start on 2.0.
worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{
DeploymentOptions: worker.DeploymentOptions{
UseVersioning: true,
Version: v1,
},
})
worker1.RegisterWorkflowWithOptions(ts.workflows.ContinueAsNewWithRampingVersionV1, workflow.RegisterOptions{
Name: "ContinueAsNewWithRampingVersion",
VersioningBehavior: workflow.VersioningBehaviorPinned,
})
ts.NoError(worker1.Start())
defer worker1.Stop()

worker2 := worker.New(ts.client, ts.taskQueueName, worker.Options{
DeploymentOptions: worker.DeploymentOptions{
UseVersioning: true,
Version: v2,
},
})
worker2.RegisterWorkflowWithOptions(ts.workflows.ContinueAsNewWithRampingVersionV2, workflow.RegisterOptions{
Name: "ContinueAsNewWithRampingVersion",
VersioningBehavior: workflow.VersioningBehaviorPinned,
})
ts.NoError(worker2.Start())
defer worker2.Stop()

dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName)
ts.waitForWorkerDeployment(ctx, dHandle)
response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
ts.NoError(err)

ts.waitForWorkerDeploymentVersion(ctx, dHandle, v1)
response2, err := dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{
BuildID: v1.BuildID,
ConflictToken: response1.ConflictToken,
})
ts.NoError(err)
ts.waitForWorkerDeploymentRoutingConfigPropagation(ctx, deploymentName, v1.BuildID, "")

wfHandle, err := ts.client.ExecuteWorkflow(
ctx,
ts.startWorkflowOptions("test-continueasnew-with-ramping-version"),
"ContinueAsNewWithRampingVersion",
0,
)
ts.NoError(err)
ts.waitForWorkflowRunningOnVersion(ctx, wfHandle, v1.BuildID)

ts.waitForWorkerDeploymentVersion(ctx, dHandle, v2)
// Set ramping percentage to 0%, ramping should still occur due to CaN.
_, err = dHandle.SetRampingVersion(ctx, client.WorkerDeploymentSetRampingVersionOptions{
BuildID: v2.BuildID,
ConflictToken: response2.ConflictToken,
Percentage: float32(0.0),
})
ts.NoError(err)
ts.waitForWorkerDeploymentRoutingConfigPropagation(ctx, deploymentName, v1.BuildID, v2.BuildID)

ts.NoError(ts.client.SignalWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "continue-as-new", nil))

var result string
ts.NoError(wfHandle.Get(ctx, &result))
ts.Equal("v2.0", result)
}
27 changes: 27 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,33 @@ func (w *Workflows) ContinueAsNewWithVersionUpgradeV2(
return "v2.0", nil
}

func (w *Workflows) ContinueAsNewWithRampingVersionV1(
ctx workflow.Context,
attempt int,
) (string, error) {
if attempt > 0 {
return "v1.0", nil
}

workflow.GetSignalChannel(ctx, "continue-as-new").Receive(ctx, nil)

return "", workflow.NewContinueAsNewErrorWithOptions(
ctx,
workflow.ContinueAsNewErrorOptions{
InitialVersioningBehavior: workflow.ContinueAsNewVersioningBehaviorUseRampingVersion,
},
"ContinueAsNewWithRampingVersion",
attempt+1,
)
}

func (w *Workflows) ContinueAsNewWithRampingVersionV2(
ctx workflow.Context,
attempt int,
) (string, error) {
return "v2.0", nil
}

func (w *Workflows) ContinueAsNewWithChildWF(
ctx workflow.Context,
iterations int,
Expand Down
15 changes: 15 additions & 0 deletions workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,21 @@ const (
// Note that if the previous workflow had a Pinned override, that override will be inherited by the new workflow
// run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new command.
ContinueAsNewVersioningBehaviorAutoUpgrade = internal.ContinueAsNewVersioningBehaviorAutoUpgrade

// ContinueAsNewVersioningBehaviorUseRampingVersion - Use the Ramping Version of the workflow's task queue at start time,
// regardless of the workflow's Target Version (according to f(workflow_id, ramp_percentage)). After the first workflow
// task completes, the workflow will use whatever Versioning Behavior it is annotated with. If there is no Ramping
// Version by the time that the first workflow task is dispatched, it will be sent to the Current Version.
//
// It is highly discouraged to use this if the workflow is annotated with AutoUpgrade behavior, because
// this setting ONLY applies to the first task of the workflow. If, after the first task, the workflow
// is AutoUpgrade, it will behave like a normal AutoUpgrade workflow and go to the Target Version, which
// may be the Current Version instead of the Ramping Version.
//
// Note that if the workflow being continued has a Pinned override, that override will be inherited by the
// new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new
// command. Versioning Override always takes precedence until it's removed manually via UpdateWorkflowExecutionOptions.
ContinueAsNewVersioningBehaviorUseRampingVersion = internal.ContinueAsNewVersioningBehaviorUseRampingVersion
)

// ContinueAsNewSuggestedReason specifies why ContinueAsNewSuggested is true. Multiple reasons can be true at the same time.
Expand Down
Loading