Skip to content

Add deployment version features #611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dockerfiles/dynamicconfig/docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,18 @@ system.enableActivityEagerExecution:
system.enableEagerWorkflowStart:
- value: true
constraints: {}
system.enableDeployments:
- value: true
constraints: {}
system.enableDeploymentVersions:
- value: true
constraints: {}
frontend.enableUpdateWorkflowExecution:
- value: true
constraints: {}
frontend.enableUpdateWorkflowExecutionAsyncAccepted:
- value: true
constraints: {}
frontend.workerVersioningWorkflowAPIs:
- value: true
constraints: {}
127 changes: 127 additions & 0 deletions features/deployment_versioning/common.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the generic helpers here should be in the harness and the workflows should be in the feature files themselves (yes even if it means the few lines are copied across them). For the most part we've tried to keep features self contained.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following build_id_versioning/common.go. Coping the workflows everywhere is not an issue (just a few lines), but I wonder if functions like setCurrent that are very deployment specific belong to the harness...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coping the workflows everywhere is not an issue (just a few lines)

👍

I wonder if functions like setCurrent that are very deployment specific belong to the harness...

It seems like normal client use just like our users are expected to use. Is this how we expect our users to use the deployment client (i.e. wait until the server reports it at a certain value before making the next call)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, the server API is eventually consistent because we discover worker properties, as opposed to using a declarative approach... This is something I deeply dislike, but after many team discussions It eventually went that way..

In non-test scenarios it may not be that bad, you may only need to check for deployments/versions existence once, since deployments are explicitly deleted and versions are GC slowly. Or use an infinite loop (k8) to change desired state, ignoring early failures.

Back to the issue, is it ok to pollute handler code with all this stuff only applicable to deployment features?

Copy link
Member

@cretz cretz Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back to the issue, is it ok to pollute handler code with all this stuff only applicable to deployment features?

I think it's probably ok (so long as well qualified), but if you want to leave here in common that's fine. But I do think the workflows should be in their feature files (and I'll mark approved after that).

I do think that you as the first client user of versioning are already seeing difficult user experience for programmatically using versioning APIs. I think that may need to be brought up in a devexp context.

Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package deployment_versioning

import (
"context"
"time"

"github.com/temporalio/features/harness/go/harness"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)

func StartWorker(ctx context.Context, r *harness.Runner, version string, versioningBehavior workflow.VersioningBehavior, waitForSignal func(workflow.Context) (string, error)) worker.Worker {
w := worker.New(r.Client, r.TaskQueue, worker.Options{
DeploymentOptions: worker.DeploymentOptions{
UseVersioning: true,
Version: version,
DefaultVersioningBehavior: versioningBehavior,
},
})
w.RegisterWorkflowWithOptions(waitForSignal, workflow.RegisterOptions{
Name: "WaitForSignal",
})
return w
}

func WaitForDeploymentVersion(r *harness.Runner, ctx context.Context, dHandle client.WorkerDeploymentHandle, version string) error {
return r.DoUntilEventually(ctx, 300*time.Millisecond, 10*time.Second,
func() bool {
d, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
if err != nil {
return false
}
for _, v := range d.Info.VersionSummaries {
if v.Version == version {
return true
}
}
return false
})
}

func WaitForDeployment(r *harness.Runner, ctx context.Context, dHandle client.WorkerDeploymentHandle) error {
return r.DoUntilEventually(ctx, 300*time.Millisecond, 10*time.Second,
func() bool {
_, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
return err == nil
})
}

func WaitForWorkflowRunning(r *harness.Runner, ctx context.Context, handle client.WorkflowRun) error {
return r.DoUntilEventually(ctx, 300*time.Millisecond, 10*time.Second,
func() bool {
describeResp, err := r.Client.DescribeWorkflowExecution(ctx, handle.GetID(), handle.GetRunID())
if err != nil {
return false
}
status := describeResp.WorkflowExecutionInfo.Status
return enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING == status
})
}

func SetCurrent(r *harness.Runner, ctx context.Context, deploymentName string, version string) error {
dHandle := r.Client.WorkerDeploymentClient().GetHandle(deploymentName)

if err := WaitForDeployment(r, ctx, dHandle); err != nil {
return err
}

response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
if err != nil {
return err
}

if err := WaitForDeploymentVersion(r, ctx, dHandle, version); err != nil {
return err
}

_, err = dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{
Version: version,
ConflictToken: response1.ConflictToken,
})

return err
}

