Skip to content

Cancelling in flight flow and task #279

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
4 changes: 4 additions & 0 deletions activity/context.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package activity

import (
"context"
"github.com/project-flogo/core/data"
"github.com/project-flogo/core/data/metadata"
"github.com/project-flogo/core/support/log"
Expand Down Expand Up @@ -37,6 +38,9 @@ type Context interface {

// GetTracingContext returns tracing context associated with the activity
GetTracingContext() trace.TracingContext

// GetCancelContext returns cancel context
GetCancelContext() context.Context
}

type Host interface {
Expand Down
13 changes: 12 additions & 1 deletion engine/runner/pooled.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package runner
import (
"context"
"errors"
"time"

"github.com/project-flogo/core/action"
"github.com/project-flogo/core/support"
"github.com/project-flogo/core/support/log"
Expand Down Expand Up @@ -99,7 +101,16 @@ func (runner *PooledRunner) Stop() error {
worker.Stop()
}
// check if all actions done till shutdown waiting time
trackPooledRunnerActions.gracefulStop()
if runnercompleted := trackPooledRunnerActions.gracefulStop(); !runnercompleted { // don't call cancel if all runners already completed
for _, worker := range runner.workers {
runner.logger.Debug("Cancelling context for running workers")
worker.Cancel()
}

// wait to make sure running flow will make cancel entry into DB (if can)
time.Sleep(time.Second * 5)
}

}

return nil
Expand Down
4 changes: 3 additions & 1 deletion engine/runner/runner_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (rt RunnerTracker) WaitForRunnersCompletion(timeout time.Duration) bool {
}
}

func (rt RunnerTracker) gracefulStop() {
func (rt RunnerTracker) gracefulStop() (runnercompleted bool) {
logger := log.RootLogger()
delayedStopInterval := app.GetDelayedStopInterval()
if delayedStopInterval != "" {
Expand All @@ -53,9 +53,11 @@ func (rt RunnerTracker) gracefulStop() {
if isTimeout := rt.WaitForRunnersCompletion(duration); isTimeout {
logger.Info("All actions not completed before engine shutdown")
} else {
runnercompleted = true
logger.Info("All actions completed before engine shutdown")
}
}

}
return
}
17 changes: 15 additions & 2 deletions engine/runner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type ActionWorker struct {
Work chan ActionWorkRequest
WorkerQueue chan chan ActionWorkRequest
QuitChan chan bool
CancelChan chan bool
}

// NewWorker creates, and returns a new Worker object. Its only argument
Expand All @@ -61,7 +62,8 @@ func NewWorker(id int, runner *DirectRunner, workerQueue chan chan ActionWorkReq
runner: runner,
Work: make(chan ActionWorkRequest),
WorkerQueue: workerQueue,
QuitChan: make(chan bool)}
QuitChan: make(chan bool),
CancelChan: make(chan bool)}

return worker
}
Expand Down Expand Up @@ -104,7 +106,8 @@ func (w ActionWorker) Start() {
actionData.arc <- &ActionResult{results: results, err: err}

} else if asyncAct, ok := actionData.action.(action.AsyncAction); ok {
err := asyncAct.Run(actionData.context, actionData.inputs, handler)
ctx, cancelFunc := context.WithCancel(actionData.context)
err := asyncAct.Run(ctx, actionData.inputs, handler)

if err != nil {
logger.Debugf("Action-Worker-%d: Action Run error: %s", w.ID, err.Error())
Expand All @@ -130,6 +133,10 @@ func (w ActionWorker) Start() {
actionData.arc <- &ActionResult{}
}
done = true
case <-w.CancelChan:
if !replied {
cancelFunc() // cancel the running job
}
}
}
}
Expand Down Expand Up @@ -158,6 +165,12 @@ func (w ActionWorker) Stop() {
}()
}

func (w ActionWorker) Cancel() {
go func() {
w.CancelChan <- true
}()
}

// AsyncResultHandler simple ResultHandler to use in the asynchronous case
type AsyncResultHandler struct {
done chan bool
Expand Down
6 changes: 6 additions & 0 deletions support/test/context.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package test

import (
"context"

"github.com/project-flogo/core/activity"
"github.com/project-flogo/core/data"
"github.com/project-flogo/core/data/mapper"
Expand Down Expand Up @@ -255,3 +257,7 @@ func (c *TestActivityContext) Logger() log.Logger {
func (c *TestActivityContext) GetTracingContext() trace.TracingContext {
return nil
}

func (c *TestActivityContext) GetCancelContext() context.Context {
return context.TODO()
}