Skip to content

Commit 1747d53

Browse files
authored
Add support for Worker Deployment Versions (Go worker) (#338)
## Summary This PR adds `--deployment-name`, `--deployment-build-id`, and `--default-versioning-behavior` flags to the Omes worker CLI. ## Context Omes currently only supports the legacy Build ID based worker versioning scheme for the Go worker. This PR adds flags for Deployment Versions, and initializes the worker accordingly. We continue to support the legacy Build ID based path, but I've added a comment stating that those flags are deprecated. We can phase out this path entirely in the near future. Out of scope: Python / TypeScript / Java / .NET worker wiring ## Files changed | File | Change | |---|---| | `cmd/clioptions/worker.go` | Added `DeploymentName`, `DeploymentBuildID`, `DefaultVersioningBehavior` fields on `WorkerOptions` and registered `--deployment-name`, `--deployment-build-id`, `--default-versioning-behavior` flags. Clarified `--build-id` description as legacy and mutually exclusive with `--deployment-name`. | | `workers/go/worker/worker.go` | Hoisted `worker.Options` construction out of the per-task-queue goroutine. When `--deployment-name` is set, builds `worker.DeploymentOptions` from `DeploymentName` + `DeploymentBuildID` + resolved `DefaultVersioningBehavior`; otherwise keeps the legacy `BuildID` + `UseBuildIDForVersioning` path (now commented as deprecated). Added `parseVersioningBehavior` helper (accepts `pinned`, `auto-upgrade`; defaults to `auto-upgrade` on empty). Added up-front validation for the three mutually-exclusive / required-together flag combinations. | ## Test plan - [x] `go build ./...` passes - [x] Baseline scenario (no versioning flags) completes 3 iterations in ~2s with embedded server - [x] Deployment-versioned worker starts cleanly and logs `"Started Worker" ... "BuildID": "v1"` under `--deployment-name omes-test --deployment-build-id v1 --default-versioning-behavior auto-upgrade` - [x] `--build-id` + `--deployment-name` → fatal `--build-id and --deployment-name are mutually exclusive; use --deployment-build-id with --deployment-name` - [x] `--deployment-name` without `--deployment-build-id` → fatal `--deployment-build-id is required when --deployment-name is set` - [x] `--deployment-build-id` without `--deployment-name` → fatal `--deployment-build-id requires --deployment-name` - [x] `--default-versioning-behavior bogus` → fatal `invalid --default-versioning-behavior "bogus" (expected pinned or auto-upgrade)` - [x] Legacy `--build-id` path compiles and reaches the deprecated code branch (runtime end-to-end depends on task-queue Build-ID assignment rules, unchanged from prior behavior)
1 parent 4084919 commit 1747d53

2 files changed

Lines changed: 62 additions & 16 deletions

File tree

cmd/clioptions/worker.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ import (
77
// WorkerOptions for setting up worker parameters
88
type WorkerOptions struct {
99
BuildID string
10+
DeploymentName string
11+
DeploymentBuildID string
12+
DefaultVersioningBehavior string
1013
MaxConcurrentActivityPollers int
1114
MaxConcurrentWorkflowPollers int
1215
ActivityPollerAutoscaleMax int // overrides MaxConcurrentActivityPollers
@@ -25,7 +28,10 @@ func (m *WorkerOptions) FlagSet() *pflag.FlagSet {
2528
return m.fs
2629
}
2730
m.fs = pflag.NewFlagSet("worker_options", pflag.ExitOnError)
28-
m.fs.StringVar(&m.BuildID, "build-id", "", "Build ID")
31+
m.fs.StringVar(&m.BuildID, "build-id", "", "DEPRECATED: Build ID for legacy Build-ID-based worker versioning. Temporal Server will soon stop supporting the Rules-Based Versioning APIs that back this flag - use --deployment-name and --deployment-build-id instead. Mutually exclusive with --deployment-name")
32+
m.fs.StringVar(&m.DeploymentName, "deployment-name", "", "Worker Deployment name. When set, enables Worker Deployment Versioning and must be combined with --deployment-build-id")
33+
m.fs.StringVar(&m.DeploymentBuildID, "deployment-build-id", "", "Build ID within the Worker Deployment. Required when --deployment-name is set")
34+
m.fs.StringVar(&m.DefaultVersioningBehavior, "default-versioning-behavior", "", "Default versioning behavior for workflows that don't set one at registration. One of: pinned, auto-upgrade. Defaults to auto-upgrade when --deployment-name is set")
2935
m.fs.IntVar(&m.MaxConcurrentActivityPollers, "worker-max-concurrent-activity-pollers", 0, "Max concurrent activity pollers")
3036
m.fs.IntVar(&m.MaxConcurrentWorkflowPollers, "worker-max-concurrent-workflow-pollers", 0, "Max concurrent workflow pollers")
3137
m.fs.IntVar(&m.MaxConcurrentActivities, "worker-max-concurrent-activities", 0, "Max concurrent activities")

workers/go/worker/worker.go

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,61 @@ func makePollerBehavior(simple, auto int) worker.PollerBehavior {
6868
})
6969
}
7070

71+
func parseVersioningBehavior(s string) (workflow.VersioningBehavior, error) {
72+
switch s {
73+
case "", "auto-upgrade":
74+
return workflow.VersioningBehaviorAutoUpgrade, nil
75+
case "pinned":
76+
return workflow.VersioningBehaviorPinned, nil
77+
default:
78+
return 0, fmt.Errorf("invalid --default-versioning-behavior %q (expected pinned or auto-upgrade)", s)
79+
}
80+
}
81+
7182
func runWorkers(client client.Client, taskQueues []string, options clioptions.WorkerOptions) error {
83+
workerOpts := worker.Options{
84+
MaxConcurrentActivityExecutionSize: options.MaxConcurrentActivities,
85+
MaxConcurrentWorkflowTaskExecutionSize: options.MaxConcurrentWorkflowTasks,
86+
ActivityTaskPollerBehavior: makePollerBehavior(
87+
options.MaxConcurrentActivityPollers,
88+
options.ActivityPollerAutoscaleMax,
89+
),
90+
WorkflowTaskPollerBehavior: makePollerBehavior(
91+
options.MaxConcurrentWorkflowPollers,
92+
options.WorkflowPollerAutoscaleMax,
93+
),
94+
WorkerActivitiesPerSecond: options.WorkerActivitiesPerSecond,
95+
}
96+
if options.DeploymentName != "" {
97+
if options.BuildID != "" {
98+
return fmt.Errorf("--build-id and --deployment-name are mutually exclusive; use --deployment-build-id with --deployment-name")
99+
}
100+
if options.DeploymentBuildID == "" {
101+
return fmt.Errorf("--deployment-build-id is required when --deployment-name is set")
102+
}
103+
behavior, err := parseVersioningBehavior(options.DefaultVersioningBehavior)
104+
if err != nil {
105+
return err
106+
}
107+
workerOpts.DeploymentOptions = worker.DeploymentOptions{
108+
UseVersioning: true,
109+
Version: worker.WorkerDeploymentVersion{
110+
DeploymentName: options.DeploymentName,
111+
BuildID: options.DeploymentBuildID,
112+
},
113+
DefaultVersioningBehavior: behavior,
114+
}
115+
} else if options.DeploymentBuildID != "" {
116+
return fmt.Errorf("--deployment-build-id requires --deployment-name")
117+
} else if options.BuildID != "" {
118+
// DEPRECATED: BuildID and UseBuildIDForVersioning select the legacy
119+
// Rules-Based Versioning APIs, which Temporal Server will soon stop
120+
// supporting. New users should use --deployment-name and
121+
// --deployment-build-id above, which drive Worker Deployment Versioning.
122+
workerOpts.BuildID = options.BuildID
123+
workerOpts.UseBuildIDForVersioning = true
124+
}
125+
72126
errCh := make(chan error, len(taskQueues))
73127
ebbFlowActivities := ebbandflow.Activities{}
74128
clientActivities := kitchensink.ClientActivities{
@@ -84,21 +138,7 @@ func runWorkers(client client.Client, taskQueues []string, options clioptions.Wo
84138
for _, taskQueue := range taskQueues {
85139
taskQueue := taskQueue
86140
go func() {
87-
w := worker.New(client, taskQueue, worker.Options{
88-
BuildID: options.BuildID,
89-
UseBuildIDForVersioning: options.BuildID != "",
90-
MaxConcurrentActivityExecutionSize: options.MaxConcurrentActivities,
91-
MaxConcurrentWorkflowTaskExecutionSize: options.MaxConcurrentWorkflowTasks,
92-
ActivityTaskPollerBehavior: makePollerBehavior(
93-
options.MaxConcurrentActivityPollers,
94-
options.ActivityPollerAutoscaleMax,
95-
),
96-
WorkflowTaskPollerBehavior: makePollerBehavior(
97-
options.MaxConcurrentWorkflowPollers,
98-
options.WorkflowPollerAutoscaleMax,
99-
),
100-
WorkerActivitiesPerSecond: options.WorkerActivitiesPerSecond,
101-
})
141+
w := worker.New(client, taskQueue, workerOpts)
102142
w.RegisterWorkflowWithOptions(kitchensink.KitchenSinkWorkflow, workflow.RegisterOptions{Name: "kitchenSink"})
103143
w.RegisterActivityWithOptions(kitchensink.Noop, activity.RegisterOptions{Name: "noop"})
104144
w.RegisterActivityWithOptions(kitchensink.Delay, activity.RegisterOptions{Name: "delay"})

0 commit comments

Comments
 (0)