Skip to content

Commit a0051e7

Browse files
feat(wasm/host): suspend & resume executions on await
Suspends a guest by trapping await_capabilities when an awaited capability response is not yet ready, waits for the response, and resumes by re-instantiating the guest in a fresh store while preserving the fuel/epoch budget. Gated by ModuleConfig.SuspensionEnabled. Adds the suspended_executions and non_deterministic_input standard tests.
1 parent ba5692a commit a0051e7

16 files changed

Lines changed: 582 additions & 130 deletions

File tree

go.mod

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ require (
4242
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
4343
github.com/scylladb/go-reflectx v1.0.1
4444
github.com/shopspring/decimal v1.4.0
45-
github.com/smartcontractkit/chain-selectors v1.0.100
45+
github.com/smartcontractkit/chain-selectors v1.0.104
4646
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260528204832-58c7145c53f8
4747
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4
48-
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260622152157-c8e129347b8b
48+
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260629155926-02c54659e848
4949
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b
5050
github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b
5151
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0
@@ -139,6 +139,7 @@ require (
139139
github.com/rogpeppe/go-internal v1.14.1 // indirect
140140
github.com/ryanuber/go-glob v1.0.0 // indirect
141141
github.com/sanity-io/litter v1.5.5 // indirect
142+
github.com/smartcontractkit/chainlink-protos v0.0.0-20260629155926-02c54659e848 // indirect
142143
github.com/stretchr/objx v0.5.2 // indirect
143144
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
144145
github.com/x448/float16 v0.8.4 // indirect

go.sum

Lines changed: 16 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/settings/cresettings/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ flowchart
259259
VaultMaxPerOracleUnexpiredBlobCount{{VaultMaxPerOracleUnexpiredBlobCount}}:::bound
260260
PerOwner.VaultCiphertextSizeLimit{{PerOwner.VaultCiphertextSizeLimit}}:::bound
261261
PerOwner.VaultSecretsLimit{{PerOwner.VaultSecretsLimit}}:::bound
262+
PerOwner.SuspendOnAwaitEnabled[/PerOwner.SuspendOnAwaitEnabled\]:::gate
262263
end
263264
264265
handleRequest-->Store.FetchWorkflowArtifacts-->host.NewModule-->Engine.init-->Engine.runTriggerSubscriptionPhase-->triggers-->Engine.handleAllTriggerEvents-->Engine.startExecution

pkg/settings/cresettings/defaults.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@
5353
"WorkflowLimit": "1000",
5454
"WorkflowExecutionConcurrencyLimit": "5",
5555
"VaultCiphertextSizeLimit": "2kb",
56-
"VaultSecretsLimit": "100"
56+
"VaultSecretsLimit": "100",
57+
"SuspendOnAwaitEnabled": "false"
5758
},
5859
"PerWorkflow": {
5960
"TriggerRegistrationsTimeout": "10s",

pkg/settings/cresettings/defaults.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ WorkflowLimit = '1000'
5454
WorkflowExecutionConcurrencyLimit = '5'
5555
VaultCiphertextSizeLimit = '2kb'
5656
VaultSecretsLimit = '100'
57+
SuspendOnAwaitEnabled = 'false'
5758

5859
[PerWorkflow]
5960
TriggerRegistrationsTimeout = '10s'

pkg/settings/cresettings/settings.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ var Default = Schema{
145145
WorkflowLimit: Int(1000),
146146
WorkflowExecutionConcurrencyLimit: Int(5),
147147

148+
SuspendOnAwaitEnabled: Bool(false),
149+
148150
// DANGER(cedric): Be extremely careful changing these vault limits below as they act as a default value
149151
// used by the Vault OCR plugin -- changing these values could cause issues with the plugin during an image
150152
// upgrade as nodes apply the old and new values inconsistently. A safe upgrade path
@@ -337,6 +339,8 @@ type Owners struct {
337339
WorkflowExecutionConcurrencyLimit Setting[int] `unit:"{workflow}"`
338340
VaultCiphertextSizeLimit Setting[config.Size]
339341
VaultSecretsLimit Setting[int] `unit:"{secret}"`
342+
343+
SuspendOnAwaitEnabled Setting[bool]
340344
}
341345

342346
type Workflows struct {

pkg/workflows/wasm/host/execution.go

Lines changed: 103 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ import (
1919
wfpb "github.com/smartcontractkit/chainlink-protos/workflows/go/v2"
2020
)
2121

22+
// Here we need to distinguish between variables that should survive a replay and
23+
// variables that need to be re-initialized every replay.
2224
type execution[T any] struct {
2325
fetchRequestsCounter int
2426
response T
2527
ctx context.Context
26-
capabilityResponses map[int32]<-chan *sdkpb.CapabilityResponse
28+
capabilityResponses map[int32]*asyncResponse[sdkpb.CapabilityRequest, sdkpb.CapabilityResponse]
2729
secretsResponses map[int32]<-chan *secretsResponse
2830
pendingCallsLimiter limits.ResourcePoolLimiter[int]
2931
lock sync.RWMutex
@@ -37,12 +39,62 @@ type execution[T any] struct {
3739
nodeSeed int64
3840
donLogCount uint32
3941
nodeLogCount uint32
42+
awaiting []int32
43+
// suspendOnAwait gates the suspend/resume behaviour. When false, the
44+
// execution behaves as it did before suspension was introduced:
45+
// awaitCapabilities blocks until each response is available and callCapAsync
46+
// always dispatches a fresh call. When true, awaitCapabilities returns
47+
// errSuspendExecution while responses are pending and callCapAsync replays
48+
// recorded calls instead of re-dispatching them.
49+
suspendOnAwait bool
50+
}
51+
52+
type asyncResponse[I, O any] struct {
53+
mu sync.Mutex
54+
ch <-chan *O
55+
resp *O
56+
req *I
57+
}
58+
59+
func (a *asyncResponse[I, O]) wait(ctx context.Context) (*O, error) {
60+
if resp := a.getResp(ctx); resp != nil {
61+
return resp, nil
62+
}
63+
64+
select {
65+
case <-ctx.Done():
66+
return nil, ctx.Err()
67+
case o := <-a.ch:
68+
a.mu.Lock()
69+
defer a.mu.Unlock()
70+
a.resp = o
71+
return o, nil
72+
}
73+
}
74+
75+
func (a *asyncResponse[I, O]) getResp(ctx context.Context) *O {
76+
a.mu.Lock()
77+
defer a.mu.Unlock()
78+
return a.resp
4079
}
4180

4281
// callCapAsync async calls a capability by placing execution results onto a
4382
// channel and storing each channel with a unique identifier for future
4483
// retrieval on await.
4584
func (e *execution[T]) callCapAsync(ctx context.Context, req *sdkpb.CapabilityRequest) error {
85+
if e.suspendOnAwait {
86+
// check if there is already an item in the capabilityResponses for a given callback id.
87+
// if there is -> integrity check (matching requests); return without firing goroutine.
88+
// else -> legacy path.
89+
if asyncResponse, ok := e.capabilityResponses[req.CallbackId]; ok {
90+
// there is already an item for this callback id; we must therefore be replaying
91+
// perform an integrity check to enforce determinism and return early.
92+
if !proto.Equal(asyncResponse.req, req) {
93+
return errors.New("non-determinism error")
94+
}
95+
return nil
96+
}
97+
}
4698
// Acquire a slot from the pool limiter to bound concurrency.
4799
free, err := e.pendingCallsLimiter.Wait(ctx, 1)
48100
if err != nil {
@@ -52,7 +104,13 @@ func (e *execution[T]) callCapAsync(ctx context.Context, req *sdkpb.CapabilityRe
52104
ch := make(chan *sdkpb.CapabilityResponse, 1)
53105
e.lock.Lock()
54106
defer e.lock.Unlock()
55-
e.capabilityResponses[req.CallbackId] = ch
107+
e.capabilityResponses[req.CallbackId] = &asyncResponse[
108+
sdkpb.CapabilityRequest,
109+
sdkpb.CapabilityResponse,
110+
]{
111+
ch: ch,
112+
req: req,
113+
}
56114

57115
go func() {
58116
defer free()
@@ -82,25 +140,61 @@ func (e *execution[T]) callCapAsync(ctx context.Context, req *sdkpb.CapabilityRe
82140
return nil
83141
}
84142

143+
var errSuspendExecution = errors.New("__SUSPEND_EXECUTION__")
144+
85145
func (e *execution[T]) awaitCapabilities(ctx context.Context, acr *sdkpb.AwaitCapabilitiesRequest) (*sdkpb.AwaitCapabilitiesResponse, error) {
86146
responses := make(map[int32]*sdkpb.CapabilityResponse, len(acr.Ids))
87147

88148
e.lock.Lock()
89149
defer e.lock.Unlock()
150+
151+
if !e.suspendOnAwait {
152+
// Legacy behaviour: block until each requested response is available,
153+
// then consume and remove it from the store.
154+
for _, callId := range acr.Ids {
155+
ar, ok := e.capabilityResponses[callId]
156+
if !ok {
157+
return nil, fmt.Errorf("failed to get call from store : %d", callId)
158+
}
159+
160+
select {
161+
case <-ctx.Done():
162+
return nil, fmt.Errorf("failed to wait for capability response %d : %w", callId, ctx.Err())
163+
case resp := <-ar.ch:
164+
responses[callId] = resp
165+
}
166+
167+
delete(e.capabilityResponses, callId)
168+
}
169+
170+
return &sdkpb.AwaitCapabilitiesResponse{
171+
Responses: responses,
172+
}, nil
173+
}
174+
175+
// for all ids, check whether we have a response
176+
// if yes: return the response
177+
// if no: return suspend execution error, record the ids for which we are still waiting
178+
// to be picked up by the runWasm routine
179+
responsesForAll := true
90180
for _, callId := range acr.Ids {
91-
ch, ok := e.capabilityResponses[callId]
181+
ar, ok := e.capabilityResponses[callId]
92182
if !ok {
93183
return nil, fmt.Errorf("failed to get call from store : %d", callId)
94184
}
95185

96-
select {
97-
case <-ctx.Done():
98-
return nil, fmt.Errorf("failed to wait for capability response %d : %w", callId, ctx.Err())
99-
case resp := <-ch:
100-
responses[callId] = resp
186+
resp := ar.getResp(ctx)
187+
if resp == nil {
188+
responsesForAll = false
189+
break
101190
}
102191

103-
delete(e.capabilityResponses, callId)
192+
responses[callId] = resp
193+
}
194+
195+
if !responsesForAll {
196+
e.awaiting = acr.Ids
197+
return nil, errSuspendExecution
104198
}
105199

106200
return &sdkpb.AwaitCapabilitiesResponse{

pkg/workflows/wasm/host/execution_await_order_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestAwaitCapabilities_headOfLineBlocksOnEarlierID(t *testing.T) {
7070

7171
exec := &execution[*sdkpb.ExecutionResult]{
7272
ctx: t.Context(),
73-
capabilityResponses: make(map[int32]<-chan *sdkpb.CapabilityResponse),
73+
capabilityResponses: make(map[int32]*asyncResponse[sdkpb.CapabilityRequest, sdkpb.CapabilityResponse]),
7474
pendingCallsLimiter: limits.GlobalResourcePoolLimiter(cresettings.Default.PerWorkflow.CapabilityConcurrencyLimit.DefaultValue),
7575
executor: stub,
7676
}

pkg/workflows/wasm/host/execution_semaphore_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ var _ ExecutionHelper = (*slowCapStub)(nil)
5959
func newTestExec(maxPending int, stub ExecutionHelper) *execution[*sdkpb.ExecutionResult] {
6060
return &execution[*sdkpb.ExecutionResult]{
6161
ctx: context.Background(),
62-
capabilityResponses: make(map[int32]<-chan *sdkpb.CapabilityResponse),
62+
capabilityResponses: make(map[int32]*asyncResponse[sdkpb.CapabilityRequest, sdkpb.CapabilityResponse]),
6363
secretsResponses: make(map[int32]<-chan *secretsResponse),
6464
pendingCallsLimiter: limits.GlobalResourcePoolLimiter[int](maxPending),
6565
executor: stub,

0 commit comments

Comments
 (0)