func SetRamp(r *harness.Runner, ctx context.Context, deploymentName string, version string, percentage float32) error {
dHandle := r.Client.WorkerDeploymentClient().GetHandle(deploymentName)

if err := WaitForDeployment(r, ctx, dHandle); err != nil {
return err
}

response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
if err != nil {
return err
}

if err := WaitForDeploymentVersion(r, ctx, dHandle, version); err != nil {
return err
}

_, err = dHandle.SetRampingVersion(ctx, client.WorkerDeploymentSetRampingVersionOptions{
Version: version,
ConflictToken: response1.ConflictToken,
Percentage: float32(100.0),
})

return err
}

func ServerSupportsDeployments(ctx context.Context, r *harness.Runner) bool {
// No system capability, only dynamic config in namespace, need to just try...
iter, err := r.Client.WorkerDeploymentClient().List(ctx, client.WorkerDeploymentListOptions{})
if err != nil {
return false
}
// Need to call `HasNext` to contact the server
for iter.HasNext() {
_, err := iter.Next()
if err != nil {
return false
}
}
return true
}
14 changes: 14 additions & 0 deletions features/deployment_versioning/routing_auto_upgrade/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Deployment Versioning: Routing AutoUpgrade

When the Current Version for a Deployment changes, already started workflows
will transition to the new version if they are `AutoUpgrade`.

# Detailed spec

* Create a random deployment name `deployment_name`
* Start a `deployment_name.1-0` worker, register workflow type `WaitForSignal` as `AutoUpgrade`, the implementation of that workflow should end returning `prefix_v1`.
* Start a `deployment_name.2-0` worker, register workflow type `WaitForSignal` as `AutoUpgrade`, the implementation of that workflow should end returning `prefix_v2`.
* Set Current version for `deployment_name` to `deployment_name.1-0`
* Start `workflow_1` of type `WaitForSignal`, it should start AutoUpgrade and with version `deployment_name.1-0`
* Set Current version for `deployment_name` to `deployment_name.2-0`
* Signal workflow. The workflow (pinned) should exit returning `prefix_v2`.
97 changes: 97 additions & 0 deletions features/deployment_versioning/routing_auto_upgrade/feature.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package routing_auto_upgrade

import (
"context"
"fmt"
"time"

"github.com/google/uuid"
"github.com/temporalio/features/features/deployment_versioning"
"github.com/temporalio/features/harness/go/harness"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)

var deploymentName = uuid.NewString()

func WaitForSignalOne(ctx workflow.Context) (string, error) {
var value string
workflow.GetSignalChannel(ctx, "start-signal").Receive(ctx, &value)
return value + "_v1", nil
}

func WaitForSignalTwo(ctx workflow.Context) (string, error) {
var value string
workflow.GetSignalChannel(ctx, "start-signal").Receive(ctx, &value)
return value + "_v2", nil
}

var Feature = harness.Feature{
Workflows: []interface{}{
harness.WorkflowWithOptions{
Workflow: WaitForSignalOne,
Options: workflow.RegisterOptions{
Name: "WaitForSignal",
VersioningBehavior: workflow.VersioningBehaviorAutoUpgrade,
},
},
},
Execute: Execute,
WorkerOptions: worker.Options{
DeploymentOptions: worker.DeploymentOptions{
UseVersioning: true,
Version: deploymentName + ".1.0",
},
},
CheckHistory: CheckHistory,
ExpectRunResult: "prefix_v2",
}

var worker2 worker.Worker

func Execute(ctx context.Context, r *harness.Runner) (client.WorkflowRun, error) {
if supported := deployment_versioning.ServerSupportsDeployments(ctx, r); !supported {
return nil, r.Skip(fmt.Sprintf("server does not support deployment versioning"))
}

worker2 = deployment_versioning.StartWorker(ctx, r, deploymentName+".2.0",
workflow.VersioningBehaviorAutoUpgrade, WaitForSignalTwo)
if err := worker2.Start(); err != nil {
return nil, err
}

if err := deployment_versioning.SetCurrent(r, ctx, deploymentName, deploymentName+".1.0"); err != nil {
return nil, err
}

run, err := r.Client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
TaskQueue: r.TaskQueue,
ID: "workflow_1",
WorkflowExecutionTimeout: 1 * time.Minute,
}, "WaitForSignal")

if err != nil {
return nil, err
}

if err := deployment_versioning.WaitForWorkflowRunning(r, ctx, run); err != nil {
return nil, err
}

if err := deployment_versioning.SetCurrent(r, ctx, deploymentName, deploymentName+".2.0"); err != nil {
return nil, err
}

if err := r.Client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "start-signal", "prefix"); err != nil {
return nil, err
}

return run, nil
}

func CheckHistory(ctx context.Context, r *harness.Runner, run client.WorkflowRun) error {
// Shut down the 2.0 worker
worker2.Stop()
return r.CheckHistoryDefault(ctx, run)
}
14 changes: 14 additions & 0 deletions features/deployment_versioning/routing_pinned/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Deployment Versioning: Version Routing Pinned

