From 073e0265c555b1b4dd62686ebea3965af7ca4fde Mon Sep 17 00:00:00 2001 From: vanilcha Date: Tue, 21 Mar 2023 11:07:31 +0530 Subject: [PATCH 1/7] cancel context for activities when app is stopped --- activity/context.go | 4 ++++ engine/runner/pooled.go | 11 ++++++++++- engine/runner/runner_tracker.go | 4 +++- engine/runner/worker.go | 17 +++++++++++++++-- 4 files changed, 32 insertions(+), 4 deletions(-) diff --git a/activity/context.go b/activity/context.go index 3771ab22..e2e77e55 100644 --- a/activity/context.go +++ b/activity/context.go @@ -1,6 +1,7 @@ package activity import ( + "context" "github.com/project-flogo/core/data" "github.com/project-flogo/core/data/metadata" "github.com/project-flogo/core/support/log" @@ -37,6 +38,9 @@ type Context interface { // GetTracingContext returns tracing context associated with the activity GetTracingContext() trace.TracingContext + + // GetCancelContext returns cancel context + GetCancelContext() context.Context } type Host interface { diff --git a/engine/runner/pooled.go b/engine/runner/pooled.go index 96195985..23dd07b8 100644 --- a/engine/runner/pooled.go +++ b/engine/runner/pooled.go @@ -6,6 +6,7 @@ import ( "github.com/project-flogo/core/action" "github.com/project-flogo/core/support" "github.com/project-flogo/core/support/log" + "time" ) // PooledRunner is a action runner that queues and runs a action in a worker pool @@ -99,7 +100,15 @@ func (runner *PooledRunner) Stop() error { worker.Stop() } // check if all actions done till shutdown waiting time - trackPooledRunnerActions.gracefulStop() + if runnercompleted := trackPooledRunnerActions.gracefulStop(); !runnercompleted { // don't call cancel if all runners already completed + for _, worker := range runner.workers { + runner.logger.Debug("Cancelling context for running workers") + worker.Cancel() + } + time.Sleep(time.Second * 2) + // wait for 1-2 secs to make sure running flow will make cancel entry into DB (if can) + } + } return nil diff --git a/engine/runner/runner_tracker.go b/engine/runner/runner_tracker.go index 25f3a4ab..9158cb05 100644 --- a/engine/runner/runner_tracker.go +++ b/engine/runner/runner_tracker.go @@ -41,7 +41,7 @@ func (rt RunnerTracker) WaitForRunnersCompletion(timeout time.Duration) bool { } } -func (rt RunnerTracker) gracefulStop() { +func (rt RunnerTracker) gracefulStop() (runnercompleted bool) { logger := log.RootLogger() delayedStopInterval := app.GetDelayedStopInterval() if delayedStopInterval != "" { @@ -53,9 +53,11 @@ func (rt RunnerTracker) gracefulStop() { if isTimeout := rt.WaitForRunnersCompletion(duration); isTimeout { logger.Info("All actions not completed before engine shutdown") } else { + runnercompleted = true logger.Info("All actions completed before engine shutdown") } } } + return } diff --git a/engine/runner/worker.go b/engine/runner/worker.go index b96c4b66..519c31a9 100644 --- a/engine/runner/worker.go +++ b/engine/runner/worker.go @@ -49,6 +49,7 @@ type ActionWorker struct { Work chan ActionWorkRequest WorkerQueue chan chan ActionWorkRequest QuitChan chan bool + CancelChan chan bool } // NewWorker creates, and returns a new Worker object. Its only argument @@ -61,7 +62,8 @@ func NewWorker(id int, runner *DirectRunner, workerQueue chan chan ActionWorkReq runner: runner, Work: make(chan ActionWorkRequest), WorkerQueue: workerQueue, - QuitChan: make(chan bool)} + QuitChan: make(chan bool), + CancelChan: make(chan bool)} return worker } @@ -104,7 +106,8 @@ func (w ActionWorker) Start() { actionData.arc <- &ActionResult{results: results, err: err} } else if asyncAct, ok := actionData.action.(action.AsyncAction); ok { - err := asyncAct.Run(actionData.context, actionData.inputs, handler) + ctx, cancelFunc := context.WithCancel(actionData.context) + err := asyncAct.Run(ctx, actionData.inputs, handler) if err != nil { logger.Debugf("Action-Worker-%d: Action Run error: %s", w.ID, err.Error()) @@ -130,6 +133,10 @@ func (w ActionWorker) Start() { actionData.arc <- &ActionResult{} } done = true + case <-w.CancelChan: + if !replied { + cancelFunc() // cancel the running job + } } } } @@ -158,6 +165,12 @@ func (w ActionWorker) Stop() { }() } +func (w ActionWorker) Cancel() { + go func() { + w.CancelChan <- true + }() +} + // AsyncResultHandler simple ResultHandler to use in the asynchronous case type AsyncResultHandler struct { done chan bool From 068a3a710edb7548677f66f55d0f8b869bc12b04 Mon Sep 17 00:00:00 2001 From: Abhijit Wakchaure Date: Mon, 10 Apr 2023 16:08:15 +0530 Subject: [PATCH 2/7] fix build issue --- support/test/context.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/support/test/context.go b/support/test/context.go index 6c079194..9e8c5aac 100644 --- a/support/test/context.go +++ b/support/test/context.go @@ -1,6 +1,8 @@ package test import ( + "context" + "github.com/project-flogo/core/activity" "github.com/project-flogo/core/data" "github.com/project-flogo/core/data/mapper" @@ -255,3 +257,7 @@ func (c *TestActivityContext) Logger() log.Logger { func (c *TestActivityContext) GetTracingContext() trace.TracingContext { return nil } + +func (c *TestActivityContext) GetCancelContext() context.Context { + return context.TODO() +} From cbebca0cabd20f42f91100f63da8efd7b94f5f88 Mon Sep 17 00:00:00 2001 From: vanilcha Date: Thu, 11 May 2023 12:57:19 +0530 Subject: [PATCH 3/7] Added graceful timeout to 1 min --- engine/runner/pooled.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/engine/runner/pooled.go b/engine/runner/pooled.go index 23dd07b8..a463e877 100644 --- a/engine/runner/pooled.go +++ b/engine/runner/pooled.go @@ -3,10 +3,11 @@ package runner import ( "context" "errors" + "time" + "github.com/project-flogo/core/action" "github.com/project-flogo/core/support" "github.com/project-flogo/core/support/log" - "time" ) // PooledRunner is a action runner that queues and runs a action in a worker pool @@ -105,7 +106,7 @@ func (runner *PooledRunner) Stop() error { runner.logger.Debug("Cancelling context for running workers") worker.Cancel() } - time.Sleep(time.Second * 2) + time.Sleep(time.Minute * 1) // wait for 1-2 secs to make sure running flow will make cancel entry into DB (if can) } From 82dec4c5ef7daea41b313dc5708f63935e4504ab Mon Sep 17 00:00:00 2001 From: vanilcha Date: Thu, 11 May 2023 14:30:07 +0530 Subject: [PATCH 4/7] exposed context cancel duration to env var --- engine/runner/pooled.go | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/engine/runner/pooled.go b/engine/runner/pooled.go index a463e877..2e013df4 100644 --- a/engine/runner/pooled.go +++ b/engine/runner/pooled.go @@ -3,6 +3,7 @@ package runner import ( "context" "errors" + "os" "time" "github.com/project-flogo/core/action" @@ -10,6 +11,10 @@ import ( "github.com/project-flogo/core/support/log" ) +const ( + EnvDelayedCancelContextInterval = "FLOGO_APP_DELAYED_CANCEL_CONTEXT_INTERVAL" +) + // PooledRunner is a action runner that queues and runs a action in a worker pool type PooledRunner struct { workerQueue chan chan ActionWorkRequest @@ -106,8 +111,16 @@ func (runner *PooledRunner) Stop() error { runner.logger.Debug("Cancelling context for running workers") worker.Cancel() } - time.Sleep(time.Minute * 1) - // wait for 1-2 secs to make sure running flow will make cancel entry into DB (if can) + + // wait to make sure running flow will make cancel entry into DB (if can) + timeout := GetEnvVar(EnvDelayedCancelContextInterval) + timeDuration, ok := timeout.(int) + if ok && timeDuration > 0 { + time.Sleep(time.Second * time.Duration(timeDuration)) + } else { + time.Sleep(time.Second * 2) + } + } } @@ -147,3 +160,11 @@ func (runner *PooledRunner) RunAction(ctx context.Context, act action.Action, in //Run rejected return nil, errors.New("runner not active") } + +func GetEnvVar(varName string) interface{} { + intervalEnv := os.Getenv(varName) + if len(intervalEnv) > 0 { + return intervalEnv + } + return "" +} From 386448899ad9b015e2e761872a36d305d21a670b Mon Sep 17 00:00:00 2001 From: vanilcha Date: Thu, 18 May 2023 14:59:33 +0530 Subject: [PATCH 5/7] handled timeout parsing --- engine/runner/pooled.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/engine/runner/pooled.go b/engine/runner/pooled.go index 2e013df4..3a443014 100644 --- a/engine/runner/pooled.go +++ b/engine/runner/pooled.go @@ -4,6 +4,7 @@ import ( "context" "errors" "os" + "strconv" "time" "github.com/project-flogo/core/action" @@ -114,8 +115,8 @@ func (runner *PooledRunner) Stop() error { // wait to make sure running flow will make cancel entry into DB (if can) timeout := GetEnvVar(EnvDelayedCancelContextInterval) - timeDuration, ok := timeout.(int) - if ok && timeDuration > 0 { + timeDuration, err := strconv.Atoi(timeout) + if err == nil && timeDuration > 0 { time.Sleep(time.Second * time.Duration(timeDuration)) } else { time.Sleep(time.Second * 2) @@ -161,7 +162,7 @@ func (runner *PooledRunner) RunAction(ctx context.Context, act action.Action, in return nil, errors.New("runner not active") } -func GetEnvVar(varName string) interface{} { +func GetEnvVar(varName string) string { intervalEnv := os.Getenv(varName) if len(intervalEnv) > 0 { return intervalEnv From 61579f6ae7fc37810d3da1c93f51e1700a1f6eaa Mon Sep 17 00:00:00 2001 From: vanilcha Date: Thu, 6 Jul 2023 15:59:52 +0530 Subject: [PATCH 6/7] removed env varible for delayed cancelled context --- engine/runner/pooled.go | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/engine/runner/pooled.go b/engine/runner/pooled.go index 3a443014..d180ac04 100644 --- a/engine/runner/pooled.go +++ b/engine/runner/pooled.go @@ -3,8 +3,6 @@ package runner import ( "context" "errors" - "os" - "strconv" "time" "github.com/project-flogo/core/action" @@ -114,14 +112,7 @@ func (runner *PooledRunner) Stop() error { } // wait to make sure running flow will make cancel entry into DB (if can) - timeout := GetEnvVar(EnvDelayedCancelContextInterval) - timeDuration, err := strconv.Atoi(timeout) - if err == nil && timeDuration > 0 { - time.Sleep(time.Second * time.Duration(timeDuration)) - } else { - time.Sleep(time.Second * 2) - } - + time.Sleep(time.Second * 5) } } @@ -161,11 +152,3 @@ func (runner *PooledRunner) RunAction(ctx context.Context, act action.Action, in //Run rejected return nil, errors.New("runner not active") } - -func GetEnvVar(varName string) string { - intervalEnv := os.Getenv(varName) - if len(intervalEnv) > 0 { - return intervalEnv - } - return "" -} From 30882b42b3ef0869ea25f1fb0ea810d087db9760 Mon Sep 17 00:00:00 2001 From: vanilcha Date: Thu, 6 Jul 2023 16:05:51 +0530 Subject: [PATCH 7/7] removed env var --- engine/runner/pooled.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/engine/runner/pooled.go b/engine/runner/pooled.go index d180ac04..9e5985c0 100644 --- a/engine/runner/pooled.go +++ b/engine/runner/pooled.go @@ -10,10 +10,6 @@ import ( "github.com/project-flogo/core/support/log" ) -const ( - EnvDelayedCancelContextInterval = "FLOGO_APP_DELAYED_CANCEL_CONTEXT_INTERVAL" -) - // PooledRunner is a action runner that queues and runs a action in a worker pool type PooledRunner struct { workerQueue chan chan ActionWorkRequest