Skip to content

Commit aaded97

Browse files
author
Josh Berry
authored
Merge main into next-server (#768)
This PR brings all the latest `main` changes to the `next-server` branch.
2 parents 122ac6f + 16302b6 commit aaded97

11 files changed

Lines changed: 387 additions & 50 deletions

temporalcli/client.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,20 +39,25 @@ func (c *ClientOptions) dialClient(cctx *CommandContext) (client.Client, error)
3939

4040
// Headers
4141
if len(c.GrpcMeta) > 0 {
42-
headers := make(stringMapHeadersProvider, len(c.GrpcMeta))
43-
for _, kv := range c.GrpcMeta {
44-
pieces := strings.SplitN(kv, "=", 2)
45-
if len(pieces) != 2 {
46-
return nil, fmt.Errorf("gRPC meta of %q does not have '='", kv)
47-
}
48-
headers[pieces[0]] = pieces[1]
42+
headers, err := NewStringMapHeaderProvider(c.GrpcMeta)
43+
if err != nil {
44+
return nil, fmt.Errorf("grpc-meta %s", err)
4945
}
5046
clientOptions.HeadersProvider = headers
5147
}
5248

5349
// Remote codec
5450
if c.CodecEndpoint != "" {
55-
interceptor, err := payloadCodecInterceptor(c.Namespace, c.CodecEndpoint, c.CodecAuth)
51+
codecHeaders, err := NewStringMapHeaderProvider(c.CodecHeader)
52+
if err != nil {
53+
return nil, fmt.Errorf("codec-header %s", err)
54+
}
55+
56+
if c.CodecAuth != "" {
57+
codecHeaders["Authorization"] = c.CodecAuth
58+
}
59+
60+
interceptor, err := payloadCodecInterceptor(c.Namespace, c.CodecEndpoint, codecHeaders)
5661
if err != nil {
5762
return nil, fmt.Errorf("failed creating payload codec interceptor: %w", err)
5863
}
@@ -145,16 +150,16 @@ func fixedHeaderOverrideInterceptor(
145150
return invoker(ctx, method, req, reply, cc, opts...)
146151
}
147152

148-
func payloadCodecInterceptor(namespace, codecEndpoint, codecAuth string) (grpc.UnaryClientInterceptor, error) {
153+
func payloadCodecInterceptor(namespace, codecEndpoint string, codecHeaders stringMapHeadersProvider) (grpc.UnaryClientInterceptor, error) {
149154
codecEndpoint = strings.ReplaceAll(codecEndpoint, "{namespace}", namespace)
150155

151156
payloadCodec := converter.NewRemotePayloadCodec(
152157
converter.RemotePayloadCodecOptions{
153158
Endpoint: codecEndpoint,
154159
ModifyRequest: func(req *http.Request) error {
155160
req.Header.Set("X-Namespace", namespace)
156-
if codecAuth != "" {
157-
req.Header.Set("Authorization", codecAuth)
161+
for headerName, headerValue := range codecHeaders {
162+
req.Header.Set(headerName, headerValue)
158163
}
159164
return nil
160165
},
@@ -185,6 +190,18 @@ func (s stringMapHeadersProvider) GetHeaders(context.Context) (map[string]string
185190
return s, nil
186191
}
187192

193+
func NewStringMapHeaderProvider(config []string) (stringMapHeadersProvider, error) {
194+
headers := make(stringMapHeadersProvider, len(config))
195+
for _, kv := range config {
196+
pieces := strings.SplitN(kv, "=", 2)
197+
if len(pieces) != 2 {
198+
return nil, fmt.Errorf("%q does not have '='", kv)
199+
}
200+
headers[pieces[0]] = pieces[1]
201+
}
202+
return headers, nil
203+
}
204+
188205
var DataConverterWithRawValue = converter.NewCompositeDataConverter(
189206
rawValuePayloadConverter{},
190207
converter.NewNilPayloadConverter(),

temporalcli/commands.gen.go

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type ClientOptions struct {
3232
TlsServerName string
3333
CodecEndpoint string
3434
CodecAuth string
35+
CodecHeader []string
3536
}
3637

3738
func (v *ClientOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
@@ -41,7 +42,7 @@ func (v *ClientOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
4142
cctx.BindFlagEnvVar(f.Lookup("namespace"), "TEMPORAL_NAMESPACE")
4243
f.StringVar(&v.ApiKey, "api-key", "", "API key for request.")
4344
cctx.BindFlagEnvVar(f.Lookup("api-key"), "TEMPORAL_API_KEY")
44-
f.StringArrayVar(&v.GrpcMeta, "grpc-meta", nil, "HTTP headers for requests. format as a `KEY=VALUE` pair May be passed multiple times to set multiple headers.")
45+
f.StringArrayVar(&v.GrpcMeta, "grpc-meta", nil, "HTTP headers for requests. Format as a `KEY=VALUE` pair. May be passed multiple times to set multiple headers.")
4546
f.BoolVar(&v.Tls, "tls", false, "Enable base TLS encryption. Does not have additional options like mTLS or client certs.")
4647
cctx.BindFlagEnvVar(f.Lookup("tls"), "TEMPORAL_TLS")
4748
f.StringVar(&v.TlsCertPath, "tls-cert-path", "", "Path to x509 certificate. Can't be used with --tls-cert-data.")
@@ -64,6 +65,7 @@ func (v *ClientOptions) buildFlags(cctx *CommandContext, f *pflag.FlagSet) {
6465
cctx.BindFlagEnvVar(f.Lookup("codec-endpoint"), "TEMPORAL_CODEC_ENDPOINT")
6566
f.StringVar(&v.CodecAuth, "codec-auth", "", "Authorization header for Codec Server requests.")
6667
cctx.BindFlagEnvVar(f.Lookup("codec-auth"), "TEMPORAL_CODEC_AUTH")
68+
f.StringArrayVar(&v.CodecHeader, "codec-header", nil, "HTTP headers for requests to codec server. Format as a `KEY=VALUE` pair. May be passed multiple times to set multiple headers.")
6769
}
6870

6971
type OverlapPolicyOptions struct {
@@ -2861,6 +2863,7 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) *
28612863
s.Command.AddCommand(&NewTemporalWorkflowResultCommand(cctx, &s).Command)
28622864
s.Command.AddCommand(&NewTemporalWorkflowShowCommand(cctx, &s).Command)
28632865
s.Command.AddCommand(&NewTemporalWorkflowSignalCommand(cctx, &s).Command)
2866+
s.Command.AddCommand(&NewTemporalWorkflowSignalWithStartCommand(cctx, &s).Command)
28642867
s.Command.AddCommand(&NewTemporalWorkflowStackCommand(cctx, &s).Command)
28652868
s.Command.AddCommand(&NewTemporalWorkflowStartCommand(cctx, &s).Command)
28662869
s.Command.AddCommand(&NewTemporalWorkflowTerminateCommand(cctx, &s).Command)
@@ -3055,6 +3058,7 @@ type TemporalWorkflowListCommand struct {
30553058
Query string
30563059
Archived bool
30573060
Limit int
3061+
PageSize int
30583062
}
30593063

30603064
func NewTemporalWorkflowListCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowListCommand {
@@ -3072,6 +3076,7 @@ func NewTemporalWorkflowListCommand(cctx *CommandContext, parent *TemporalWorkfl
30723076
s.Command.Flags().StringVarP(&s.Query, "query", "q", "", "Content for an SQL-like `QUERY` List Filter.")
30733077
s.Command.Flags().BoolVar(&s.Archived, "archived", false, "Limit output to archived Workflow Executions.")
30743078
s.Command.Flags().IntVar(&s.Limit, "limit", 0, "Maximum number of Workflow Executions to display.")
3079+
s.Command.Flags().IntVar(&s.PageSize, "page-size", 0, "Maximum number of Workflow Executions to fetch at a time from the server.")
30753080
s.Command.Run = func(c *cobra.Command, args []string) {
30763081
if err := s.run(cctx, args); err != nil {
30773082
cctx.Options.Fail(err)
@@ -3288,6 +3293,52 @@ func NewTemporalWorkflowSignalCommand(cctx *CommandContext, parent *TemporalWork
32883293
return &s
32893294
}
32903295

3296+
type TemporalWorkflowSignalWithStartCommand struct {
3297+
Parent *TemporalWorkflowCommand
3298+
Command cobra.Command
3299+
SharedWorkflowStartOptions
3300+
WorkflowStartOptions
3301+
PayloadInputOptions
3302+
SignalName string
3303+
SignalInput []string
3304+
SignalInputFile []string
3305+
SignalInputMeta []string
3306+
SignalInputBase64 bool
3307+
}
3308+
3309+
func NewTemporalWorkflowSignalWithStartCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowSignalWithStartCommand {
3310+
var s TemporalWorkflowSignalWithStartCommand
3311+
s.Parent = parent
3312+
s.Command.DisableFlagsInUseLine = true
3313+
s.Command.Use = "signal-with-start [flags]"
3314+
s.Command.Short = "Send a message to a Workflow Execution, start the execution if it isn't running"
3315+
if hasHighlighting {
3316+
s.Command.Long = "Send an asynchronous notification (Signal) to a Workflow Execution.\nIf the Workflow Execution is not running or is not found, it starts the \nworkflow then sends the signal.\n\n\x1b[1mtemporal workflow signal-with-start \\\n --signal-name YourSignal \\\n --signal-input '{\"some-key\": \"some-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --input '{\"some-key\": \"some-value\"}'\x1b[0m"
3317+
} else {
3318+
s.Command.Long = "Send an asynchronous notification (Signal) to a Workflow Execution.\nIf the Workflow Execution is not running or is not found, it starts the \nworkflow then sends the signal.\n\n```\ntemporal workflow signal-with-start \\\n --signal-name YourSignal \\\n --signal-input '{\"some-key\": \"some-value\"}' \\\n --workflow-id YourWorkflowId \\\n --type YourWorkflowType \\\n --task-queue YourTaskQueue \\\n --input '{\"some-key\": \"some-value\"}'\n```"
3319+
}
3320+
s.Command.Args = cobra.NoArgs
3321+
s.Command.Flags().StringVar(&s.SignalName, "signal-name", "", "Signal name. Required. Aliased as \"--signal-type\".")
3322+
_ = cobra.MarkFlagRequired(s.Command.Flags(), "signal-name")
3323+
s.Command.Flags().StringArrayVar(&s.SignalInput, "signal-input", nil, "Signal input value. Use JSON content or set --signal-input-meta to override. Can't be combined with --signal-input-file. Can be passed multiple times to pass multiple arguments.")
3324+
s.Command.Flags().StringArrayVar(&s.SignalInputFile, "signal-input-file", nil, "A path or paths for input file(s). Use JSON content or set --signal-input-meta to override. Can't be combined with --signal-input. Can be passed multiple times to pass multiple arguments.")
3325+
s.Command.Flags().StringArrayVar(&s.SignalInputMeta, "signal-input-meta", nil, "Input signal payload metadata as a `KEY=VALUE` pair. When the KEY is \"encoding\", this overrides the default (\"json/plain\"). Can be passed multiple times.")
3326+
s.Command.Flags().BoolVar(&s.SignalInputBase64, "signal-input-base64", false, "Assume signal inputs are base64-encoded and attempt to decode them.")
3327+
s.SharedWorkflowStartOptions.buildFlags(cctx, s.Command.Flags())
3328+
s.WorkflowStartOptions.buildFlags(cctx, s.Command.Flags())
3329+
s.PayloadInputOptions.buildFlags(cctx, s.Command.Flags())
3330+
s.Command.Flags().SetNormalizeFunc(aliasNormalizer(map[string]string{
3331+
"name": "type",
3332+
"signal-type": "signal-name",
3333+
}))
3334+
s.Command.Run = func(c *cobra.Command, args []string) {
3335+
if err := s.run(cctx, args); err != nil {
3336+
cctx.Options.Fail(err)
3337+
}
3338+
}
3339+
return &s
3340+
}
3341+
32913342
type TemporalWorkflowStackCommand struct {
32923343
Parent *TemporalWorkflowCommand
32933344
Command cobra.Command

temporalcli/commands.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ import (
2323
"github.com/temporalio/cli/temporalcli/internal/printer"
2424
"github.com/temporalio/ui-server/v2/server/version"
2525
"go.temporal.io/api/common/v1"
26+
commonpb "go.temporal.io/api/common/v1"
2627
"go.temporal.io/api/failure/v1"
2728
"go.temporal.io/api/temporalproto"
29+
"go.temporal.io/sdk/converter"
2830
"go.temporal.io/sdk/temporal"
2931
"go.temporal.io/server/common/headers"
3032
"google.golang.org/grpc"
@@ -589,3 +591,20 @@ func fromApplicationError(err *temporal.ApplicationError) (*structuredError, err
589591
Details: deets,
590592
}, nil
591593
}
594+
595+
func encodeMapToPayloads(in map[string]any) (map[string]*commonpb.Payload, error) {
596+
if len(in) == 0 {
597+
return nil, nil
598+
}
599+
// search attributes always use default dataconverter
600+
dc := converter.GetDefaultDataConverter()
601+
out := make(map[string]*commonpb.Payload, len(in))
602+
for key, val := range in {
603+
payload, err := dc.ToPayload(val)
604+
if err != nil {
605+
return nil, err
606+
}
607+
out[key] = payload
608+
}
609+
return out, nil
610+
}

temporalcli/commands.schedule.go

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
schedpb "go.temporal.io/api/schedule/v1"
1717
"go.temporal.io/api/workflowservice/v1"
1818
"go.temporal.io/sdk/client"
19-
"go.temporal.io/sdk/converter"
2019
"go.temporal.io/server/common/primitives/timestamp"
2120
)
2221

@@ -257,7 +256,7 @@ func toScheduleAction(sw *SharedWorkflowStartOptions, i *PayloadInputOptions) (c
257256
if err != nil {
258257
return nil, err
259258
}
260-
untypedSearchAttributes, err := encodeSearchAttributesToPayloads(opts.SearchAttributes)
259+
untypedSearchAttributes, err := encodeMapToPayloads(opts.SearchAttributes)
261260
if err != nil {
262261
return nil, err
263262
}
@@ -605,20 +604,3 @@ func formatDuration(d time.Duration) string {
605604
s = strings.TrimSpace(s)
606605
return s
607606
}
608-
609-
func encodeSearchAttributesToPayloads(in map[string]any) (map[string]*commonpb.Payload, error) {
610-
if len(in) == 0 {
611-
return nil, nil
612-
}
613-
// search attributes always use default dataconverter
614-
dc := converter.GetDefaultDataConverter()
615-
out := make(map[string]*commonpb.Payload, len(in))
616-
for key, val := range in {
617-
payload, err := dc.ToPayload(val)
618-
if err != nil {
619-
return nil, err
620-
}
621-
out[key] = payload
622-
}
623-
return out, nil
624-
}

temporalcli/commands.worker.deployment_test.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,6 @@ type jsonDeploymentVersionInfoType struct {
6161
Metadata map[string]*common.Payload `json:"metadata"`
6262
}
6363

64-
/*
65-
type jsonDeploymentReachabilityInfoType struct {
66-
DeploymentInfo jsonDeploymentInfoType `json:"deploymentInfo"`
67-
Reachability string `json:"reachability"`
68-
LastUpdateTime time.Time `json:"lastUpdateTime"`
69-
}
70-
71-
type jsonDeploymentListEntryType struct {
72-
Deployment jsonDeploymentType `json:"deployment"`
73-
CreateTime time.Time `json:"createTime"`
74-
IsCurrent bool `json:"isCurrent"`
75-
}
76-
*/
77-
7864
func (s *SharedServerSuite) TestDeployment_Set_Current_Version() {
7965
deploymentName := uuid.NewString()
8066
buildId := uuid.NewString()

temporalcli/commands.workflow_exec.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,18 @@ import (
1212
"time"
1313

1414
"github.com/fatih/color"
15+
"github.com/google/uuid"
1516
"github.com/temporalio/cli/temporalcli/internal/printer"
1617
"go.temporal.io/api/common/v1"
18+
commonpb "go.temporal.io/api/common/v1"
1719
"go.temporal.io/api/enums/v1"
20+
enumspb "go.temporal.io/api/enums/v1"
1821
"go.temporal.io/api/history/v1"
22+
taskqueuepb "go.temporal.io/api/taskqueue/v1"
1923
"go.temporal.io/api/temporalproto"
24+
"go.temporal.io/api/workflowservice/v1"
2025
"go.temporal.io/sdk/client"
26+
"google.golang.org/protobuf/types/known/durationpb"
2127
)
2228

2329
func (c *TemporalWorkflowStartCommand) run(cctx *CommandContext, args []string) error {
@@ -92,6 +98,112 @@ func (c *TemporalWorkflowExecuteCommand) run(cctx *CommandContext, args []string
9298
return err
9399
}
94100

101+
func (c *TemporalWorkflowSignalWithStartCommand) run(cctx *CommandContext, _ []string) error {
102+
if c.SharedWorkflowStartOptions.WorkflowId == "" {
103+
return fmt.Errorf("--workflow-id flag must be provided")
104+
}
105+
106+
cl, err := c.Parent.ClientOptions.dialClient(cctx)
107+
if err != nil {
108+
return err
109+
}
110+
defer cl.Close()
111+
112+
wfStartOpts, err := buildStartOptions(&c.SharedWorkflowStartOptions, &c.WorkflowStartOptions)
113+
if err != nil {
114+
return err
115+
}
116+
wfInput, err := c.buildRawInputPayloads()
117+
if err != nil {
118+
return err
119+
}
120+
121+
signalPayloadInputOpts := PayloadInputOptions{
122+
Input: c.SignalInput,
123+
InputFile: c.SignalInputFile,
124+
InputMeta: c.InputMeta,
125+
InputBase64: c.SignalInputBase64,
126+
}
127+
signalInput, err := signalPayloadInputOpts.buildRawInputPayloads()
128+
if err != nil {
129+
return err
130+
}
131+
132+
var retryPolicy *common.RetryPolicy
133+
if wfStartOpts.RetryPolicy != nil {
134+
retryPolicy = &commonpb.RetryPolicy{
135+
MaximumInterval: durationpb.New(wfStartOpts.RetryPolicy.MaximumInterval),
136+
InitialInterval: durationpb.New(wfStartOpts.RetryPolicy.InitialInterval),
137+
BackoffCoefficient: wfStartOpts.RetryPolicy.BackoffCoefficient,
138+
MaximumAttempts: wfStartOpts.RetryPolicy.MaximumAttempts,
139+
NonRetryableErrorTypes: wfStartOpts.RetryPolicy.NonRetryableErrorTypes,
140+
}
141+
}
142+
var memo *common.Memo
143+
if wfStartOpts.Memo != nil {
144+
fields, err := encodeMapToPayloads(wfStartOpts.Memo)
145+
if err != nil {
146+
return err
147+
}
148+
memo = &common.Memo{Fields: fields}
149+
}
150+
var searchAttr *common.SearchAttributes
151+
if wfStartOpts.SearchAttributes != nil {
152+
fields, err := encodeMapToPayloads(wfStartOpts.SearchAttributes)
153+
if err != nil {
154+
return err
155+
}
156+
searchAttr = &common.SearchAttributes{IndexedFields: fields}
157+
}
158+
159+
if wfStartOpts.VersioningOverride != (client.VersioningOverride{}) {
160+
cctx.Logger.Warn("VersioningOverride is not configured for the signal-with-start command")
161+
}
162+
163+
// We have to use the raw signal service call here because the Go SDK's
164+
// signal-with-start call doesn't accept multiple signal arguments.
165+
resp, err := cl.WorkflowService().SignalWithStartWorkflowExecution(
166+
cctx,
167+
&workflowservice.SignalWithStartWorkflowExecutionRequest{
168+
Namespace: c.Parent.Namespace,
169+
RequestId: uuid.NewString(),
170+
WorkflowId: c.WorkflowId,
171+
WorkflowType: &common.WorkflowType{Name: c.Type},
172+
TaskQueue: &taskqueuepb.TaskQueue{Name: c.TaskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
173+
Input: wfInput,
174+
WorkflowExecutionTimeout: durationpb.New(wfStartOpts.WorkflowExecutionTimeout),
175+
WorkflowRunTimeout: durationpb.New(wfStartOpts.WorkflowRunTimeout),
176+
WorkflowTaskTimeout: durationpb.New(wfStartOpts.WorkflowTaskTimeout),
177+
SignalName: c.SignalName,
178+
SignalInput: signalInput,
179+
Identity: clientIdentity(),
180+
RetryPolicy: retryPolicy,
181+
CronSchedule: wfStartOpts.CronSchedule,
182+
Memo: memo,
183+
SearchAttributes: searchAttr,
184+
WorkflowIdReusePolicy: wfStartOpts.WorkflowIDReusePolicy,
185+
WorkflowIdConflictPolicy: wfStartOpts.WorkflowIDConflictPolicy,
186+
},
187+
)
188+
if err != nil {
189+
return err
190+
}
191+
cctx.Printer.Println(color.MagentaString("Running execution:"))
192+
return cctx.Printer.PrintStructured(struct {
193+
WorkflowId string `json:"workflowId"`
194+
RunId string `json:"runId"`
195+
Type string `json:"type"`
196+
Namespace string `json:"namespace"`
197+
TaskQueue string `json:"taskQueue"`
198+
}{
199+
WorkflowId: c.WorkflowId,
200+
RunId: resp.RunId,
201+
Type: c.Type,
202+
Namespace: c.Parent.Namespace,
203+
TaskQueue: c.TaskQueue,
204+
}, printer.StructuredOptions{})
205+
}
206+
95207
type workflowJSONResult struct {
96208
WorkflowId string `json:"workflowId"`
97209
RunId string `json:"runId"`

0 commit comments

Comments
 (0)