Skip to content

Commit 461c9f1

Browse files
authored
feat: configurable delay confirmations for chainsync events (#423)
Fixes #418 Signed-off-by: Aurora Gaffney <aurora@blinklabs.io>
1 parent 513579b commit 461c9f1

3 files changed

Lines changed: 101 additions & 28 deletions

File tree

input/chainsync/chainsync.go

Lines changed: 73 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"log/slog"
2323
"math"
2424
"net/http"
25+
"slices"
2526
"strings"
2627
"time"
2728

@@ -69,6 +70,8 @@ type ChainSync struct {
6970
dialFamily string
7071
kupoUrl string
7172
kupoClient *kugo.Client
73+
delayConfirmations uint
74+
delayBuffer [][]event.Event
7275
}
7376

7477
type ChainSyncStatus struct {
@@ -304,6 +307,22 @@ func (c *ChainSync) handleRollBackward(
304307
nil,
305308
NewRollbackEvent(point),
306309
)
310+
// Remove rolled-back events from buffer
311+
if len(c.delayBuffer) > 0 {
312+
// We iterate backwards to avoid the issues with deleting from a list while iterating over it
313+
for i := len(c.delayBuffer) - 1; i >= 0; i-- {
314+
for _, evt := range c.delayBuffer[i] {
315+
// Look for block event
316+
if blockEvtCtx, ok := evt.Context.(BlockContext); ok {
317+
// Delete event batch if slot is after rollback point
318+
if blockEvtCtx.SlotNumber > point.Slot {
319+
c.delayBuffer = slices.Delete(c.delayBuffer, i, i+1)
320+
break
321+
}
322+
}
323+
}
324+
}
325+
}
307326
c.eventChan <- evt
308327

309328
// updating status after roll backward
@@ -323,36 +342,73 @@ func (c *ChainSync) handleRollForward(
323342
blockData any,
324343
tip ochainsync.Tip,
325344
) error {
345+
var block ledger.Block
346+
var err error
347+
tmpEvents := make([]event.Event, 0, 20)
326348
switch v := blockData.(type) {
327349
case ledger.Block:
328-
evt := event.New("chainsync.block", time.Now(), NewBlockContext(v, c.networkMagic), NewBlockEvent(v, c.includeCbor))
329-
c.eventChan <- evt
330-
c.updateStatus(v.SlotNumber(), v.BlockNumber(), v.Hash().String(), tip.Point.Slot, hex.EncodeToString(tip.Point.Hash))
350+
block = v
331351
case ledger.BlockHeader:
332352
blockSlot := v.SlotNumber()
333-
block, err := c.oConn.BlockFetch().Client.GetBlock(ocommon.Point{Slot: blockSlot, Hash: v.Hash().Bytes()})
353+
block, err = c.oConn.BlockFetch().Client.GetBlock(ocommon.Point{Slot: blockSlot, Hash: v.Hash().Bytes()})
334354
if err != nil {
335355
return err
336356
}
337357
if block == nil {
338358
return errors.New("blockfetch returned empty")
339359
}
340-
blockEvt := event.New("chainsync.block", time.Now(), NewBlockHeaderContext(v), NewBlockEvent(block, c.includeCbor))
341-
c.eventChan <- blockEvt
342-
for t, transaction := range block.Transactions() {
343-
resolvedInputs, err := resolveTransactionInputs(transaction, c)
344-
if err != nil {
345-
return err
346-
}
347-
if t < 0 || t > math.MaxUint32 {
348-
return errors.New("invalid number of transactions")
360+
default:
361+
return errors.New("unknown type")
362+
}
363+
blockEvt := event.New("chainsync.block", time.Now(), NewBlockHeaderContext(block.Header()), NewBlockEvent(block, c.includeCbor))
364+
tmpEvents = append(tmpEvents, blockEvt)
365+
for t, transaction := range block.Transactions() {
366+
resolvedInputs, err := resolveTransactionInputs(transaction, c)
367+
if err != nil {
368+
return err
369+
}
370+
if t < 0 || t > math.MaxUint32 {
371+
return errors.New("invalid number of transactions")
372+
}
373+
txEvt := event.New("chainsync.transaction", time.Now(), NewTransactionContext(block, transaction, uint32(t), c.networkMagic),
374+
NewTransactionEvent(block, transaction, c.includeCbor, resolvedInputs))
375+
tmpEvents = append(tmpEvents, txEvt)
376+
}
377+
updateTip := ochainsync.Tip{
378+
Point: ocommon.Point{
379+
Slot: block.SlotNumber(),
380+
Hash: block.Hash().Bytes(),
381+
},
382+
BlockNumber: block.BlockNumber(),
383+
}
384+
if c.delayConfirmations == 0 {
385+
// Send events immediately if no delay confirmations configured
386+
for _, evt := range tmpEvents {
387+
c.eventChan <- evt
388+
}
389+
} else {
390+
// Add events to delay buffer
391+
c.delayBuffer = append(c.delayBuffer, tmpEvents)
392+
// Send oldest events and remove from buffer if delay buffer is larger than configured delay confirmations
393+
if uint(len(c.delayBuffer)) > c.delayConfirmations {
394+
for _, evt := range c.delayBuffer[0] {
395+
// Look for block event
396+
if blockEvt, ok := evt.Payload.(BlockEvent); ok {
397+
// Populate current point for update status based on most recently sent events
398+
updateTip = ochainsync.Tip{
399+
Point: ocommon.Point{
400+
Slot: blockEvt.Block.SlotNumber(),
401+
Hash: blockEvt.Block.Hash().Bytes(),
402+
},
403+
BlockNumber: blockEvt.Block.BlockNumber(),
404+
}
405+
}
406+
c.eventChan <- evt
349407
}
350-
txEvt := event.New("chainsync.transaction", time.Now(), NewTransactionContext(block, transaction, uint32(t), c.networkMagic),
351-
NewTransactionEvent(block, transaction, c.includeCbor, resolvedInputs))
352-
c.eventChan <- txEvt
408+
c.delayBuffer = slices.Delete(c.delayBuffer, 0, 1)
353409
}
354-
c.updateStatus(v.SlotNumber(), v.BlockNumber(), v.Hash().String(), tip.Point.Slot, hex.EncodeToString(tip.Point.Hash))
355410
}
411+
c.updateStatus(updateTip.Point.Slot, updateTip.BlockNumber, hex.EncodeToString(updateTip.Point.Hash), tip.Point.Slot, hex.EncodeToString(tip.Point.Hash))
356412
return nil
357413
}
358414

input/chainsync/options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,16 @@ func WithBulkMode(bulkMode bool) ChainSyncOptionFunc {
108108
}
109109
}
110110

111+
// WithKupoUrl specifies the URL for a Kupo instance that will be queried for additional information
111112
func WithKupoUrl(kupoUrl string) ChainSyncOptionFunc {
112113
return func(c *ChainSync) {
113114
c.kupoUrl = kupoUrl
114115
}
115116
}
117+
118+
// WithDelayConfirmationCount specifies the number of confirmations (subsequent blocks) are required before an event will be emitted
119+
func WithDelayConfirmations(count uint) ChainSyncOptionFunc {
120+
return func(c *ChainSync) {
121+
c.delayConfirmations = count
122+
}
123+
}

input/chainsync/plugin.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,18 @@ import (
2626
)
2727

2828
var cmdlineOptions struct {
29-
network string
30-
networkMagic uint
31-
address string
32-
socketPath string
33-
ntcTcp bool
34-
bulkMode bool
35-
intersectTip bool
36-
intersectPoint string
37-
includeCbor bool
38-
autoReconnect bool
39-
kupoUrl string
29+
network string
30+
networkMagic uint
31+
address string
32+
socketPath string
33+
ntcTcp bool
34+
bulkMode bool
35+
intersectTip bool
36+
intersectPoint string
37+
includeCbor bool
38+
autoReconnect bool
39+
kupoUrl string
40+
delayConfirmations uint
4041
}
4142

4243
func init() {
@@ -126,6 +127,13 @@ func init() {
126127
DefaultValue: "",
127128
Dest: &(cmdlineOptions.kupoUrl),
128129
},
130+
{
131+
Name: "delay-confirmations",
132+
Type: plugin.PluginOptionTypeUint,
133+
Description: "number of confirmations required before emitting events",
134+
DefaultValue: uint(0),
135+
Dest: &(cmdlineOptions.delayConfirmations),
136+
},
129137
},
130138
},
131139
)
@@ -151,6 +159,7 @@ func NewFromCmdlineOptions() plugin.Plugin {
151159
WithIncludeCbor(cmdlineOptions.includeCbor),
152160
WithAutoReconnect(cmdlineOptions.autoReconnect),
153161
WithKupoUrl(cmdlineOptions.kupoUrl),
162+
WithDelayConfirmations(cmdlineOptions.delayConfirmations),
154163
}
155164
if cmdlineOptions.intersectPoint != "" {
156165
intersectPoints := []ocommon.Point{}

0 commit comments

Comments
 (0)