Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func (i *Indexer) cmdLeaderlog(m *telebot.Message) {

// Check DB first
stored, storeErr := i.store.GetLeaderSchedule(ctx, targetEpoch)
if storeErr == nil && stored != nil {
if storeErr == nil && stored != nil && i.scheduleNonceMatches(ctx, stored) {
msg := FormatScheduleForTelegram(stored, i.poolName, i.leaderlogTZ, i.leaderlogTimeFormat)
i.bot.Send(m.Chat, msg)
return
Expand All @@ -443,7 +443,7 @@ func (i *Indexer) cmdLeaderlog(m *telebot.Message) {
}
}

epochNonce, nonceErr := i.nonceTracker.GetNonceForEpoch(targetEpoch - 1)
epochNonce, nonceErr := i.nonceTracker.GetVerifiedNonceForEpoch(targetEpoch - 1)
if nonceErr != nil {
replyEpoch(fmt.Sprintf("Failed to get nonce for epoch %d (using epoch %d nonce): %v", targetEpoch, targetEpoch-1, nonceErr))
return
Expand Down Expand Up @@ -533,7 +533,7 @@ func (i *Indexer) cmdLeaderlog(m *telebot.Message) {

// Check for stored schedule first
stored, err := i.store.GetLeaderSchedule(ctx, targetEpoch)
if err == nil && stored != nil {
if err == nil && stored != nil && i.scheduleNonceMatches(ctx, stored) {
msg := FormatScheduleForTelegram(stored, i.poolName, i.leaderlogTZ, i.leaderlogTimeFormat)
i.bot.Send(m.Chat, msg)
return
Expand All @@ -553,7 +553,7 @@ func (i *Indexer) cmdLeaderlog(m *telebot.Message) {
}
}

epochNonce, err := i.nonceTracker.GetNonceForEpoch(targetEpoch - 1)
epochNonce, err := i.nonceTracker.GetVerifiedNonceForEpoch(targetEpoch - 1)
if err != nil {
reply(fmt.Sprintf("Failed to get nonce for epoch %d (using epoch %d nonce): %v", targetEpoch, targetEpoch-1, err))
return
Expand Down Expand Up @@ -634,7 +634,7 @@ func (i *Indexer) cmdNonce(m *telebot.Message) {
label = "next"
}

nonce, err := i.nonceTracker.GetNonceForEpoch(epoch)
nonce, err := i.nonceTracker.GetVerifiedNonceForEpoch(epoch)
if err != nil {
i.bot.Send(m.Chat, fmt.Sprintf("Error getting %s epoch nonce: %v", label, err))
return
Expand Down Expand Up @@ -921,6 +921,9 @@ func (i *Indexer) cmdNextBlock(m *telebot.Message) {
// Try DB first with short timeout
ctxShort, cancelShort := context.WithTimeout(context.Background(), 10*time.Second)
schedule, err := i.store.GetLeaderSchedule(ctxShort, currentEpoch)
if err == nil && schedule != nil && !i.scheduleNonceMatches(ctxShort, schedule) {
schedule = nil
}
cancelShort()

// If no schedule, auto-compute it
Expand All @@ -937,7 +940,7 @@ func (i *Indexer) cmdNextBlock(m *telebot.Message) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
defer cancel()

epochNonce, nonceErr := i.nonceTracker.GetNonceForEpoch(currentEpoch - 1)
epochNonce, nonceErr := i.nonceTracker.GetVerifiedNonceForEpoch(currentEpoch - 1)
if nonceErr != nil {
reply(fmt.Sprintf("Failed to get nonce for epoch %d (using epoch %d nonce): %v", currentEpoch, currentEpoch-1, nonceErr))
return
Expand Down Expand Up @@ -1064,6 +1067,25 @@ func (i *Indexer) cmdNextBlock(m *telebot.Message) {
i.bot.Send(m.Chat, msg)
}

// scheduleNonceMatches validates that a cached schedule was built with the
// currently expected nonce for that epoch (epoch-1 final nonce).
func (i *Indexer) scheduleNonceMatches(ctx context.Context, schedule *LeaderSchedule) bool {
if schedule == nil {
return false
}
expected, err := i.nonceTracker.GetVerifiedNonceForEpoch(schedule.Epoch - 1)
if err != nil {
log.Printf("Failed to load expected nonce for epoch %d: %v", schedule.Epoch, err)
return false
}
if schedule.EpochNonce == hex.EncodeToString(expected) {
return true
}
log.Printf("Ignoring stale schedule for epoch %d: cached nonce %s != expected %s",
schedule.Epoch, schedule.EpochNonce, hex.EncodeToString(expected))
return false
}

func (i *Indexer) cmdVersion(m *telebot.Message) {
if !i.isGroupAllowed(m) {
return
Expand Down
32 changes: 16 additions & 16 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ const (

// Channel to broadcast block events to connected clients
var clients = make(map[*websocket.Conn]bool) // connected clients
var clientsMutex sync.RWMutex // protects clients map
var broadcast = make(chan interface{}, 100) // broadcast channel (buffered to prevent deadlock)
var clientsMutex sync.RWMutex // protects clients map
var broadcast = make(chan interface{}, 100) // broadcast channel (buffered to prevent deadlock)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
Expand Down Expand Up @@ -114,14 +114,14 @@ type Indexer struct {
leaderlogEnabled bool
leaderlogTZ string
leaderlogTimeFormat string
store Store
nonceTracker *NonceTracker
leaderlogMu sync.Mutex
leaderlogCalcing map[int]bool // epochs currently being calculated
leaderlogFailed map[int]time.Time // cooldown: epoch -> last failure time
scheduleExists map[int]bool // cached: epoch -> schedule already in DB
syncer *ChainSyncer // nil in lite mode
lastBlockTime int64 // atomic: unix timestamp of last block received
store Store
nonceTracker *NonceTracker
leaderlogMu sync.Mutex
leaderlogCalcing map[int]bool // epochs currently being calculated
leaderlogFailed map[int]time.Time // cooldown: epoch -> last failure time
scheduleExists map[int]bool // cached: epoch -> schedule already in DB
syncer *ChainSyncer // nil in lite mode
lastBlockTime int64 // atomic: unix timestamp of last block received
}

type BlockEvent struct {
Expand Down Expand Up @@ -1256,7 +1256,7 @@ func (i *Indexer) checkLeaderlogTrigger(slot uint64) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
existing, err := i.store.GetLeaderSchedule(ctx, nextEpoch)
cancel()
if err == nil && existing != nil {
if err == nil && existing != nil && i.scheduleNonceMatches(ctx, existing) {
i.scheduleExists[nextEpoch] = true
i.leaderlogMu.Unlock()
return
Expand Down Expand Up @@ -1306,7 +1306,7 @@ func (i *Indexer) calculateAndPostLeaderlog(epoch int) bool {

// Get epoch nonce (local first, Koios fallback)
// For epoch N leaderlog, use the nonce computed during epoch N-1
epochNonce, err := i.nonceTracker.GetNonceForEpoch(epoch - 1)
epochNonce, err := i.nonceTracker.GetVerifiedNonceForEpoch(epoch - 1)
if err != nil {
log.Printf("Failed to get nonce for epoch %d (tried epoch %d nonce): %v", epoch, epoch-1, err)
return false
Expand Down Expand Up @@ -1445,15 +1445,15 @@ func (i *Indexer) backfillSchedules(ctx context.Context) error {
failed := 0

for epoch := shelleyStart + 1; epoch <= i.epoch; epoch++ {
// Check if schedule already exists
// Check if schedule already exists and matches expected nonce
existing, _ := i.store.GetLeaderSchedule(ctx, epoch)
if existing != nil {
if existing != nil && i.scheduleNonceMatches(ctx, existing) {
skipped++
continue
}

// Get nonce from DB
nonce, err := i.store.GetFinalNonce(ctx, epoch)
// Get nonce from DB (epoch N schedule uses nonce from epoch N-1)
nonce, err := i.store.GetFinalNonce(ctx, epoch-1)
if err != nil || nonce == nil {
continue // no nonce available for this epoch
}
Expand Down
101 changes: 81 additions & 20 deletions nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,21 @@ type NonceTracker struct {
blockCount int
candidateFroze bool // whether candidate nonce was frozen this epoch
networkMagic int
fullMode bool // true = genesis-seeded rolling nonce, false = lite (zero-seeded)
fullMode bool // true = genesis-seeded rolling nonce, false = lite (zero-seeded)
verifiedNonces map[int][]byte // in-memory cache of verified epoch nonces (full mode only)
}

// NewNonceTracker creates a NonceTracker and attempts to restore state from DB.
// In full mode, the initial nonce is seeded with the Shelley genesis hash.
// In lite mode, the initial nonce is zero (current behavior).
func NewNonceTracker(store Store, koiosClient *koios.Client, epoch, networkMagic int, fullMode bool) *NonceTracker {
nt := &NonceTracker{
store: store,
koiosClient: koiosClient,
currentEpoch: epoch,
networkMagic: networkMagic,
fullMode: fullMode,
store: store,
koiosClient: koiosClient,
currentEpoch: epoch,
networkMagic: networkMagic,
fullMode: fullMode,
verifiedNonces: make(map[int][]byte),
}

// Try to restore evolving nonce from DB for current epoch
Expand Down Expand Up @@ -316,6 +318,65 @@ func (nt *NonceTracker) GetNonceForEpoch(epoch int) ([]byte, error) {
return nonce, nil
}

// GetVerifiedNonceForEpoch returns a nonce that is verified against canonical
// data for that epoch, repairing stale DB cache entries if needed.
//
// Full mode: check in-memory cache first, then recompute from local chain data
// if not cached. Repairs stale DB entries on mismatch.
// Lite mode: use existing lookup priority (DB -> Koios).
func (nt *NonceTracker) GetVerifiedNonceForEpoch(epoch int) ([]byte, error) {
if !nt.fullMode {
return nt.GetNonceForEpoch(epoch)
}

// Check in-memory cache first (prevents repeated genesis recomputation)
nt.mu.Lock()
if cached, ok := nt.verifiedNonces[epoch]; ok {
nt.mu.Unlock()
return cached, nil
}
nt.mu.Unlock()

// Not in cache — recompute from local chain data
computeCtx, computeCancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer computeCancel()
computed, err := nt.ComputeEpochNonce(computeCtx, epoch)
if err != nil {
return nil, fmt.Errorf("failed to verify nonce for epoch %d: %w", epoch, err)
}

// Check if DB cache needs repair
checkCtx, checkCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer checkCancel()
cached, cacheErr := nt.store.GetFinalNonce(checkCtx, epoch)
if cacheErr == nil && cached != nil && bytes.Equal(cached, computed) {
// DB matches — store in memory cache and return
nt.mu.Lock()
nt.verifiedNonces[epoch] = computed
nt.mu.Unlock()
return cached, nil
}

source := "computed-verified"
if cacheErr == nil && cached != nil && !bytes.Equal(cached, computed) {
log.Printf("Correcting stale cached nonce for epoch %d: cached %x != computed %x", epoch, cached, computed)
source = "computed-correction"
}

storeCtx, storeCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer storeCancel()
if err := nt.store.SetFinalNonce(storeCtx, epoch, computed, source); err != nil {
log.Printf("Failed to persist verified nonce for epoch %d: %v", epoch, err)
}

// Store in memory cache
nt.mu.Lock()
nt.verifiedNonces[epoch] = computed
nt.mu.Unlock()

return computed, nil
}

// ComputeEpochNonce computes the epoch nonce for targetEpoch entirely from local chain data.
// Streams all blocks from Shelley genesis, evolving the nonce and freezing at the
// stability window of each epoch, then computing:
Expand Down Expand Up @@ -600,26 +661,26 @@ func (nt *NonceTracker) BackfillNonces(ctx context.Context) error {

// IntegrityResult holds the verification result for a single epoch.
type IntegrityResult struct {
Epoch int
Computed string // hex
DBStored string // hex, or "n/a"
Koios string // hex, or "n/a"
DBMatch bool
Epoch int
Computed string // hex
DBStored string // hex, or "n/a"
Koios string // hex, or "n/a"
DBMatch bool
KoiosMatch bool
}

// IntegrityReport is the summary of a full nonce integrity check.
type IntegrityReport struct {
TotalEpochs int
KoiosMatched int
TotalEpochs int
KoiosMatched int
KoiosMismatched int
KoiosUnavail int
DBMatched int
DBMismatched int
VrfErrors int
TotalBlocks int
Duration time.Duration
FirstMismatch int // epoch of first Koios mismatch, 0 if none
KoiosUnavail int
DBMatched int
DBMismatched int
VrfErrors int
TotalBlocks int
Duration time.Duration
FirstMismatch int // epoch of first Koios mismatch, 0 if none
}

// NonceIntegrityCheck recomputes all epoch nonces from raw VRF outputs and
Expand Down