Skip to content

Commit 36cbd6b

Browse files
committed
[CAPPL-735] Engine V2 execution phase (minimal impl)
1 parent 98fa28d commit 36cbd6b

File tree

5 files changed

+187
-34
lines changed

5 files changed

+187
-34
lines changed

core/services/workflows/engine.go

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package workflows
22

33
import (
44
"context"
5-
"crypto/sha256"
6-
"encoding/hex"
75
"errors"
86
"fmt"
97
"strconv"
@@ -494,21 +492,6 @@ func (e *Engine) stepUpdateLoop(ctx context.Context, executionID string, stepUpd
494492
}
495493
}
496494

497-
func generateExecutionID(workflowID, eventID string) (string, error) {
498-
s := sha256.New()
499-
_, err := s.Write([]byte(workflowID))
500-
if err != nil {
501-
return "", err
502-
}
503-
504-
_, err = s.Write([]byte(eventID))
505-
if err != nil {
506-
return "", err
507-
}
508-
509-
return hex.EncodeToString(s.Sum(nil)), nil
510-
}
511-
512495
// startExecution kicks off a new workflow execution when a trigger event is received.
513496
func (e *Engine) startExecution(ctx context.Context, executionID string, triggerID string, event *values.Map) error {
514497
e.meterReports.Add(executionID, NewMeteringReport())
@@ -741,7 +724,7 @@ func (e *Engine) worker(ctx context.Context) {
741724
continue
742725
}
743726

744-
executionID, err := generateExecutionID(e.workflow.id, te.ID)
727+
executionID, err := types.GenerateExecutionID(e.workflow.id, te.ID)
745728
if err != nil {
746729
e.logger.With(platform.KeyTriggerID, te.ID).Errorf("could not generate execution ID: %v", err)
747730
continue
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package types
2+
3+
import (
4+
"crypto/sha256"
5+
"encoding/hex"
6+
)
7+
8+
// hash of (workflowID, triggerEventID)
9+
// TODO(CAPPL-838): improve for V2
10+
func GenerateExecutionID(workflowID, triggerEventID string) (string, error) {
11+
s := sha256.New()
12+
_, err := s.Write([]byte(workflowID))
13+
if err != nil {
14+
return "", err
15+
}
16+
17+
_, err = s.Write([]byte(triggerEventID))
18+
if err != nil {
19+
return "", err
20+
}
21+
22+
return hex.EncodeToString(s.Sum(nil)), nil
23+
}

core/services/workflows/v2/config.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@ const (
4242
defaultMaxTriggerSubscriptions = 10
4343
defaultTriggerEventQueueSize = 1000
4444

45-
defaultMaxConcurrentWorkflowExecutions = 100
46-
defaultMaxConcurrentCapabilityCalls = 10
45+
defaultMaxConcurrentWorkflowExecutions = 100
46+
defaultMaxConcurrentCapabilityCallsPerWorkflow = 10
47+
defaultWorkflowExecutionTimeoutMs = 1000 * 60 * 10 // 10 minutes
48+
defaultCapabilityCallTimeoutMs = 1000 * 60 * 8 // 8 minutes
4749

4850
defaultShutdownTimeoutMs = 5000
4951
)
@@ -57,8 +59,10 @@ type EngineLimits struct {
5759
MaxTriggerSubscriptions uint16
5860
TriggerEventQueueSize uint16
5961

60-
MaxConcurrentWorkflowExecutions uint16
61-
MaxConcurrentCapabilityCalls uint16
62+
MaxConcurrentWorkflowExecutions uint16
63+
MaxConcurrentCapabilityCallsPerWorkflow uint16
64+
WorkflowExecutionTimeoutMs uint32
65+
CapabilityCallTimeoutMs uint32
6266

6367
ShutdownTimeoutMs uint32
6468
}
@@ -133,8 +137,14 @@ func (l *EngineLimits) setDefaultLimits() {
133137
if l.MaxConcurrentWorkflowExecutions == 0 {
134138
l.MaxConcurrentWorkflowExecutions = defaultMaxConcurrentWorkflowExecutions
135139
}
136-
if l.MaxConcurrentCapabilityCalls == 0 {
137-
l.MaxConcurrentCapabilityCalls = defaultMaxConcurrentCapabilityCalls
140+
if l.MaxConcurrentCapabilityCallsPerWorkflow == 0 {
141+
l.MaxConcurrentCapabilityCallsPerWorkflow = defaultMaxConcurrentCapabilityCallsPerWorkflow
142+
}
143+
if l.WorkflowExecutionTimeoutMs == 0 {
144+
l.WorkflowExecutionTimeoutMs = defaultWorkflowExecutionTimeoutMs
145+
}
146+
if l.CapabilityCallTimeoutMs == 0 {
147+
l.CapabilityCallTimeoutMs = defaultCapabilityCallTimeoutMs
138148
}
139149
if l.ShutdownTimeoutMs == 0 {
140150
l.ShutdownTimeoutMs = defaultShutdownTimeoutMs

core/services/workflows/v2/engine.go

Lines changed: 86 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"strconv"
78
"sync"
89
"time"
910

@@ -12,6 +13,9 @@ import (
1213
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
1314
"github.com/smartcontractkit/chainlink-common/pkg/services"
1415

16+
cappb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
17+
sdkpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk/v2/pb"
18+
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
1519
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/v2/pb"
1620
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/internal"
1721
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/types"
@@ -31,13 +35,18 @@ type Engine struct {
3135

3236
allTriggerEventsQueueCh chan enqueuedTriggerEvent
3337
executionsSemaphore chan struct{}
38+
capCallsSemaphore chan struct{}
3439
}
3540

3641
type enqueuedTriggerEvent struct {
37-
event capabilities.TriggerResponse
38-
timestamp time.Time
42+
triggerCapID string
43+
triggerIndex int
44+
timestamp time.Time
45+
event capabilities.TriggerResponse
3946
}
4047

48+
var _ host.CapabilityExecutor = (*Engine)(nil)
49+
4150
func NewEngine(ctx context.Context, cfg *EngineConfig) (*Engine, error) {
4251
err := cfg.Validate()
4352
if err != nil {
@@ -48,6 +57,7 @@ func NewEngine(ctx context.Context, cfg *EngineConfig) (*Engine, error) {
4857
triggers: make(map[string]capabilities.TriggerCapability),
4958
allTriggerEventsQueueCh: make(chan enqueuedTriggerEvent, cfg.LocalLimits.TriggerEventQueueSize),
5059
executionsSemaphore: make(chan struct{}, cfg.LocalLimits.MaxConcurrentWorkflowExecutions),
60+
capCallsSemaphore: make(chan struct{}, cfg.LocalLimits.MaxConcurrentCapabilityCallsPerWorkflow),
5161
}
5262
engine.Service, engine.srvcEng = services.Config{
5363
Name: "WorkflowEngineV2",
@@ -102,7 +112,15 @@ func (e *Engine) init(ctx context.Context) {
102112
return
103113
}
104114

105-
err := e.runTriggerSubscriptionPhase(ctx)
115+
err := e.cfg.Module.SetCapabilityExecutor(e)
116+
if err != nil {
117+
e.cfg.Lggr.Errorw("Workflow Engine initialization failed", "err", err)
118+
// TODO(CAPPL-736): observability
119+
e.cfg.Hooks.OnInitialized(err)
120+
return
121+
}
122+
123+
err = e.runTriggerSubscriptionPhase(ctx)
106124
if err != nil {
107125
e.cfg.Lggr.Errorw("Workflow Engine initialization failed", "err", err)
108126
// TODO(CAPPL-736): observability
@@ -185,7 +203,7 @@ func (e *Engine) runTriggerSubscriptionPhase(ctx context.Context) error {
185203
}
186204

187205
// start listening for trigger events only if all registrations succeeded
188-
for _, triggerEventCh := range eventChans {
206+
for idx, triggerEventCh := range eventChans {
189207
e.srvcEng.Go(func(srvcCtx context.Context) {
190208
for {
191209
select {
@@ -197,8 +215,10 @@ func (e *Engine) runTriggerSubscriptionPhase(ctx context.Context) error {
197215
}
198216
select {
199217
case e.allTriggerEventsQueueCh <- enqueuedTriggerEvent{
200-
event: event,
201-
timestamp: e.cfg.Clock.Now(),
218+
triggerCapID: subs.Subscriptions[idx].Id,
219+
triggerIndex: idx,
220+
timestamp: e.cfg.Clock.Now(),
221+
event: event,
202222
}:
203223
default: // queue full, drop the event
204224
// TODO(CAPPL-736): observability
@@ -216,15 +236,15 @@ func (e *Engine) handleAllTriggerEvents(ctx context.Context) {
216236
select {
217237
case <-ctx.Done():
218238
return
219-
case queueElem, isOpen := <-e.allTriggerEventsQueueCh:
239+
case queueHead, isOpen := <-e.allTriggerEventsQueueCh:
220240
if !isOpen {
221241
return
222242
}
223243
// TODO(CAPPL-737): check if expired
224244
select {
225245
case e.executionsSemaphore <- struct{}{}: // block if too many concurrent workflow executions
226246
e.srvcEng.Go(func(srvcCtx context.Context) {
227-
e.startNewWorkflowExecution(srvcCtx, queueElem.event)
247+
e.startExecution(srvcCtx, queueHead)
228248
<-e.executionsSemaphore
229249
})
230250
case <-ctx.Done():
@@ -234,8 +254,64 @@ func (e *Engine) handleAllTriggerEvents(ctx context.Context) {
234254
}
235255
}
236256

237-
func (e *Engine) startNewWorkflowExecution(_ context.Context, _ capabilities.TriggerResponse) {
238-
// TODO(CAPPL-735): implement execution phase
257+
// new workflow execution, blocking until completed
258+
func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueuedTriggerEvent) {
259+
triggerEvent := wrappedTriggerEvent.event.Event
260+
executionID, err := types.GenerateExecutionID(e.cfg.WorkflowID, triggerEvent.ID)
261+
if err != nil {
262+
// TODO(CAPPL-736): observability
263+
return
264+
}
265+
266+
subCtx, cancel := context.WithTimeout(ctx, time.Millisecond*time.Duration(e.cfg.LocalLimits.WorkflowExecutionTimeoutMs))
267+
defer cancel()
268+
result, err := e.cfg.Module.Execute(subCtx, &wasmpb.ExecuteRequest{
269+
Id: executionID,
270+
Request: &wasmpb.ExecuteRequest_Trigger{
271+
Trigger: &sdkpb.Trigger{
272+
Id: strconv.FormatInt(int64(wrappedTriggerEvent.triggerIndex), 10), // TODO: change to an integer oncce proto is refactored
273+
Payload: triggerEvent.Payload,
274+
},
275+
},
276+
MaxResponseSize: uint64(e.cfg.LocalLimits.ModuleExecuteMaxResponseSizeBytes),
277+
// no Config needed
278+
})
279+
if err != nil {
280+
e.cfg.Lggr.Errorw("Workflow execution failed", "err", err)
281+
// TODO(CAPPL-736): observability
282+
return
283+
}
284+
// TODO(CAPPL-736): handle execution result
285+
e.cfg.Lggr.Debugw("Workflow execution finished", "executionID", executionID, "result", result)
286+
e.cfg.Hooks.OnExecutionFinished(executionID)
287+
}
288+
289+
// blocking
290+
func (e *Engine) CallCapability(ctx context.Context, request *cappb.CapabilityRequest) (*cappb.CapabilityResponse, error) {
291+
select {
292+
case e.capCallsSemaphore <- struct{}{}: // block if too many concurrent capability calls
293+
case <-ctx.Done():
294+
return nil, ctx.Err()
295+
}
296+
defer func() { <-e.capCallsSemaphore }()
297+
298+
// TODO (CAPPL-735): use request.Metadata.WorkflowExecutionId to associate the call with a specific execution
299+
capability, err := e.cfg.CapRegistry.GetExecutable(ctx, request.CapabilityId)
300+
if err != nil {
301+
return nil, fmt.Errorf("trigger capability not found: %w", err)
302+
}
303+
304+
capReq, err := cappb.CapabilityRequestFromProto(request)
305+
if err != nil {
306+
return nil, fmt.Errorf("failed to convert capability request: %w", err)
307+
}
308+
309+
// TODO(CAPPL-737): run with a timeout
310+
capResp, err := capability.Execute(ctx, capReq)
311+
if err != nil {
312+
return nil, fmt.Errorf("failed to execute capability: %w", err)
313+
}
314+
return cappb.CapabilityResponseToProto(capResp), nil
239315
}
240316

241317
func (e *Engine) close() error {

core/services/workflows/v2/engine_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func TestEngine_Init(t *testing.T) {
4242
require.NoError(t, err)
4343

4444
module.EXPECT().Start().Once()
45+
module.EXPECT().SetCapabilityExecutor(mock.Anything).Return(nil).Once()
4546
module.EXPECT().Execute(matches.AnyContext, mock.Anything).Return(newTriggerSubs(0), nil).Once()
4647
capreg.EXPECT().LocalNode(matches.AnyContext).Return(capabilities.Node{}, nil).Once()
4748
require.NoError(t, engine.Start(t.Context()))
@@ -62,6 +63,7 @@ func TestEngine_Start_RateLimited(t *testing.T) {
6263

6364
module := modulemocks.NewModuleV2(t)
6465
module.EXPECT().Start()
66+
module.EXPECT().SetCapabilityExecutor(mock.Anything).Return(nil)
6567
module.EXPECT().Execute(matches.AnyContext, mock.Anything).Return(newTriggerSubs(0), nil).Times(2)
6668
module.EXPECT().Close()
6769
capreg := regmocks.NewCapabilitiesRegistry(t)
@@ -123,6 +125,7 @@ func TestEngine_TriggerSubscriptions(t *testing.T) {
123125

124126
module := modulemocks.NewModuleV2(t)
125127
module.EXPECT().Start()
128+
module.EXPECT().SetCapabilityExecutor(mock.Anything).Return(nil)
126129
module.EXPECT().Close()
127130
capreg := regmocks.NewCapabilitiesRegistry(t)
128131
capreg.EXPECT().LocalNode(matches.AnyContext).Return(capabilities.Node{}, nil)
@@ -215,3 +218,61 @@ func newTriggerSubs(n int) *wasmpb.ExecutionResult {
215218
},
216219
}
217220
}
221+
222+
func TestEngine_Execution(t *testing.T) {
223+
t.Parallel()
224+
225+
module := modulemocks.NewModuleV2(t)
226+
module.EXPECT().Start()
227+
module.EXPECT().SetCapabilityExecutor(mock.Anything).Return(nil)
228+
module.EXPECT().Close()
229+
capreg := regmocks.NewCapabilitiesRegistry(t)
230+
capreg.EXPECT().LocalNode(matches.AnyContext).Return(capabilities.Node{}, nil)
231+
232+
initDoneCh := make(chan error)
233+
subscribedToTriggersCh := make(chan []string, 1)
234+
executionFinishedCh := make(chan string)
235+
236+
cfg := defaultTestConfig(t)
237+
cfg.Module = module
238+
cfg.CapRegistry = capreg
239+
cfg.Hooks = v2.LifecycleHooks{
240+
OnInitialized: func(err error) {
241+
initDoneCh <- err
242+
},
243+
OnSubscribedToTriggers: func(triggerIDs []string) {
244+
subscribedToTriggersCh <- triggerIDs
245+
},
246+
OnExecutionFinished: func(executionID string) {
247+
executionFinishedCh <- executionID
248+
},
249+
}
250+
251+
t.Run("successful execution with no capability calls", func(t *testing.T) {
252+
engine, err := v2.NewEngine(t.Context(), cfg)
253+
require.NoError(t, err)
254+
module.EXPECT().Execute(matches.AnyContext, mock.Anything).Return(newTriggerSubs(1), nil).Once()
255+
trigger := capmocks.NewTriggerCapability(t)
256+
capreg.EXPECT().GetTrigger(matches.AnyContext, "id_0").Return(trigger, nil).Once()
257+
eventCh := make(chan capabilities.TriggerResponse)
258+
trigger.EXPECT().RegisterTrigger(matches.AnyContext, mock.Anything).Return(eventCh, nil).Once()
259+
trigger.EXPECT().UnregisterTrigger(matches.AnyContext, mock.Anything).Return(nil).Once()
260+
261+
require.NoError(t, engine.Start(t.Context()))
262+
263+
require.NoError(t, <-initDoneCh) // successful trigger registration
264+
require.Equal(t, []string{"id_0"}, <-subscribedToTriggersCh)
265+
266+
module.EXPECT().Execute(matches.AnyContext, mock.Anything).Return(nil, nil).Once()
267+
eventCh <- capabilities.TriggerResponse{
268+
Event: capabilities.TriggerEvent{
269+
TriggerType: "[email protected]",
270+
ID: "event_012345",
271+
Payload: nil,
272+
},
273+
}
274+
<-executionFinishedCh
275+
276+
require.NoError(t, engine.Close())
277+
})
278+
}

0 commit comments

Comments
 (0)