Skip to content

Commit d8bf9c8

Browse files
authored
temporal worker deployment create|create-version (#990)
Adds implementation of the `temporal worker deployment create` and `temporal worker deployment create-version` CLI commands using only direct gRPC API calls, not the sdk-go client code. Adds some basic unit tests for both commands though due to the server-side validation of proper AWS IAM credentials, the happy-path `temporal worker deployment create-version` call with AWS Lambda compute config is skipped until such time as we can figure out adding real AWS test fixtures. --------- Signed-off-by: Jay Pipes <jay.pipes@temporal.io>
1 parent 030b081 commit d8bf9c8

4 files changed

Lines changed: 661 additions & 0 deletions

File tree

internal/temporalcli/commands.gen.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2878,6 +2878,8 @@ func NewTemporalWorkerDeploymentCommand(cctx *CommandContext, parent *TemporalWo
28782878
s.Command.Long = "Deployment commands perform operations on Worker Deployments:\n\n```\ntemporal worker deployment [command] [options]\n```\n\nFor example:\n\n```\ntemporal worker deployment list\n```\n\nLists the Deployments in the client's namespace.\n\nArguments can be Worker Deployment Versions associated with\na Deployment, specified using the Deployment name and Build ID.\n\nFor example:\n\n```\ntemporal worker deployment set-current-version \\\n --deployment-name YourDeploymentName --build-id YourBuildID\n```\n\nSets the current Deployment Version for a given Deployment."
28792879
}
28802880
s.Command.Args = cobra.NoArgs
2881+
s.Command.AddCommand(&NewTemporalWorkerDeploymentCreateCommand(cctx, &s).Command)
2882+
s.Command.AddCommand(&NewTemporalWorkerDeploymentCreateVersionCommand(cctx, &s).Command)
28812883
s.Command.AddCommand(&NewTemporalWorkerDeploymentDeleteCommand(cctx, &s).Command)
28822884
s.Command.AddCommand(&NewTemporalWorkerDeploymentDeleteVersionCommand(cctx, &s).Command)
28832885
s.Command.AddCommand(&NewTemporalWorkerDeploymentDescribeCommand(cctx, &s).Command)
@@ -2890,6 +2892,66 @@ func NewTemporalWorkerDeploymentCommand(cctx *CommandContext, parent *TemporalWo
28902892
return &s
28912893
}
28922894

2895+
type TemporalWorkerDeploymentCreateCommand struct {
2896+
Parent *TemporalWorkerDeploymentCommand
2897+
Command cobra.Command
2898+
DeploymentNameOptions
2899+
}
2900+
2901+
func NewTemporalWorkerDeploymentCreateCommand(cctx *CommandContext, parent *TemporalWorkerDeploymentCommand) *TemporalWorkerDeploymentCreateCommand {
2902+
var s TemporalWorkerDeploymentCreateCommand
2903+
s.Parent = parent
2904+
s.Command.DisableFlagsInUseLine = true
2905+
s.Command.Use = "create [flags]"
2906+
s.Command.Short = "Create a new Worker Deployment"
2907+
if hasHighlighting {
2908+
s.Command.Long = "Create a new Worker Deployment:\n\n\x1b[1mtemporal worker deployment create [options]\x1b[0m\n\nWorker Deployments are lazily created the first time a Worker polls the\nTemporal Server and specifies a VersionOverride. However, if you need to\npre-define a compute configuration (for instance to set up a serverless\nWorker), you need to call \x1b[1mtemporal worker deployment create-version\x1b[0m and\npass in the name of the Worker Deployment. The \x1b[1mtemporal worker\ndeployment create\x1b[0m command allows you to pre-define a Worker Deployment\nso that calls to \x1b[1mtemporal worker deployment create-version\x1b[0m will\nsucceed.\n\nIf a Worker Deployment with the supplied name already exists, this\ncommand will return an error.\n\nNote: This is an experimental feature and may change in the future."
2909+
} else {
2910+
s.Command.Long = "Create a new Worker Deployment:\n\n```\ntemporal worker deployment create [options]\n```\n\nWorker Deployments are lazily created the first time a Worker polls the\nTemporal Server and specifies a VersionOverride. However, if you need to\npre-define a compute configuration (for instance to set up a serverless\nWorker), you need to call `temporal worker deployment create-version` and\npass in the name of the Worker Deployment. The `temporal worker\ndeployment create` command allows you to pre-define a Worker Deployment\nso that calls to `temporal worker deployment create-version` will\nsucceed.\n\nIf a Worker Deployment with the supplied name already exists, this\ncommand will return an error.\n\nNote: This is an experimental feature and may change in the future."
2911+
}
2912+
s.Command.Args = cobra.NoArgs
2913+
s.DeploymentNameOptions.BuildFlags(s.Command.Flags())
2914+
s.Command.Run = func(c *cobra.Command, args []string) {
2915+
if err := s.run(cctx, args); err != nil {
2916+
cctx.Options.Fail(err)
2917+
}
2918+
}
2919+
return &s
2920+
}
2921+
2922+
type TemporalWorkerDeploymentCreateVersionCommand struct {
2923+
Parent *TemporalWorkerDeploymentCommand
2924+
Command cobra.Command
2925+
DeploymentVersionOptions
2926+
AwsLambdaFunctionArn string
2927+
AwsLambdaAssumeRoleArn string
2928+
AwsLambdaAssumeRoleExternalId string
2929+
}
2930+
2931+
func NewTemporalWorkerDeploymentCreateVersionCommand(cctx *CommandContext, parent *TemporalWorkerDeploymentCommand) *TemporalWorkerDeploymentCreateVersionCommand {
2932+
var s TemporalWorkerDeploymentCreateVersionCommand
2933+
s.Parent = parent
2934+
s.Command.DisableFlagsInUseLine = true
2935+
s.Command.Use = "create-version [flags]"
2936+
s.Command.Short = "Create a new Worker Deployment Version"
2937+
if hasHighlighting {
2938+
s.Command.Long = "\nCreate a new Worker Deployment Version:\n\n\x1b[1mtemporal worker deployment create-version [options]\x1b[0m\n\nConfigure a Worker Deployment Version's compute configuration as needed.\nFor example, pass compute provider information for an AWS Lambda function\nthat spawns a Worker in the Worker Deployment:\n\n\x1b[1mtemporal worker deployment create-version \\\n --namespace YourNamespaceName \\\n --deployment-name YourDeploymentName \\\n --build-id YourBuildID \\\n --aws-lambda-function-arn LambdaFunctionARN \\\n --aws-lambda-assume-role-arn LambdaAssumeRoleARN \\\n --aws-lambda-assume-role-external-id LambdaAssumeRoleExternalID\x1b[0m\n\nIf a Worker Deployment Version with the supplied BuildID already exists,\nthis command will return an error.\n\nNote: This is an experimental feature and may change in the future."
2939+
} else {
2940+
s.Command.Long = "\nCreate a new Worker Deployment Version:\n\n```\ntemporal worker deployment create-version [options]\n```\n\nConfigure a Worker Deployment Version's compute configuration as needed.\nFor example, pass compute provider information for an AWS Lambda function\nthat spawns a Worker in the Worker Deployment:\n\n```\ntemporal worker deployment create-version \\\n --namespace YourNamespaceName \\\n --deployment-name YourDeploymentName \\\n --build-id YourBuildID \\\n --aws-lambda-function-arn LambdaFunctionARN \\\n --aws-lambda-assume-role-arn LambdaAssumeRoleARN \\\n --aws-lambda-assume-role-external-id LambdaAssumeRoleExternalID\n```\n\nIf a Worker Deployment Version with the supplied BuildID already exists,\nthis command will return an error.\n\nNote: This is an experimental feature and may change in the future."
2941+
}
2942+
s.Command.Args = cobra.NoArgs
2943+
s.Command.Flags().StringVar(&s.AwsLambdaFunctionArn, "aws-lambda-function-arn", "", "Qualified (contains version suffix) or unqualified AWS Lambda function ARN to invoke when there are no active pollers for task queue targets in the Worker Deployment.")
2944+
s.Command.Flags().StringVar(&s.AwsLambdaAssumeRoleArn, "aws-lambda-assume-role-arn", "", "AWS IAM role ARN that the Temporal server will assume when invoking the Lambda function that spawns a new Worker in this Worker Deployment Version. Required when --aws-lambda-function-arn is specified.")
2945+
s.Command.Flags().StringVar(&s.AwsLambdaAssumeRoleExternalId, "aws-lambda-assume-role-external-id", "", "Temporal server will enforce that the AWS IAM trust policy associated with the AWS IAM role specified in --aws-lambda-assume-role-arn has an aws:ExternalId condition that matches the supplied value. Required when --aws-lambda-function-arn is specified.")
2946+
s.DeploymentVersionOptions.BuildFlags(s.Command.Flags())
2947+
s.Command.Run = func(c *cobra.Command, args []string) {
2948+
if err := s.run(cctx, args); err != nil {
2949+
cctx.Options.Fail(err)
2950+
}
2951+
}
2952+
return &s
2953+
}
2954+
28932955
type TemporalWorkerDeploymentDeleteCommand struct {
28942956
Parent *TemporalWorkerDeploymentCommand
28952957
Command cobra.Command

internal/temporalcli/commands.worker.deployment.go

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,24 @@ package temporalcli
33
import (
44
"errors"
55
"fmt"
6+
"slices"
7+
"strings"
68
"time"
79

810
"github.com/fatih/color"
11+
"github.com/google/uuid"
912
"github.com/temporalio/cli/internal/printer"
1013
"go.temporal.io/api/common/v1"
14+
commonpb "go.temporal.io/api/common/v1"
15+
computepb "go.temporal.io/api/compute/v1"
16+
"go.temporal.io/api/deployment/v1"
1117
deploymentpb "go.temporal.io/api/deployment/v1"
1218
enumspb "go.temporal.io/api/enums/v1"
1319
"go.temporal.io/api/serviceerror"
1420
taskqueuepb "go.temporal.io/api/taskqueue/v1"
1521
"go.temporal.io/api/workflowservice/v1"
1622
"go.temporal.io/sdk/client"
23+
"go.temporal.io/sdk/converter"
1724
"go.temporal.io/sdk/worker"
1825
)
1926

@@ -111,6 +118,24 @@ type formattedWorkerDeploymentVersionInfoType struct {
111118
DrainageInfo formattedDrainageInfo `json:"drainageInfo"`
112119
TaskQueuesInfos []formattedTaskQueueInfoRowType `json:"taskQueuesInfos"`
113120
Metadata map[string]*common.Payload `json:"metadata"`
121+
ComputeConfig *formattedComputeConfig `json:"computeConfig,omitempty"`
122+
}
123+
124+
type formattedComputeConfig struct {
125+
ScalingGroups map[string]formattedComputeConfigScalingGroup `json:"scalingGroups"`
126+
}
127+
128+
type formattedComputeConfigScalingGroup struct {
129+
Provider *formattedComputeConfigProvider `json:"provider,omitempty"`
130+
Scaler *formattedComputeConfigScaler `json:"scaler,omitempty"`
131+
}
132+
133+
type formattedComputeConfigProvider struct {
134+
Type string `json:"type"`
135+
}
136+
137+
type formattedComputeConfigScaler struct {
138+
Type string `json:"type"`
114139
}
115140

116141
func drainageStatusToStr(drainage client.WorkerDeploymentVersionDrainageStatus) (string, error) {
@@ -354,6 +379,39 @@ func formatDrainageInfoProto(drainageInfo *deploymentpb.VersionDrainageInfo) (fo
354379
}, nil
355380
}
356381

382+
func formatComputeConfigProto(cc *computepb.ComputeConfig) *formattedComputeConfig {
383+
if cc == nil {
384+
return nil
385+
}
386+
msgSGs := cc.GetScalingGroups()
387+
if len(msgSGs) == 0 {
388+
return nil
389+
}
390+
sgs := make(map[string]formattedComputeConfigScalingGroup, len(msgSGs))
391+
for name, msgSG := range msgSGs {
392+
p := msgSG.GetProvider()
393+
s := msgSG.GetScaler()
394+
if p == nil && s == nil {
395+
continue
396+
}
397+
sg := formattedComputeConfigScalingGroup{}
398+
if p != nil {
399+
sg.Provider = &formattedComputeConfigProvider{
400+
Type: p.GetType(),
401+
}
402+
}
403+
if p != nil {
404+
sg.Scaler = &formattedComputeConfigScaler{
405+
Type: s.GetType(),
406+
}
407+
}
408+
sgs[name] = sg
409+
}
410+
return &formattedComputeConfig{
411+
ScalingGroups: sgs,
412+
}
413+
}
414+
357415
// workerDeploymentVersionInfoProtoToRows converts gRPC proto types to formatted types for display.
358416
func workerDeploymentVersionInfoProtoToRows(deploymentInfo *deploymentpb.WorkerDeploymentVersionInfo, taskQueueInfos []*workflowservice.DescribeWorkerDeploymentVersionResponse_VersionTaskQueue, includeStats bool) (formattedWorkerDeploymentVersionInfoType, error) {
359417
tqi, err := formatTaskQueuesInfosProto(taskQueueInfos, includeStats)
@@ -366,6 +424,8 @@ func workerDeploymentVersionInfoProtoToRows(deploymentInfo *deploymentpb.WorkerD
366424
return formattedWorkerDeploymentVersionInfoType{}, err
367425
}
368426

427+
computeConfig := formatComputeConfigProto(deploymentInfo.GetComputeConfig())
428+
369429
return formattedWorkerDeploymentVersionInfoType{
370430
DeploymentName: deploymentInfo.GetDeploymentVersion().GetDeploymentName(),
371431
BuildID: deploymentInfo.GetDeploymentVersion().GetBuildId(),
@@ -377,9 +437,28 @@ func workerDeploymentVersionInfoProtoToRows(deploymentInfo *deploymentpb.WorkerD
377437
DrainageInfo: drainage,
378438
TaskQueuesInfos: tqi,
379439
Metadata: deploymentInfo.GetMetadata().GetEntries(),
440+
ComputeConfig: computeConfig,
380441
}, nil
381442
}
382443

444+
func computeConfigSummaryStr(cc *computepb.ComputeConfig) string {
445+
if cc == nil {
446+
return ""
447+
}
448+
providers := []string{}
449+
for _, sg := range cc.GetScalingGroups() {
450+
p := sg.GetProvider()
451+
if p == nil {
452+
continue
453+
}
454+
pt := p.GetType()
455+
if !slices.Contains(providers, pt) {
456+
providers = append(providers, pt)
457+
}
458+
}
459+
return strings.Join(providers, ",")
460+
}
461+
383462
// printWorkerDeploymentVersionInfoProto prints worker deployment version info from proto types.
384463
func printWorkerDeploymentVersionInfoProto(cctx *CommandContext, deploymentInfo *deploymentpb.WorkerDeploymentVersionInfo, taskQueueInfos []*workflowservice.DescribeWorkerDeploymentVersionResponse_VersionTaskQueue, msg string, opts printVersionInfoOptions) error {
385464
fDeploymentInfo, err := workerDeploymentVersionInfoProtoToRows(deploymentInfo, taskQueueInfos, opts.showStats)
@@ -400,6 +479,7 @@ func printWorkerDeploymentVersionInfoProto(cctx *CommandContext, deploymentInfo
400479
drainageLastChangedTime = deploymentInfo.GetDrainageInfo().GetLastChangedTime().AsTime()
401480
drainageLastCheckedTime = deploymentInfo.GetDrainageInfo().GetLastCheckedTime().AsTime()
402481
}
482+
computeConfigSummary := computeConfigSummaryStr(deploymentInfo.GetComputeConfig())
403483

404484
printMe := struct {
405485
DeploymentName string
@@ -413,6 +493,7 @@ func printWorkerDeploymentVersionInfoProto(cctx *CommandContext, deploymentInfo
413493
DrainageLastChangedTime time.Time `cli:",cardOmitEmpty"`
414494
DrainageLastCheckedTime time.Time `cli:",cardOmitEmpty"`
415495
Metadata map[string]*common.Payload `cli:",cardOmitEmpty"`
496+
ComputeConfigSummary string `cli:",cardOmitEmpty"`
416497
}{
417498
DeploymentName: deploymentInfo.GetDeploymentVersion().GetDeploymentName(),
418499
BuildID: deploymentInfo.GetDeploymentVersion().GetBuildId(),
@@ -425,6 +506,7 @@ func printWorkerDeploymentVersionInfoProto(cctx *CommandContext, deploymentInfo
425506
DrainageLastChangedTime: drainageLastChangedTime,
426507
DrainageLastCheckedTime: drainageLastCheckedTime,
427508
Metadata: deploymentInfo.GetMetadata().GetEntries(),
509+
ComputeConfigSummary: computeConfigSummary,
428510
}
429511
err := cctx.Printer.PrintStructured(printMe, printer.StructuredOptions{})
430512
if err != nil {
@@ -601,6 +683,34 @@ func (c *TemporalWorkerDeploymentCommand) getConflictToken(cctx *CommandContext,
601683
return resp.ConflictToken, nil
602684
}
603685

686+
func (c *TemporalWorkerDeploymentCreateCommand) run(cctx *CommandContext, args []string) error {
687+
cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions)
688+
if err != nil {
689+
return err
690+
}
691+
defer cl.Close()
692+
693+
ns := c.Parent.Parent.Namespace
694+
identity := c.Parent.Parent.Identity
695+
deploymentName := c.Name
696+
requestID := uuid.NewString()
697+
698+
request := &workflowservice.CreateWorkerDeploymentRequest{
699+
Namespace: ns,
700+
DeploymentName: deploymentName,
701+
Identity: identity,
702+
RequestId: requestID,
703+
}
704+
705+
_, err = cl.WorkflowService().CreateWorkerDeployment(cctx, request)
706+
if err != nil {
707+
return fmt.Errorf("error creating worker deployment: %w", err)
708+
}
709+
710+
cctx.Printer.Println("Successfully created worker deployment")
711+
return nil
712+
}
713+
604714
func (c *TemporalWorkerDeploymentDescribeCommand) run(cctx *CommandContext, args []string) error {
605715
cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions)
606716
if err != nil {
@@ -773,6 +883,92 @@ func (c *TemporalWorkerDeploymentManagerIdentityUnsetCommand) run(cctx *CommandC
773883
return nil
774884
}
775885

886+
func validateAWSLambdaProviderDetails(details map[string]any) error {
887+
for _, key := range []string{"arn", "role", "role_external_id"} {
888+
if _, ok := details[key]; !ok {
889+
return fmt.Errorf("missing required AWS Lambda provider detail: %s", key)
890+
}
891+
}
892+
return nil
893+
}
894+
895+
// awsLambdaProviderDetailsPayload returns the encoded Payload representing AWS
896+
// Lambda compute provider details.
897+
func (c *TemporalWorkerDeploymentCreateVersionCommand) awsLambdaProviderDetailsPayload() (*commonpb.Payload, error) {
898+
// Map keys from temporal-auto-scaled-workers:
899+
// https://github.com/temporalio/temporal-auto-scaled-workers/blob/c4a7e69b6504365d7e5326b0b8e6cd95e3293f96/wci/workflow/compute_provider/aws_lambda.go#L16-L20
900+
providerDetails := map[string]any{
901+
"arn": c.AwsLambdaFunctionArn,
902+
}
903+
if c.AwsLambdaAssumeRoleArn != "" {
904+
providerDetails["role"] = c.AwsLambdaAssumeRoleArn
905+
}
906+
if c.AwsLambdaAssumeRoleExternalId != "" {
907+
providerDetails["role_external_id"] = c.AwsLambdaAssumeRoleExternalId
908+
}
909+
err := validateAWSLambdaProviderDetails(providerDetails)
910+
if err != nil {
911+
return nil, err
912+
}
913+
dc := converter.GetDefaultDataConverter()
914+
return dc.ToPayload(&providerDetails)
915+
}
916+
917+
func (c *TemporalWorkerDeploymentCreateVersionCommand) run(cctx *CommandContext, args []string) error {
918+
cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions)
919+
if err != nil {
920+
return err
921+
}
922+
defer cl.Close()
923+
924+
ns := c.Parent.Parent.Namespace
925+
buildID := c.BuildId
926+
identity := c.Parent.Parent.Identity
927+
deploymentName := c.DeploymentName
928+
requestID := uuid.NewString()
929+
930+
var cc *computepb.ComputeConfig
931+
if c.AwsLambdaFunctionArn != "" {
932+
detailsPayload, err := c.awsLambdaProviderDetailsPayload()
933+
if err != nil {
934+
return err
935+
}
936+
cc = &computepb.ComputeConfig{
937+
ScalingGroups: map[string]*computepb.ComputeConfigScalingGroup{
938+
"default": {
939+
Provider: &computepb.ComputeProvider{
940+
Type: "aws-lambda",
941+
Details: detailsPayload,
942+
},
943+
Scaler: &computepb.ComputeScaler{
944+
// Hard-coded: no-sync is the only supported algorithm
945+
// in temporal-auto-scaled-workers as of 2026-04-01.
946+
Type: "no-sync",
947+
},
948+
},
949+
},
950+
}
951+
}
952+
request := &workflowservice.CreateWorkerDeploymentVersionRequest{
953+
Namespace: ns,
954+
DeploymentVersion: &deployment.WorkerDeploymentVersion{
955+
DeploymentName: deploymentName,
956+
BuildId: buildID,
957+
},
958+
Identity: identity,
959+
ComputeConfig: cc,
960+
RequestId: requestID,
961+
}
962+
963+
_, err = cl.WorkflowService().CreateWorkerDeploymentVersion(cctx, request)
964+
if err != nil {
965+
return fmt.Errorf("error creating worker deployment version: %w", err)
966+
}
967+
968+
cctx.Printer.Println("Successfully created worker deployment version")
969+
return nil
970+
}
971+
776972
func (c *TemporalWorkerDeploymentDeleteVersionCommand) run(cctx *CommandContext, args []string) error {
777973
cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions)
778974
if err != nil {

0 commit comments

Comments
 (0)