Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions cmd/mithril/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1235,7 +1235,40 @@ postBootstrap:
NearTipPollMs: blockNearTipPollMs,
NearTipLookahead: blockNearTipLookahead,
}
result := runReplayWithRecovery(ctx, accountsDb, accountsPath, manifest, resumeState, uint64(startSlot), liveEndSlot, rpcEndpoints, blockstorePath, int(txParallelism), true, useLightbringer, dbgOpts, metricsWriter, rpcServer, mithrilState, blockFetchOpts, replayStartTime)
// Build consensus options from config
consensusMaxDepth := config.GetInt("consensus.skip_path_max_depth")
if consensusMaxDepth <= 0 {
consensusMaxDepth = 64
}
consensusPolicy := config.GetString("consensus.unresolved_policy")
if consensusPolicy == "" {
consensusPolicy = "halt"
}
switch consensusPolicy {
case "halt", "warn":
// valid
default:
mlog.Log.Errorf("invalid consensus.unresolved_policy %q (must be \"halt\" or \"warn\"), defaulting to \"halt\"", consensusPolicy)
consensusPolicy = "halt"
}
consensusEnforceSource := config.GetString("consensus.enforce_on_source")
if consensusEnforceSource == "" {
consensusEnforceSource = "lightbringer"
}
switch consensusEnforceSource {
case "lightbringer", "all":
// valid
default:
mlog.Log.Errorf("invalid consensus.enforce_on_source %q (must be \"lightbringer\" or \"all\"), defaulting to \"lightbringer\"", consensusEnforceSource)
consensusEnforceSource = "lightbringer"
}
consensusOpts := &replay.ConsensusOpts{
SkipPathMaxDepth: consensusMaxDepth,
UnresolvedPolicy: consensusPolicy,
EnforceOnSource: consensusEnforceSource,
}

result := runReplayWithRecovery(ctx, accountsDb, accountsPath, manifest, resumeState, uint64(startSlot), liveEndSlot, rpcEndpoints, blockstorePath, int(txParallelism), true, useLightbringer, dbgOpts, metricsWriter, rpcServer, mithrilState, blockFetchOpts, consensusOpts, replayStartTime)

// Update state file with last persisted slot and shutdown context
// Skip if already written during cancellation (eliminates timing window)
Expand Down Expand Up @@ -2099,6 +2132,7 @@ func runReplayWithRecovery(
rpcServer *rpcserver.RpcServer,
mithrilState *state.MithrilState,
blockFetchOpts *replay.BlockFetchOpts,
consensusOpts *replay.ConsensusOpts,
replayStartTime time.Time, // Start time for resume context
) *replay.ReplayResult {
var result *replay.ReplayResult
Expand Down Expand Up @@ -2210,6 +2244,6 @@ func runReplayWithRecovery(
}
}()

result = replay.ReplayBlocks(ctx, accountsDb, accountsDbPath, mithrilState, resumeState, startSlot, endSlot, rpcEndpoints, blockDir, txParallelism, isLive, useLightbringer, dbgOpts, metricsWriter, rpcServer, blockFetchOpts, onCancelWriteState)
result = replay.ReplayBlocks(ctx, accountsDb, accountsDbPath, mithrilState, resumeState, startSlot, endSlot, rpcEndpoints, blockDir, txParallelism, isLive, useLightbringer, dbgOpts, metricsWriter, rpcServer, blockFetchOpts, consensusOpts, onCancelWriteState)
return result
}
29 changes: 29 additions & 0 deletions config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,35 @@ name = "mithril"
# num_slots = 0
# end_slot = -1

# ============================================================================
# [consensus] - Vote-Anchored Consensus
# ============================================================================
#
# Controls how Mithril uses on-chain vote data to verify block correctness.
# The fork choice service accumulates vote stake per slot and determines
# which bank hash has reached 2/3 supermajority.
#
# When using Lightbringer as the block source, the consensus coordinator
# resolves ambiguous slot ranges by finding a valid skip path that chains
# to the vote-confirmed hash. If no valid path exists, the configured
# policy determines behavior.

[consensus]
# Maximum depth (number of slots) the skip-path solver will explore.
# Longer ranges take more memory. 64 covers ~26 seconds of slots.
skip_path_max_depth = 64

