Skip to content

[CAPPL-735] Engine V2 execution phase (minimal impl) #17590

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
May 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/shopspring/decimal v1.4.0
github.com/smartcontractkit/chainlink-automation v0.8.1
github.com/smartcontractkit/chainlink-ccip v0.0.0-20250506195202-6a3f20db41c6
github.com/smartcontractkit/chainlink-common v0.7.1-0.20250506185033-ea88ef405511
github.com/smartcontractkit/chainlink-common v0.7.1-0.20250507190601-db395570d649
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250417193446-eeb0a7d1e049
github.com/smartcontractkit/chainlink-deployments-framework v0.0.14
github.com/smartcontractkit/chainlink-evm v0.0.0-20250506144221-ee990aefea6c
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1191,8 +1191,8 @@ github.com/smartcontractkit/chainlink-ccip v0.0.0-20250506195202-6a3f20db41c6 h1
github.com/smartcontractkit/chainlink-ccip v0.0.0-20250506195202-6a3f20db41c6/go.mod h1:Jb05WL6lj5H89XGcaaOinxTf4Gdj+vXO4TcUhqTgqIM=
github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20250425163923-16aa375957b7 h1:j6Vo/NX2ABsPdGxETC5pfQLcz/h6iLJu/Yx+8AhPa34=
github.com/smartcontractkit/chainlink-ccip/chains/solana v0.0.0-20250425163923-16aa375957b7/go.mod h1:k3/Z6AvwurPUlfuDFEonRbkkiTSgNSrtVNhJEWNlUZA=
github.com/smartcontractkit/chainlink-common v0.7.1-0.20250506185033-ea88ef405511 h1:QFkJ2MVmt0ly0W0BnfobicUknn4Qr8u0VCs1WYYo2E0=
github.com/smartcontractkit/chainlink-common v0.7.1-0.20250506185033-ea88ef405511/go.mod h1:uNF6+noody47ZdmRwymDZAnQ7eKTXLzMKvl41LA63lo=
github.com/smartcontractkit/chainlink-common v0.7.1-0.20250507190601-db395570d649 h1:VomBm6IeVmLjp2IDXnd8/mKdpYzocDJdvi+lu3ad6Sk=
github.com/smartcontractkit/chainlink-common v0.7.1-0.20250507190601-db395570d649/go.mod h1:uNF6+noody47ZdmRwymDZAnQ7eKTXLzMKvl41LA63lo=
github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7 h1:9wh1G+WbXwPVqf0cfSRSgwIcaXTQgvYezylEAfwmrbw=
github.com/smartcontractkit/chainlink-common/pkg/monitoring v0.0.0-20250415235644-8703639403c7/go.mod h1:yaDOAZF6MNB+NGYpxGCUc+owIdKrjvFW0JODdTcQ3V0=
github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20250417193446-eeb0a7d1e049 h1:7HwYt8rDz1ehTcB28oNipdTZUtV17F2sfkLTLtMJC4c=
Expand Down
19 changes: 1 addition & 18 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package workflows

import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"strconv"
Expand Down Expand Up @@ -494,21 +492,6 @@ func (e *Engine) stepUpdateLoop(ctx context.Context, executionID string, stepUpd
}
}

func generateExecutionID(workflowID, eventID string) (string, error) {
s := sha256.New()
_, err := s.Write([]byte(workflowID))
if err != nil {
return "", err
}

_, err = s.Write([]byte(eventID))
if err != nil {
return "", err
}

return hex.EncodeToString(s.Sum(nil)), nil
}

