Skip to content

Commit 53052a6

Browse files
committed
fix: skip CountWorkflow in batch operations when --yes is set
The visibility CountWorkflowExecutions request was issued unconditionally before every batch terminate / signal / cancel / reset. The count is only used to populate the "Start batch against approximately N workflow(s)?" confirmation prompt. When --yes bypasses the prompt entirely, the count result is never read. In clusters whose visibility API is overloaded (e.g. Postgres-backed clusters with many workflows), this CountWorkflow call can time out and prevent batch jobs from being started at all, even though the batch operation itself uses the same query and would succeed. Skipping the count when --yes is set lets these batch jobs proceed unconditionally. Both batch entry points are updated: - commands.workflow.go (terminate / signal / cancel) - commands.workflow_reset.go (reset) When the count is skipped, the prompt text shown by --yes changes from "Start batch against approximately N workflow(s)? y/N" to "Start batch against workflows matching query "<query>"? y/N" so the output remains informative. Adds TestWorkflow_Terminate_BatchWorkflow_SkipsCountWhenYes which uses a gRPC unary interceptor to assert that CountWorkflowExecutionsRequest is *not* sent when --yes is passed, while StartBatchOperationRequest still is. The existing without-yes tests are unaffected. Closes #838
1 parent a6a4f86 commit 53052a6

3 files changed

Lines changed: 103 additions & 11 deletions

File tree

internal/temporalcli/commands.workflow.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -542,13 +542,19 @@ func (s *SingleWorkflowOrBatchOptions) workflowExecOrBatch(
542542
return nil, nil, fmt.Errorf("cannot set run ID when query is set")
543543
}
544544

