Skip to content

Add deployment version features #611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dockerfiles/dynamicconfig/docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,18 @@ system.enableActivityEagerExecution:
system.enableEagerWorkflowStart:
- value: true
constraints: {}
system.enableDeployments:
- value: true
constraints: {}
system.enableDeploymentVersions:
- value: true
constraints: {}
frontend.enableUpdateWorkflowExecution:
- value: true
constraints: {}
frontend.enableUpdateWorkflowExecutionAsyncAccepted:
- value: true
constraints: {}
frontend.workerVersioningWorkflowAPIs:
- value: true
constraints: {}
155 changes: 155 additions & 0 deletions features/deployment_versioning/common.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the generic helpers here should be in the harness and the workflows should be in the feature files themselves (yes even if it means the few lines are copied across them). For the most part we've tried to keep features self contained.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following build_id_versioning/common.go. Coping the workflows everywhere is not an issue (just a few lines), but I wonder if functions like setCurrent that are very deployment specific belong to the harness...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coping the workflows everywhere is not an issue (just a few lines)

👍

I wonder if functions like setCurrent that are very deployment specific belong to the harness...

It seems like normal client use just like our users are expected to use. Is this how we expect our users to use the deployment client (i.e. wait until the server reports it at a certain value before making the next call)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, the server API is eventually consistent because we discover worker properties, as opposed to using a declarative approach... This is something I deeply dislike, but after many team discussions It eventually went that way..

In non-test scenarios it may not be that bad, you may only need to check for deployments/versions existence once, since deployments are explicitly deleted and versions are GC slowly. Or use an infinite loop (k8) to change desired state, ignoring early failures.

Back to the issue, is it ok to pollute handler code with all this stuff only applicable to deployment features?

Copy link
Member

@cretz cretz Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back to the issue, is it ok to pollute handler code with all this stuff only applicable to deployment features?

