Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o goduckbot .
docker pull wcatz/goduckbot:latest
```

Multi-arch images (amd64/arm64) are published to [Docker Hub](https://hub.docker.com/r/wcatz/goduckbot) via GitHub Actions on every merge to master and tagged release.
ARM64 images are published to [Docker Hub](https://hub.docker.com/r/wcatz/goduckbot) built locally and pushed on every release.

## Helm Chart

Expand Down
222 changes: 76 additions & 146 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,73 @@ func (i *Indexer) cmdEpoch(m *telebot.Message) {
i.bot.Send(m.Chat, msg)
}

// queryStakeForLeaderlog retrieves pool and total stake for the given target epoch.
// Tries NtC first with the appropriate mark/set/go snapshot, then falls back to Koios.
// This is the single authoritative stake-fetch path for all leaderlog calculations.
func (i *Indexer) queryStakeForLeaderlog(ctx context.Context, targetEpoch int) (poolStake, totalStake uint64, err error) {
curEpoch := i.getCurrentEpoch()

// Try NtC if within mark/set/go snapshot range
if i.nodeQuery != nil {
var snap SnapshotType
ntcAvailable := true
switch targetEpoch {
case curEpoch + 1:
snap = SnapshotMark
case curEpoch:
snap = SnapshotSet
case curEpoch - 1:
snap = SnapshotGo
default:
ntcAvailable = false
}
if ntcAvailable {
ntcCtx, ntcCancel := context.WithTimeout(ctx, 5*time.Minute)
snapshots, snapErr := i.nodeQuery.QueryPoolStakeSnapshots(ntcCtx, i.bech32PoolId)
ntcCancel()
if snapErr != nil {
log.Printf("NtC stake query for epoch %d failed: %v", targetEpoch, snapErr)
} else {
switch snap {
case SnapshotMark:
poolStake, totalStake = snapshots.PoolStakeMark, snapshots.TotalStakeMark
case SnapshotSet:
poolStake, totalStake = snapshots.PoolStakeSet, snapshots.TotalStakeSet
case SnapshotGo:
poolStake, totalStake = snapshots.PoolStakeGo, snapshots.TotalStakeGo
}
if poolStake > 0 {
return poolStake, totalStake, nil
}
}
}
}

// Koios fallback: try target epoch first, then recent epochs
if i.koios != nil {
for _, tryEpoch := range []int{targetEpoch, curEpoch, curEpoch - 1, curEpoch - 2} {
epochNo := koios.EpochNo(tryEpoch)
poolHist, histErr := i.koios.GetPoolHistory(ctx, koios.PoolID(i.bech32PoolId), &epochNo, nil)
if histErr != nil || len(poolHist.Data) == 0 {
continue
}
poolStake = uint64(poolHist.Data[0].ActiveStake.IntPart())
epochInfo, infoErr := i.koios.GetEpochInfo(ctx, &epochNo, nil)
if infoErr != nil || len(epochInfo.Data) == 0 {
poolStake = 0
continue
}
totalStake = uint64(epochInfo.Data[0].ActiveStake.IntPart())
if tryEpoch != targetEpoch {
log.Printf("Using Koios stake fallback for epoch %d (from epoch %d)", targetEpoch, tryEpoch)
}
return poolStake, totalStake, nil
}
}

return 0, 0, fmt.Errorf("no stake data available for epoch %d from NtC or Koios", targetEpoch)
}

func (i *Indexer) cmdLeaderlog(m *telebot.Message) {
if !i.isAllowed(m) {
return
Expand Down Expand Up @@ -417,14 +484,11 @@ func (i *Indexer) cmdLeaderlog(m *telebot.Message) {

// Parse argument: "next", "current", or epoch number
var targetEpoch int
var snapType SnapshotType
switch {
case args == "next":
targetEpoch = i.getCurrentEpoch() + 1
snapType = SnapshotMark
case args == "current":
targetEpoch = i.getCurrentEpoch()
snapType = SnapshotSet
default:
parsed, err := strconv.Atoi(args)
if err != nil {
Expand Down Expand Up @@ -460,69 +524,9 @@ func (i *Indexer) cmdLeaderlog(m *telebot.Message) {
return
}

// Try NtC first if epoch is within snapshot range (mark/set/go)
var poolStake, totalStake uint64
curEpoch := i.getCurrentEpoch()
if i.nodeQuery != nil {
var snap SnapshotType
ntcAvailable := true
switch targetEpoch {
case curEpoch + 1:
snap = SnapshotMark
case curEpoch:
snap = SnapshotSet
case curEpoch - 1:
snap = SnapshotGo
default:
ntcAvailable = false
}
if ntcAvailable {
ntcCtx, ntcCancel := context.WithTimeout(ctx, 5*time.Minute)
snapshots, snapErr := i.nodeQuery.QueryPoolStakeSnapshots(ntcCtx, i.bech32PoolId)
ntcCancel()
if snapErr != nil {
log.Printf("NtC stake query for epoch %d failed: %v", targetEpoch, snapErr)
} else {
switch snap {
case SnapshotMark:
poolStake = snapshots.PoolStakeMark
totalStake = snapshots.TotalStakeMark
case SnapshotSet:
poolStake = snapshots.PoolStakeSet
totalStake = snapshots.TotalStakeSet
case SnapshotGo:
poolStake = snapshots.PoolStakeGo
totalStake = snapshots.TotalStakeGo
}
}
}
}

// Koios fallback for epochs outside NtC range or NtC failure
if poolStake == 0 && i.koios != nil {
curEpoch := i.getCurrentEpoch()
for _, tryEpoch := range []int{targetEpoch, curEpoch, curEpoch - 1, curEpoch - 2} {
epochNo := koios.EpochNo(tryEpoch)
poolHist, histErr := i.koios.GetPoolHistory(ctx, koios.PoolID(i.bech32PoolId), &epochNo, nil)
if histErr != nil || len(poolHist.Data) == 0 {
continue
}
poolStake = uint64(poolHist.Data[0].ActiveStake.IntPart())
epochInfo, infoErr := i.koios.GetEpochInfo(ctx, &epochNo, nil)
if infoErr != nil || len(epochInfo.Data) == 0 {
poolStake = 0
continue
}
totalStake = uint64(epochInfo.Data[0].ActiveStake.IntPart())
if tryEpoch != targetEpoch {
log.Printf("Using Koios stake fallback for /leaderlog %d (from epoch %d)", targetEpoch, tryEpoch)
}
break
}
}

if poolStake == 0 || totalStake == 0 {
replyEpoch(fmt.Sprintf("Failed to get stake for epoch %d from NtC or Koios", targetEpoch))
poolStake, totalStake, stakeErr := i.queryStakeForLeaderlog(ctx, targetEpoch)
if stakeErr != nil {
replyEpoch(fmt.Sprintf("Failed to get stake for epoch %d: %v", targetEpoch, stakeErr))
return
}

Expand Down Expand Up @@ -576,51 +580,9 @@ func (i *Indexer) cmdLeaderlog(m *telebot.Message) {
return
}

// Try NtC first for stake data
var poolStake, totalStake uint64
if i.nodeQuery != nil {
ntcCtx, ntcCancel := context.WithTimeout(ctx, 5*time.Minute)
snapshots, snapErr := i.nodeQuery.QueryPoolStakeSnapshots(ntcCtx, i.bech32PoolId)
ntcCancel()
if snapErr != nil {
log.Printf("NtC stake query for epoch %d failed: %v", targetEpoch, snapErr)
} else {
switch snapType {
case SnapshotSet:
poolStake = snapshots.PoolStakeSet
totalStake = snapshots.TotalStakeSet
default:
poolStake = snapshots.PoolStakeMark
totalStake = snapshots.TotalStakeMark
}
}
}

// Koios fallback if NtC unavailable or failed (try recent epochs since Koios may lag)
if poolStake == 0 && i.koios != nil {
curEpoch := i.getCurrentEpoch()
for _, tryEpoch := range []int{curEpoch, curEpoch - 1, curEpoch - 2} {
epochNo := koios.EpochNo(tryEpoch)
poolHist, histErr := i.koios.GetPoolHistory(ctx, koios.PoolID(i.bech32PoolId), &epochNo, nil)
if histErr != nil || len(poolHist.Data) == 0 {
continue
}
poolStake = uint64(poolHist.Data[0].ActiveStake.IntPart())
epochInfo, infoErr := i.koios.GetEpochInfo(ctx, &epochNo, nil)
if infoErr != nil || len(epochInfo.Data) == 0 {
poolStake = 0
continue
}
totalStake = uint64(epochInfo.Data[0].ActiveStake.IntPart())
if tryEpoch != targetEpoch {
log.Printf("Using Koios stake fallback for /leaderlog (from epoch %d)", tryEpoch)
}
break
}
}

if poolStake == 0 || totalStake == 0 {
reply(fmt.Sprintf("Failed to get stake for epoch %d from NtC or Koios", targetEpoch))
poolStake, totalStake, stakeErr := i.queryStakeForLeaderlog(ctx, targetEpoch)
if stakeErr != nil {
reply(fmt.Sprintf("Failed to get stake for epoch %d: %v", targetEpoch, stakeErr))
return
}

Expand Down Expand Up @@ -994,41 +956,9 @@ func (i *Indexer) cmdNextBlock(m *telebot.Message) {
return
}

var poolStake, totalStake uint64
if i.nodeQuery != nil {
ntcCtx, ntcCancel := context.WithTimeout(ctx, 5*time.Minute)
snapshots, snapErr := i.nodeQuery.QueryPoolStakeSnapshots(ntcCtx, i.bech32PoolId)
ntcCancel()
if snapErr != nil {
log.Printf("NtC stake query failed, trying Koios fallback: %v", snapErr)
} else {
poolStake = snapshots.PoolStakeSet
totalStake = snapshots.TotalStakeSet
}
}
if poolStake == 0 && i.koios != nil {
// Try recent epochs (Koios may lag 1-2 epochs behind)
for _, tryEpoch := range []int{currentEpoch, currentEpoch - 1, currentEpoch - 2} {
epochNo := koios.EpochNo(tryEpoch)
poolHist, histErr := i.koios.GetPoolHistory(ctx, koios.PoolID(i.bech32PoolId), &epochNo, nil)
if histErr != nil || len(poolHist.Data) == 0 {
log.Printf("Koios GetPoolHistory(%d) empty or error: %v", tryEpoch, histErr)
continue
}
poolStake = uint64(poolHist.Data[0].ActiveStake.IntPart())
epochInfo, infoErr := i.koios.GetEpochInfo(ctx, &epochNo, nil)
if infoErr != nil || len(epochInfo.Data) == 0 {
log.Printf("Koios GetEpochInfo(%d) failed: %v", tryEpoch, infoErr)
poolStake = 0
continue
}
totalStake = uint64(epochInfo.Data[0].ActiveStake.IntPart())
log.Printf("Using Koios stake fallback for /nextblock (from epoch %d)", tryEpoch)
break
}
}
if poolStake == 0 || totalStake == 0 {
reply("No stake data available from NtC or Koios")
poolStake, totalStake, stakeErr := i.queryStakeForLeaderlog(ctx, currentEpoch)
if stakeErr != nil {
reply(fmt.Sprintf("No stake data available: %v", stakeErr))
return
}

Expand Down
34 changes: 0 additions & 34 deletions comprehensive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,40 +819,6 @@ func TestStoreForgedSlots(t *testing.T) {
}
}

func TestStoreStreamBlockNonces(t *testing.T) {
store := newTestStore(t)
ctx := context.Background()

_, _ = store.InsertBlock(ctx, 300, 2, "h3", []byte{3}, []byte{33})
_, _ = store.InsertBlock(ctx, 100, 1, "h1", []byte{1}, []byte{11})
_, _ = store.InsertBlock(ctx, 200, 1, "h2", []byte{2}, []byte{22})

rows, err := store.StreamBlockNonces(ctx)
if err != nil {
t.Fatalf("StreamBlockNonces: %v", err)
}
defer rows.Close()

var slots []uint64
for rows.Next() {
_, slot, _, _, err := rows.Scan()
if err != nil {
t.Fatalf("Scan: %v", err)
}
slots = append(slots, slot)
}
if err := rows.Err(); err != nil {
t.Fatalf("Err: %v", err)
}

// Should be ordered by slot ASC
if len(slots) != 3 {
t.Fatalf("expected 3, got %d", len(slots))
}
if slots[0] != 100 || slots[1] != 200 || slots[2] != 300 {
t.Fatalf("slots not in order: %v", slots)
}
}

func TestStoreStreamBlockVrfOutputs(t *testing.T) {
store := newTestStore(t)
Expand Down
6 changes: 6 additions & 0 deletions config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ leaderlog:
timezone: "America/New_York"
timeFormat: "12h" # "12h" or "24h"

# Advanced (full mode only):
# nonceIntegrityCheck: false # After nonce backfill, verify computed nonces against Koios
# # and log any mismatches. Safe to enable; adds ~1 API call/epoch.
# backfillSchedules: false # After nonce backfill, calculate and store leader schedules
# # for all historical epochs where nonce data is available.

# Database configuration (only used when leaderlog.enabled is true)
database:
driver: "sqlite" # "sqlite" (default) or "postgres"
Expand Down
Loading
Loading