545-
// Count the workflows that will be affected
546-
count, err := cl.CountWorkflow(cctx, &workflowservice.CountWorkflowExecutionsRequest{Query: s.Query})
547-
if err != nil {
548-
return nil, nil, fmt.Errorf("failed counting workflows from query: %w", err)
545+
// The count is only used in the confirmation prompt; skip the request when --yes
546+
// bypasses it, so batch jobs can still proceed if the visibility API is timing out.
547+
var promptMessage string
548+
if s.Yes {
549+
promptMessage = fmt.Sprintf("Start batch against workflows matching query %q? y/N", s.Query)
550+
} else {
551+
count, err := cl.CountWorkflow(cctx, &workflowservice.CountWorkflowExecutionsRequest{Query: s.Query})
552+
if err != nil {
553+
return nil, nil, fmt.Errorf("failed counting workflows from query: %w", err)
554+
}
555+
promptMessage = fmt.Sprintf("Start batch against approximately %v workflow(s)? y/N", count.Count)
549556
}
550-
yes, err := cctx.promptYes(
551-
fmt.Sprintf("Start batch against approximately %v workflow(s)? y/N", count.Count), s.Yes)
557+
yes, err := cctx.promptYes(promptMessage, s.Yes)
552558
if err != nil {
553559
return nil, nil, err
554560
} else if !yes {

internal/temporalcli/commands.workflow_reset.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,12 +134,19 @@ func (c *TemporalWorkflowResetCommand) runBatchResetWithPostOps(cctx *CommandCon
134134
PostResetOperations: postOps,
135135
},
136136
}
137-
count, err := cl.CountWorkflow(cctx, &workflowservice.CountWorkflowExecutionsRequest{Query: c.Query})
138-
if err != nil {
139-
return fmt.Errorf("failed counting workflows from query: %w", err)
137+
// The count is only used in the confirmation prompt; skip the request when --yes
138+
// bypasses it, so batch jobs can still proceed if the visibility API is timing out.
139+
var promptMessage string
140+
if c.Yes {
141+
promptMessage = fmt.Sprintf("Start batch against workflows matching query %q? y/N", c.Query)
142+
} else {
143+
count, err := cl.CountWorkflow(cctx, &workflowservice.CountWorkflowExecutionsRequest{Query: c.Query})
144+
if err != nil {
145+
return fmt.Errorf("failed counting workflows from query: %w", err)
146+
}
147+
promptMessage = fmt.Sprintf("Start batch against approximately %v workflow(s)? y/N", count.Count)
140148
}
141-
yes, err := cctx.promptYes(
142-
fmt.Sprintf("Start batch against approximately %v workflow(s)? y/N", count.Count), c.Yes)
149+
yes, err := cctx.promptYes(promptMessage, c.Yes)
143150
if err != nil {
144151
return err
145152
}

internal/temporalcli/commands.workflow_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,85 @@ func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflowSuccess_JSON() {
346346
s.NotEmpty(jsonRes["batchJobId"])
347347
}
348348

349+
// TestWorkflow_Terminate_BatchWorkflow_SkipsCountWhenYes verifies that --yes causes
350+
// the batch terminate command to skip the CountWorkflowExecutions call. The count
351+
// is only used for the "Start batch against approximately N workflow(s)?" prompt;
352+
// when --yes bypasses the prompt, issuing it adds latency and prevents batch jobs
353+
// from running on clusters whose visibility API is timing out.
354+
func (s *SharedServerSuite) TestWorkflow_Terminate_BatchWorkflow_SkipsCountWhenYes() {
355+
s.Worker().OnDevWorkflow(func(ctx workflow.Context, a any) (any, error) {
356+
ctx.Done().Receive(ctx, nil)
357+
return nil, ctx.Err()
358+
})
359+
360+
var callLock sync.Mutex
361+
var countCalls, startBatchCalls int
362+
s.CommandHarness.Options.AdditionalClientGRPCDialOptions = append(
363+
s.CommandHarness.Options.AdditionalClientGRPCDialOptions,
364+
grpc.WithChainUnaryInterceptor(func(
365+
ctx context.Context,
366+
method string, req, reply any,
367+
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
368+
) error {
369+
callLock.Lock()
370+
switch req.(type) {
371+
case *workflowservice.CountWorkflowExecutionsRequest:
372+
countCalls++
373+
case *workflowservice.StartBatchOperationRequest:
374+
startBatchCalls++
375+
}
376+
callLock.Unlock()
377+
return invoker(ctx, method, req, reply, cc, opts...)
378+
}),
379+
)
380+
381+
// Start a workflow so the batch query has at least one match. The count assertion
382+
// is independent of the match count (it asserts zero CountWorkflow calls regardless),
383+
// but executing the batch against a real workflow keeps the test path realistic.
384+
searchAttr := "keyword-" + uuid.NewString()
385+
run, err := s.Client.ExecuteWorkflow(
386+
s.Context,
387+
client.StartWorkflowOptions{
388+
TaskQueue: s.Worker().Options.TaskQueue,
389+
SearchAttributes: map[string]any{"CustomKeywordField": searchAttr},
390+
},
391+
DevWorkflow,
392+
"ignored",
393+
)
394+
s.NoError(err)
395+
s.Eventually(func() bool {
396+
resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{
397+
Query: "CustomKeywordField = '" + searchAttr + "'",
398+
})
399+
s.NoError(err)
400+
return len(resp.Executions) == 1
401+
}, 3*time.Second, 100*time.Millisecond)
402+
403+
res := s.Execute(
404+
"workflow", "terminate",
405+
"--address", s.Address(),
406+
"--query", "CustomKeywordField = '"+searchAttr+"'",
407+
"--reason", "skip-count-test",
408+
"--yes",
409+
)
410+
s.NoError(res.Err)
411+
412+
callLock.Lock()
413+
defer callLock.Unlock()
414+
s.Equal(0, countCalls, "CountWorkflowExecutions must not be called when --yes is set")
415+
s.Equal(1, startBatchCalls, "StartBatchOperation must still be called")
416+
417+
// Sanity-check: the prompt text should reflect the missing count.
418+
s.NotContains(res.Stdout.String(), "approximately")
419+
s.Contains(res.Stdout.String(), "matching query")
420+
421+
// Drain the workflow so the test fixture cleans up.
422+
s.Eventually(func() bool {
423+
err := run.Get(s.Context, nil)
424+
return err != nil
425+
}, 5*time.Second, 100*time.Millisecond)
426+
}
427+
349428
func (s *SharedServerSuite) testTerminateBatchWorkflow(
350429
total int,
351430
rps float32,

0 commit comments

Comments
 (0)