I think it's probably ok (so long as well qualified), but if you want to leave here in common that's fine. But I do think the workflows should be in their feature files (and I'll mark approved after that).

I do think that you as the first client user of versioning are already seeing difficult user experience for programmatically using versioning APIs. I think that may need to be brought up in a devexp context.

Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package deployment_versioning

import (
"context"
"strings"
"time"

"github.com/temporalio/features/harness/go/harness"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)

func WaitForSignalOne(ctx workflow.Context) (string, error) {
var value string
workflow.GetSignalChannel(ctx, "start-signal").Receive(ctx, &value)
return value + "_v1", nil
}

func WaitForSignalTwo(ctx workflow.Context) (string, error) {
var value string
workflow.GetSignalChannel(ctx, "start-signal").Receive(ctx, &value)
return value + "_v2", nil
}

func StartWorker(ctx context.Context, r *harness.Runner, version string, versioningBehavior workflow.VersioningBehavior) worker.Worker {
w := worker.New(r.Client, r.TaskQueue, worker.Options{
DeploymentOptions: worker.DeploymentOptions{
UseVersioning: true,
Version: version,
DefaultVersioningBehavior: versioningBehavior,
},
})
if strings.HasSuffix(version, "1.0") {
w.RegisterWorkflowWithOptions(WaitForSignalOne, workflow.RegisterOptions{
Name: "WaitForSignal",
})
} else {
w.RegisterWorkflowWithOptions(WaitForSignalTwo, workflow.RegisterOptions{
Name: "WaitForSignal",
})
}
return w
}

func WaitForDeploymentVersion(r *harness.Runner, ctx context.Context, dHandle client.WorkerDeploymentHandle, version string) error {
return r.DoUntilEventually(ctx, 300*time.Millisecond, 10*time.Second,
func() bool {
d, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
if err != nil {
return false
}
for _, v := range d.Info.VersionSummaries {
if v.Version == version {
return true
}
}
return false
})
}

func WaitForDeployment(r *harness.Runner, ctx context.Context, dHandle client.WorkerDeploymentHandle) error {
return r.DoUntilEventually(ctx, 300*time.Millisecond, 10*time.Second,
func() bool {
_, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
return err == nil
})
}

func WaitForWorkflowRunning(r *harness.Runner, ctx context.Context, handle client.WorkflowRun) error {
return r.DoUntilEventually(ctx, 300*time.Millisecond, 10*time.Second,
func() bool {
describeResp, err := r.Client.DescribeWorkflowExecution(ctx, handle.GetID(), handle.GetRunID())
if err != nil {
return false
}
status := describeResp.WorkflowExecutionInfo.Status
return enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING == status
})
}

func SignalAll(r *harness.Runner, ctx context.Context, handles []client.WorkflowRun) error {
for _, handle := range handles {
if err := r.Client.SignalWorkflow(ctx, handle.GetID(), handle.GetRunID(), "start-signal", "prefix"); err != nil {
return err
}
}
return nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be inlined where used since it relies on certain expectations of the workflow shape. It's only used for a single handle anywhere anyways and signalling is not so much of a burden it needs a helper


func SetCurrent(r *harness.Runner, ctx context.Context, deploymentName string, version string) error {
dHandle := r.Client.WorkerDeploymentClient().GetHandle(deploymentName)

if err := WaitForDeployment(r, ctx, dHandle); err != nil {
return err
}

response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
if err != nil {
return err
}

if err := WaitForDeploymentVersion(r, ctx, dHandle, version); err != nil {
return err
}

_, err = dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{
Version: version,
ConflictToken: response1.ConflictToken,
})

return err
}

func SetRamp(r *harness.Runner, ctx context.Context, deploymentName string, version string, percentage float32) error {
dHandle := r.Client.WorkerDeploymentClient().GetHandle(deploymentName)

if err := WaitForDeployment(r, ctx, dHandle); err != nil {
return err
}

response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
if err != nil {
return err
}

if err := WaitForDeploymentVersion(r, ctx, dHandle, version); err != nil {
return err
}

_, err = dHandle.SetRampingVersion(ctx, client.WorkerDeploymentSetRampingVersionOptions{
Version: version,
ConflictToken: response1.ConflictToken,
Percentage: float32(100.0),
})

return err
}

func ServerSupportsDeployments(ctx context.Context, r *harness.Runner) bool {
// No system capability, only dynamic config in namespace, need to just try...
iter, err := r.Client.WorkerDeploymentClient().List(ctx, client.WorkerDeploymentListOptions{})
if err != nil {
return false
}
// Need to call `HasNext` to contact the server
for iter.HasNext() {
_, err := iter.Next()
if err != nil {
return false
}
}
return true
}
14 changes: 14 additions & 0 deletions features/deployment_versioning/routing_auto_upgrade/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Deployment Versioning: Routing AutoUpgrade

When the Current Version for a Deployment changes, already started workflows
will transition to the new version if they are `AutoUpgrade`.

# Detailed spec

* Create a random deployment name `deployment_name`
* Start a `deployment_name.1-0` worker, register workflow type `WaitForSignal` as `AutoUpgrade`, the implementation of that workflow should end returning `prefix_v1`.
* Start a `deployment_name.2-0` worker, register workflow type `WaitForSignal` as `AutoUpgrade`, the implementation of that workflow should end returning `prefix_v2`.
* Set Current version for `deployment_name` to `deployment_name.1-0`
* Start `workflow_1` of type `WaitForSignal`, it should start AutoUpgrade and with version `deployment_name.1-0`
* Set Current version for `deployment_name` to `deployment_name.2-0`
* Signal workflow. The workflow (pinned) should exit returning `prefix_v2`.
90 changes: 90 additions & 0 deletions features/deployment_versioning/routing_auto_upgrade/feature.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package routing_auto_upgrade

import (
"context"
"fmt"
"time"

"github.com/google/uuid"
"github.com/temporalio/features/features/deployment_versioning"
"github.com/temporalio/features/harness/go/harness"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)

var deploymentName = uuid.NewString()

// Wrap for correct Feature.Path
func WaitForSignalOne(ctx workflow.Context) (string, error) {
return deployment_versioning.WaitForSignalOne(ctx)
}

var Feature = harness.Feature{
Workflows: []interface{}{
harness.WorkflowWithOptions{
Workflow: WaitForSignalOne,
Options: workflow.RegisterOptions{
Name: "WaitForSignal",
VersioningBehavior: workflow.VersioningBehaviorAutoUpgrade,
},
},
},
Execute: Execute,
WorkerOptions: worker.Options{
DeploymentOptions: worker.DeploymentOptions{
UseVersioning: true,
Version: deploymentName + ".1.0",
},
},
CheckHistory: CheckHistory,
ExpectRunResult: "prefix_v2",
}

var worker2 worker.Worker

func Execute(ctx context.Context, r *harness.Runner) (client.WorkflowRun, error) {
if supported := deployment_versioning.ServerSupportsDeployments(ctx, r); !supported {
return nil, r.Skip(fmt.Sprintf("server does not support deployment versioning"))
}

worker2 = deployment_versioning.StartWorker(ctx, r, deploymentName+".2.0",
workflow.VersioningBehaviorAutoUpgrade)
if err := worker2.Start(); err != nil {
return nil, err
}

if err := deployment_versioning.SetCurrent(r, ctx, deploymentName, deploymentName+".1.0"); err != nil {
return nil, err
}

run, err := r.Client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
TaskQueue: r.TaskQueue,
ID: "workflow_1",
WorkflowExecutionTimeout: 1 * time.Minute,
}, "WaitForSignal")

