Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions comprehensive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ func TestStoreBlockBatch(t *testing.T) {
{Slot: 300, Epoch: 1, BlockHash: "h3", VrfOutput: make([]byte, 64), NetworkMagic: MainnetNetworkMagic},
}

if err := store.InsertBlockBatch(ctx, blocks); err != nil {
if _, err := store.InsertBlockBatch(ctx, blocks); err != nil {
t.Fatalf("InsertBlockBatch: %v", err)
}

Expand All @@ -696,7 +696,7 @@ func TestStoreBlockBatch(t *testing.T) {
}

// Duplicate batch should not error (ON CONFLICT DO NOTHING)
if err := store.InsertBlockBatch(ctx, blocks); err != nil {
if _, err := store.InsertBlockBatch(ctx, blocks); err != nil {
t.Fatalf("InsertBlockBatch duplicate: %v", err)
}

Expand Down
17 changes: 9 additions & 8 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,10 @@ func (s *PgStore) GetForgedSlots(ctx context.Context, epoch int) ([]uint64, erro
return slots, rows.Err()
}

func (s *PgStore) InsertBlockBatch(ctx context.Context, blocks []BlockData) error {
func (s *PgStore) InsertBlockBatch(ctx context.Context, blocks []BlockData) (int, error) {
tx, err := s.pool.Begin(ctx)
if err != nil {
return err
return 0, err
}
defer tx.Rollback(ctx)

Expand All @@ -281,7 +281,7 @@ func (s *PgStore) InsertBlockBatch(ctx context.Context, blocks []BlockData) erro
slot BIGINT, epoch INT, block_hash TEXT, vrf_output BYTEA, nonce_value BYTEA
) ON COMMIT DROP`)
if err != nil {
return err
return 0, err
}

rows := make([][]interface{}, len(blocks))
Expand All @@ -297,18 +297,19 @@ func (s *PgStore) InsertBlockBatch(ctx context.Context, blocks []BlockData) erro
pgx.CopyFromRows(rows),
)
if err != nil {
return err
return 0, err
}

// Merge into blocks — duplicates silently skipped
_, err = tx.Exec(ctx, `INSERT INTO blocks (slot, epoch, block_hash, vrf_output, nonce_value)
// Merge into blocks — duplicates silently skipped, count actually inserted
result, err := tx.Exec(ctx, `INSERT INTO blocks (slot, epoch, block_hash, vrf_output, nonce_value)
SELECT slot, epoch, block_hash, vrf_output, nonce_value FROM blocks_staging
ON CONFLICT (slot) DO NOTHING`)
if err != nil {
return err
return 0, err
}

return tx.Commit(ctx)
inserted := int(result.RowsAffected())
return inserted, tx.Commit(ctx)
}


Expand Down
14 changes: 12 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ func (i *Indexer) flushBlockBatch(batch []BlockData) {

// Large batches (historical sync) — bulk insert via CopyFrom
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
err := i.store.InsertBlockBatch(ctx, batch)
inserted, err := i.store.InsertBlockBatch(ctx, batch)
cancel()

if err != nil {
Expand All @@ -532,7 +532,17 @@ func (i *Indexer) flushBlockBatch(batch []BlockData) {
return
}

// Blocks inserted via CopyFrom, evolve nonce in-memory with single DB persist
if inserted < len(batch) {
// Overlap window: some blocks already existed (reconnect overlap).
// Blocks arrive in slot order, so duplicates are at the start.
// Slice to only the new blocks for nonce evolution.
dupes := len(batch) - inserted
log.Printf("Batch had %d/%d duplicates (overlap), evolving %d new blocks", dupes, len(batch), inserted)
i.nonceTracker.ProcessBatch(batch[dupes:])
return
}

// All blocks were new — evolve nonce in-memory with single DB persist
i.nonceTracker.ProcessBatch(batch)
}

Expand Down
19 changes: 12 additions & 7 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type BlockVrfRows interface {
// Both SQLite and PostgreSQL backends implement this interface.
type Store interface {
InsertBlock(ctx context.Context, slot uint64, epoch int, blockHash string, vrfOutput, nonceValue []byte) (bool, error)
InsertBlockBatch(ctx context.Context, blocks []BlockData) error
InsertBlockBatch(ctx context.Context, blocks []BlockData) (int, error)
UpsertEvolvingNonce(ctx context.Context, epoch int, nonce []byte, blockCount int) error
SetCandidateNonce(ctx context.Context, epoch int, nonce []byte) error
SetFinalNonce(ctx context.Context, epoch int, nonce []byte, source string) error
Expand Down Expand Up @@ -344,10 +344,10 @@ func (s *SqliteStore) GetForgedSlots(ctx context.Context, epoch int) ([]uint64,
return slots, rows.Err()
}

func (s *SqliteStore) InsertBlockBatch(ctx context.Context, blocks []BlockData) error {
func (s *SqliteStore) InsertBlockBatch(ctx context.Context, blocks []BlockData) (int, error) {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
return 0, fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback()

Expand All @@ -357,18 +357,23 @@ func (s *SqliteStore) InsertBlockBatch(ctx context.Context, blocks []BlockData)
ON CONFLICT (slot) DO NOTHING`,
)
if err != nil {
return fmt.Errorf("prepare: %w", err)
return 0, fmt.Errorf("prepare: %w", err)
}
defer stmt.Close()

inserted := 0
for _, b := range blocks {
nonceValue := vrfNonceValueForEpoch(b.VrfOutput, b.Epoch, b.NetworkMagic)
if _, err := stmt.ExecContext(ctx, int64(b.Slot), b.Epoch, b.BlockHash, b.VrfOutput, nonceValue); err != nil {
return fmt.Errorf("insert slot %d: %w", b.Slot, err)
result, err := stmt.ExecContext(ctx, int64(b.Slot), b.Epoch, b.BlockHash, b.VrfOutput, nonceValue)
if err != nil {
return 0, fmt.Errorf("insert slot %d: %w", b.Slot, err)
}
if n, _ := result.RowsAffected(); n > 0 {
inserted++
}
}

return tx.Commit()
return inserted, tx.Commit()
}


Expand Down
23 changes: 15 additions & 8 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,20 +138,27 @@ func (s *ChainSyncer) Stop() {
}

// getIntersectPoints determines where to start syncing from.
// Uses an overlap window of 100 blocks (~2000 slots) on reconnect to prevent
// gaps from blocks lost in the gouroboros muxer buffer when a connection dies.
// ON CONFLICT DO NOTHING in InsertBlockBatch handles the resulting duplicates.
func (s *ChainSyncer) getIntersectPoints(ctx context.Context) ([]pcommon.Point, error) {
if s.store == nil {
return []pcommon.Point{pcommon.NewPointOrigin()}, nil
}

// Check if we have existing data to resume from
lastSlot, err := s.store.GetLastSyncedSlot(ctx)
if err == nil && lastSlot > 0 {
log.Printf("Resuming sync from last synced slot %d", lastSlot)
point, err := s.getIntersectForSlot(ctx, lastSlot)
if err == nil {
return []pcommon.Point{point}, nil
// Resume with overlap: fetch last 100 blocks and intersect at the oldest.
// This backs up ~2000 slots so any blocks lost in the muxer's buffer
// during a connection timeout get re-delivered by the node.
blocks, err := s.store.GetLastNBlocks(ctx, 100)
if err == nil && len(blocks) > 0 {
oldest := blocks[len(blocks)-1] // GetLastNBlocks returns DESC order
log.Printf("Resuming sync from slot %d (overlap: %d blocks back from tip slot %d)",
oldest.Slot, len(blocks), blocks[0].Slot)
hashBytes, decErr := hex.DecodeString(oldest.BlockHash)
if decErr == nil {
return []pcommon.Point{pcommon.NewPoint(oldest.Slot, hashBytes)}, nil
}
log.Printf("Could not get intersect for slot %d, falling back to Shelley genesis: %v", lastSlot, err)
log.Printf("Could not decode block hash at slot %d: %v", oldest.Slot, decErr)
}

// Start from Shelley genesis (skip Byron)
Expand Down
Loading