Skip to content

Commit 69dfddf

Browse files
authored
CLI Support for UpdateTaskQueueConfig's fairness weight overrides (#987)
## What was changed Add support for setting fairness key weight overrides. ## Why? We should expose this feature to users. ## Checklist 2. How was this tested: Newly added tests and tested against a local dev server 3. Any docs updates needed? Yes; https://docs.temporal.io/cli/task-queue
1 parent 61044f2 commit 69dfddf

4 files changed

Lines changed: 367 additions & 53 deletions

File tree

internal/temporalcli/commands.gen.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2537,6 +2537,8 @@ type TemporalTaskQueueConfigSetCommand struct {
25372537
QueueRpsLimitReason string
25382538
FairnessKeyRpsLimitDefault string
25392539
FairnessKeyRpsLimitReason string
2540+
FairnessKeyWeight []string
2541+
FairnessKeyWeightClearAll bool
25402542
}
25412543

25422544
func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *TemporalTaskQueueConfigCommand) *TemporalTaskQueueConfigSetCommand {
@@ -2546,9 +2548,9 @@ func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *Temporal
25462548
s.Command.Use = "set [flags]"
25472549
s.Command.Short = "Set Task Queue configuration"
25482550
if hasHighlighting {
2549-
s.Command.Long = "Update configuration settings for a Task Queue.\n\n\x1b[1mtemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit <requests_per_second:float> \\\n --queue-rps-limit-reason <reason_string> \\\n --fairness-key-rps-limit-default <requests_per_second:float> \\\n --fairness-key-rps-limit-reason <reason_string>\x1b[0m\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default"
2551+
s.Command.Long = "Update configuration settings for a Task Queue.\n\n\x1b[1mtemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit <requests_per_second:float> \\\n --queue-rps-limit-reason <reason_string> \\\n --fairness-key-rps-limit-default <requests_per_second:float> \\\n --fairness-key-rps-limit-reason <reason_string> \\\n --fairness-key-weight-set HighPriority=2.0 \\\n --fairness-key-weight-set LowPriority=0.5\x1b[0m\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n- Fairness key weight overrides: Set custom weights for specific fairness keys.\n Weights control the relative share of capacity each key receives.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default\nTo unset specific fairness weights, use --fairness-key-weight-unset <key>\nTo unset all fairness weights, use --fairness-key-weight-unset-all"
25502552
} else {
2551-
s.Command.Long = "Update configuration settings for a Task Queue.\n\n```\ntemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit <requests_per_second:float> \\\n --queue-rps-limit-reason <reason_string> \\\n --fairness-key-rps-limit-default <requests_per_second:float> \\\n --fairness-key-rps-limit-reason <reason_string>\n```\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default"
2553+
s.Command.Long = "Update configuration settings for a Task Queue.\n\n```\ntemporal task-queue config set \\\n --task-queue YourTaskQueue \\\n --task-queue-type activity \\\n --namespace YourNamespace \\\n --queue-rps-limit <requests_per_second:float> \\\n --queue-rps-limit-reason <reason_string> \\\n --fairness-key-rps-limit-default <requests_per_second:float> \\\n --fairness-key-rps-limit-reason <reason_string> \\\n --fairness-key-weight-set HighPriority=2.0 \\\n --fairness-key-weight-set LowPriority=0.5\n```\n\nThis command supports updating:\n- Queue rate limits: Controls the overall rate limit of the task queue.\n This setting overrides the worker rate limit if set.\n Unless modified, this is the system-defined rate limit.\n- Fairness key rate limit defaults: Sets default rate limits for fairness keys.\n If set, each individual fairness key will be limited to this rate,\n scaled by the weight of the fairness key.\n- Fairness key weight overrides: Set custom weights for specific fairness keys.\n Weights control the relative share of capacity each key receives.\n\nTo unset a rate limit, pass in 'default', for example: --queue-rps-limit default\nTo unset specific fairness weights, use --fairness-key-weight-unset <key>\nTo unset all fairness weights, use --fairness-key-weight-unset-all"
25522554
}
25532555
s.Command.Args = cobra.NoArgs
25542556
s.Command.Flags().StringVarP(&s.TaskQueue, "task-queue", "t", "", "Task Queue name. Required.")
@@ -2562,6 +2564,8 @@ func NewTemporalTaskQueueConfigSetCommand(cctx *CommandContext, parent *Temporal
25622564
s.Command.Flags().StringVar(&s.FairnessKeyRpsLimitDefault, "fairness-key-rps-limit-default", "", "Fairness key rate limit default in requests per second. Accepts a float; or 'default' to unset.")
25632565
overrideFlagDisplayType(s.Command.Flags().Lookup("fairness-key-rps-limit-default"), "float|default")
25642566
s.Command.Flags().StringVar(&s.FairnessKeyRpsLimitReason, "fairness-key-rps-limit-reason", "", "Reason for fairness key rate limit update.")
2567+
s.Command.Flags().StringArrayVar(&s.FairnessKeyWeight, "fairness-key-weight", nil, "Set or unset fairness key weight overrides in format key=weight or key=default. Use key=weight to set a positive weight value; use key=default to unset. Can be specified multiple times. Example: --fairness-key-weight HighPriority=2.0 --fairness-key-weight LowPriority=default.")
2568+
s.Command.Flags().BoolVar(&s.FairnessKeyWeightClearAll, "fairness-key-weight-clear-all", false, "Unset all fairness key weight overrides. Cannot be used with --fairness-key-weight.")
25652569
s.Command.Run = func(c *cobra.Command, args []string) {
25662570
if err := s.run(cctx, args); err != nil {
25672571
cctx.Options.Fail(err)

internal/temporalcli/commands.taskqueue_config.go

Lines changed: 175 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,15 @@ import (
99
enums "go.temporal.io/api/enums/v1"
1010
"go.temporal.io/api/taskqueue/v1"
1111
"go.temporal.io/api/workflowservice/v1"
12+
"golang.org/x/exp/maps"
1213
)
1314

1415
// TaskQueueConfigGetCommand handles getting task queue configuration
1516
func (c *TemporalTaskQueueConfigGetCommand) run(cctx *CommandContext, args []string) error {
1617
// Validate inputs before dialing client
17-
taskQueue := c.TaskQueue
18+
taskQueue := strings.TrimSpace(c.TaskQueue)
1819
if taskQueue == "" {
19-
return fmt.Errorf("taskQueue name is required")
20+
return fmt.Errorf("task queue name is required and cannot be empty")
2021
}
2122

2223
taskQueueType, err := parseTaskQueueType(c.TaskQueueType.Value)
@@ -56,12 +57,99 @@ func (c *TemporalTaskQueueConfigGetCommand) run(cctx *CommandContext, args []str
5657
return printTaskQueueConfig(cctx, resp.Config)
5758
}
5859

60+
const (
61+
// maxFairnessKeyLength matches server-side limit in temporal/common/priorities/priority_util.go
62+
maxFairnessKeyLength = 64
63+
)
64+
65+
// printZeroRateLimitWarning prints a warning when a rate limit is set to 0
66+
func printZeroRateLimitWarning(p *printer.Printer, limitType string) {
67+
p.Printlnf("WARNING: Setting %s to 0 will STOP ALL TRAFFIC on this task queue.", limitType)
68+
p.Println(" This will prevent any tasks from being dispatched until the limit is changed.")
69+
}
70+
71+
// parseFairnessKeyWeights parses "key=weight" or "key=default" format strings
72+
// Returns separate maps for set and unset operations, or an error if there are duplicate keys, invalid weights, or malformed input
73+
// If inputs is empty, returns nil for both maps
74+
func parseFairnessKeyWeights(inputs []string) (setWeights map[string]float32, unsetKeys []string, err error) {
75+
if len(inputs) == 0 {
76+
return nil, nil, nil
77+
}
78+
79+
setWeights = make(map[string]float32)
80+
unsetKeysMap := make(map[string]bool) // Track unset keys in a map to check for duplicates
81+
seen := make(map[string]bool)
82+
83+
for _, input := range inputs {
84+
parts := strings.SplitN(input, "=", 2)
85+
if len(parts) != 2 {
86+
return nil, nil, fmt.Errorf("invalid format: %q (expected key=weight or key=default)", input)
87+
}
88+
89+
key := parts[0]
90+
if key == "" {
91+
return nil, nil, fmt.Errorf("empty key in: %q", input)
92+
}
93+
94+
// Check for duplicate keys across both set and unset
95+
if seen[key] {
96+
return nil, nil, fmt.Errorf("duplicate fairness key %q specified multiple times", key)
97+
}
98+
seen[key] = true
99+
100+
valueStr := parts[1]
101+
if valueStr == "" {
102+
return nil, nil, fmt.Errorf("empty value for key %q", key)
103+
}
104+
105+
// Check if this is an unset operation (value is "default")
106+
// Do this before validating key length since we don't care about length when unsetting
107+
if strings.EqualFold(valueStr, "default") {
108+
unsetKeysMap[key] = true
109+
continue
110+
}
111+
112+
// Validate key length only for set operations (server enforces 64 byte limit)
113+
if len(key) > maxFairnessKeyLength {
114+
return nil, nil, fmt.Errorf("fairness key %q exceeds maximum length of %d bytes", key, maxFairnessKeyLength)
115+
}
116+
117+
// Parse as weight
118+
weight, err := strconv.ParseFloat(valueStr, 32)
119+
if err != nil {
120+
return nil, nil, fmt.Errorf("invalid weight %q for key %q: must be a number or 'default'", valueStr, key)
121+
}
122+
123+
// Validate weight is positive - server handles clamping to its configured range
124+
if weight <= 0 {
125+
return nil, nil, fmt.Errorf("weight for key %q must be positive", key)
126+
}
127+
128+
setWeights[key] = float32(weight)
129+
}
130+
131+
// Convert unset map to slice
132+
if len(unsetKeysMap) > 0 {
133+
unsetKeys = maps.Keys(unsetKeysMap)
134+
}
135+
136+
// Return nil instead of empty maps/slices
137+
if len(setWeights) == 0 {
138+
setWeights = nil
139+
}
140+
if len(unsetKeys) == 0 {
141+
unsetKeys = nil
142+
}
143+
144+
return setWeights, unsetKeys, nil
145+
}
146+
59147
// TaskQueueConfigSetCommand handles setting task queue configuration
60148
func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []string) error {
61149
// Validate inputs before dialing client
62-
taskQueue := c.TaskQueue
150+
taskQueue := strings.TrimSpace(c.TaskQueue)
63151
if taskQueue == "" {
64-
return fmt.Errorf("taskQueue name is required")
152+
return fmt.Errorf("task queue name is required and cannot be empty")
65153
}
66154

67155
taskQueueType, err := parseTaskQueueType(c.TaskQueueType.Value)
@@ -79,43 +167,57 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str
79167

80168
// Helper to parse RPS values for a given flag name.
81169
// Accepts "default" or a non-negative float string.
82-
parseRPS := func(flagName string) (*taskqueue.RateLimit, error) {
170+
// Returns (rateLimit, isZero, error)
171+
parseRPS := func(flagName string) (*taskqueue.RateLimit, bool, error) {
83172
raw := strings.TrimSpace(c.Command.Flags().Lookup(flagName).Value.String())
84173
if raw == "" {
85-
return nil, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName)
174+
return nil, false, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName)
86175
}
87176
if strings.EqualFold(raw, "default") {
88177
// Unset: returning nil RateLimit removes the existing rate limit.
89-
return nil, nil
178+
return nil, false, nil
90179
}
91180
v, err := strconv.ParseFloat(raw, 32)
92181
if err != nil {
93-
return nil, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName)
182+
return nil, false, fmt.Errorf("invalid value for --%s: must be a non-negative number or 'default'", flagName)
94183
}
95184
if v < 0 {
96-
return nil, fmt.Errorf("invalid value for --%s: must be >= 0 or 'default'", flagName)
185+
return nil, false, fmt.Errorf("invalid value for --%s: must be >= 0 or 'default'", flagName)
97186
}
98-
return &taskqueue.RateLimit{RequestsPerSecond: float32(v)}, nil
187+
isZero := v == 0
188+
return &taskqueue.RateLimit{RequestsPerSecond: float32(v)}, isZero, nil
99189
}
100190

191+
// Parse and validate queue rate limit
101192
var queueRpsLimitParsed *taskqueue.RateLimit
193+
var queueRateLimitIsZero bool
102194
if c.Command.Flags().Changed("queue-rps-limit") {
103195
var err error
104-
if queueRpsLimitParsed, err = parseRPS("queue-rps-limit"); err != nil {
196+
queueRpsLimitParsed, queueRateLimitIsZero, err = parseRPS("queue-rps-limit")
197+
if err != nil {
105198
return err
106199
}
107-
} else if c.Command.Flags().Changed("queue-rps-limit-reason") {
108-
return fmt.Errorf("queue-rps-limit-reason can only be set if queue-rps-limit is updated")
200+
201+
// Warn about zero rate limit (stops all traffic)
202+
if queueRateLimitIsZero {
203+
printZeroRateLimitWarning(cctx.Printer, "queue rate limit")
204+
}
109205
}
110206

207+
// Parse and validate fairness key rate limit default
111208
var fairnessKeyRpsLimitDefaultParsed *taskqueue.RateLimit
209+
var fairnessRateLimitIsZero bool
112210
if c.Command.Flags().Changed("fairness-key-rps-limit-default") {
113211
var err error
114-
if fairnessKeyRpsLimitDefaultParsed, err = parseRPS("fairness-key-rps-limit-default"); err != nil {
212+
fairnessKeyRpsLimitDefaultParsed, fairnessRateLimitIsZero, err = parseRPS("fairness-key-rps-limit-default")
213+
if err != nil {
115214
return err
116215
}
117-
} else if c.Command.Flags().Changed("fairness-key-rps-limit-default-reason") {
118-
return fmt.Errorf("fairness-key-rps-limit-default-reason can only be set if fairness-key-rps-limit-default is updated")
216+
217+
// Warn about zero rate limit
218+
if fairnessRateLimitIsZero {
219+
printZeroRateLimitWarning(cctx.Printer, "fairness key rate limit default")
220+
}
119221
}
120222

121223
cl, err := dialClient(cctx, &c.Parent.Parent.ClientOptions)
@@ -152,10 +254,66 @@ func (c *TemporalTaskQueueConfigSetCommand) run(cctx *CommandContext, args []str
152254
}
153255
}
154256

257+
// Validate at least one configuration change is requested
258+
hasAnyUpdate := c.Command.Flags().Changed("queue-rps-limit") ||
259+
c.Command.Flags().Changed("fairness-key-rps-limit-default") ||
260+
len(c.FairnessKeyWeight) > 0 ||
261+
c.FairnessKeyWeightClearAll
262+
263+
if !hasAnyUpdate {
264+
return fmt.Errorf("at least one configuration update must be specified (use --help to see available options)")
265+
}
266+
267+
// Handle fairness weight overrides
268+
// Validate mutual exclusivity of clear-all with other weight operations
269+
if c.FairnessKeyWeightClearAll {
270+
if len(c.FairnessKeyWeight) > 0 {
271+
return fmt.Errorf("--fairness-key-weight-clear-all cannot be used with --fairness-key-weight")
272+
}
273+
}
274+
275+
// Parse fairness key weights (handles both set and unset operations)
276+
setWeights, unsetKeys, err := parseFairnessKeyWeights(c.FairnessKeyWeight)
277+
if err != nil {
278+
return err
279+
}
280+
request.SetFairnessWeightOverrides = setWeights
281+
request.UnsetFairnessWeightOverrides = unsetKeys
282+
283+
// Handle clear all
284+
if c.FairnessKeyWeightClearAll {
285+
// Need to fetch current config to get all keys to unset
286+
descResp, err := cl.WorkflowService().DescribeTaskQueue(cctx, &workflowservice.DescribeTaskQueueRequest{
287+
Namespace: namespace,
288+
TaskQueue: &taskqueue.TaskQueue{
289+
Name: taskQueue,
290+
Kind: enums.TASK_QUEUE_KIND_NORMAL,
291+
},
292+
TaskQueueType: taskQueueType,
293+
ReportConfig: true,
294+
})
295+
if err != nil {
296+
return fmt.Errorf("error fetching current config for clear-all: %w", err)
297+
}
298+
var overrides map[string]float32
299+
if descResp.Config != nil {
300+
overrides = descResp.Config.FairnessWeightOverrides
301+
}
302+
keys := maps.Keys(overrides)
303+
if len(keys) > 0 {
304+
request.UnsetFairnessWeightOverrides = keys
305+
cctx.Printer.Printlnf("Unsetting %d fairness weight override(s)", len(keys))
306+
} else {
307+
cctx.Printer.Println("No fairness weight overrides found to unset")
308+
// Don't return error, just proceed with no-op update
309+
}
310+
}
311+
155312
// Call the API
156313
resp, err := cl.WorkflowService().UpdateTaskQueueConfig(cctx, request)
157314
if err != nil {
158-
return fmt.Errorf("error updating task queue config: %w", err)
315+
// Provide more context in error message
316+
return fmt.Errorf("failed to update task queue config for %s/%s: %w", namespace, taskQueue, err)
159317
}
160318

161319
cctx.Printer.Println("Successfully updated task queue configuration")

0 commit comments

Comments
 (0)