diff --git a/activity/context.go b/activity/context.go index 3771ab2..e2e77e5 100644 --- a/activity/context.go +++ b/activity/context.go @@ -1,6 +1,7 @@ package activity import ( + "context" "github.com/project-flogo/core/data" "github.com/project-flogo/core/data/metadata" "github.com/project-flogo/core/support/log" @@ -37,6 +38,9 @@ type Context interface { // GetTracingContext returns tracing context associated with the activity GetTracingContext() trace.TracingContext + + // GetCancelContext returns cancel context + GetCancelContext() context.Context } type Host interface { diff --git a/engine/runner/pooled.go b/engine/runner/pooled.go index 9619598..9e5985c 100644 --- a/engine/runner/pooled.go +++ b/engine/runner/pooled.go @@ -3,6 +3,8 @@ package runner import ( "context" "errors" + "time" + "github.com/project-flogo/core/action" "github.com/project-flogo/core/support" "github.com/project-flogo/core/support/log" @@ -99,7 +101,16 @@ func (runner *PooledRunner) Stop() error { worker.Stop() } // check if all actions done till shutdown waiting time - trackPooledRunnerActions.gracefulStop() + if runnercompleted := trackPooledRunnerActions.gracefulStop(); !runnercompleted { // don't call cancel if all runners already completed + for _, worker := range runner.workers { + runner.logger.Debug("Cancelling context for running workers") + worker.Cancel() + } + + // wait to make sure running flow will make cancel entry into DB (if can) + time.Sleep(time.Second * 5) + } + } return nil diff --git a/engine/runner/runner_tracker.go b/engine/runner/runner_tracker.go index 25f3a4a..9158cb0 100644 --- a/engine/runner/runner_tracker.go +++ b/engine/runner/runner_tracker.go @@ -41,7 +41,7 @@ func (rt RunnerTracker) WaitForRunnersCompletion(timeout time.Duration) bool { } } -func (rt RunnerTracker) gracefulStop() { +func (rt RunnerTracker) gracefulStop() (runnercompleted bool) { logger := log.RootLogger() delayedStopInterval := app.GetDelayedStopInterval() if delayedStopInterval != "" { @@ -53,9 +53,11 @@ func (rt RunnerTracker) gracefulStop() { if isTimeout := rt.WaitForRunnersCompletion(duration); isTimeout { logger.Info("All actions not completed before engine shutdown") } else { + runnercompleted = true logger.Info("All actions completed before engine shutdown") } } } + return } diff --git a/engine/runner/worker.go b/engine/runner/worker.go index b96c4b6..519c31a 100644 --- a/engine/runner/worker.go +++ b/engine/runner/worker.go @@ -49,6 +49,7 @@ type ActionWorker struct { Work chan ActionWorkRequest WorkerQueue chan chan ActionWorkRequest QuitChan chan bool + CancelChan chan bool } // NewWorker creates, and returns a new Worker object. Its only argument @@ -61,7 +62,8 @@ func NewWorker(id int, runner *DirectRunner, workerQueue chan chan ActionWorkReq runner: runner, Work: make(chan ActionWorkRequest), WorkerQueue: workerQueue, - QuitChan: make(chan bool)} + QuitChan: make(chan bool), + CancelChan: make(chan bool)} return worker } @@ -104,7 +106,8 @@ func (w ActionWorker) Start() { actionData.arc <- &ActionResult{results: results, err: err} } else if asyncAct, ok := actionData.action.(action.AsyncAction); ok { - err := asyncAct.Run(actionData.context, actionData.inputs, handler) + ctx, cancelFunc := context.WithCancel(actionData.context) + err := asyncAct.Run(ctx, actionData.inputs, handler) if err != nil { logger.Debugf("Action-Worker-%d: Action Run error: %s", w.ID, err.Error()) @@ -130,6 +133,10 @@ func (w ActionWorker) Start() { actionData.arc <- &ActionResult{} } done = true + case <-w.CancelChan: + if !replied { + cancelFunc() // cancel the running job + } } } } @@ -158,6 +165,12 @@ func (w ActionWorker) Stop() { }() } +func (w ActionWorker) Cancel() { + go func() { + w.CancelChan <- true + }() +} + // AsyncResultHandler simple ResultHandler to use in the asynchronous case type AsyncResultHandler struct { done chan bool diff --git a/support/test/context.go b/support/test/context.go index 6c07919..9e8c5aa 100644 --- a/support/test/context.go +++ b/support/test/context.go @@ -1,6 +1,8 @@ package test import ( + "context" + "github.com/project-flogo/core/activity" "github.com/project-flogo/core/data" "github.com/project-flogo/core/data/mapper" @@ -255,3 +257,7 @@ func (c *TestActivityContext) Logger() log.Logger { func (c *TestActivityContext) GetTracingContext() trace.TracingContext { return nil } + +func (c *TestActivityContext) GetCancelContext() context.Context { + return context.TODO() +}