// startExecution kicks off a new workflow execution when a trigger event is received.
func (e *Engine) startExecution(ctx context.Context, executionID string, triggerID string, event *values.Map) error {
e.meterReports.Add(executionID, NewMeteringReport())
Expand Down Expand Up @@ -741,7 +724,7 @@ func (e *Engine) worker(ctx context.Context) {
continue
}

executionID, err := generateExecutionID(e.workflow.id, te.ID)
executionID, err := types.GenerateExecutionID(e.workflow.id, te.ID)
if err != nil {
e.logger.With(platform.KeyTriggerID, te.ID).Errorf("could not generate execution ID: %v", err)
continue
Expand Down
2 changes: 1 addition & 1 deletion core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1826,7 +1826,7 @@ func basicTestTrigger(t *testing.T) *mockTriggerCapability {
}

func TestEngine_WithCustomComputeStep(t *testing.T) {
cmd := "core/services/workflows/test/wasm/cmd"
cmd := "core/services/workflows/test/wasm/legacy/cmd"

ctx := testutils.Context(t)
log := logger.TestLogger(t)
Expand Down
23 changes: 23 additions & 0 deletions core/services/workflows/types/execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package types

import (
"crypto/sha256"
"encoding/hex"
)

// hash of (workflowID, triggerEventID)
// TODO(CAPPL-838): improve for V2
func GenerateExecutionID(workflowID, triggerEventID string) (string, error) {
s := sha256.New()
_, err := s.Write([]byte(workflowID))
if err != nil {
return "", err
}

_, err = s.Write([]byte(triggerEventID))
if err != nil {
return "", err
}

return hex.EncodeToString(s.Sum(nil)), nil
}
22 changes: 16 additions & 6 deletions core/services/workflows/v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ const (
defaultMaxTriggerSubscriptions = 10
defaultTriggerEventQueueSize = 1000

defaultMaxConcurrentWorkflowExecutions = 100
defaultMaxConcurrentCapabilityCalls = 10
defaultMaxConcurrentWorkflowExecutions = 100
defaultMaxConcurrentCapabilityCallsPerWorkflow = 10
defaultWorkflowExecutionTimeoutMs = 1000 * 60 * 10 // 10 minutes
defaultCapabilityCallTimeoutMs = 1000 * 60 * 8 // 8 minutes

defaultShutdownTimeoutMs = 5000
)
Expand All @@ -57,8 +59,10 @@ type EngineLimits struct {
MaxTriggerSubscriptions uint16
TriggerEventQueueSize uint16

MaxConcurrentWorkflowExecutions uint16
MaxConcurrentCapabilityCalls uint16
MaxConcurrentWorkflowExecutions uint16
MaxConcurrentCapabilityCallsPerWorkflow uint16
WorkflowExecutionTimeoutMs uint32
CapabilityCallTimeoutMs uint32
Comment on lines +64 to +65
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use time.Duration for these? Sheds the conversion on use, and makes the literals more readable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, for context, the other variables are already labeled Ms and uint32.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this pattern is used throughout the capabilities and gateway packages, can move to a separate discussion because changing here makes this now inconsistent with other areas of the code.


ShutdownTimeoutMs uint32
}
Expand Down Expand Up @@ -133,8 +137,14 @@ func (l *EngineLimits) setDefaultLimits() {
if l.MaxConcurrentWorkflowExecutions == 0 {
l.MaxConcurrentWorkflowExecutions = defaultMaxConcurrentWorkflowExecutions
}
if l.MaxConcurrentCapabilityCalls == 0 {
l.MaxConcurrentCapabilityCalls = defaultMaxConcurrentCapabilityCalls
if l.MaxConcurrentCapabilityCallsPerWorkflow == 0 {
l.MaxConcurrentCapabilityCallsPerWorkflow = defaultMaxConcurrentCapabilityCallsPerWorkflow
}
if l.WorkflowExecutionTimeoutMs == 0 {
l.WorkflowExecutionTimeoutMs = defaultWorkflowExecutionTimeoutMs
}
if l.CapabilityCallTimeoutMs == 0 {
l.CapabilityCallTimeoutMs = defaultCapabilityCallTimeoutMs
}
if l.ShutdownTimeoutMs == 0 {
l.ShutdownTimeoutMs = defaultShutdownTimeoutMs
Expand Down
102 changes: 92 additions & 10 deletions core/services/workflows/v2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/services"

cappb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
sdkpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/sdk/v2/pb"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"
wasmpb "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/v2/pb"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/internal"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/types"
"github.com/smartcontractkit/chainlink/v2/core/utils/safe"
)

type Engine struct {
Expand All @@ -31,13 +35,18 @@ type Engine struct {

allTriggerEventsQueueCh chan enqueuedTriggerEvent
executionsSemaphore chan struct{}
capCallsSemaphore chan struct{}
}

type enqueuedTriggerEvent struct {
event capabilities.TriggerResponse
timestamp time.Time
triggerCapID string
triggerIndex int
timestamp time.Time
event capabilities.TriggerResponse
}

var _ host.CapabilityExecutor = (*Engine)(nil)

func NewEngine(ctx context.Context, cfg *EngineConfig) (*Engine, error) {
err := cfg.Validate()
if err != nil {
Expand All @@ -48,6 +57,7 @@ func NewEngine(ctx context.Context, cfg *EngineConfig) (*Engine, error) {
triggers: make(map[string]capabilities.TriggerCapability),
allTriggerEventsQueueCh: make(chan enqueuedTriggerEvent, cfg.LocalLimits.TriggerEventQueueSize),
executionsSemaphore: make(chan struct{}, cfg.LocalLimits.MaxConcurrentWorkflowExecutions),
capCallsSemaphore: make(chan struct{}, cfg.LocalLimits.MaxConcurrentCapabilityCallsPerWorkflow),
}
engine.Service, engine.srvcEng = services.Config{
Name: "WorkflowEngineV2",
Expand Down Expand Up @@ -102,7 +112,15 @@ func (e *Engine) init(ctx context.Context) {
return
}

err := e.runTriggerSubscriptionPhase(ctx)
err := e.cfg.Module.SetCapabilityExecutor(e)
if err != nil {
e.cfg.Lggr.Errorw("Workflow Engine initialization failed", "err", err)
// TODO(CAPPL-736): observability
e.cfg.Hooks.OnInitialized(err)
return
}

err = e.runTriggerSubscriptionPhase(ctx)
if err != nil {
e.cfg.Lggr.Errorw("Workflow Engine initialization failed", "err", err)
// TODO(CAPPL-736): observability
Expand Down Expand Up @@ -185,7 +203,7 @@ func (e *Engine) runTriggerSubscriptionPhase(ctx context.Context) error {
}

// start listening for trigger events only if all registrations succeeded
for _, triggerEventCh := range eventChans {
for idx, triggerEventCh := range eventChans {
e.srvcEng.Go(func(srvcCtx context.Context) {
for {
select {
Expand All @@ -197,8 +215,10 @@ func (e *Engine) runTriggerSubscriptionPhase(ctx context.Context) error {
}
select {
case e.allTriggerEventsQueueCh <- enqueuedTriggerEvent{
event: event,
timestamp: e.cfg.Clock.Now(),
triggerCapID: subs.Subscriptions[idx].Id,
triggerIndex: idx,
timestamp: e.cfg.Clock.Now(),
event: event,
}:
default: // queue full, drop the event
// TODO(CAPPL-736): observability
Expand All @@ -216,15 +236,15 @@ func (e *Engine) handleAllTriggerEvents(ctx context.Context) {
select {
case <-ctx.Done():
return
case queueElem, isOpen := <-e.allTriggerEventsQueueCh:
case queueHead, isOpen := <-e.allTriggerEventsQueueCh:
if !isOpen {
return
}
// TODO(CAPPL-737): check if expired
select {
case e.executionsSemaphore <- struct{}{}: // block if too many concurrent workflow executions
e.srvcEng.Go(func(srvcCtx context.Context) {
e.startNewWorkflowExecution(srvcCtx, queueElem.event)
e.startExecution(srvcCtx, queueHead)
<-e.executionsSemaphore
})
case <-ctx.Done():
Expand All @@ -234,8 +254,70 @@ func (e *Engine) handleAllTriggerEvents(ctx context.Context) {
}
}

func (e *Engine) startNewWorkflowExecution(_ context.Context, _ capabilities.TriggerResponse) {
// TODO(CAPPL-735): implement execution phase
// startExecution initiates a new workflow execution, blocking until completed
func (e *Engine) startExecution(ctx context.Context, wrappedTriggerEvent enqueuedTriggerEvent) {
triggerEvent := wrappedTriggerEvent.event.Event
executionID, err := types.GenerateExecutionID(e.cfg.WorkflowID, triggerEvent.ID)
if err != nil {
// TODO(CAPPL-736): observability
return
}

subCtx, cancel := context.WithTimeout(ctx, time.Millisecond*time.Duration(e.cfg.LocalLimits.WorkflowExecutionTimeoutMs))
defer cancel()

tid, err := safe.IntToUint64(wrappedTriggerEvent.triggerIndex)
if err != nil {
// TODO(CAPPL-736): observability
return
}

result, err := e.cfg.Module.Execute(subCtx, &wasmpb.ExecuteRequest{
Id: executionID,
Request: &wasmpb.ExecuteRequest_Trigger{
Trigger: &sdkpb.Trigger{
Id: tid,
Payload: triggerEvent.Payload,
},
},
MaxResponseSize: uint64(e.cfg.LocalLimits.ModuleExecuteMaxResponseSizeBytes),
// TODO(CAPPL-729): pass workflow config
})
if err != nil {
e.cfg.Lggr.Errorw("Workflow execution failed", "err", err)
// TODO(CAPPL-736): observability
return
}
// TODO(CAPPL-736): handle execution result
e.cfg.Lggr.Debugw("Workflow execution finished", "executionID", executionID, "result", result)
e.cfg.Hooks.OnExecutionFinished(executionID)
}

func (e *Engine) CallCapability(ctx context.Context, request *cappb.CapabilityRequest) (*cappb.CapabilityResponse, error) {
select {
case e.capCallsSemaphore <- struct{}{}: // block if too many concurrent capability calls
case <-ctx.Done():
return nil, ctx.Err()
}
defer func() { <-e.capCallsSemaphore }()

// TODO (CAPPL-735): use request.Metadata.WorkflowExecutionId to associate the call with a specific execution
capability, err := e.cfg.CapRegistry.GetExecutable(ctx, request.CapabilityId)
if err != nil {
return nil, fmt.Errorf("trigger capability not found: %w", err)
}

capReq, err := cappb.CapabilityRequestFromProto(request)
if err != nil {
return nil, fmt.Errorf("failed to convert capability request: %w", err)
}

// TODO(CAPPL-737): run with a timeout
capResp, err := capability.Execute(ctx, capReq)
if err != nil {
return nil, fmt.Errorf("failed to execute capability: %w", err)
}
return cappb.CapabilityResponseToProto(capResp), nil
}

func (e *Engine) close() error {
Expand Down
61 changes: 61 additions & 0 deletions core/services/workflows/v2/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestEngine_Init(t *testing.T) {
require.NoError(t, err)

module.EXPECT().Start().Once()
module.EXPECT().SetCapabilityExecutor(mock.Anything).Return(nil).Once()
module.EXPECT().Execute(matches.AnyContext, mock.Anything).Return(newTriggerSubs(0), nil).Once()
capreg.EXPECT().LocalNode(matches.AnyContext).Return(capabilities.Node{}, nil).Once()
require.NoError(t, engine.Start(t.Context()))
Expand All @@ -62,6 +63,7 @@ func TestEngine_Start_RateLimited(t *testing.T) {

module := modulemocks.NewModuleV2(t)
module.EXPECT().Start()
module.EXPECT().SetCapabilityExecutor(mock.Anything).Return(nil)
module.EXPECT().Execute(matches.AnyContext, mock.Anything).Return(newTriggerSubs(0), nil).Times(2)
module.EXPECT().Close()
capreg := regmocks.NewCapabilitiesRegistry(t)
Expand Down Expand Up @@ -123,6 +125,7 @@ func TestEngine_TriggerSubscriptions(t *testing.T) {

module := modulemocks.NewModuleV2(t)
module.EXPECT().Start()
module.EXPECT().SetCapabilityExecutor(mock.Anything).Return(nil)
module.EXPECT().Close()
capreg := regmocks.NewCapabilitiesRegistry(t)
capreg.EXPECT().LocalNode(matches.AnyContext).Return(capabilities.Node{}, nil)
Expand Down Expand Up @@ -215,3 +218,61 @@ func newTriggerSubs(n int) *wasmpb.ExecutionResult {
},
}
}

func TestEngine_Execution(t *testing.T) {
t.Parallel()

module := modulemocks.NewModuleV2(t)
module.EXPECT().Start()
module.EXPECT().SetCapabilityExecutor(mock.Anything).Return(nil)
module.EXPECT().Close()
capreg := regmocks.NewCapabilitiesRegistry(t)
capreg.EXPECT().LocalNode(matches.AnyContext).Return(capabilities.Node{}, nil)

initDoneCh := make(chan error)
subscribedToTriggersCh := make(chan []string, 1)
executionFinishedCh := make(chan string)

cfg := defaultTestConfig(t)
cfg.Module = module
cfg.CapRegistry = capreg
cfg.Hooks = v2.LifecycleHooks{
OnInitialized: func(err error) {
initDoneCh <- err
},
OnSubscribedToTriggers: func(triggerIDs []string) {
subscribedToTriggersCh <- triggerIDs
},
OnExecutionFinished: func(executionID string) {
executionFinishedCh <- executionID
},
}

t.Run("successful execution with no capability calls", func(t *testing.T) {
engine, err := v2.NewEngine(t.Context(), cfg)
require.NoError(t, err)
module.EXPECT().Execute(matches.AnyContext, mock.Anything).Return(newTriggerSubs(1), nil).Once()
trigger := capmocks.NewTriggerCapability(t)
capreg.EXPECT().GetTrigger(matches.AnyContext, "id_0").Return(trigger, nil).Once()
eventCh := make(chan capabilities.TriggerResponse)
trigger.EXPECT().RegisterTrigger(matches.AnyContext, mock.Anything).Return(eventCh, nil).Once()
trigger.EXPECT().UnregisterTrigger(matches.AnyContext, mock.Anything).Return(nil).Once()

require.NoError(t, engine.Start(t.Context()))

require.NoError(t, <-initDoneCh) // successful trigger registration
require.Equal(t, []string{"id_0"}, <-subscribedToTriggersCh)

module.EXPECT().Execute(matches.AnyContext, mock.Anything).Return(nil, nil).Once()
eventCh <- capabilities.TriggerResponse{
Event: capabilities.TriggerEvent{
TriggerType: "[email protected]",
ID: "event_012345",
Payload: nil,
},
}
<-executionFinishedCh

require.NoError(t, engine.Close())
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit, could you add a second test that runs the test executable used to test the module? It'll ensure that there's no gaps to re-use it. I wouldn't remove this test, since it's easier to debug if things go wrong.

I don't use config in that test yet, but it probably should opt to soon.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is blocked until SetCapabilityExecutor is implemented on the host. So I've split this test out to a separate ticket.

Loading
Loading