Skip to content

Commit 3a71085

Browse files
authored
Add batch workflow delete (#201)
* Add batch workflow delete * update test
1 parent 855c281 commit 3a71085

5 files changed

Lines changed: 109 additions & 9 deletions

File tree

batch/batch_commands.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,21 @@ func BatchSignal(c *cli.Context) error {
135135
return startBatchJob(c, &req)
136136
}
137137

138+
// BatchDelete delete a list of workflows
139+
func BatchDelete(c *cli.Context) error {
140+
operator := common.GetCurrentUserFromEnv()
141+
142+
req := workflowservice.StartBatchOperationRequest{
143+
Operation: &workflowservice.StartBatchOperationRequest_DeletionOperation{
144+
DeletionOperation: &batch.BatchOperationDeletion{
145+
Identity: operator,
146+
},
147+
},
148+
}
149+
150+
return startBatchJob(c, &req)
151+
}
152+
138153
// startBatchJob starts a batch job
139154
func startBatchJob(c *cli.Context, req *workflowservice.StartBatchOperationRequest) error {
140155
namespace, err := common.RequiredFlag(c, common.FlagNamespace)

common/defs-flags.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const (
66
FlagAddrDefinition = "The host and port (formatted as host:port) for the Temporal Frontend Service."
77
FlagNSAliasDefinition = "Identifies a Namespace in the Temporal Workflow."
88
FlagMetadataDefinition = "Contains gRPC metadata to send with requests (formatted as key=value)."
9-
FlagTLSDefinition = "Enable TLS encryption without additional options such as mTLS or client certificates"
9+
FlagTLSDefinition = "Enable TLS encryption without additional options such as mTLS or client certificates."
1010
FlagTLSCertPathDefinition = "Path to x509 certificate."
1111
FlagTLSKeyPathDefinition = "Path to private certificate key."
1212
FlagTLSCaPathDefinition = "Path to server CA certificate."
@@ -65,6 +65,7 @@ const (
6565
FlagCancelWorkflow = "Cancel Workflow Execution with given Workflow Id."
6666
FlagWorkflowIDTerminate = "Terminate Workflow Execution with given Workflow Id."
6767
FlagQueryTerminate = "Terminate Workflow Executions with given List Filter."
68+
FlagQueryDelete = "Delete Workflow Executions with given List Filter."
6869
FlagEventIDDefinition = "The Event Id for any Event after WorkflowTaskStarted you want to reset to (exclusive). It can be WorkflowTaskCompleted, WorkflowTaskFailed or others."
6970
FlagQueryResetBatch = "Visibility Query of Search Attributes describing the Workflow Executions to reset. See https://docs.temporal.io/docs/tctl/workflow/list#--query."
7071
FlagInputFileReset = "Input file that specifies Workflow Executions to reset. Each line contains one Workflow Id as the base Run and, optionally, a Run Id."

tests/workflow_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,51 @@ func (s *e2eSuite) TestWorkflowTerminate_Batch() {
300300
}, 10*time.Second, time.Second, "timed out awaiting for workflows termination")
301301
}
302302

303+
func (s *e2eSuite) TestWorkflowDelete_Batch() {
304+
s.T().Parallel()
305+
306+
testserver, app, _ := s.setUpTestEnvironment()
307+
defer func() {
308+
_ = testserver.Stop()
309+
}()
310+
311+
w := s.newWorker(testserver, testTq, func(r worker.Registry) {
312+
r.RegisterWorkflow(awaitsignal.Workflow)
313+
})
314+
defer w.Stop()
315+
316+
c := testserver.Client()
317+
318+
ids := []string{"1", "2", "3"}
319+
for _, id := range ids {
320+
_, err := c.ExecuteWorkflow(
321+
context.Background(),
322+
sdkclient.StartWorkflowOptions{ID: id, TaskQueue: testTq},
323+
awaitsignal.Workflow,
324+
)
325+
s.NoError(err)
326+
}
327+
328+
err := app.Run([]string{"", "workflow", "delete", "--query", "WorkflowId = '1' OR WorkflowId = '2'", "--reason", "test", "--yes", "--namespace", testNamespace})
329+
s.NoError(err)
330+
331+
awaitTaskQueuePoller(s, c, testTq)
332+
awaitBatchJob(s, c, testNamespace)
333+
334+
s.Eventually(func() bool {
335+
wfs, err := c.ListWorkflow(context.Background(), &workflowservice.ListWorkflowExecutionsRequest{
336+
Namespace: testNamespace,
337+
})
338+
s.NoError(err)
339+
340+
if len(wfs.GetExecutions()) == 1 && wfs.GetExecutions()[0].GetExecution().GetWorkflowId() == "3" {
341+
return true
342+
}
343+
344+
return false
345+
}, 10*time.Second, time.Second, "timed out awaiting for workflows termination")
346+
}
347+
303348
// awaitTaskQueuePoller used mostly for more explicit failure message
304349
func awaitTaskQueuePoller(s *e2eSuite, c sdkclient.Client, taskqueue string) {
305350
s.Eventually(func() bool {

workflow/workflow.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,37 @@ func NewWorkflowCommands() []*cli.Command {
245245
Name: "delete",
246246
Usage: common.DeleteWorkflowDefinition,
247247
UsageText: common.WorkflowDeleteUsageText,
248-
Flags: common.FlagsForExecution,
248+
Flags: []cli.Flag{
249+
&cli.StringFlag{
250+
Name: common.FlagWorkflowID,
251+
Aliases: common.FlagWorkflowIDAlias,
252+
Usage: common.FlagWorkflowIDTerminate,
253+
Category: common.CategoryMain,
254+
},
255+
&cli.StringFlag{
256+
Name: common.FlagRunID,
257+
Aliases: common.FlagRunIDAlias,
258+
Usage: common.FlagRunIdDefinition,
259+
Category: common.CategoryMain,
260+
},
261+
&cli.StringFlag{
262+
Name: common.FlagQuery,
263+
Aliases: common.FlagQueryAlias,
264+
Usage: common.FlagQueryDelete,
265+
Category: common.CategoryMain,
266+
},
267+
&cli.StringFlag{
268+
Name: common.FlagReason,
269+
Usage: common.FlagReasonDefinition,
270+
Category: common.CategoryMain,
271+
},
272+
&cli.BoolFlag{
273+
Name: common.FlagYes,
274+
Aliases: common.FlagYesAlias,
275+
Usage: common.FlagYesDefinition,
276+
Category: common.CategoryMain,
277+
},
278+
},
249279
Action: func(c *cli.Context) error {
250280
return DeleteWorkflow(c)
251281
},

workflow/workflow_commands.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -375,9 +375,9 @@ func printReplayableHistory(c *cli.Context, iter iterator.Iterator[*historypb.Hi
375375
func TerminateWorkflow(c *cli.Context) error {
376376
if c.String(common.FlagQuery) != "" {
377377
return batch.BatchTerminate(c)
378-
} else {
379-
return terminateWorkflowByID(c)
380378
}
379+
380+
return terminateWorkflowByID(c)
381381
}
382382

383383
// terminateWorkflowByID terminates a single workflow execution
@@ -406,8 +406,17 @@ func terminateWorkflowByID(c *cli.Context) error {
406406
return nil
407407
}
408408

409-
// DeleteWorkflow deletes a workflow execution.
409+
// DeleteWorkflow deletes workflow executions based on filter parameters
410410
func DeleteWorkflow(c *cli.Context) error {
411+
if c.String(common.FlagQuery) != "" {
412+
return batch.BatchDelete(c)
413+
}
414+
415+
return deleteWorkflowByID(c)
416+
}
417+
418+
// deleteWorkflowByID deletes a single workflow execution
419+
func deleteWorkflowByID(c *cli.Context) error {
411420
nsName, err := common.RequiredFlag(c, common.FlagNamespace)
412421
if err != nil {
413422
return err
@@ -439,9 +448,9 @@ func DeleteWorkflow(c *cli.Context) error {
439448
func CancelWorkflow(c *cli.Context) error {
440449
if c.String(common.FlagQuery) != "" {
441450
return batch.BatchCancel(c)
442-
} else {
443-
return cancelWorkflowByID(c)
444451
}
452+
453+
return cancelWorkflowByID(c)
445454
}
446455

447456
// cancelWorkflowByID cancels a single workflow execution
@@ -472,9 +481,9 @@ func cancelWorkflowByID(c *cli.Context) error {
472481
func SignalWorkflow(c *cli.Context) error {
473482
if c.String(common.FlagQuery) != "" {
474483
return batch.BatchSignal(c)
475-
} else {
476-
return signalWorkflowByID(c)
477484
}
485+
486+
return signalWorkflowByID(c)
478487
}
479488

480489
// signalWorkflowByID signals a single workflow execution

0 commit comments

Comments
 (0)