-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsync.go
More file actions
343 lines (306 loc) · 11.2 KB
/
sync.go
File metadata and controls
343 lines (306 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
package main
import (
"context"
"encoding/hex"
"fmt"
"log"
"sync/atomic"
"time"
ouroboros "github.com/blinklabs-io/gouroboros"
"github.com/blinklabs-io/gouroboros/ledger"
"github.com/blinklabs-io/gouroboros/ledger/allegra"
"github.com/blinklabs-io/gouroboros/ledger/alonzo"
"github.com/blinklabs-io/gouroboros/ledger/babbage"
"github.com/blinklabs-io/gouroboros/ledger/conway"
"github.com/blinklabs-io/gouroboros/ledger/mary"
"github.com/blinklabs-io/gouroboros/ledger/shelley"
"github.com/blinklabs-io/gouroboros/protocol/chainsync"
pcommon "github.com/blinklabs-io/gouroboros/protocol/common"
)
// Shelley intersect points: last Byron block per network.
// ChainSync starts from the first Shelley block after these points.
type intersectPoint struct {
Slot uint64
Hash string
}
var shelleyIntersectPoints = map[int]intersectPoint{
MainnetNetworkMagic: {4492799, "f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457"},
PreprodNetworkMagic: {1598399, "7e16781b40ebf8b6da18f7b5e8ade855d6738095ef2f1c58c77e88b6e45997a4"},
}
// ChainSyncer performs full historical chain sync from Shelley genesis to tip
// using the gouroboros NtN (Node-to-Node) ChainSync protocol.
type ChainSyncer struct {
store Store
networkMagic int
nodeAddress string
onBlock func(slot uint64, epoch int, blockHash string, vrfOutput []byte)
onCaughtUp func()
conn *ouroboros.Connection
blockssynced uint64 // atomic counter for progress logging
tipSlot uint64 // atomic: current tip slot for progress tracking
chanLen int64 // atomic: current block channel occupancy (set by writer)
chanCap int64 // channel capacity for pressure calculation
caughtUp atomic.Bool
// Progress tracking
syncStart time.Time
lastLogTime time.Time
lastLogSlot uint64
currentEra string
currentEpoch int
}
// NewChainSyncer creates a new historical chain syncer.
func NewChainSyncer(store Store, networkMagic int, nodeAddress string,
onBlock func(slot uint64, epoch int, blockHash string, vrfOutput []byte),
onCaughtUp func(),
) *ChainSyncer {
return &ChainSyncer{
store: store,
networkMagic: networkMagic,
nodeAddress: nodeAddress,
onBlock: onBlock,
onCaughtUp: onCaughtUp,
}
}
// Start begins the historical chain sync. It blocks until sync completes or ctx is cancelled.
func (s *ChainSyncer) Start(ctx context.Context) error {
// Determine intersect point
intersectPoints, err := s.getIntersectPoints(ctx)
if err != nil {
return fmt.Errorf("getting intersect points: %w", err)
}
log.Printf("Starting historical chain sync from slot %d", intersectPoints[0].Slot)
// Configure ChainSync callbacks
chainSyncCfg := chainsync.NewConfig(
chainsync.WithRollForwardFunc(s.handleRollForward),
chainsync.WithRollBackwardFunc(s.handleRollBackward),
chainsync.WithPipelineLimit(100),
)
// Connect to node via NtN (required for TCP connections)
// Keepalive DISABLED during historical sync: enabling keepalive causes
// ChainSync to stall after ~2000 blocks (gouroboros muxer interaction),
// dropping throughput from ~2500 blk/s to ~6 blk/s.
// Without keepalive the node kills us every ~97s (ExceededTimeLimit),
// but the retry loop handles reconnects seamlessly at ~1800 blk/s sustained.
errChan := make(chan error, 1)
conn, connErr := ouroboros.NewConnection(
ouroboros.WithNetworkMagic(uint32(s.networkMagic)),
ouroboros.WithNodeToNode(true),
ouroboros.WithKeepAlive(false),
ouroboros.WithChainSyncConfig(chainSyncCfg),
ouroboros.WithErrorChan(errChan),
)
if connErr != nil {
return fmt.Errorf("creating connection: %w", connErr)
}
s.conn = conn
if err := conn.Dial("tcp", s.nodeAddress); err != nil {
return fmt.Errorf("connecting to %s: %w", s.nodeAddress, err)
}
log.Printf("Connected to node at %s for historical sync", s.nodeAddress)
// Start chain sync from intersect point
if err := conn.ChainSync().Client.Sync(intersectPoints); err != nil {
conn.Close()
return fmt.Errorf("starting chain sync: %w", err)
}
// Wait for sync to complete, error, or context cancellation
select {
case <-ctx.Done():
conn.Close()
return ctx.Err()
case err := <-errChan:
conn.Close()
if s.caughtUp.Load() {
return nil
}
return fmt.Errorf("chain sync error: %w", err)
}
}
// Stop terminates the chain sync connection.
func (s *ChainSyncer) Stop() {
if s.conn != nil {
s.conn.Close()
}
}
// getIntersectPoints determines where to start syncing from.
// Uses an overlap window of 100 blocks (~2000 slots) on reconnect to prevent
// gaps from blocks lost in the gouroboros muxer buffer when a connection dies.
// ON CONFLICT DO NOTHING in InsertBlockBatch handles the resulting duplicates.
func (s *ChainSyncer) getIntersectPoints(ctx context.Context) ([]pcommon.Point, error) {
if s.store == nil {
return []pcommon.Point{pcommon.NewPointOrigin()}, nil
}
// Resume with overlap: fetch last 100 blocks and intersect at the oldest.
// This backs up ~2000 slots so any blocks lost in the muxer's buffer
// during a connection timeout get re-delivered by the node.
blocks, err := s.store.GetLastNBlocks(ctx, 100)
if err == nil && len(blocks) > 0 {
oldest := blocks[len(blocks)-1] // GetLastNBlocks returns DESC order
log.Printf("Resuming sync from slot %d (overlap: %d blocks back from tip slot %d)",
oldest.Slot, len(blocks), blocks[0].Slot)
hashBytes, decErr := hex.DecodeString(oldest.BlockHash)
if decErr == nil {
return []pcommon.Point{pcommon.NewPoint(oldest.Slot, hashBytes)}, nil
}
log.Printf("Could not decode block hash at slot %d: %v", oldest.Slot, decErr)
}
// Start from Shelley genesis (skip Byron)
if ip, ok := shelleyIntersectPoints[s.networkMagic]; ok {
hashBytes, err := hex.DecodeString(ip.Hash)
if err != nil {
return nil, fmt.Errorf("decoding intersect hash: %w", err)
}
return []pcommon.Point{pcommon.NewPoint(ip.Slot, hashBytes)}, nil
}
// Preview and other networks: start from origin
return []pcommon.Point{pcommon.NewPointOrigin()}, nil
}
// getIntersectForSlot builds an intersect point for a known slot by querying the blocks table.
func (s *ChainSyncer) getIntersectForSlot(ctx context.Context, slot uint64) (pcommon.Point, error) {
hash, err := s.store.GetBlockHash(ctx, slot)
if err != nil {
return pcommon.Point{}, fmt.Errorf("no block hash for slot %d: %w", slot, err)
}
hashBytes, err := hex.DecodeString(hash)
if err != nil {
return pcommon.Point{}, fmt.Errorf("decoding hash: %w", err)
}
return pcommon.NewPoint(slot, hashBytes), nil
}
// blockTypeToEra maps a gouroboros block type constant to an era name.
func blockTypeToEra(blockType uint) string {
switch blockType {
case ledger.BlockTypeShelley:
return "Shelley"
case ledger.BlockTypeAllegra:
return "Allegra"
case ledger.BlockTypeMary:
return "Mary"
case ledger.BlockTypeAlonzo:
return "Alonzo"
case ledger.BlockTypeBabbage:
return "Babbage"
case ledger.BlockTypeConway:
return "Conway"
default:
return fmt.Sprintf("unknown(%d)", blockType)
}
}
// handleRollForward processes a block received during NtN chain sync.
func (s *ChainSyncer) handleRollForward(ctx chainsync.CallbackContext, blockType uint, data any, tip chainsync.Tip) error {
// Update tip for progress tracking
atomic.StoreUint64(&s.tipSlot, tip.Point.Slot)
// Skip Byron blocks — no VRF data
if blockType == ledger.BlockTypeByronEbb || blockType == ledger.BlockTypeByronMain {
return nil
}
// Initialize timing on first post-Byron block
now := time.Now()
if s.syncStart.IsZero() {
s.syncStart = now
s.lastLogTime = now
}
// In NtN mode, data is a ledger.BlockHeader
header, ok := data.(ledger.BlockHeader)
if !ok {
return fmt.Errorf("expected BlockHeader, got %T", data)
}
slot := header.SlotNumber()
blockHash := header.Hash().String()
epoch := SlotToEpoch(slot, s.networkMagic)
// Detect era transitions
era := blockTypeToEra(blockType)
if era != s.currentEra {
log.Printf("[sync] ━━━ entering %s era at slot %d (epoch %d) ━━━", era, slot, epoch)
s.currentEra = era
}
// Detect epoch transitions
if epoch != s.currentEpoch && s.currentEpoch > 0 {
log.Printf("[sync] epoch %d started at slot %d", epoch, slot)
}
s.currentEpoch = epoch
// Extract VRF output from header
vrfOutput := extractVrfFromHeader(blockType, header)
if vrfOutput == nil {
return nil
}
// Deliver to callback
if s.onBlock != nil {
s.onBlock(slot, epoch, blockHash, vrfOutput)
}
// Log progress every 5,000 blocks
count := atomic.AddUint64(&s.blockssynced, 1)
if count%5000 == 0 {
tipSlot := atomic.LoadUint64(&s.tipSlot)
pct := 0.0
if tipSlot > 0 {
pct = float64(slot) / float64(tipSlot) * 100
}
elapsed := now.Sub(s.syncStart)
sinceLastLog := now.Sub(s.lastLogTime)
var slotsPerSec float64
var eta time.Duration
if sinceLastLog.Seconds() > 0 && slot > s.lastLogSlot {
slotsPerSec = float64(slot-s.lastLogSlot) / sinceLastLog.Seconds()
if slotsPerSec > 0 && tipSlot > slot {
eta = time.Duration(float64(tipSlot-slot)/slotsPerSec) * time.Second
}
}
blkPerSec := float64(count) / elapsed.Seconds()
chLen := atomic.LoadInt64(&s.chanLen)
chPct := 0.0
if s.chanCap > 0 {
chPct = float64(chLen) / float64(s.chanCap) * 100
}
log.Printf("[sync] slot %d/%d (%.1f%%) | epoch %d | %s | %.0f blk/s | buf %.0f%% | elapsed %s | ETA %s",
slot, tipSlot, pct, epoch, era, blkPerSec, chPct,
elapsed.Round(time.Second), eta.Round(time.Second))
s.lastLogTime = now
s.lastLogSlot = slot
}
// Check if caught up (within 120 slots / ~2 minutes of tip)
if !s.caughtUp.Load() && tip.Point.Slot > 0 && slot+120 >= tip.Point.Slot {
s.caughtUp.Store(true)
elapsed := time.Since(s.syncStart)
avgBlkSec := float64(count) / elapsed.Seconds()
log.Printf("[sync] caught up in %s | %d blocks | avg %.0f blk/s",
elapsed.Round(time.Second), count, avgBlkSec)
if s.onCaughtUp != nil {
go s.onCaughtUp()
}
}
return nil
}
// handleRollBackward handles a rollback during sync.
func (s *ChainSyncer) handleRollBackward(ctx chainsync.CallbackContext, point pcommon.Point, tip chainsync.Tip) error {
log.Printf("Chain sync rollback to slot %d", point.Slot)
// Rollbacks during historical sync are rare. We continue from the rollback point.
// The nonce state will be slightly off but will self-correct as blocks are re-processed
// (InsertBlock uses ON CONFLICT DO NOTHING so duplicates are safe).
return nil
}
// extractVrfFromHeader extracts VRF output from an NtN block header by era.
func extractVrfFromHeader(blockType uint, header ledger.BlockHeader) []byte {
// Skip Byron blocks
if blockType == ledger.BlockTypeByronEbb || blockType == ledger.BlockTypeByronMain {
return nil
}
// Each era has its own Go type that embeds the previous era's header.
// Must check each concrete type — Go type assertions don't match embedded types.
switch h := header.(type) {
case *conway.ConwayBlockHeader:
return h.Body.VrfResult.Output
case *babbage.BabbageBlockHeader:
return h.Body.VrfResult.Output
case *alonzo.AlonzoBlockHeader:
return h.Body.NonceVrf.Output
case *mary.MaryBlockHeader:
return h.Body.NonceVrf.Output
case *allegra.AllegraBlockHeader:
return h.Body.NonceVrf.Output
case *shelley.ShelleyBlockHeader:
return h.Body.NonceVrf.Output
default:
log.Printf("Could not extract VRF from header type %T (blockType=%d)", header, blockType)
return nil
}
}