Skip to content

Commit 2258e52

Browse files
Added support for sequencing of actions based on the key (#294)
1 parent 2ce7fe8 commit 2258e52

File tree

4 files changed

+294
-25
lines changed

4 files changed

+294
-25
lines changed

trigger/config.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,9 @@ func (hc *HandlerConfig) UnmarshalJSON(d []byte) error {
104104
// ActionConfig is the configuration for the Action
105105
type ActionConfig struct {
106106
*action.Config
107-
If string `json:"if,omitempty"`
108-
Input map[string]interface{} `json:"input,omitempty"`
109-
Output map[string]interface{} `json:"output,omitempty"`
110-
111-
Act action.Action `json:"-,omitempty"`
112-
}
107+
If string `json:"if,omitempty"`
108+
Input map[string]interface{} `json:"input,omitempty"`
109+
Output map[string]interface{} `json:"output,omitempty"`
110+
SequenceKey string `json:"seqKey,omitempty"`
111+
Act action.Action `json:"-,omitempty"`
112+
}

trigger/handler.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,17 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"runtime/debug"
8+
"sync"
9+
"time"
10+
711
"github.com/project-flogo/core/action"
812
"github.com/project-flogo/core/data"
913
"github.com/project-flogo/core/data/coerce"
1014
"github.com/project-flogo/core/data/expression"
1115
"github.com/project-flogo/core/data/mapper"
1216
"github.com/project-flogo/core/data/property"
1317
"github.com/project-flogo/core/support/log"
14-
"runtime/debug"
15-
"time"
1618
)
1719

1820
var handlerLog = log.ChildLogger(log.RootLogger(), "handler")
@@ -30,6 +32,7 @@ type actImpl struct {
3032
condition expression.Expr
3133
actionInputMapper mapper.Mapper
3234
actionOutputMapper mapper.Mapper
35+
sequenceKey mapper.Mapper
3336
}
3437