When the Current Version for a Deployment changes, already started workflows
will never transition to the new version if they are `Pinned`.

# Detailed spec

* Create a random deployment name `deployment_name`
* Start a `deployment_name.1-0` worker, register workflow type `WaitForSignal` as `Pinned`, the implementation of that workflow should end returning `prefix_v1`.
* Start a `deployment_name.2-0` worker, register workflow type `WaitForSignal` as `AutoUpgrade`, the implementation of that workflow should end returning `prefix_v2`.
* Set Current version for `deployment_name` to `deployment_name.1-0`
* Start `workflow_1` of type `WaitForSignal`, it should start pinned and with version `deployment_name.1-0`
* Set Current version for `deployment_name` to `deployment_name.2-0`
* Signal workflow. The workflow (pinned) should exit returning `prefix_v1`.
96 changes: 96 additions & 0 deletions features/deployment_versioning/routing_pinned/feature.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package routing_pinned

import (
"context"
"fmt"
"time"

"github.com/google/uuid"
"github.com/temporalio/features/features/deployment_versioning"
"github.com/temporalio/features/harness/go/harness"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)

var deploymentName = uuid.NewString()

func WaitForSignalOne(ctx workflow.Context) (string, error) {
var value string
workflow.GetSignalChannel(ctx, "start-signal").Receive(ctx, &value)
return value + "_v1", nil
}

func WaitForSignalTwo(ctx workflow.Context) (string, error) {
var value string
workflow.GetSignalChannel(ctx, "start-signal").Receive(ctx, &value)
return value + "_v2", nil
}

var Feature = harness.Feature{
Workflows: []interface{}{
harness.WorkflowWithOptions{
Workflow: WaitForSignalOne,
Options: workflow.RegisterOptions{
Name: "WaitForSignal",
VersioningBehavior: workflow.VersioningBehaviorPinned,
},
},
},
Execute: Execute,
WorkerOptions: worker.Options{
DeploymentOptions: worker.DeploymentOptions{
UseVersioning: true,
Version: deploymentName + ".1.0",
},
},
CheckHistory: CheckHistory,
ExpectRunResult: "prefix_v1",
}
var worker2 worker.Worker

func Execute(ctx context.Context, r *harness.Runner) (client.WorkflowRun, error) {
if supported := deployment_versioning.ServerSupportsDeployments(ctx, r); !supported {
return nil, r.Skip(fmt.Sprintf("server does not support deployment versioning"))
}

worker2 = deployment_versioning.StartWorker(ctx, r, deploymentName+".2.0",
workflow.VersioningBehaviorAutoUpgrade, WaitForSignalTwo)
if err := worker2.Start(); err != nil {
return nil, err
}

if err := deployment_versioning.SetCurrent(r, ctx, deploymentName, deploymentName+".1.0"); err != nil {
return nil, err
}

run, err := r.Client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
TaskQueue: r.TaskQueue,
ID: "workflow_1",
WorkflowExecutionTimeout: 1 * time.Minute,
}, "WaitForSignal")

if err != nil {
return nil, err
}

if err := deployment_versioning.WaitForWorkflowRunning(r, ctx, run); err != nil {
return nil, err
}

if err := deployment_versioning.SetCurrent(r, ctx, deploymentName, deploymentName+".2.0"); err != nil {
return nil, err
}

if err := r.Client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "start-signal", "prefix"); err != nil {
return nil, err
}

return run, nil
}

func CheckHistory(ctx context.Context, r *harness.Runner, run client.WorkflowRun) error {
// Shut down the 2.0 worker
worker2.Stop()
return r.CheckHistoryDefault(ctx, run)
}
14 changes: 14 additions & 0 deletions features/deployment_versioning/routing_with_override/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Deployment Versioning: Routing with Override

It is possible to override the Version of a new workflow with Start Workflow
Options, so that it is pinned to a Version different from the Current one in that Deployment.


# Detailed spec

* Create a random deployment name `deployment_name`
* Start a `deployment_name.1-0` worker, register workflow type `WaitForSignal` as `Pinned`, the implementation of that workflow should end returning `prefix_v1`.
* Start a `deployment_name.2-0` worker, register workflow type `WaitForSignal` as `AutoUpgrade`, the implementation of that workflow should end returning `prefix_v2`.
* Set Current version for `deployment_name` to `deployment_name.2-0`
* Start `workflow_1` of type `WaitForSignal`, and override for `Pinned` to `deployment_name.1.0`. It should start Pinned and with version `deployment_name.1-0`.
* Signal workflow. The workflow (pinned) should exit returning `prefix_v1`.
Loading
Loading