Skip to content

Commit 68aa27d

Browse files
bolekkMStreet3
andauthored
[CAPPL-735] Engine V2 execution phase (minimal impl) (#17590)
* [CAPPL-735] Engine V2 execution phase (minimal impl) * chore: respond to comments * migrate legacy wasm test location * bump chainlink-common * use trigger index as uint64 * chore: fix docstring * refactor: safely cast into to uint64 * fix: lint errors --------- Co-authored-by: Michael Street <[email protected]>
1 parent 2385d84 commit 68aa27d

File tree

23 files changed

+289
-56
lines changed

23 files changed

+289
-56
lines changed

core/scripts/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ require (
3333
github.com/shopspring/decimal v1.4.0
3434
github.com/smartcontractkit/chainlink-automation v0.8.1
3535
github.com/smartcontractkit/chainlink-ccip v0.0.0-20250506195202-6a3f20db41c6
36-
github.com/smartcontractkit/chainlink-common v0.7.1-0.20250506185033-ea88ef405511
36+
github.com/smartcontractkit/chainlink-common v0.7.1-0.20250507190601-db395570d649
3737
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250417193446-eeb0a7d1e049
3838
github.com/smartcontractkit/chainlink-deployments-framework v0.0.14
3939
github.com/smartcontractkit/chainlink-evm v0.0.0-20250506144221-ee990aefea6c

core/scripts/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1191,8 +1191,8 @@ github.com/smartcontractkit/chainlink-ccip v0.0.0-20250506195202-6a3f20db41c6 h1
11911191
github.com/smartcontractkit/chainlink-ccip v0.0.0-20250506195202-6a3f20db41c6/go.mod h1:Jb05WL6lj5H89XGcaaOinxTf4Gdj+vXO4TcUhqTgqIM=
11921192
github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20250425163923-16aa375957b7 h1:j6Vo/NX2ABsPdGxETC5pfQLcz/h6iLJu/Yx+8AhPa34=
11931193
github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20250425163923-16aa375957b7/go.mod h1:k3/Z6AvwurPUlfuDFEonRbkkiTSgNSrtVNhJEWNlUZA=
1194-
github.com/smartcontractkit/chainlink-common v0.7.1-0.20250506185033-ea88ef405511 h1:QFkJ2MVmt0ly0W0BnfobicUknn4Qr8u0VCs1WYYo2E0=
1195-
github.com/smartcontractkit/chainlink-common v0.7.1-0.20250506185033-ea88ef405511/go.mod h1:uNF6+noody47ZdmRwymDZAnQ7eKTXLzMKvl41LA63lo=
1194+
github.com/smartcontractkit/chainlink-common v0.7.1-0.20250507190601-db395570d649 h1:VomBm6IeVmLjp2IDXnd8/mKdpYzocDJdvi+lu3ad6Sk=
1195+
github.com/smartcontractkit/chainlink-common v0.7.1-0.20250507190601-db395570d649/go.mod h1:uNF6+noody47ZdmRwymDZAnQ7eKTXLzMKvl41LA63lo=
11961196
github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw=
11971197
github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7/go.mod h1:yaDOAZF6MNB+NGYpxGCUc+owIdKrjvFW0JODdTcQ3V0=
11981198
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250417193446-eeb0a7d1e049 h1:7HwYt8rDz1ehTcB28oNipdTZUtV17F2sfkLTLtMJC4c=

core/services/workflows/engine.go

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package workflows
22

33
import (
44
"context"
5-
"crypto/sha256"
6-
"encoding/hex"
75
"errors"
86
"fmt"
97
"strconv"
@@ -494,21 +492,6 @@ func (e *Engine) stepUpdateLoop(ctx context.Context, executionID string, stepUpd
494492
}
495493
}
496494

497-
func generateExecutionID(workflowID, eventID string) (string, error) {
498-
s := sha256.New()
499-
_, err := s.Write([]byte(workflowID))
500-
if err != nil {
501-
return "", err
502-
}
503-
504-
_, err = s.Write([]byte(eventID))
505-
if err != nil {
506-
return "", err
507-
}
508-
509-
return hex.EncodeToString(s.Sum(nil)), nil
510-
}
511-
512495
// startExecution kicks off a new workflow execution when a trigger event is received.
513496
func (e *Engine) startExecution(ctx context.Context, executionID string, triggerID string, event *values.Map) error {
514497
e.meterReports.Add(executionID, NewMeteringReport())
@@ -741,7 +724,7 @@ func (e *Engine) worker(ctx context.Context) {
741724
continue
742725
}
743726

744-
executionID, err := generateExecutionID(e.workflow.id, te.ID)
727+
executionID, err := types.GenerateExecutionID(e.workflow.id, te.ID)
745728
if err != nil {
746729
e.logger.With(platform.KeyTriggerID, te.ID).Errorf("could not generate execution ID: %v", err)
747730
continue

core/services/workflows/engine_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1826,7 +1826,7 @@ func basicTestTrigger(t *testing.T) *mockTriggerCapability {
18261826
}
18271827

18281828
func TestEngine_WithCustomComputeStep(t *testing.T) {
1829-
cmd := "core/services/workflows/test/wasm/cmd"
1829+
cmd := "core/services/workflows/test/wasm/legacy/cmd"
18301830

18311831
ctx := testutils.Context(t)
18321832
log := logger.TestLogger(t)
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package types
2+
3+
import (
4+
"crypto/sha256"
5+
"encoding/hex"
6+
)
7+
8+
// hash of (workflowID, triggerEventID)
9+
// TODO(CAPPL-838): improve for V2
10+
func GenerateExecutionID(workflowID, triggerEventID string) (string, error) {
11+
s := sha256.New()
12+
_, err := s.Write([]byte(workflowID))
13+
if err != nil {
14+
return "", err
15+
}
16+
17+
_, err = s.Write([]byte(triggerEventID))
18+
if err != nil {
19+
return "", err
20+
}
21+
22+
return hex.EncodeToString(s.Sum(nil)), nil
23+
}

core/services/workflows/v2/config.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@ const (
4242
defaultMaxTriggerSubscriptions = 10
4343
defaultTriggerEventQueueSize = 1000
4444

45-
defaultMaxConcurrentWorkflowExecutions = 100
46-
defaultMaxConcurrentCapabilityCalls = 10
45+
defaultMaxConcurrentWorkflowExecutions = 100
46+
defaultMaxConcurrentCapabilityCallsPerWorkflow = 10
47+
defaultWorkflowExecutionTimeoutMs = 1000 * 60 * 10 // 10 minutes
48+
defaultCapabilityCallTimeoutMs = 1000 * 60 * 8 // 8 minutes
4749

4850
defaultShutdownTimeoutMs = 5000
4951
)
@@ -57,8 +59,10 @@ type EngineLimits struct {
5759
MaxTriggerSubscriptions uint16
5860
TriggerEventQueueSize uint16
5961

60-
MaxConcurrentWorkflowExecutions uint16
61-
MaxConcurrentCapabilityCalls uint16
62+
MaxConcurrentWorkflowExecutions uint16
63+
MaxConcurrentCapabilityCallsPerWorkflow uint16
64+
WorkflowExecutionTimeoutMs uint32
65+
CapabilityCallTimeoutMs uint32
6266

6367
ShutdownTimeoutMs uint32
6468
}
@@ -133,8 +137,14 @@ func (l *EngineLimits) setDefaultLimits() {
133137
if l.MaxConcurrentWorkflowExecutions == 0 {
134138
l.MaxConcurrentWorkflowExecutions = defaultMaxConcurrentWorkflowExecutions
135139
}
136-
if l.MaxConcurrentCapabilityCalls == 0 {
137-
l.MaxConcurrentCapabilityCalls = defaultMaxConcurrentCapabilityCalls
140+
if l.MaxConcurrentCapabilityCallsPerWorkflow == 0 {
141+
l.MaxConcurrentCapabilityCallsPerWorkflow = defaultMaxConcurrentCapabilityCallsPerWorkflow
142+
}
143+
if l.WorkflowExecutionTimeoutMs == 0 {
144+
l.WorkflowExecutionTimeoutMs = defaultWorkflowExecutionTimeoutMs
145+
}
146+
if l.CapabilityCallTimeoutMs == 0 {
147+
l.CapabilityCallTimeoutMs = defaultCapabilityCallTimeoutMs
138148
}
139149
if l.ShutdownTimeoutMs == 0 {
140150
l.ShutdownTimeoutMs = defaultShutdownTimeoutMs

core/services/workflows/v2/engine.go

Lines changed: 92 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,13 @@ import (
1212
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
1313
"github.com/smartcontractkit/chainlink-common/pkg/services"
1414

15+
cappb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
16+
sdkpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk/v2/pb"
17+
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
1518
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/v2/pb"
1619
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/internal"
1720
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/types"
21+
"github.com/smartcontractkit/chainlink/v2/core/utils/safe"
1822
)
1923

2024
type Engine struct {
@@ -31,13 +35,18 @@ type Engine struct {
3135

3236
allTriggerEventsQueueCh chan enqueuedTriggerEvent
3337
executionsSemaphore chan struct{}
38+
capCallsSemaphore chan struct{}
3439
}
3540

3641
type enqueuedTriggerEvent struct {
37-
event capabilities.TriggerResponse
38-
timestamp time.Time
42+
triggerCapID string
43+
triggerIndex int
44+
timestamp time.Time
45+
event capabilities.TriggerResponse
3946
}
4047

48+
var _ host.CapabilityExecutor = (*Engine)(nil)
49+
4150
func NewEngine(ctx context.Context, cfg *EngineConfig) (*Engine, error) {
4251
err := cfg.Validate()
4352
if err != nil {
@@ -48,6 +57,7 @@ func NewEngine(ctx context.Context, cfg *EngineConfig) (*Engine, error) {
4857
triggers: make(map[string]capabilities.TriggerCapability),
4958
allTriggerEventsQueueCh: make(chan enqueuedTriggerEvent, cfg.LocalLimits.TriggerEventQueueSize),
5059
executionsSemaphore: make(chan struct{}, cfg.LocalLimits.MaxConcurrentWorkflowExecutions),
60+
capCallsSemaphore: make(chan struct{}, cfg.LocalLimits.MaxConcurrentCapabilityCallsPerWorkflow),
5161
}
5262
engine.Service, engine.srvcEng = services.Config{
5363
Name: "WorkflowEngineV2",
@@ -102,7 +112,15 @@ func (e *Engine) init(ctx context.Context) {
102112
return
103113
}
104114

105-
err := e.runTriggerSubscriptionPhase(ctx)
115+
err := e.cfg.Module.SetCapabilityExecutor(e)
116+
if err != nil {
117+
e.cfg.Lggr.Errorw("Workflow Engine initialization failed", "err", err)
118+
// TODO(CAPPL-736): observability
119+
e.cfg.Hooks.OnInitialized(err)
120+
return
121+
}
122+
123+
err = e.runTriggerSubscriptionPhase(ctx)
106124
if err != nil {
107125
e.cfg.Lggr.Errorw("Workflow Engine initialization failed", "err", err)
108126
// TODO(CAPPL-736): observability
@@ -185,7 +203,7 @@ func (e *Engine) runTriggerSubscriptionPhase(ctx context.Context) error {
185203
}
186204

187205
// start listening for trigger events only if all registrations succeeded
188-
for _, triggerEventCh := range eventChans {
206+
for idx, triggerEventCh := range eventChans {
189207
e.srvcEng.Go(func(srvcCtx context.Context) {
190208
for {
191209
select {
@@ -197,8 +215,10 @@ func (e *Engine) runTriggerSubscriptionPhase(ctx context.Context) error {
197215
}
198216
select {
199217
case e.allTriggerEventsQueueCh <- enqueuedTriggerEvent{
200-
event: event,
201-
timestamp: e.cfg.Clock.Now(),
218+
triggerCapID: subs.Subscriptions[idx].Id,
219+
triggerIndex: idx,
220+
timestamp: e.cfg.Clock.Now(),
221+
event: event,
202222
}:
203223
default: // queue full, drop the event
204224
// TODO(CAPPL-736): observability
@@ -216,15 +236,15 @@ func (e *Engine) handleAllTriggerEvents(ctx context.Context) {
216236
select {
217237
case <-ctx.Done():
218238
return
219-
case queueElem, isOpen := <-e.allTriggerEventsQueueCh:
239+
case queueHead, isOpen := <-e.allTriggerEventsQueueCh:
220240
if !isOpen {
221241
return
222242
}
223243
// TODO(CAPPL-737): check if expired
224244
select {
225245
case e.executionsSemaphore <- struct{}{}: // block if too many concurrent workflow executions
226246
e.srvcEng.Go(func(srvcCtx context.Context) {
227-
e.startNewWorkflowExecution(srvcCtx, queueElem.event)
247+
e.startExecution(srvcCtx, queueHead)
228248
<-e.executionsSemaphore
229249
})
230250
case <-ctx.Done():
@@ -234,8 +254,70 @@ func (e *Engine) handleAllTriggerEvents(ctx context.Context) {
234254
}
235255
}
236256

237-
func (e *Engine) startNewWorkflowExecution(_ context.Context, _ capabilities.TriggerResponse) {
238-
// TODO(CAPPL-735): implement execution phase
257+
// startExecution initiates a new workflow execution, blocking until completed
258+
func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueuedTriggerEvent) {
259+
triggerEvent := wrappedTriggerEvent.event.Event
260+
executionID, err := types.GenerateExecutionID(e.cfg.WorkflowID, triggerEvent.ID)
261+
if err != nil {
262+
// TODO(CAPPL-736): observability
263+
return
264+
}
265+
266+
subCtx, cancel := context.WithTimeout(ctx, time.Millisecond*time.Duration(e.cfg.LocalLimits.WorkflowExecutionTimeoutMs))
267+
defer cancel()
268+
269+
tid, err := safe.IntToUint64(wrappedTriggerEvent.triggerIndex)
270+
if err != nil {
271+
// TODO(CAPPL-736): observability
272+
return
273+
}
274+
275+
result, err := e.cfg.Module.Execute(subCtx, &wasmpb.ExecuteRequest{
276+
Id: executionID,
277+
Request: &wasmpb.ExecuteRequest_Trigger{
278+
Trigger: &sdkpb.Trigger{
279+
Id: tid,
280+
Payload: triggerEvent.Payload,
281+
},
282+
},
283+
MaxResponseSize: uint64(e.cfg.LocalLimits.ModuleExecuteMaxResponseSizeBytes),
284+
// TODO(CAPPL-729): pass workflow config
285+
})
286+
if err != nil {
287+
e.cfg.Lggr.Errorw("Workflow execution failed", "err", err)
288+
// TODO(CAPPL-736): observability
289+
return
290+
}
291+
// TODO(CAPPL-736): handle execution result
292+
e.cfg.Lggr.Debugw("Workflow execution finished", "executionID", executionID, "result", result)
293+
e.cfg.Hooks.OnExecutionFinished(executionID)
294+
}
295+
296+
func (e *Engine) CallCapability(ctx context.Context, request *cappb.CapabilityRequest) (*cappb.CapabilityResponse, error) {
297+
select {
298+
case e.capCallsSemaphore <- struct{}{}: // block if too many concurrent capability calls
299+
case <-ctx.Done():
300+
return nil, ctx.Err()
301+
}
302+
defer func() { <-e.capCallsSemaphore }()
303+
304+
// TODO (CAPPL-735): use request.Metadata.WorkflowExecutionId to associate the call with a specific execution
305+
capability, err := e.cfg.CapRegistry.GetExecutable(ctx, request.CapabilityId)
306+
if err != nil {
307+
return nil, fmt.Errorf("trigger capability not found: %w", err)
308+
}
309+
310+
capReq, err := cappb.CapabilityRequestFromProto(request)
311+
if err != nil {
312+
return nil, fmt.Errorf("failed to convert capability request: %w", err)
313+
}
314+
315+
// TODO(CAPPL-737): run with a timeout
316+
capResp, err := capability.Execute(ctx, capReq)
317+
if err != nil {
318+
return nil, fmt.Errorf("failed to execute capability: %w", err)
319+
}
320+
return cappb.CapabilityResponseToProto(capResp), nil
239321
}
240322

241323
func (e *Engine) close() error {

core/services/workflows/v2/engine_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func TestEngine_Init(t *testing.T) {
4242
require.NoError(t, err)
4343

4444
module.EXPECT().Start().Once()
45+
module.EXPECT().SetCapabilityExecutor(mock.Anything).Return(nil).Once()
4546
module.EXPECT().Execute(matches.AnyContext, mock.Anything).Return(newTriggerSubs(0), nil).Once()
4647
capreg.EXPECT().LocalNode(matches.AnyContext).Return(capabilities.Node{}, nil).Once()
4748
require.NoError(t, engine.Start(t.Context()))
@@ -62,6 +63,7 @@ func TestEngine_Start_RateLimited(t *testing.T) {
6263

6364
module := modulemocks.NewModuleV2(t)
6465
module.EXPECT().Start()
66+
module.EXPECT().SetCapabilityExecutor(mock.Anything).Return(nil)
6567
module.EXPECT().Execute(matches.AnyContext, mock.Anything).Return(newTriggerSubs(0), nil).Times(2)
6668
module.EXPECT().Close()
6769
capreg := regmocks.NewCapabilitiesRegistry(t)
@@ -123,6 +125,7 @@ func TestEngine_TriggerSubscriptions(t *testing.T) {
123125

124126
module := modulemocks.NewModuleV2(t)
125127
module.EXPECT().Start()
128+
module.EXPECT().SetCapabilityExecutor(mock.Anything).Return(nil)
126129
module.EXPECT().Close()
127130
capreg := regmocks.NewCapabilitiesRegistry(t)
128131
capreg.EXPECT().LocalNode(matches.AnyContext).Return(capabilities.Node{}, nil)
@@ -215,3 +218,61 @@ func newTriggerSubs(n int) *wasmpb.ExecutionResult {
215218
},
216219
}
217220
}
221+
222+
func TestEngine_Execution(t *testing.T) {
223+
t.Parallel()
224+
225+
module := modulemocks.NewModuleV2(t)
226+
module.EXPECT().Start()
227+
module.EXPECT().SetCapabilityExecutor(mock.Anything).Return(nil)
228+
module.EXPECT().Close()
229+
capreg := regmocks.NewCapabilitiesRegistry(t)
230+
capreg.EXPECT().LocalNode(matches.AnyContext).Return(capabilities.Node{}, nil)
231+
232+
initDoneCh := make(chan error)
233+
subscribedToTriggersCh := make(chan []string, 1)
234+
executionFinishedCh := make(chan string)
235+
236+
cfg := defaultTestConfig(t)
237+
cfg.Module = module
238+
cfg.CapRegistry = capreg
239+
cfg.Hooks = v2.LifecycleHooks{
240+
OnInitialized: func(err error) {
241+
initDoneCh <- err
242+
},
243+
OnSubscribedToTriggers: func(triggerIDs []string) {
244+
subscribedToTriggersCh <- triggerIDs
245+
},
246+
OnExecutionFinished: func(executionID string) {
247+
executionFinishedCh <- executionID
248+
},
249+
}
250+
251+
t.Run("successful execution with no capability calls", func(t *testing.T) {
252+
engine, err := v2.NewEngine(t.Context(), cfg)
253+
require.NoError(t, err)
254+
module.EXPECT().Execute(matches.AnyContext, mock.Anything).Return(newTriggerSubs(1), nil).Once()
255+
trigger := capmocks.NewTriggerCapability(t)
256+
capreg.EXPECT().GetTrigger(matches.AnyContext, "id_0").Return(trigger, nil).Once()
257+
eventCh := make(chan capabilities.TriggerResponse)
258+
trigger.EXPECT().RegisterTrigger(matches.AnyContext, mock.Anything).Return(eventCh, nil).Once()
259+
trigger.EXPECT().UnregisterTrigger(matches.AnyContext, mock.Anything).Return(nil).Once()
260+
261+
require.NoError(t, engine.Start(t.Context()))
262+
263+
require.NoError(t, <-initDoneCh) // successful trigger registration
264+
require.Equal(t, []string{"id_0"}, <-subscribedToTriggersCh)
265+
266+
module.EXPECT().Execute(matches.AnyContext, mock.Anything).Return(nil, nil).Once()
267+
eventCh <- capabilities.TriggerResponse{
268+
Event: capabilities.TriggerEvent{
269+
TriggerType: "[email protected]",
270+
ID: "event_012345",
271+
Payload: nil,
272+
},
273+
}
274+
<-executionFinishedCh
275+
276+
require.NoError(t, engine.Close())
277+
})
278+
}

0 commit comments

Comments
 (0)