3538
type handlerImpl struct {
@@ -79,6 +82,7 @@ func NewHandler(config *HandlerConfig, acts []action.Action, mf mapper.Factory,
7982

8083
handler := &handlerImpl{config: config, acts: make([]actImpl, len(acts)), runner: runner, logger: handlerLogger}
8184
var err error
85+
var hasSequenceKey bool
8286

8387
//todo we could filter inputs/outputs based on the metadata, maybe make this an option
8488
for i, act := range acts {
@@ -105,6 +109,22 @@ func NewHandler(config *HandlerConfig, acts []action.Action, mf mapper.Factory,
105109
return nil, err
106110
}
107111
}
112+
113+
if config.Actions[i].SequenceKey != "" {
114+
hasSequenceKey = true
115+
handler.logger.Infof("Handler [%s] is configured with sequence key [%s]", handler.Name(), config.Actions[i].SequenceKey)
116+
mappings := map[string]interface{}{"key": config.Actions[i].SequenceKey}
117+
sequenceKey, err := mf.NewMapper(mappings)
118+
if err != nil {
119+
return nil, err
120+
}
121+
handler.acts[i].sequenceKey = sequenceKey
122+
}
123+
}
124+
if hasSequenceKey {
125+
// Create a new handler with sequence key support
126+
seqKeyHandler := &seqKeyHandlerImpl{config: handler.config, acts: handler.acts, runner: handler.runner, logger: handlerLogger, seqKeyChannelMap: sync.Map{}}
127+
return seqKeyHandler, nil
108128
}
109129

110130
return handler, nil

trigger/handler_test.go

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@ package trigger
22

33
import (
44
"context"
5-
"github.com/project-flogo/core/support/log"
65
"testing"
76

7+
"github.com/project-flogo/core/support/log"
8+
89
"github.com/project-flogo/core/action"
910
"github.com/project-flogo/core/data/expression"
1011
"github.com/project-flogo/core/data/mapper"
1112
"github.com/project-flogo/core/data/metadata"
1213
"github.com/project-flogo/core/data/resolve"
13-
"github.com/project-flogo/core/engine/runner"
1414
"github.com/stretchr/testify/assert"
1515
)
1616

@@ -44,24 +44,12 @@ func TestNewHandler(t *testing.T) {
4444
expf := expression.NewFactory(defResolver)
4545

4646
//Action not specified
47-
handler, err := NewHandler(hCfg, nil, mf, expf, runner.NewDirect(), log.RootLogger())
47+
handler, err := NewHandler(hCfg, nil, mf, expf, nil, log.RootLogger())
4848
assert.NotNil(t, err, "Actions not specified.")
4949

50-
//Parent not defined in the Handler Config
51-
handler, err = NewHandler(hCfg, []action.Action{&MockAction{}}, mf, expf, runner.NewDirect(), log.RootLogger())
52-
_, err = handler.Handle(context.Background(), map[string]interface{}{"anInput": "input"})
53-
assert.NotNil(t, err, "Parent not defined.")
54-
55-
//Parent defined.
56-
hCfg.parent = &Config{Id: "sampleTrig"}
57-
handler, err = NewHandler(hCfg, []action.Action{&MockAction{}}, mf, expf, runner.NewDirect(), log.RootLogger())
58-
assert.Nil(t, err)
59-
assert.NotNil(t, handler)
60-
50+
//HandlerSettings configured
51+
handler, err = NewHandler(hCfg, []action.Action{&MockAction{}}, mf, expf, nil, log.RootLogger())
6152
assert.NotNil(t, handler.Settings())
62-
out, err := handler.Handle(context.Background(), map[string]interface{}{"anInput": "input"})
63-
64-
assert.Equal(t, "output", out["anOutput"])
6553
}
6654

6755
func TestHandlerContext(t *testing.T) {

trigger/sequenceKeyHandler.go

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
package trigger
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"runtime/debug"
8+
"sync"
9+
"time"
10+
11+
"github.com/project-flogo/core/action"
12+
"github.com/project-flogo/core/data"
13+
"github.com/project-flogo/core/data/coerce"
14+
"github.com/project-flogo/core/data/property"
15+
"github.com/project-flogo/core/support/log"
16+
)
17+
18+
type SeqKayActionWrapper func()
19+
20+
type seqKeyHandlerImpl struct {
21+
runner action.Runner
22+
logger log.Logger
23+
config *HandlerConfig
24+
acts []actImpl
25+
eventData map[string]string
26+
seqKeyChannelMap sync.Map
27+
}
28+
29+
func (h *seqKeyHandlerImpl) Name() string {
30+
return h.config.Name
31+
}
32+
33+
func (h *seqKeyHandlerImpl) Schemas() *SchemaConfig {
34+
return h.config.Schemas
35+
}
36+
37+
func (h *seqKeyHandlerImpl) Settings() map[string]interface{} {
38+
return h.config.Settings
39+
}
40+
41+
func (h *seqKeyHandlerImpl) Logger() log.Logger {
42+
return h.logger
43+
}
44+
45+
func (h *seqKeyHandlerImpl) SetDefaultEventData(data map[string]string) {
46+
h.eventData = data
47+
}
48+
49+
func (h *seqKeyHandlerImpl) GetSetting(setting string) (interface{}, bool) {
50+
51+
if h.config == nil {
52+
return nil, false
53+
}
54+
55+
val, exists := h.config.Settings[setting]
56+
57+
if !exists {
58+
val, exists = h.config.parent.Settings[setting]
59+
}
60+
61+
return val, exists
62+
}
63+
64+
func (h *seqKeyHandlerImpl) Handle(ctx context.Context, triggerData interface{}) (results map[string]interface{}, err error) {
65+
handlerName := "Handler"
66+
if h.config != nil && h.config.Name != "" {
67+
handlerName = h.config.Name
68+
}
69+
newCtx := NewHandlerContext(ctx, h.config)
70+
71+
defer func() {
72+
h.Logger().Debugf("Handler [%s] for event id [%s] completed in %s", handlerName, GetHandlerEventIdFromContext(newCtx), time.Since(GetHandleStartTimeFromContext(newCtx)).String())
73+
if r := recover(); r != nil {
74+
h.Logger().Warnf("Unhandled Error while handling handler [%s]: %v", h.Name(), r)
75+
if h.Logger().DebugEnabled() {
76+
h.Logger().Debugf("StackTrace: %s", debug.Stack())
77+
}
78+
err = fmt.Errorf("Unhandled Error while handling handler [%s]: %v", h.Name(), r)
79+
}
80+
}()
81+
82+
var triggerValues map[string]interface{}
83+
84+
if triggerData == nil {
85+
triggerValues = make(map[string]interface{})
86+
} else if values, ok := triggerData.(map[string]interface{}); ok {
87+
triggerValues = values
88+
} else if value, ok := triggerData.(data.StructValue); ok {
89+
triggerValues = value.ToMap()
90+
} else {
91+
return nil, fmt.Errorf("unsupported trigger data: %v", triggerData)
92+
}
93+
94+
var act actImpl
95+
scope := data.NewSimpleScope(triggerValues, nil)
96+
for _, v := range h.acts {
97+
if v.condition == nil {
98+
act = v
99+
break
100+
}
101+
val, err := v.condition.Eval(scope)
102+
if err != nil {
103+
return nil, err
104+
}
105+
if val == nil {
106+
return nil, errors.New("expression has nil result")
107+
}
108+
condition, ok := val.(bool)
109+
if !ok {
110+
return nil, errors.New("expression has a non-bool result")
111+
}
112+
if condition {
113+
act = v
114+
break
115+
}
116+
}
117+
118+
if act.act == nil {
119+
log.RootLogger().Warnf("no action to execute")
120+
return nil, nil
121+
}
122+
123+
if act.sequenceKey != nil {
124+
sequenceKeyObj, err := act.sequenceKey.Apply(scope)
125+
if err != nil {
126+
return nil, err
127+
}
128+
if sequenceKeyObj == nil {
129+
h.logger.Warnf("SequenceKey is evaluated to nil. Running action in concurrent mode")
130+
return h.runAction(newCtx, act, scope, triggerValues, handlerName)
131+
} else {
132+
sequenceKeyString, _ := sequenceKeyObj["key"].(string)
133+
if sequenceKeyString == "" {
134+
h.logger.Warnf("SequenceKey is evaluated to empty string. Running action in concurrent mode")
135+
return h.runAction(newCtx, act, scope, triggerValues, handlerName)
136+
}
137+
// Run actions in sequencial mode for matching key
138+
// Check if a channel is already created for the sequence key
139+
runActionChannel, _ := h.seqKeyChannelMap.Load(sequenceKeyString)
140+
if runActionChannel == nil {
141+
// Create a new channel for the sequence key
142+
runActionChannel = make(chan SeqKayActionWrapper, 100)
143+
h.seqKeyChannelMap.Store(sequenceKeyString, runActionChannel)
144+
// Start a go routine to listen on the channel
145+
go h.seqKeyActionListener(runActionChannel.(chan SeqKayActionWrapper), sequenceKeyString)
146+
}
147+
148+
resultChann := make(chan ExecResult)
149+
runActionWrapper := func() {
150+
h.runSeqKeyBasedAction(newCtx, act, scope, triggerValues, handlerName, resultChann)
151+
}
152+
// Send the action to the channel
153+
runActionChannel.(chan SeqKayActionWrapper) <- runActionWrapper
154+
155+
// Wait for the reply
156+
result := <-resultChann
157+
return result.results, result.err
158+
}
159+
} else {
160+
// Run action in concurrent mode
161+
return h.runAction(newCtx, act, scope, triggerValues, handlerName)
162+
}
163+
}
164+
165+
func (h *seqKeyHandlerImpl) seqKeyActionListener(seqActionChannel chan SeqKayActionWrapper, seqKey string) {
166+
for seqKayBasedAction := range seqActionChannel {
167+
h.logger.Infof("Running action[%s] for sequence key [%s]", h.Name(), seqKey)
168+
seqKayBasedAction()
169+
h.logger.Infof("Action[%s] for sequence key [%s] completed", h.Name(), seqKey)
170+
}
171+
}
172+
173+
type ExecResult struct {
174+
results map[string]interface{}
175+
err error
176+
}
177+
178+
func (h *seqKeyHandlerImpl) runAction(ctx context.Context, act actImpl, scope data.Scope, triggerValues map[string]interface{}, handlerName string) (results map[string]interface{}, err error) {
179+
180+
newCtx := NewHandlerContext(ctx, h.config)
181+
h.Logger().Infof("Executing handler [%s] for event Id [%s]", handlerName, GetHandlerEventIdFromContext(newCtx))
182+
eventData := h.eventData
183+
184+
// check if any event data was attached to the context
185+
if ctxEventData, _ := ExtractEventDataFromContext(newCtx); ctxEventData != nil {
186+
//use this event data values and add missing default event values
187+
for key, value := range eventData {
188+
if _, exists := ctxEventData[key]; !exists {
189+
ctxEventData[key] = value
190+
}
191+
}
192+
eventData = ctxEventData
193+
}
194+
195+
PostHandlerEvent(STARTED, h.Name(), h.config.parent.Id, eventData)
196+
var inputMap map[string]interface{}
197+
198+
if act.actionInputMapper != nil {
199+
inputMap, err = act.actionInputMapper.Apply(scope)
200+
if err != nil {
201+
return nil, err
202+
}
203+
} else {
204+
inputMap = triggerValues
205+
}
206+
207+
if ioMd := act.act.IOMetadata(); ioMd != nil {
208+
for name, tv := range ioMd.Input {
209+
if val, ok := inputMap[name]; ok {
210+
inputMap[name], err = coerce.ToType(val, tv.Type())
211+
if err != nil {
212+
return nil, err
213+
}
214+
}
215+
}
216+
}
217+
218+
if property.IsPropertySnapshotEnabled() {
219+
if inputMap == nil {
220+
inputMap = make(map[string]interface{})
221+
}
222+
// Take snapshot of current app properties
223+
propSnapShot := make(map[string]interface{}, len(property.DefaultManager().GetProperties()))
224+
for k, v := range property.DefaultManager().GetProperties() {
225+
propSnapShot[k] = v
226+
}
227+
inputMap["_PROPERTIES"] = propSnapShot
228+
}
229+
230+
results, err = h.runner.RunAction(ctx, act.act, inputMap)
231+
if err != nil {
232+
PostHandlerEvent(FAILED, h.Name(), h.config.parent.Id, eventData)
233+
return nil, err
234+
}
235+
236+
PostHandlerEvent(COMPLETED, h.Name(), h.config.parent.Id, eventData)
237+
238+
if act.actionOutputMapper != nil {
239+
outScope := data.NewSimpleScope(results, nil)
240+
results, err = act.actionOutputMapper.Apply(outScope)
241+
}
242+
243+
return results, err
244+
}
245+
246+
func (h *seqKeyHandlerImpl) runSeqKeyBasedAction(ctx context.Context, act actImpl, scope data.Scope, triggerValues map[string]interface{}, handlerName string, resultChan chan ExecResult) {
247+
results, err := h.runAction(ctx, act, scope, triggerValues, handlerName)
248+
resultChan <- ExecResult{results: results, err: err}
249+
}
250+
251+
func (h *seqKeyHandlerImpl) String() string {
252+
triggerId := ""
253+
if h.config.parent != nil {
254+
triggerId = h.config.parent.Id
255+
}
256+
handlerId := "Handler"
257+
if h.config.Name != "" {
258+
handlerId = h.config.Name
259+
}
260+
return fmt.Sprintf("Trigger[%s].%s", triggerId, handlerId)
261+
}

0 commit comments

Comments
 (0)