Skip to content

Commit ebf4b2b

Browse files
authored
Add CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_USE_RAMPING_VERSION (#2319)
* bump api go to 1.62.11 * bump api go to 1.62.11 for test package * surface UseRampingVersion as a CaN versioning option. Added test. Bumped CLI dep to support use ramping version on CaN * fix rebase conflicts * fix for go check * update doc strings (remove function references) * correct nexus error type assertion
1 parent 3536724 commit ebf4b2b

6 files changed

Lines changed: 148 additions & 6 deletions

File tree

internal/cmd/build/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (b *builder) integrationTest() error {
121121
if *devServerFlag {
122122
devServer, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{
123123
CachedDownload: testsuite.CachedDownload{
124-
Version: "v1.6.2-server-1.31.0-151.6",
124+
Version: "v1.7.0",
125125
},
126126
ClientOptions: &client.Options{
127127
HostPort: "127.0.0.1:7233",

internal/workflow.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,24 @@ const (
9393
//
9494
// Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewVersioningBehaviorAutoUpgrade]
9595
ContinueAsNewVersioningBehaviorAutoUpgrade = 1
96+
97+
// ContinueAsNewVersioningBehaviorUseRampingVersion - Use the Ramping Version of the workflow's task queue at start time,
98+
// regardless of the workflow's Target Version.
99+
//
100+
// After the first workflow task completes, the workflow will use whatever Versioning Behavior it is annotated with. If
101+
// there is no Ramping Version by the time that the first workflow task is dispatched, it will be sent to the Current Version.
102+
//
103+
// It is highly discouraged to use this if the workflow is annotated with AutoUpgrade behavior, because
104+
// this setting ONLY applies to the first task of the workflow. If, after the first task, the workflow
105+
// is AutoUpgrade, it will behave like a normal AutoUpgrade workflow and go to the Target Version, which
106+
// may be the Current Version instead of the Ramping Version.
107+
//
108+
// Note that if the workflow being continued has a Pinned override, that override will be inherited by the
109+
// new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new
110+
// command. Versioning Override always takes precedence until it's removed manually via UpdateWorkflowExecutionOptions.
111+
//
112+
// Exposed as: [go.temporal.io/sdk/workflow.ContinueAsNewVersioningBehaviorUseRampingVersion]
113+
ContinueAsNewVersioningBehaviorUseRampingVersion = 2
96114
)
97115

98116
// ContinueAsNewSuggestedReason specifies why ContinueAsNewSuggested is true. Multiple reasons can be true at the same time.
@@ -3176,6 +3194,8 @@ func continueAsNewVersioningBehaviorToProto(t ContinueAsNewVersioningBehavior) e
31763194
return enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_UNSPECIFIED
31773195
case ContinueAsNewVersioningBehaviorAutoUpgrade:
31783196
return enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_AUTO_UPGRADE
3197+
case ContinueAsNewVersioningBehaviorUseRampingVersion:
3198+
return enumspb.CONTINUE_AS_NEW_VERSIONING_BEHAVIOR_USE_RAMPING_VERSION
31793199
default:
31803200
panic("unknown continue-as-new versioning behavior type")
31813201
}

test/nexus_test.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1937,11 +1937,7 @@ func TestAsyncOperationCompletionCustomFailureConverter(t *testing.T) {
19371937
var appErr *temporal.ApplicationError
19381938
require.ErrorAs(t, err, &appErr)
19391939
require.Equal(t, "async failure", appErr.Message())
1940-
if os.Getenv("DISABLE_NEW_NEXUS_ERROR_FORMAT_TESTS") != "" {
1941-
require.Equal(t, "NexusFailure", appErr.Type())
1942-
} else {
1943-
require.Equal(t, "", appErr.Type())
1944-
}
1940+
require.Equal(t, "NexusFailure", appErr.Type())
19451941
var details nexus.Failure
19461942
require.NoError(t, appErr.Details(&details))
19471943
require.Equal(t, "custom", details.Metadata["type"])

test/worker_deployment_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1454,3 +1454,87 @@ func (ts *WorkerDeploymentTestSuite) TestContinueAsNewWithVersionUpgrade() {
14541454
ts.NoError(wfHandle.Get(ctx, &result))
14551455
ts.Equal("v2.0", result)
14561456
}
1457+
1458+
func (ts *WorkerDeploymentTestSuite) TestContinueAsNewWithRampingVersion() {
1459+
if os.Getenv("DISABLE_SERVER_1_27_TESTS") != "" {
1460+
ts.T().Skip("temporal server 1.27+ required")
1461+
}
1462+
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
1463+
defer cancel()
1464+
1465+
deploymentName := "deploy-test-" + uuid.NewString()
1466+
v1 := worker.WorkerDeploymentVersion{
1467+
DeploymentName: deploymentName,
1468+
BuildID: "1.0",
1469+
}
1470+
v2 := worker.WorkerDeploymentVersion{
1471+
DeploymentName: deploymentName,
1472+
BuildID: "2.0",
1473+
}
1474+
1475+
// Keep current at 1.0 and ramp 0% to 2.0. A continued run with
1476+
// UseRampingVersion should still start on 2.0.
1477+
worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{
1478+
DeploymentOptions: worker.DeploymentOptions{
1479+
UseVersioning: true,
1480+
Version: v1,
1481+
},
1482+
})
1483+
worker1.RegisterWorkflowWithOptions(ts.workflows.ContinueAsNewWithRampingVersionV1, workflow.RegisterOptions{
1484+
Name: "ContinueAsNewWithRampingVersion",
1485+
VersioningBehavior: workflow.VersioningBehaviorPinned,
1486+
})
1487+
ts.NoError(worker1.Start())
1488+
defer worker1.Stop()
1489+
1490+
worker2 := worker.New(ts.client, ts.taskQueueName, worker.Options{
1491+
DeploymentOptions: worker.DeploymentOptions{
1492+
UseVersioning: true,
1493+
Version: v2,
1494+
},
1495+
})
1496+
worker2.RegisterWorkflowWithOptions(ts.workflows.ContinueAsNewWithRampingVersionV2, workflow.RegisterOptions{
1497+
Name: "ContinueAsNewWithRampingVersion",
1498+
VersioningBehavior: workflow.VersioningBehaviorPinned,
1499+
})
1500+
ts.NoError(worker2.Start())
1501+
defer worker2.Stop()
1502+
1503+
dHandle := ts.client.WorkerDeploymentClient().GetHandle(deploymentName)
1504+
ts.waitForWorkerDeployment(ctx, dHandle)
1505+
response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
1506+
ts.NoError(err)
1507+
1508+
ts.waitForWorkerDeploymentVersion(ctx, dHandle, v1)
1509+
response2, err := dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{
1510+
BuildID: v1.BuildID,
1511+
ConflictToken: response1.ConflictToken,
1512+
})
1513+
ts.NoError(err)
1514+
ts.waitForWorkerDeploymentRoutingConfigPropagation(ctx, deploymentName, v1.BuildID, "")
1515+
1516+
wfHandle, err := ts.client.ExecuteWorkflow(
1517+
ctx,
1518+
ts.startWorkflowOptions("test-continueasnew-with-ramping-version"),
1519+
"ContinueAsNewWithRampingVersion",
1520+
0,
1521+
)
1522+
ts.NoError(err)
1523+
ts.waitForWorkflowRunningOnVersion(ctx, wfHandle, v1.BuildID)
1524+
1525+
ts.waitForWorkerDeploymentVersion(ctx, dHandle, v2)
1526+
// Set ramping percentage to 0%, ramping should still occur due to CaN.
1527+
_, err = dHandle.SetRampingVersion(ctx, client.WorkerDeploymentSetRampingVersionOptions{
1528+
BuildID: v2.BuildID,
1529+
ConflictToken: response2.ConflictToken,
1530+
Percentage: float32(0.0),
1531+
})
1532+
ts.NoError(err)
1533+
ts.waitForWorkerDeploymentRoutingConfigPropagation(ctx, deploymentName, v1.BuildID, v2.BuildID)
1534+
1535+
ts.NoError(ts.client.SignalWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "continue-as-new", nil))
1536+
1537+
var result string
1538+
ts.NoError(wfHandle.Get(ctx, &result))
1539+
ts.Equal("v2.0", result)
1540+
}

test/workflow_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,33 @@ func (w *Workflows) ContinueAsNewWithVersionUpgradeV2(
635635
return "v2.0", nil
636636
}
637637

638+
func (w *Workflows) ContinueAsNewWithRampingVersionV1(
639+
ctx workflow.Context,
640+
attempt int,
641+
) (string, error) {
642+
if attempt > 0 {
643+
return "v1.0", nil
644+
}
645+
646+
workflow.GetSignalChannel(ctx, "continue-as-new").Receive(ctx, nil)
647+
648+
return "", workflow.NewContinueAsNewErrorWithOptions(
649+
ctx,
650+
workflow.ContinueAsNewErrorOptions{
651+
InitialVersioningBehavior: workflow.ContinueAsNewVersioningBehaviorUseRampingVersion,
652+
},
653+
"ContinueAsNewWithRampingVersion",
654+
attempt+1,
655+
)
656+
}
657+
658+
func (w *Workflows) ContinueAsNewWithRampingVersionV2(
659+
ctx workflow.Context,
660+
attempt int,
661+
) (string, error) {
662+
return "v2.0", nil
663+
}
664+
638665
func (w *Workflows) ContinueAsNewWithChildWF(
639666
ctx workflow.Context,
640667
iterations int,

workflow/workflow.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,21 @@ const (
4646
// Note that if the previous workflow had a Pinned override, that override will be inherited by the new workflow
4747
// run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new command.
4848
ContinueAsNewVersioningBehaviorAutoUpgrade = internal.ContinueAsNewVersioningBehaviorAutoUpgrade
49+
50+
// ContinueAsNewVersioningBehaviorUseRampingVersion - Use the Ramping Version of the workflow's task queue at start time,
51+
// regardless of the workflow's Target Version. After the first workflow task completes, the workflow will use whatever
52+
// Versioning Behavior it is annotated with. If there is no Ramping Version by the time that the first workflow task is
53+
// dispatched, it will be sent to the Current Version.
54+
//
55+
// It is highly discouraged to use this if the workflow is annotated with AutoUpgrade behavior, because
56+
// this setting ONLY applies to the first task of the workflow. If, after the first task, the workflow
57+
// is AutoUpgrade, it will behave like a normal AutoUpgrade workflow and go to the Target Version, which
58+
// may be the Current Version instead of the Ramping Version.
59+
//
60+
// Note that if the workflow being continued has a Pinned override, that override will be inherited by the
61+
// new workflow run regardless of the ContinueAsNewVersioningBehavior specified in the continue-as-new
62+
// command. Versioning Override always takes precedence until it's removed manually via UpdateWorkflowExecutionOptions.
63+
ContinueAsNewVersioningBehaviorUseRampingVersion = internal.ContinueAsNewVersioningBehaviorUseRampingVersion
4964
)
5065

5166
// ContinueAsNewSuggestedReason specifies why ContinueAsNewSuggested is true. Multiple reasons can be true at the same time.

0 commit comments

Comments
 (0)