# What to do when a Lightbringer slot range cannot be resolved:
# "halt" - Graceful shutdown, write diagnostic artifact (recommended)
# "warn" - Log warning and continue (use only for debugging)
unresolved_policy = "halt"

# Which block source to enforce consensus on:
# "lightbringer" - Only enforce on Lightbringer blocks (RPC blocks are trusted)
# "all" - Enforce on all block sources (not yet implemented)
enforce_on_source = "lightbringer"


# ============================================================================
# [rpc] - Mithril RPC Server
# ============================================================================
Expand Down
8 changes: 8 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ type LogConfig struct {
MaxBackups int `toml:"max_backups" mapstructure:"max_backups"` // Keep up to N old log files
}

// ConsensusConfig holds vote-anchored consensus configuration
type ConsensusConfig struct {
SkipPathMaxDepth int `toml:"skip_path_max_depth" mapstructure:"skip_path_max_depth"` // Max slots the skip-path solver explores (default: 64)
UnresolvedPolicy string `toml:"unresolved_policy" mapstructure:"unresolved_policy"` // "halt" or "warn" (default: "halt")
EnforceOnSource string `toml:"enforce_on_source" mapstructure:"enforce_on_source"` // "lightbringer" or "all" (default: "lightbringer")
}

// Config holds all configuration options for Mithril (Firedancer-style hierarchy)
type Config struct {
// Top-level (matches Firedancer style)
Expand All @@ -152,6 +159,7 @@ type Config struct {
Rpc RpcConfig `toml:"rpc" mapstructure:"rpc"`
Replay ReplayConfig `toml:"replay" mapstructure:"replay"`
Block BlockConfig `toml:"block" mapstructure:"block"`
Consensus ConsensusConfig `toml:"consensus" mapstructure:"consensus"`
Snapshot SnapshotConfig `toml:"snapshot" mapstructure:"snapshot"`
Development DevelopmentConfig `toml:"development" mapstructure:"development"`
Reporting ReportingConfig `toml:"reporting" mapstructure:"reporting"`
Expand Down
10 changes: 10 additions & 0 deletions pkg/epochstakes/epoch_authorized_voters.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,13 @@ func (cache *EpochAuthorizedVotersCache) IsAuthorizedVoter(voteAcct solana.Publi
}
return false
}

// Entries returns the underlying map for serialization/persistence.
func (cache *EpochAuthorizedVotersCache) Entries() map[solana.PublicKey][]solana.PublicKey {
return cache.authorizedVoters
}

// Len returns the number of vote accounts in the cache.
func (cache *EpochAuthorizedVotersCache) Len() int {
return len(cache.authorizedVoters)
}
92 changes: 92 additions & 0 deletions pkg/forkchoice/consensus_coordinator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package forkchoice

import (
"errors"

"github.com/gagliardetto/solana-go"
)

var (
ErrNeedWait = errors.New("consensus: vote landing window not reached, need to wait")
ErrNoSupermajority = errors.New("consensus: no hash reached supermajority for target slot")
)

// SlotDecision represents the resolved action for a single slot.
type SlotDecision struct {
Slot uint64
UseBlock bool // true = use the block, false = slot is empty/skipped
}

// ConsensusCoordinator bridges the ForkChoiceService (vote accumulation) and
// the SkipPath solver to verify bankhash-chain consistency for a slot range.
//
// For a given range [startSlot, endSlot], it:
// 1. Queries forkchoice for the vote-confirmed bankhash at endSlot
// 2. Runs the skipPath solver to find a valid chain from prevBankhash to that hash
// 3. Returns per-slot decisions (use block or skip)
type ConsensusCoordinator struct {
forkChoice *ForkChoiceService
maxDepth int
policy string // "halt" = return error on unresolved, "warn" = log and continue
}

// NewConsensusCoordinator creates a coordinator with the given forkchoice service,
// maximum skipPath solver depth, and unresolved policy.
func NewConsensusCoordinator(fc *ForkChoiceService, maxDepth int, policy string) *ConsensusCoordinator {
return &ConsensusCoordinator{
forkChoice: fc,
maxDepth: maxDepth,
policy: policy,
}
}

// ResolveRange attempts to determine which slots in [startSlot, endSlot] should
// use blocks vs be treated as skipped.
//
// prevBankhash is the parent bankhash at startSlot-1 (the last executed block's bankhash).
// candidates are executed slot candidates carrying computed bankhash + parent bankhash.
//
// Returns slot decisions or an error:
// - ErrNeedWait: votes haven't landed yet, caller should retry later
// - ErrNoSupermajority: no hash reached 2/3 threshold for endSlot
// - ErrNoPath: solver couldn't find a valid chain to the target hash
// - ErrDepthExceeded: range exceeds maxDepth
func (cc *ConsensusCoordinator) ResolveRange(
startSlot, endSlot uint64,
prevBankhash solana.Hash,
candidates map[uint64]*SlotCandidate,
) ([]SlotDecision, error) {
// Query forkchoice for the vote-confirmed hash at the end slot.
targetHash, status := cc.forkChoice.GetSupermajorityHash(endSlot)

switch status {
case BankhashNeedWait:
return nil, ErrNeedWait
case BankhashNoSupermajority:
return nil, ErrNoSupermajority
case BankhashHasSupermajority:
// Continue to solve
}

// Run the skipPath solver to find a valid chain.
result, err := SkipPath(startSlot, endSlot, prevBankhash, candidates, targetHash, cc.maxDepth)
if err != nil {
return nil, err
}

// Convert []bool path to []SlotDecision.
decisions := make([]SlotDecision, len(result.Path))
for i, useBlock := range result.Path {
decisions[i] = SlotDecision{
Slot: startSlot + uint64(i),
UseBlock: useBlock,
}
}

return decisions, nil
}

// Policy returns the coordinator's unresolved policy ("halt" or "warn").
func (cc *ConsensusCoordinator) Policy() string {
return cc.policy
}
163 changes: 163 additions & 0 deletions pkg/forkchoice/consensus_coordinator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package forkchoice

import (
"testing"

"github.com/gagliardetto/solana-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// newTestForkChoiceState creates a minimal forkChoiceState for testing.
// Does NOT start the service or worker pool — only the state is used.
func newTestForkChoiceState(totalStake uint64) *ForkChoiceService {
state := &forkChoiceState{
voteStakeTotals: make(map[uint64]*slotVoteAccumulator),
totalEpochStake: totalStake,
}
return &ForkChoiceService{state: state}
}

// injectSupermajority simulates a slot reaching supermajority by directly
// manipulating the accumulator state. This isolates coordinator tests from
// vote parsing and block submission internals.
func injectSupermajority(fc *ForkChoiceService, slot uint64, winningHash solana.Hash, stake uint64) {
acc := newSlotVoteAccumulator(fc.state.totalEpochStake, slot)
tracker := &voteStakeTracker{
voted: make(map[solana.PublicKey]struct{}),
stake: stake,
}
acc.trackers[winningHash] = tracker
acc.confirmed = true
acc.confirmedHash = winningHash
fc.state.voteStakeTotals[slot] = acc
}

func TestResolveRangeSuccess(t *testing.T) {
fc := newTestForkChoiceState(100)

prevBankhash := hash(0x01)
blockHash := hash(0x02)

// Simulate: slot 10 has a block, supermajority confirms blockHash.
injectSupermajority(fc, 10, blockHash, 70)
fc.state.latestSlotIngested = 10 + VoteConfirmationTimeoutSlots

candidates := map[uint64]*SlotCandidate{
10: {Slot: 10, HasBlock: true, Blockhash: blockHash, LastBlockhash: prevBankhash},
}

cc := NewConsensusCoordinator(fc, 64, "halt")
decisions, err := cc.ResolveRange(10, 10, prevBankhash, candidates)
require.NoError(t, err)
require.Len(t, decisions, 1)
assert.Equal(t, uint64(10), decisions[0].Slot)
assert.True(t, decisions[0].UseBlock)
}

func TestResolveRangeNeedWait(t *testing.T) {
fc := newTestForkChoiceState(100)

// latestSlotIngested is 0 — way before landing window for slot 10.
fc.state.latestSlotIngested = 0

cc := NewConsensusCoordinator(fc, 64, "halt")
_, err := cc.ResolveRange(10, 10, hash(0x01), nil)
assert.ErrorIs(t, err, ErrNeedWait)
}

func TestResolveRangeNoSupermajority(t *testing.T) {
fc := newTestForkChoiceState(100)

// Landing window passed but no votes at all for slot 10.
fc.state.latestSlotIngested = 10 + VoteConfirmationTimeoutSlots

cc := NewConsensusCoordinator(fc, 64, "halt")
_, err := cc.ResolveRange(10, 10, hash(0x01), nil)
assert.ErrorIs(t, err, ErrNoSupermajority)
}

func TestResolveRangeNoPath(t *testing.T) {
fc := newTestForkChoiceState(100)

prevBankhash := hash(0x01)
targetHash := hash(0xFF)

// Supermajority says targetHash won, but our candidates can't chain to it.
injectSupermajority(fc, 10, targetHash, 70)
fc.state.latestSlotIngested = 10 + VoteConfirmationTimeoutSlots

candidates := map[uint64]*SlotCandidate{
10: {Slot: 10, HasBlock: true, Blockhash: hash(0x02), LastBlockhash: prevBankhash},
}

cc := NewConsensusCoordinator(fc, 64, "halt")
_, err := cc.ResolveRange(10, 10, prevBankhash, candidates)
assert.ErrorIs(t, err, ErrNoPath)
}

func TestResolveRangeMultiSlotMixed(t *testing.T) {
fc := newTestForkChoiceState(100)

// Slots 10-13: 10(empty), 11(block), 12(empty), 13(block)
prevBankhash := hash(0x01)
block11Hash := hash(0x02)
block13Hash := hash(0x03)

injectSupermajority(fc, 13, block13Hash, 70)
fc.state.latestSlotIngested = 13 + VoteConfirmationTimeoutSlots

candidates := map[uint64]*SlotCandidate{
11: {Slot: 11, HasBlock: true, Blockhash: block11Hash, LastBlockhash: prevBankhash},
13: {Slot: 13, HasBlock: true, Blockhash: block13Hash, LastBlockhash: block11Hash},
}

cc := NewConsensusCoordinator(fc, 64, "halt")
decisions, err := cc.ResolveRange(10, 13, prevBankhash, candidates)
require.NoError(t, err)
require.Len(t, decisions, 4)

assert.Equal(t, SlotDecision{Slot: 10, UseBlock: false}, decisions[0])
assert.Equal(t, SlotDecision{Slot: 11, UseBlock: true}, decisions[1])
assert.Equal(t, SlotDecision{Slot: 12, UseBlock: false}, decisions[2])
assert.Equal(t, SlotDecision{Slot: 13, UseBlock: true}, decisions[3])
}

func TestResolveRangeDepthExceeded(t *testing.T) {
fc := newTestForkChoiceState(100)

injectSupermajority(fc, 100, hash(0xAA), 70)
fc.state.latestSlotIngested = 100 + VoteConfirmationTimeoutSlots

// Range is 0..100 = 101 slots, but maxDepth is 64.
cc := NewConsensusCoordinator(fc, 64, "halt")
_, err := cc.ResolveRange(0, 100, hash(0x01), nil)
assert.ErrorIs(t, err, ErrDepthExceeded)
}

func TestResolveRangeAllEmpty(t *testing.T) {
fc := newTestForkChoiceState(100)

prevBankhash := hash(0x01)

// Target hash equals prevBankhash — all slots skipped.
injectSupermajority(fc, 12, prevBankhash, 70)
fc.state.latestSlotIngested = 12 + VoteConfirmationTimeoutSlots

cc := NewConsensusCoordinator(fc, 64, "halt")
decisions, err := cc.ResolveRange(10, 12, prevBankhash, nil)
require.NoError(t, err)
require.Len(t, decisions, 3)
for _, d := range decisions {
assert.False(t, d.UseBlock)
}
}

func TestCoordinatorPolicy(t *testing.T) {
fc := newTestForkChoiceState(100)
cc := NewConsensusCoordinator(fc, 64, "halt")
assert.Equal(t, "halt", cc.Policy())

cc2 := NewConsensusCoordinator(fc, 64, "warn")
assert.Equal(t, "warn", cc2.Policy())
}
Loading
Loading