@@ -19,11 +19,13 @@ import (
1919 wfpb "github.com/smartcontractkit/chainlink-protos/workflows/go/v2"
2020)
2121
22+ // Here we need to distinguish between variables that should survive a replay and
23+ // variables that need to be re-initialized every replay.
2224type execution [T any ] struct {
2325 fetchRequestsCounter int
2426 response T
2527 ctx context.Context
26- capabilityResponses map [int32 ]<- chan * sdkpb.CapabilityResponse
28+ capabilityResponses map [int32 ]* asyncResponse [ sdkpb.CapabilityRequest , sdkpb. CapabilityResponse ]
2729 secretsResponses map [int32 ]<- chan * secretsResponse
2830 pendingCallsLimiter limits.ResourcePoolLimiter [int ]
2931 lock sync.RWMutex
@@ -37,12 +39,62 @@ type execution[T any] struct {
3739 nodeSeed int64
3840 donLogCount uint32
3941 nodeLogCount uint32
42+ awaiting []int32
43+ // suspensionEnabled gates the suspend/resume behaviour. When false, the
44+ // execution behaves as it did before suspension was introduced:
45+ // awaitCapabilities blocks until each response is available and callCapAsync
46+ // always dispatches a fresh call. When true, awaitCapabilities returns
47+ // errSuspendExecution while responses are pending and callCapAsync replays
48+ // recorded calls instead of re-dispatching them.
49+ suspensionEnabled bool
50+ }
51+
52+ type asyncResponse [I , O any ] struct {
53+ mu sync.Mutex
54+ ch <- chan * O
55+ resp * O
56+ req * I
57+ }
58+
59+ func (a * asyncResponse [I , O ]) wait (ctx context.Context ) (* O , error ) {
60+ if resp := a .getResp (ctx ); resp != nil {
61+ return resp , nil
62+ }
63+
64+ select {
65+ case <- ctx .Done ():
66+ return nil , ctx .Err ()
67+ case o := <- a .ch :
68+ a .mu .Lock ()
69+ defer a .mu .Unlock ()
70+ a .resp = o
71+ return o , nil
72+ }
73+ }
74+
75+ func (a * asyncResponse [I , O ]) getResp (ctx context.Context ) * O {
76+ a .mu .Lock ()
77+ defer a .mu .Unlock ()
78+ return a .resp
4079}
4180
4281// callCapAsync async calls a capability by placing execution results onto a
4382// channel and storing each channel with a unique identifier for future
4483// retrieval on await.
4584func (e * execution [T ]) callCapAsync (ctx context.Context , req * sdkpb.CapabilityRequest ) error {
85+ if e .suspensionEnabled {
86+ // check if there is already an item in the capabilityResponses for a given callback id.
87+ // if there is -> integrity check (matching requests); return without firing goroutine.
88+ // else -> legacy path.
89+ if asyncResponse , ok := e .capabilityResponses [req .CallbackId ]; ok {
90+ // there is already an item for this callback id; we must therefore be replaying
91+ // perform an integrity check to enforce determinism and return early.
92+ if ! proto .Equal (asyncResponse .req , req ) {
93+ return errors .New ("non-determinism error" )
94+ }
95+ return nil
96+ }
97+ }
4698 // Acquire a slot from the pool limiter to bound concurrency.
4799 free , err := e .pendingCallsLimiter .Wait (ctx , 1 )
48100 if err != nil {
@@ -52,7 +104,13 @@ func (e *execution[T]) callCapAsync(ctx context.Context, req *sdkpb.CapabilityRe
52104 ch := make (chan * sdkpb.CapabilityResponse , 1 )
53105 e .lock .Lock ()
54106 defer e .lock .Unlock ()
55- e .capabilityResponses [req .CallbackId ] = ch
107+ e .capabilityResponses [req .CallbackId ] = & asyncResponse [
108+ sdkpb.CapabilityRequest ,
109+ sdkpb.CapabilityResponse ,
110+ ]{
111+ ch : ch ,
112+ req : req ,
113+ }
56114
57115 go func () {
58116 defer free ()
@@ -82,25 +140,61 @@ func (e *execution[T]) callCapAsync(ctx context.Context, req *sdkpb.CapabilityRe
82140 return nil
83141}
84142
143+ var errSuspendExecution = errors .New ("__SUSPEND_EXECUTION__" )
144+
85145func (e * execution [T ]) awaitCapabilities (ctx context.Context , acr * sdkpb.AwaitCapabilitiesRequest ) (* sdkpb.AwaitCapabilitiesResponse , error ) {
86146 responses := make (map [int32 ]* sdkpb.CapabilityResponse , len (acr .Ids ))
87147
88148 e .lock .Lock ()
89149 defer e .lock .Unlock ()
150+
151+ if ! e .suspensionEnabled {
152+ // Legacy behaviour: block until each requested response is available,
153+ // then consume and remove it from the store.
154+ for _ , callId := range acr .Ids {
155+ ar , ok := e .capabilityResponses [callId ]
156+ if ! ok {
157+ return nil , fmt .Errorf ("failed to get call from store : %d" , callId )
158+ }
159+
160+ select {
161+ case <- ctx .Done ():
162+ return nil , fmt .Errorf ("failed to wait for capability response %d : %w" , callId , ctx .Err ())
163+ case resp := <- ar .ch :
164+ responses [callId ] = resp
165+ }
166+
167+ delete (e .capabilityResponses , callId )
168+ }
169+
170+ return & sdkpb.AwaitCapabilitiesResponse {
171+ Responses : responses ,
172+ }, nil
173+ }
174+
175+ // for all ids, check whether we have a response
176+ // if yes: return the response
177+ // if no: return suspend execution error, record the ids for which we are still waiting
178+ // to be picked up by the runWasm routine
179+ responsesForAll := true
90180 for _ , callId := range acr .Ids {
91- ch , ok := e .capabilityResponses [callId ]
181+ ar , ok := e .capabilityResponses [callId ]
92182 if ! ok {
93183 return nil , fmt .Errorf ("failed to get call from store : %d" , callId )
94184 }
95185
96- select {
97- case <- ctx .Done ():
98- return nil , fmt .Errorf ("failed to wait for capability response %d : %w" , callId , ctx .Err ())
99- case resp := <- ch :
100- responses [callId ] = resp
186+ resp := ar .getResp (ctx )
187+ if resp == nil {
188+ responsesForAll = false
189+ break
101190 }
102191
103- delete (e .capabilityResponses , callId )
192+ responses [callId ] = resp
193+ }
194+
195+ if ! responsesForAll {
196+ e .awaiting = acr .Ids
197+ return nil , errSuspendExecution
104198 }
105199
106200 return & sdkpb.AwaitCapabilitiesResponse {
0 commit comments