diff --git a/request-cancel-external-workflow/README.md b/request-cancel-external-workflow/README.md new file mode 100644 index 00000000..d8379ddf --- /dev/null +++ b/request-cancel-external-workflow/README.md @@ -0,0 +1,22 @@ +## Cancellation + +Make sure the [Temporal Server is running locally](https://docs.temporal.io/docs/server/quick-install). + +From the root of the project, start a Worker: + +```bash +go run cancellation/worker/main.go +``` + +Start the Workflow Execution: + +```bash +go run cancellation/starter/main.go +``` + +Cancel the Workflow Execution: + +```bash +go run cancellation/cancel/main.go +``` + diff --git a/request-cancel-external-workflow/activity.go b/request-cancel-external-workflow/activity.go new file mode 100644 index 00000000..64814583 --- /dev/null +++ b/request-cancel-external-workflow/activity.go @@ -0,0 +1,41 @@ +package requestcancelexternalworkflow + +import ( + "context" + "time" + + "go.temporal.io/sdk/activity" +) + +// @@@SNIPSTART samples-go-cancellation-activity-definition +type Activities struct{} + +func (a *Activities) ActivityToBeCanceled(ctx context.Context) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("activity started, to cancel the Workflow Execution, use 'go run cancellation/cancel/main.go " + + "-w ' or use the CLI: 'tctl wf cancel -w '") + for { + select { + case <-time.After(1 * time.Second): + logger.Info("heartbeating...") + activity.RecordHeartbeat(ctx, "") + case <-ctx.Done(): + logger.Info("context is cancelled") + return "I am canceled by Done", nil + } + } +} + +func (a *Activities) CleanupActivity(ctx context.Context) error { + logger := activity.GetLogger(ctx) + logger.Info("Cleanup Activity started") + return nil +} + +func (a *Activities) ActivityToBeSkipped(ctx context.Context) error { + logger := activity.GetLogger(ctx) + logger.Info("this Activity will be skipped due to cancellation") + return nil +} + +// @@@SNIPEND diff --git a/request-cancel-external-workflow/cancel/main.go b/request-cancel-external-workflow/cancel/main.go new file mode 100644 index 00000000..d18e8fe0 --- /dev/null +++ b/request-cancel-external-workflow/cancel/main.go @@ -0,0 +1,43 @@ +package main + +import ( + "context" + "flag" + "log" + + "go.temporal.io/sdk/client" +) + +// @@@SNIPSTART samples-go-cancellation-cancel-workflow-execution-trigger +func main() { + var workflowID string + flag.StringVar(&workflowID, "wid", "workflowID-to-cancel", "workflowID of the Workflow Execution to be canceled.") + flag.Parse() + + if workflowID == "" { + flag.PrintDefaults() + return + } + + // The client is a heavyweight object that should be created once per process. + c, err := client.Dial(client.Options{ + HostPort: client.DefaultHostPort, + }) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + //err = c.CancelWorkflow(context.Background(), workflowID, "") + //if err != nil { + // log.Fatalln("Unable to cancel Workflow Execution", err) + //} + log.Println("SIGNALLING") + err = c.SignalWorkflow(context.Background(), workflowID, "", "cancelme", "cancelled") + if err != nil { + log.Fatalln("Unable to signal Workflow Execution", err) + } + log.Println("Workflow Execution cancelled", "WorkflowID", workflowID) +} + +// @@@SNIPEND diff --git a/request-cancel-external-workflow/starter/main.go b/request-cancel-external-workflow/starter/main.go new file mode 100644 index 00000000..0d0d5bd2 --- /dev/null +++ b/request-cancel-external-workflow/starter/main.go @@ -0,0 +1,39 @@ +package main + +import ( + "context" + "flag" + requestcancelexternalworkflow "github.com/temporalio/samples-go/request-cancel-external-workflow" + "log" + + "go.temporal.io/sdk/client" +) + +// @@@SNIPSTART samples-go-cancellation-workflow-execution-starter +func main() { + var workflowID string + flag.StringVar(&workflowID, "w", "workflowID-to-cancel", "w is the workflowID of the workflow to be canceled.") + flag.Parse() + + // The client is a heavyweight object that should be created once per process. + c, err := client.Dial(client.Options{ + HostPort: client.DefaultHostPort, + }) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + workflowOptions := client.StartWorkflowOptions{ + ID: workflowID, + TaskQueue: "cancel-activity", + } + + we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, requestcancelexternalworkflow.CancellingWorkflow) + if err != nil { + log.Fatalln("Unable to execute workflow", err) + } + log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) +} + +// @@@SNIPEND diff --git a/request-cancel-external-workflow/worker/main.go b/request-cancel-external-workflow/worker/main.go new file mode 100644 index 00000000..d1ec33d2 --- /dev/null +++ b/request-cancel-external-workflow/worker/main.go @@ -0,0 +1,36 @@ +package main + +import ( + requestcancelexternalworkflow "github.com/temporalio/samples-go/request-cancel-external-workflow" + "log" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + + "github.com/temporalio/samples-go/cancellation" +) + +// @@@SNIPSTART samples-go-cancellation-worker-starter +func main() { + // The client and worker are heavyweight objects that should be created once per process. + c, err := client.Dial(client.Options{ + HostPort: client.DefaultHostPort, + }) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + w := worker.New(c, "cancel-activity", worker.Options{}) + + w.RegisterWorkflow(requestcancelexternalworkflow.CancellingWorkflow) + w.RegisterWorkflow(requestcancelexternalworkflow.ChildWorkflow) + w.RegisterActivity(&cancellation.Activities{}) + + err = w.Run(worker.InterruptCh()) + if err != nil { + log.Fatalln("Unable to start worker", err) + } +} + +// @@@SNIPEND diff --git a/request-cancel-external-workflow/workflow.go b/request-cancel-external-workflow/workflow.go new file mode 100644 index 00000000..3135f231 --- /dev/null +++ b/request-cancel-external-workflow/workflow.go @@ -0,0 +1,104 @@ +package requestcancelexternalworkflow + +import ( + "errors" + "go.temporal.io/sdk/workflow" + "time" +) + +// @@@SNIPSTART samples-go-cancellation-workflow-definition +// YourWorkflow is a Workflow Definition that shows how it can be canceled. +func CancellingWorkflow(ctx workflow.Context) error { + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 30 * time.Minute, + HeartbeatTimeout: 5 * time.Second, + WaitForCancellation: true, + } + ctx = workflow.WithActivityOptions(ctx, ao) + logger := workflow.GetLogger(ctx) + logger.Info("cancel workflow started") + var a *Activities // Used to call Activities by function pointer + defer func() { + + if !errors.Is(ctx.Err(), workflow.ErrCanceled) { + return + } + + // When the Workflow is canceled, it has to get a new disconnected context to execute any Activities + newCtx, _ := workflow.NewDisconnectedContext(ctx) + err := workflow.ExecuteActivity(newCtx, a.CleanupActivity).Get(ctx, nil) + if err != nil { + logger.Error("CleanupActivity failed", "Error", err) + } + }() + + var cancelled string + + workflow.Go(ctx, func(ctx workflow.Context) { + sigChan := workflow.GetSignalChannel(ctx, "cancelme") + sel := workflow.NewSelector(ctx) + sel.AddReceive(sigChan, func(c workflow.ReceiveChannel, more bool) { + sigChan.Receive(ctx, &cancelled) + logger.Info("received signal", cancelled) + }) + + sel.Select(ctx) + }) + + childCtx, _ := workflow.WithCancel(ctx) + childCtx = workflow.WithChildOptions(childCtx, workflow.ChildWorkflowOptions{ + WaitForCancellation: true, + WorkflowID: "mykid", + }) + childFuture := workflow.ExecuteChildWorkflow(childCtx, ChildWorkflow) + + if err := workflow.Await(ctx, func() bool { + return cancelled == "cancelled" + }); err != nil { + return err + } + logger.Info("cancelling child") + var childResult string + + if err := workflow.RequestCancelExternalWorkflow(ctx, "mykid", "").Get(ctx, &childResult); err != nil { + return err + } + + //cancelChild() + if err := childFuture.Get(childCtx, &childResult); err != nil { + logger.Error("child raised error", "err", err) + } + + logger.Info("child result is", "result", childResult) + + logger.Info("Workflow Execution complete.") + + return nil +} +func ChildWorkflow(ctx workflow.Context) (myresult string, err error) { + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 30 * time.Minute, + HeartbeatTimeout: 5 * time.Second, + WaitForCancellation: true, + } + ctx = workflow.WithActivityOptions(ctx, ao) + logger := workflow.GetLogger(ctx) + logger.Info("CHILD: workflow started") + myresult = "result: pristine" + defer func() { + logger.Info("CHILD: defer") + if !errors.Is(ctx.Err(), workflow.ErrCanceled) { + return + } + myresult = "result: child cancellation result" + err = nil + logger.Info("CHILD: cancel workflow") + }() + var neverSet bool + if aerr := workflow.Await(ctx, func() bool { + return neverSet + }); aerr != nil { + return "result: await errd", aerr + } + return "result: final value", nil +} diff --git a/request-cancel-external-workflow/workflow_test.go b/request-cancel-external-workflow/workflow_test.go new file mode 100644 index 00000000..9875ddb1 --- /dev/null +++ b/request-cancel-external-workflow/workflow_test.go @@ -0,0 +1,183 @@ +package requestcancelexternalworkflow + +import ( + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/log" + "go.temporal.io/sdk/testsuite" + "go.temporal.io/sdk/workflow" + "testing" + "time" +) + +const childWorkflowID string = "luke" +const signalName string = "mysignal" +const signalValueDone string = "done" + +func MyWorkflow(ctx workflow.Context) error { + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 30 * time.Minute, + HeartbeatTimeout: 5 * time.Second, + WaitForCancellation: true, + } + ctx = workflow.WithActivityOptions(ctx, ao) + info := workflow.GetInfo(ctx) + + logger := log.With(workflow.GetLogger(ctx), "type", info.WorkflowType, "wid", info.WorkflowExecution.ID) + logger.Info("workflow started", "info", workflow.GetInfo(ctx)) + + childCtx := workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{ + WorkflowID: childWorkflowID, + WaitForCancellation: true, + }) + + var signalValue string + var childFuture workflow.Future + childFuture = workflow.ExecuteChildWorkflow(childCtx, MyChild) + sigChan := workflow.GetSignalChannel(ctx, signalName) + sel := workflow.NewSelector(ctx) + workflow.Go(ctx, func(ctx workflow.Context) { + sel.AddReceive(sigChan, func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, &signalValue) + logger.Info("signal received", "signal", signalName, signalValue) + }) + sel.Select(ctx) + }) + + if err := workflow.Await(ctx, func() bool { + return signalValue == signalValueDone + }); err != nil { + logger.Error("await err", "err", err) + } + logger.Info("released from block") + if err := workflow.RequestCancelExternalWorkflow(childCtx, childWorkflowID, "").Get(ctx, nil); err != nil { + logger.Error("request cancel err", "err", err) + } + if err := childFuture.Get(ctx, nil); err != nil { + logger.Error("child GET err", "err", err) + } + return nil +} +func MyChild(ctx workflow.Context) error { + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 30 * time.Minute, + HeartbeatTimeout: 5 * time.Second, + WaitForCancellation: true, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + info := workflow.GetInfo(ctx) + logger := log.With(workflow.GetLogger(ctx), "type", info.WorkflowType, "wid", info.WorkflowExecution.ID) + logger.Info("workflow started", "info", workflow.GetInfo(ctx)) + + var signalValue string + sigChan := workflow.GetSignalChannel(ctx, signalName) + sel := workflow.NewSelector(ctx) + workflow.Go(ctx, func(ctx workflow.Context) { + sel.AddReceive(sigChan, func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, &signalValue) + logger.Info("signal received", "signal", signalName, signalValue) + }) + sel.Select(ctx) + }) + + if err := workflow.Await(ctx, func() bool { + return signalValue == signalValueDone + }); err != nil { + logger.Error("await err", "err", err) + } + logger.Info("released from block") + return nil +} + +// https://docs.temporal.io/docs/go/testing/ +type CancelTestSuite struct { + suite.Suite + testsuite.WorkflowTestSuite + env *testsuite.TestWorkflowEnvironment +} + +// SetupSuite https://pkg.go.dev/github.com/stretchr/testify/suite#SetupAllSuite +func (s *CancelTestSuite) SetupSuite() { + +} + +// SetupTest https://pkg.go.dev/github.com/stretchr/testify/suite#SetupTestSuite +// CAREFUL not to put this `env` inside the SetupSuite or else you will +// get interleaved test times between parallel tests (testify runs suite tests in parallel) +func (s *CancelTestSuite) SetupTest() { + s.env = s.NewTestWorkflowEnvironment() +} + +// BeforeTest https://pkg.go.dev/github.com/stretchr/testify/suite#BeforeTest +func (s *CancelTestSuite) BeforeTest(suiteName, testName string) { + +} + +// AfterTest https://pkg.go.dev/github.com/stretchr/testify/suite#AfterTest +func (s *CancelTestSuite) AfterTest(suiteName, testName string) { + s.env.AssertExpectations(s.T()) +} + +func (s *CancelTestSuite) Test_Cancel_NoMocky() { + + s.env.RegisterWorkflow(MyWorkflow) + s.env.RegisterWorkflow(MyChild) + //var cancelRequestCalled bool + var childCancelInfo *workflow.Info + var childCompletedInfo *workflow.Info + s.env.SetOnChildWorkflowCanceledListener(func(workflowInfo *workflow.Info) { + childCancelInfo = workflowInfo + }) + s.env.SetOnChildWorkflowCompletedListener(func(workflowInfo *workflow.Info, result converter.EncodedValue, err error) { + childCompletedInfo = workflowInfo + }) + //s.env.OnRequestCancelExternalWorkflow(mock.Anything, childWorkflowID, "").Run(func(args mock.Arguments) { + // cancelRequestCalled = true + //}).Return(nil).Once() + + s.env.RegisterDelayedCallback(func() { + s.env.SignalWorkflowByID("default-test-workflow-id", signalName, signalValueDone) + }, time.Second*1) + s.env.ExecuteWorkflow(MyWorkflow) + s.True(s.env.IsWorkflowCompleted()) + s.NoError(s.env.GetWorkflowError()) + //s.True(cancelRequestCalled) + s.NotNil(childCancelInfo) + s.NotNil(childCompletedInfo) + +} +func (s *CancelTestSuite) Test_Cancel_WithMocky() { + + s.env.RegisterWorkflow(MyWorkflow) + s.env.RegisterWorkflow(MyChild) + //var cancelRequestCalled bool + var childCancelInfo *workflow.Info + var childCompletedInfo *workflow.Info + // this mock is broken...MyChild does not require any arguments but the workflowInterceptor in our test suite panics without args + s.env.OnWorkflow(MyChild, mock.Anything).Return(func(ctx workflow.Context) error { + return nil + }) + s.env.SetOnChildWorkflowCanceledListener(func(workflowInfo *workflow.Info) { + childCancelInfo = workflowInfo + }) + s.env.SetOnChildWorkflowCompletedListener(func(workflowInfo *workflow.Info, result converter.EncodedValue, err error) { + childCompletedInfo = workflowInfo + }) + s.env.OnRequestCancelExternalWorkflow(mock.Anything, childWorkflowID, "").Return(nil).Once() + + s.env.RegisterDelayedCallback(func() { + s.env.SignalWorkflowByID("default-test-workflow-id", signalName, signalValueDone) + }, time.Second*1) + s.env.ExecuteWorkflow(MyWorkflow) + s.True(s.env.IsWorkflowCompleted()) + s.NoError(s.env.GetWorkflowError()) + //s.True(cancelRequestCalled) + s.NotNil(childCancelInfo) + s.NotNil(childCompletedInfo) + +} +func TestWorkflow(t *testing.T) { + suite.Run(t, &CancelTestSuite{}) +}