diff --git a/client/client.go b/client/client.go index 39900bfd8..d70001e43 100644 --- a/client/client.go +++ b/client/client.go @@ -8,12 +8,13 @@ package client import ( "context" "crypto/tls" + "io" + commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" "go.temporal.io/api/operatorservice/v1" "go.temporal.io/api/workflowservice/v1" - "io" "go.temporal.io/sdk/converter" "go.temporal.io/sdk/internal" @@ -344,6 +345,29 @@ type ( // See [client.Client.UpdateWithStartWorkflow] and [client.Client.NewWithStartWorkflowOperation]. UpdateWithStartWorkflowOptions = internal.UpdateWithStartWorkflowOptions + // ComputeConfig provides information about a compute configuration. + // + // NOTE: Experimental + ComputeConfig = internal.ComputeConfig + + // ComputeConfigScalingGroup provides information about a scaling group for + // a compute configuration. + // + // NOTE: Experimental + ComputeConfigScalingGroup = internal.ComputeConfigScalingGroup + + // ComputeProvider provides information about a compute provider of a + // compute configuration scaling group. + // + // NOTE: Experimental + ComputeProvider = internal.ComputeProvider + + // ComputeScaler provides information about a compute scaler of a compute + // configuration scaling group. + // + // NOTE: Experimental + ComputeScaler = internal.ComputeScaler + // WorkerDeploymentDescribeOptions provides options for [WorkerDeploymentHandle.Describe]. // // NOTE: Experimental @@ -364,6 +388,12 @@ type ( // NOTE: Experimental WorkerDeploymentDescribeResponse = internal.WorkerDeploymentDescribeResponse + // WorkerDeploymentCreateVersionOptions provides options for + // [WorkerDeploymentHandle.CreateVersion]. + // + // NOTE: Experimental + WorkerDeploymentCreateVersionOptions = internal.WorkerDeploymentCreateVersionOptions + // WorkerDeploymentSetCurrentVersionOptions provides options for // [WorkerDeploymentHandle.SetCurrentVersion]. // diff --git a/go.mod b/go.mod index 54af3aaff..4825a985f 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/nexus-rpc/sdk-go v0.6.0 github.com/robfig/cron v1.2.0 github.com/stretchr/testify v1.10.0 - go.temporal.io/api v1.62.2 + go.temporal.io/api v1.62.3-0.20260327234204-dbc016f3811d golang.org/x/sync v0.13.0 golang.org/x/sys v0.32.0 golang.org/x/time v0.3.0 diff --git a/go.sum b/go.sum index 2863c510b..482ee79df 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.62.2 h1:jFhIzlqNyJsJZTiCRQmTIMv6OTQ5BZ57z8gbgLGMaoo= -go.temporal.io/api v1.62.2/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.3-0.20260327234204-dbc016f3811d h1:fMCxkfBykqwiUmRTdp0dkJVTCAebY6hzm2O25P3z8Fs= +go.temporal.io/api v1.62.3-0.20260327234204-dbc016f3811d/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/internal/compute.go b/internal/compute.go new file mode 100644 index 000000000..7b490914f --- /dev/null +++ b/internal/compute.go @@ -0,0 +1,46 @@ +package internal + +// ComputeConfig wraps configuration settings for a compute provider and +// scaling for a set of TaskQueue name+type tuples. +type ComputeConfig struct { + // ScalingGroups contains the set of ComputeConfigScalingGroup objects + // associated with the ComputeConfig. The key for the map is the ID of the + // scaling group. + ScalingGroups map[string]*ComputeConfigScalingGroup +} + +// ComputeConfigScalingGroup defines a set of configuration settings for a +// compute provider and scaling for a set of TaskQueue types. +type ComputeConfigScalingGroup struct { + // TaskQueueTypes is the set of task queue types this scaling group serves. + TaskQueueTypes []TaskQueueType + // Provider contains the optional compute provider configuration settings. + Provider *ComputeProvider + // Scaler contains the optional compute scaler configuration settings. + Scaler *ComputeScaler +} + +// ComputeProvider describes configuration settings for a compute provider. +type ComputeProvider struct { + // Type of the compute provider. This string is implementation-specific and + // can be used by implementations to understand how to interpret the + // contents of the details field. + Type string + // Details contains an implementation-specific thing that describes the + // compute provider configuration settings. + Details map[string]any + // NexusEndpoint points at the Nexus service, if the compute provider is a + // Nexus service. + NexusEndpoint string +} + +// ComputeScaler describes configuration settings for a scaler of compute. +type ComputeScaler struct { + // Type of the compute scaler. This string is implementation-specific and + // can be used by implementations to understand how to interpret the + // contents of the details field. + Type string + // Details contains an implementation-specific thing that describes the + // compute scaler configuration settings. + Details map[string]any +} diff --git a/internal/internal_compute.go b/internal/internal_compute.go new file mode 100644 index 000000000..bbf857af7 --- /dev/null +++ b/internal/internal_compute.go @@ -0,0 +1,246 @@ +package internal + +import ( + "fmt" + + "go.temporal.io/api/compute/v1" + enums "go.temporal.io/api/enums/v1" + "go.temporal.io/sdk/converter" +) + +func computeConfigFromProto( + dc converter.DataConverter, + msg *compute.ComputeConfig, +) (*ComputeConfig, error) { + if msg == nil { + return nil, nil + } + + res := &ComputeConfig{} + groups := make(map[string]*ComputeConfigScalingGroup, len(msg.ScalingGroups)) + for groupName, group := range msg.ScalingGroups { + g, err := computeConfigScalingGroupFromProto(dc, group) + if err != nil { + return nil, err + } + groups[groupName] = g + } + res.ScalingGroups = groups + return res, nil +} + +func validateComputeConfig(cfg *ComputeConfig) error { + if cfg == nil { + return nil + } + for groupName, group := range cfg.ScalingGroups { + err := validateComputeConfigScalingGroup(groupName, group) + if err != nil { + return err + } + } + return nil +} + +func computeConfigToProto( + dc converter.DataConverter, + cc *ComputeConfig, +) (*compute.ComputeConfig, error) { + if cc == nil { + return nil, nil + } + + groups := make( + map[string]*compute.ComputeConfigScalingGroup, + len(cc.ScalingGroups), + ) + for groupName, group := range cc.ScalingGroups { + g, err := computeConfigScalingGroupToProto(dc, group) + if err != nil { + return nil, err + } + groups[groupName] = g + } + return &compute.ComputeConfig{ + ScalingGroups: groups, + }, nil +} + +func computeConfigScalingGroupFromProto( + dc converter.DataConverter, + msg *compute.ComputeConfigScalingGroup, +) (*ComputeConfigScalingGroup, error) { + if msg == nil { + return nil, nil + } + + res := &ComputeConfigScalingGroup{} + msgTQTs := msg.GetTaskQueueTypes() + tqts := make([]TaskQueueType, len(msgTQTs)) + for x, msgTQT := range msgTQTs { + tqts[x] = taskQueueTypeFromProto(msgTQT) + } + res.TaskQueueTypes = tqts + p, err := computeProviderFromProto(dc, msg.GetProvider()) + if err != nil { + return nil, err + } + res.Provider = p + s, err := computeScalerFromProto(dc, msg.GetScaler()) + if err != nil { + return nil, err + } + res.Scaler = s + return res, nil +} + +func validateComputeConfigScalingGroup( + groupName string, + cfg *ComputeConfigScalingGroup, +) error { + if cfg == nil { + return nil + } + p := cfg.Provider + if p != nil { + if p.Type == "" { + return fmt.Errorf( + "compute config scaling group %s missing provider type", + groupName, + ) + } + if p.Details == nil { + return fmt.Errorf( + "compute config scaling group %s missing provider details", + groupName, + ) + } + } + s := cfg.Scaler + if s != nil { + if s.Type == "" { + return fmt.Errorf( + "compute config scaling group %s missing scaler type", + groupName, + ) + } + if s.Details == nil { + return fmt.Errorf( + "compute config scaling group %s missing scaler details", + groupName, + ) + } + } + return nil +} + +func computeConfigScalingGroupToProto( + dc converter.DataConverter, + group *ComputeConfigScalingGroup, +) (*compute.ComputeConfigScalingGroup, error) { + if group == nil { + return nil, nil + } + + tqts := group.TaskQueueTypes + msgTQTs := make([]enums.TaskQueueType, len(tqts)) + for x, tqt := range tqts { + msgTQTs[x] = taskQueueTypeToProto(tqt) + } + provider, err := computeProviderToProto(dc, group.Provider) + if err != nil { + return nil, err + } + scaler, err := computeScalerToProto(dc, group.Scaler) + if err != nil { + return nil, err + } + return &compute.ComputeConfigScalingGroup{ + TaskQueueTypes: msgTQTs, + Provider: provider, + Scaler: scaler, + }, nil +} + +func computeProviderToProto( + dc converter.DataConverter, + p *ComputeProvider, +) (*compute.ComputeProvider, error) { + if p == nil { + return nil, nil + } + res := &compute.ComputeProvider{ + Type: p.Type, + } + details := p.Details + enc, err := dc.ToPayload(&details) + if err != nil { + return nil, err + } + res.Details = enc + return res, nil +} + +func computeScalerToProto( + dc converter.DataConverter, + s *ComputeScaler, +) (*compute.ComputeScaler, error) { + if s == nil { + return nil, nil + } + res := &compute.ComputeScaler{ + Type: s.Type, + } + details := s.Details + enc, err := dc.ToPayload(&details) + if err != nil { + return nil, err + } + res.Details = enc + return res, nil +} + +func computeProviderFromProto( + dc converter.DataConverter, + msg *compute.ComputeProvider, +) (*ComputeProvider, error) { + if msg == nil { + return nil, nil + } + + res := &ComputeProvider{ + Type: msg.GetType(), + NexusEndpoint: msg.GetNexusEndpoint(), + } + details := make(map[string]any) + if details != nil { + err := dc.FromPayload(msg.GetDetails(), details) + if err != nil { + return nil, err + } + res.Details = details + } + return res, nil +} + +func computeScalerFromProto( + dc converter.DataConverter, + msg *compute.ComputeScaler, +) (*ComputeScaler, error) { + if msg == nil { + return nil, nil + } + + res := &ComputeScaler{ + Type: msg.GetType(), + } + details := make(map[string]any) + if details != nil { + err := dc.FromPayload(msg.GetDetails(), details) + if err != nil { + return nil, err + } + res.Details = details + } + return res, nil +} diff --git a/internal/internal_worker_deployment_client.go b/internal/internal_worker_deployment_client.go index 1193fe638..3acbec41a 100644 --- a/internal/internal_worker_deployment_client.go +++ b/internal/internal_worker_deployment_client.go @@ -8,6 +8,7 @@ import ( "time" "go.temporal.io/api/common/v1" + "go.temporal.io/api/compute/v1" "go.temporal.io/api/deployment/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/converter" @@ -129,9 +130,9 @@ func workerDeploymentVersionSummariesFromProto(summaries []*deployment.WorkerDep return result } -func workerDeploymentInfoFromProto(info *deployment.WorkerDeploymentInfo) WorkerDeploymentInfo { +func workerDeploymentInfoFromProto(info *deployment.WorkerDeploymentInfo) (WorkerDeploymentInfo, error) { if info == nil { - return WorkerDeploymentInfo{} + return WorkerDeploymentInfo{}, nil } return WorkerDeploymentInfo{ @@ -141,7 +142,7 @@ func workerDeploymentInfoFromProto(info *deployment.WorkerDeploymentInfo) Worker RoutingConfig: workerDeploymentRoutingConfigFromProto(info.RoutingConfig), LastModifierIdentity: info.LastModifierIdentity, ManagerIdentity: info.ManagerIdentity, - } + }, nil } @@ -186,12 +187,69 @@ func (h *workerDeploymentHandleImpl) Describe(ctx context.Context, options Worke return WorkerDeploymentDescribeResponse{}, err } + info, err := workerDeploymentInfoFromProto(resp.GetWorkerDeploymentInfo()) + if err != nil { + return WorkerDeploymentDescribeResponse{}, err + } + return WorkerDeploymentDescribeResponse{ ConflictToken: resp.GetConflictToken(), - Info: workerDeploymentInfoFromProto(resp.GetWorkerDeploymentInfo()), + Info: info, }, nil } +func (h *workerDeploymentHandleImpl) CreateVersion( + ctx context.Context, + options WorkerDeploymentCreateVersionOptions, +) (WorkerDeploymentCreateVersionResponse, error) { + if err := h.validate(); err != nil { + return WorkerDeploymentCreateVersionResponse{}, err + } + if err := h.workflowClient.ensureInitialized(ctx); err != nil { + return WorkerDeploymentCreateVersionResponse{}, err + } + + identity := h.workflowClient.identity + if options.Identity != "" { + identity = options.Identity + } + + dc := WithContext(ctx, h.workflowClient.dataConverter) + if dc == nil { + dc = converter.GetDefaultDataConverter() + } + + var computeConfig *compute.ComputeConfig + if options.ComputeConfig != nil { + if err := validateComputeConfig(options.ComputeConfig); err != nil { + return WorkerDeploymentCreateVersionResponse{}, err + } + cc, err := computeConfigToProto(dc, options.ComputeConfig) + if err != nil { + return WorkerDeploymentCreateVersionResponse{}, err + } + computeConfig = cc + } + + request := &workflowservice.CreateWorkerDeploymentVersionRequest{ + Namespace: h.workflowClient.namespace, + DeploymentVersion: &deployment.WorkerDeploymentVersion{ + DeploymentName: h.Name, + BuildId: h.buildIdToVersionStr(options.BuildID), + }, + Identity: identity, + ComputeConfig: computeConfig, + } + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + + _, err := h.workflowClient.workflowService.CreateWorkerDeploymentVersion(grpcCtx, request) + if err != nil { + return WorkerDeploymentCreateVersionResponse{}, err + } + return WorkerDeploymentCreateVersionResponse{}, nil +} + func (h *workerDeploymentHandleImpl) SetCurrentVersion(ctx context.Context, options WorkerDeploymentSetCurrentVersionOptions) (WorkerDeploymentSetCurrentVersionResponse, error) { if err := h.validate(); err != nil { return WorkerDeploymentSetCurrentVersionResponse{}, err @@ -339,9 +397,9 @@ func workerDeploymentDrainageInfoFromProto(drainageInfo *deployment.VersionDrain } } -func workerDeploymentVersionInfoFromProto(info *deployment.WorkerDeploymentVersionInfo) WorkerDeploymentVersionInfo { +func workerDeploymentVersionInfoFromProto(info *deployment.WorkerDeploymentVersionInfo) (WorkerDeploymentVersionInfo, error) { if info == nil { - return WorkerDeploymentVersionInfo{} + return WorkerDeploymentVersionInfo{}, nil } //lint:ignore SA1019 ignore deprecated versioning APIs version := workerDeploymentVersionFromProtoOrString(info.DeploymentVersion, info.Version) @@ -349,6 +407,13 @@ func workerDeploymentVersionInfoFromProto(info *deployment.WorkerDeploymentVersi // Should never happen unless server is sending junk data version = &WorkerDeploymentVersion{} } + + dc := converter.GetDefaultDataConverter() + + cc, err := computeConfigFromProto(dc, info.ComputeConfig) + if err != nil { + return WorkerDeploymentVersionInfo{}, err + } return WorkerDeploymentVersionInfo{ Version: *version, CreateTime: safeAsTime(info.CreateTime), @@ -359,7 +424,8 @@ func workerDeploymentVersionInfoFromProto(info *deployment.WorkerDeploymentVersi TaskQueuesInfos: workerDeploymentTaskQueuesInfosFromProto(info.TaskQueueInfos), DrainageInfo: workerDeploymentDrainageInfoFromProto(info.DrainageInfo), Metadata: info.Metadata.GetEntries(), - } + ComputeConfig: cc, + }, nil } func (h *workerDeploymentHandleImpl) DescribeVersion(ctx context.Context, options WorkerDeploymentDescribeVersionOptions) (WorkerDeploymentVersionDescription, error) { @@ -390,8 +456,13 @@ func (h *workerDeploymentHandleImpl) DescribeVersion(ctx context.Context, option return WorkerDeploymentVersionDescription{}, err } + info, err := workerDeploymentVersionInfoFromProto(resp.GetWorkerDeploymentVersionInfo()) + if err != nil { + return WorkerDeploymentVersionDescription{}, err + } + return WorkerDeploymentVersionDescription{ - Info: workerDeploymentVersionInfoFromProto(resp.GetWorkerDeploymentVersionInfo()), + Info: info, }, nil } diff --git a/internal/worker_deployment_client.go b/internal/worker_deployment_client.go index f89885dee..e4a5117fb 100644 --- a/internal/worker_deployment_client.go +++ b/internal/worker_deployment_client.go @@ -119,6 +119,34 @@ type ( Info WorkerDeploymentInfo } + // WorkerDeploymentCreateVersionOptions provides options for + // [WorkerDeploymentClient.GetHandler.CreateVersion]. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.WorkerDeploymentCreateVersionOptions] + WorkerDeploymentCreateVersionOptions struct { + // BuildID - The Build ID within this deployment to create as a version. + BuildID string + + // ComputeConfig contains the optional compute configuration to use for + // the WorkerDeployment. + ComputeConfig *ComputeConfig + + // Identity - The identity of the client who initiated this request. + // + // Optional: defaults to the identity of the underlying workflow client. + Identity string + } + + // WorkerDeploymentCreateVersionResponse is the response for + // [WorkerDeploymentHandle.CreateVersion]. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.WorkerDeploymentCreateVersionResponse] + WorkerDeploymentCreateVersionResponse struct{} + // WorkerDeploymentSetCurrentVersionOptions provides options for // [WorkerDeploymentHandle.SetCurrentVersion]. // @@ -180,7 +208,7 @@ type ( ConflictToken []byte // PreviousVersion - The Version that was current before executing this operation, if any. - // + // // Deprecated: in favor of API idempotency. Use `Describe` before this API to get the previous // state. Pass the `ConflictToken` returned by `Describe` to this API to avoid race conditions. PreviousVersion *WorkerDeploymentVersion @@ -392,6 +420,10 @@ type ( // Metadata - A user-defined set of key-values attached to this Version. Metadata map[string]*commonpb.Payload + + // ComputeConfig contains the optional compute configuration to use for + // the WorkerDeployment. + ComputeConfig *ComputeConfig } // WorkerDeploymentVersionDescription is the response for @@ -487,6 +519,12 @@ type ( // NOTE: Experimental Describe(ctx context.Context, options WorkerDeploymentDescribeOptions) (WorkerDeploymentDescribeResponse, error) + // CreateVersion creates a new WorkerDeploymentVersion in this Worker + // Deployment. + // + // NOTE: Experimental + CreateVersion(ctx context.Context, options WorkerDeploymentCreateVersionOptions) (WorkerDeploymentCreateVersionResponse, error) + // SetCurrentVersion changes the Current Version for this Worker Deployment. // // It also unsets the Ramping Version when it matches the Version being set as Current.