if err != nil {
return nil, err
}

if err := deployment_versioning.WaitForWorkflowRunning(r, ctx, run); err != nil {
return nil, err
}

if err := deployment_versioning.SetCurrent(r, ctx, deploymentName, deploymentName+".2.0"); err != nil {
return nil, err
}

if err := deployment_versioning.SignalAll(r, ctx, []client.WorkflowRun{run}); err != nil {
return nil, err
}

return run, nil
}

func CheckHistory(ctx context.Context, r *harness.Runner, run client.WorkflowRun) error {
// Shut down the 2.0 worker
worker2.Stop()
return r.CheckHistoryDefault(ctx, run)
}
14 changes: 14 additions & 0 deletions features/deployment_versioning/routing_pinned/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Deployment Versioning: Version Routing Pinned

When the Current Version for a Deployment changes, already started workflows
will never transition to the new version if they are `Pinned`.

# Detailed spec

* Create a random deployment name `deployment_name`
* Start a `deployment_name.1-0` worker, register workflow type `WaitForSignal` as `Pinned`, the implementation of that workflow should end returning `prefix_v1`.
* Start a `deployment_name.2-0` worker, register workflow type `WaitForSignal` as `AutoUpgrade`, the implementation of that workflow should end returning `prefix_v2`.
* Set Current version for `deployment_name` to `deployment_name.1-0`
* Start `workflow_1` of type `WaitForSignal`, it should start pinned and with version `deployment_name.1-0`
* Set Current version for `deployment_name` to `deployment_name.2-0`
* Signal workflow. The workflow (pinned) should exit returning `prefix_v1`.
89 changes: 89 additions & 0 deletions features/deployment_versioning/routing_pinned/feature.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package routing_pinned

import (
"context"
"fmt"
"time"

"github.com/google/uuid"
"github.com/temporalio/features/features/deployment_versioning"
"github.com/temporalio/features/harness/go/harness"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)

var deploymentName = uuid.NewString()

// Wrap for correct Feature.Path
func WaitForSignalOne(ctx workflow.Context) (string, error) {
return deployment_versioning.WaitForSignalOne(ctx)
}

var Feature = harness.Feature{
Workflows: []interface{}{
harness.WorkflowWithOptions{
Workflow: WaitForSignalOne,
Options: workflow.RegisterOptions{
Name: "WaitForSignal",
VersioningBehavior: workflow.VersioningBehaviorPinned,
},
},
},
Execute: Execute,
WorkerOptions: worker.Options{
DeploymentOptions: worker.DeploymentOptions{
UseVersioning: true,
Version: deploymentName + ".1.0",
},
},
CheckHistory: CheckHistory,
ExpectRunResult: "prefix_v1",
}
var worker2 worker.Worker

func Execute(ctx context.Context, r *harness.Runner) (client.WorkflowRun, error) {
if supported := deployment_versioning.ServerSupportsDeployments(ctx, r); !supported {
return nil, r.Skip(fmt.Sprintf("server does not support deployment versioning"))
}

worker2 = deployment_versioning.StartWorker(ctx, r, deploymentName+".2.0",
workflow.VersioningBehaviorAutoUpgrade)
if err := worker2.Start(); err != nil {
return nil, err
}

if err := deployment_versioning.SetCurrent(r, ctx, deploymentName, deploymentName+".1.0"); err != nil {
return nil, err
}

run, err := r.Client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
TaskQueue: r.TaskQueue,
ID: "workflow_1",
WorkflowExecutionTimeout: 1 * time.Minute,
}, "WaitForSignal")

if err != nil {
return nil, err
}

if err := deployment_versioning.WaitForWorkflowRunning(r, ctx, run); err != nil {
return nil, err
}

if err := deployment_versioning.SetCurrent(r, ctx, deploymentName, deploymentName+".2.0"); err != nil {
return nil, err
}

if err := deployment_versioning.SignalAll(r, ctx, []client.WorkflowRun{run}); err != nil {
return nil, err
}

return run, nil
}

func CheckHistory(ctx context.Context, r *harness.Runner, run client.WorkflowRun) error {
// Shut down the 2.0 worker
worker2.Stop()
return r.CheckHistoryDefault(ctx, run)
}
Loading
Loading