Skip to content

Commit bfbba3b

Browse files
Extend Trigger Capability for Event ACKing (#1736)
* Create BaseTriggerCapability * Update capabilities.go * Remove triggerID from AckEvent * Add event timeout to protos * Include AckEvent * Implement atomicTrigger AckEvent * Update capabilities.go * Update base_trigger.go * Add AckEvent to server template * Base trigger * Setters * Update base_trigger.go * Add event store * Update base_trigger.go * Update base_trigger.go * Update tests * Add AckEvent * Add triggerId * Fix base * Fix server gen * fix build * Add AckEvent * Fix triggers * Update runner_test.go * Add EventTimeout and AckEvent * Update base_test.go * update AckEvent * Update BaseTrigger * Update base_trigger.go * Create mem_trigger_event_store.go * use NewMemEventStore * Fix AckEvent * Ad log * Test * AckEvent protos grpc * Add AckEvent server * Update mem storage * Update base_trigger.go * Clean up * Clean up trySend * Add method * Update registry.pb.go * Fix generate * Fix AckEvent tmpl * Use TriggerAndId * Update base_trigger.go * Add triggerEventStore to dependencies * Add UpdateDelivery to Trigger EventStore * Remove eventTimeout * generate
1 parent f4e9399 commit bfbba3b

File tree

32 files changed

+991
-77
lines changed

32 files changed

+991
-77
lines changed

pkg/capabilities/base_trigger.go

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
package capabilities
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"google.golang.org/protobuf/proto"
9+
10+
"github.com/smartcontractkit/chainlink-common/pkg/logger"
11+
)
12+
13+
type PendingEvent struct {
14+
TriggerId string
15+
EventId string
16+
AnyTypeURL string // Payload type
17+
Payload []byte
18+
FirstAt time.Time
19+
LastSentAt time.Time
20+
Attempts int
21+
}
22+
23+
type EventStore interface {
24+
Insert(ctx context.Context, rec PendingEvent) error
25+
UpdateDelivery(ctx context.Context, triggerId string, eventId string, lastSentAt time.Time, attempts int) error
26+
List(ctx context.Context) ([]PendingEvent, error)
27+
DeleteEvent(ctx context.Context, triggerId string, eventId string) error
28+
DeleteEventsForTrigger(ctx context.Context, triggerID string) error
29+
}
30+
31+
// BaseTriggerCapability keeps track of trigger registrations and handles resending events until
32+
// they are ACKd. Events are persisted to be resilient to node restarts.
33+
type BaseTriggerCapability[T proto.Message] struct {
34+
tRetransmit time.Duration // time window for an event being ACKd before we retransmit
35+
store EventStore
36+
newMsg func() T // factory to allocate a new T for unmarshalling
37+
lggr logger.Logger
38+
capabilityId string
39+
40+
mu sync.Mutex
41+
inboxes map[string]chan<- TriggerAndId[T] // triggerID --> registered send channel
42+
pending map[string]map[string]*PendingEvent // triggerID --> eventID --> PendingEvent
43+
44+
ctx context.Context
45+
cancel context.CancelFunc
46+
wg sync.WaitGroup
47+
}
48+
49+
func NewBaseTriggerCapability[T proto.Message](
50+
store EventStore,
51+
newMsg func() T,
52+
lggr logger.Logger,
53+
capabilityId string,
54+
tRetransmit time.Duration,
55+
) *BaseTriggerCapability[T] {
56+
ctx, cancel := context.WithCancel(context.Background())
57+
return &BaseTriggerCapability[T]{
58+
store: store,
59+
newMsg: newMsg,
60+
lggr: lggr,
61+
capabilityId: capabilityId,
62+
tRetransmit: tRetransmit,
63+
mu: sync.Mutex{},
64+
inboxes: make(map[string]chan<- TriggerAndId[T]),
65+
pending: make(map[string]map[string]*PendingEvent),
66+
ctx: ctx,
67+
cancel: cancel,
68+
}
69+
}
70+
71+
func (b *BaseTriggerCapability[T]) Start(ctx context.Context) error {
72+
b.lggr.Info("starting base trigger")
73+
b.ctx, b.cancel = context.WithCancel(ctx)
74+
75+
recs, err := b.store.List(ctx)
76+
if err != nil {
77+
b.lggr.Errorf("failed to load persisted trigger events")
78+
return err
79+
}
80+
81+
// Initialize in-memory persistence
82+
b.pending = make(map[string]map[string]*PendingEvent)
83+
for i := range recs {
84+
r := &recs[i]
85+
if _, ok := b.pending[r.TriggerId]; !ok {
86+
b.pending[r.TriggerId] = map[string]*PendingEvent{}
87+
}
88+
b.pending[r.TriggerId][r.EventId] = r
89+
}
90+
91+
b.wg.Add(1)
92+
go func() {
93+
defer b.wg.Done()
94+
b.retransmitLoop()
95+
}()
96+
return nil
97+
}
98+
99+
func (b *BaseTriggerCapability[T]) Stop() {
100+
b.cancel()
101+
b.wg.Wait()
102+
}
103+
104+
func (b *BaseTriggerCapability[T]) RegisterTrigger(triggerID string, sendCh chan<- TriggerAndId[T]) {
105+
b.mu.Lock()
106+
b.inboxes[triggerID] = sendCh
107+
b.mu.Unlock()
108+
}
109+
110+
func (b *BaseTriggerCapability[T]) UnregisterTrigger(triggerID string) {
111+
b.mu.Lock()
112+
delete(b.inboxes, triggerID)
113+
delete(b.pending, triggerID)
114+
b.mu.Unlock()
115+
if err := b.store.DeleteEventsForTrigger(b.ctx, triggerID); err != nil {
116+
b.lggr.Errorf("Failed to delete events for trigger (TriggerID=%s): %v", triggerID, err)
117+
}
118+
}
119+
120+
func (b *BaseTriggerCapability[T]) DeliverEvent(
121+
ctx context.Context,
122+
te TriggerEvent,
123+
triggerID string,
124+
) error {
125+
rec := PendingEvent{
126+
TriggerId: triggerID,
127+
EventId: te.ID,
128+
AnyTypeURL: te.Payload.GetTypeUrl(),
129+
Payload: te.Payload.GetValue(),
130+
FirstAt: time.Now(),
131+
}
132+
133+
if err := b.store.Insert(ctx, rec); err != nil {
134+
return err
135+
}
136+
137+
b.mu.Lock()
138+
if b.pending[triggerID] == nil {
139+
b.pending[triggerID] = map[string]*PendingEvent{}
140+
}
141+
b.pending[triggerID][te.ID] = &rec
142+
b.mu.Unlock()
143+
144+
b.trySend(rec)
145+
return nil
146+
}
147+
148+
func (b *BaseTriggerCapability[T]) AckEvent(ctx context.Context, triggerId string, eventId string) error {
149+
b.lggr.Infof("Event ACK (triggerID: %s, eventID %s)", triggerId, eventId)
150+
b.mu.Lock()
151+
if eventsForTrigger, ok := b.pending[triggerId]; ok && eventsForTrigger != nil {
152+
delete(eventsForTrigger, eventId)
153+
if len(eventsForTrigger) == 0 {
154+
delete(b.pending, triggerId)
155+
}
156+
}
157+
b.mu.Unlock()
158+
return b.store.DeleteEvent(ctx, triggerId, eventId)
159+
}
160+
161+
func (b *BaseTriggerCapability[T]) retransmitLoop() {
162+
ticker := time.NewTicker(b.tRetransmit / 2)
163+
defer ticker.Stop()
164+
165+
for {
166+
select {
167+
case <-b.ctx.Done():
168+
return
169+
case <-ticker.C:
170+
b.lggr.Debug("retransmitting unacknowledged events")
171+
b.scanPending()
172+
}
173+
}
174+
}
175+
176+
func (b *BaseTriggerCapability[T]) scanPending() {
177+
now := time.Now()
178+
179+
b.mu.Lock()
180+
toResend := make([]PendingEvent, 0, len(b.pending))
181+
for _, pendingForTrigger := range b.pending {
182+
for _, rec := range pendingForTrigger {
183+
if rec.LastSentAt.IsZero() || now.Sub(rec.LastSentAt) >= b.tRetransmit {
184+
toResend = append(toResend, PendingEvent{
185+
TriggerId: rec.TriggerId,
186+
EventId: rec.EventId,
187+
})
188+
}
189+
}
190+
}
191+
b.mu.Unlock()
192+
193+
for _, event := range toResend {
194+
b.trySend(event)
195+
}
196+
}
197+
198+
// trySend attempts a delivery for the given event.
199+
// It updates Attempts and LastSentAt on every attempt locally. Success is determined
200+
// later by an AckEvent call.
201+
func (b *BaseTriggerCapability[T]) trySend(event PendingEvent) {
202+
b.lggr.Infof("resending event (triggerID: %s, eventID: %s)", event.TriggerId, event.EventId)
203+
b.mu.Lock()
204+
eventsForTrigger, ok := b.pending[event.TriggerId]
205+
if !ok || eventsForTrigger == nil {
206+
b.mu.Unlock()
207+
}
208+
209+
rec, ok := eventsForTrigger[event.EventId]
210+
if !ok || rec == nil {
211+
b.mu.Unlock()
212+
}
213+
214+
rec.Attempts++
215+
rec.LastSentAt = time.Now()
216+
217+
typeURL := rec.AnyTypeURL
218+
payloadCopy := append([]byte(nil), rec.Payload...)
219+
sendCh, inboxOk := b.inboxes[event.TriggerId]
220+
attempts := rec.Attempts
221+
lastSent := rec.LastSentAt
222+
b.mu.Unlock()
223+
224+
if err := b.store.UpdateDelivery(b.ctx, event.TriggerId, event.EventId, lastSent, attempts); err != nil {
225+
b.lggr.Errorf("failed to persist delivery update for trigger=%s event=%s: %v", event.TriggerId, event.EventId, err)
226+
}
227+
228+
if !inboxOk {
229+
b.lggr.Errorf("no inbox registered for trigger %s", event.TriggerId)
230+
}
231+
232+
msg := b.newMsg()
233+
if err := proto.Unmarshal(payloadCopy, msg); err != nil {
234+
b.lggr.Errorf("failed to unmarshal payload to message type (typeURL=%s): %v", typeURL, err)
235+
return
236+
}
237+
238+
wrapped := TriggerAndId[T]{
239+
Trigger: msg,
240+
Id: event.EventId,
241+
}
242+
243+
select {
244+
case sendCh <- wrapped:
245+
b.lggr.Infof("event dispatched: capability =%s trigger=%s event=%s attempt=%d",
246+
b.capabilityId, event.TriggerId, event.EventId, rec.Attempts)
247+
default:
248+
b.lggr.Warnf("inbox full for trigger %s", event.TriggerId)
249+
}
250+
}

0 commit comments

Comments
 (0)