Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion cmd/mithril/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ var (
debugAcctWrites []string
cpuprofPath string

bankhashVerifyEndpoint string // URL of a reference mithril node for bankhash verification
bankhashVerifyMode string // "warn" or "panic"

paramArenaSizeMB uint64
borrowedAccountArenaSize uint64

Expand Down Expand Up @@ -133,6 +136,8 @@ func init() {
// [debug] section flags
Run.Flags().StringSliceVar(&debugTxs, "transaction-signatures", []string{}, "Pass tx signature strings to enable debug logging during that transaction's execution")
Run.Flags().StringSliceVar(&debugAcctWrites, "account-writes", []string{}, "Pass account pubkeys to enable debug logging of transactions that modify the account")
Run.Flags().StringVar(&bankhashVerifyEndpoint, "bankhash-verify-endpoint", "", "URL of a reference mithril node for bankhash verification (e.g. http://reference-node:8899)")
Run.Flags().StringVar(&bankhashVerifyMode, "bankhash-verify-mode", "warn", "Bankhash verification mode: 'warn' (log mismatch) or 'panic' (halt on mismatch)")

// Top-level flags
Run.Flags().StringVar(&scratchDirectory, "scratch-directory", "/tmp", "Path for downloads (e.g. snapshots) and other temp state")
Expand Down Expand Up @@ -443,6 +448,10 @@ func initConfigAndBindFlags(cmd *cobra.Command) error {
if len(debugAcctWrites) == 0 {
debugAcctWrites = getStringSlice("account-writes", "development.debug.account_writes")
}
bankhashVerifyEndpoint = getString("bankhash-verify-endpoint", "debug.bankhash_verify_endpoint")
if v := getString("bankhash-verify-mode", "debug.bankhash_verify_mode"); v != "" {
bankhashVerifyMode = v
}

// [tuning] section (with fallback to legacy [development])
paramArenaSizeMB = getUint64("param-arena-size-mb", "tuning.param_arena_size_mb")
Expand Down Expand Up @@ -2210,6 +2219,17 @@ func runReplayWithRecovery(
}
}()

result = replay.ReplayBlocks(ctx, accountsDb, accountsDbPath, mithrilState, resumeState, startSlot, endSlot, rpcEndpoints, blockDir, txParallelism, isLive, useLightbringer, dbgOpts, metricsWriter, rpcServer, blockFetchOpts, onCancelWriteState)
// Create bankhash verifier if configured
var bhVerifier *replay.BankhashVerifier
if bankhashVerifyEndpoint != "" {
mode := replay.BankhashVerifyWarn
if bankhashVerifyMode == "panic" {
mode = replay.BankhashVerifyPanic
}
bhVerifier = replay.NewBankhashVerifier(bankhashVerifyEndpoint, mode)
mlog.Log.Infof("Bankhash verification enabled: endpoint=%s mode=%s", bankhashVerifyEndpoint, bankhashVerifyMode)
}

result = replay.ReplayBlocks(ctx, accountsDb, accountsDbPath, mithrilState, resumeState, startSlot, endSlot, rpcEndpoints, blockDir, txParallelism, isLive, useLightbringer, dbgOpts, metricsWriter, rpcServer, blockFetchOpts, onCancelWriteState, bhVerifier)
return result
}
10 changes: 10 additions & 0 deletions config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,16 @@ name = "mithril"
# Account pubkeys to enable debug logging of transactions that modify them
# account_writes = ["pubkey1", "pubkey2"]

# Bankhash verification: compare computed bankhashes against a reference
# mithril node to detect state divergences early. The reference node must
# have its RPC server enabled (see [rpc] section).

# Endpoint of a trusted reference mithril node (empty = disabled)
# bankhash_verify_endpoint = "http://reference-node:8899"

# What to do on mismatch: "warn" (log and continue) or "panic" (halt)
# bankhash_verify_mode = "warn"

# ============================================================================
# [reporting] - Metrics & Reporting
# ============================================================================
Expand Down
202 changes: 202 additions & 0 deletions pkg/replay/bankhash_verify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package replay

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"

"github.com/Overclock-Validator/mithril/pkg/base58"
"github.com/Overclock-Validator/mithril/pkg/mlog"
)

// BankhashVerifier checks computed bankhashes against a reference mithril node.
// Verification is async and does not block the replay loop.
type BankhashVerifier struct {
endpoint string
httpClient *http.Client
mode BankhashVerifyMode

mu sync.Mutex
lastVerified uint64
mismatches int
verified int
errors int
verifyCh chan verifyRequest
done chan struct{}
}

type BankhashVerifyMode int

const (
BankhashVerifyWarn BankhashVerifyMode = iota // log and continue
BankhashVerifyPanic // halt on mismatch
)

type verifyRequest struct {
slot uint64
computed []byte
}

type jsonRPCRequest struct {
JSONRPC string `json:"jsonrpc"`
ID int `json:"id"`
Method string `json:"method"`
Params []interface{} `json:"params"`
}

type jsonRPCResponse struct {
JSONRPC string `json:"jsonrpc"`
ID int `json:"id"`
Result json.RawMessage `json:"result"`
Error *jsonRPCError `json:"error"`
}

type jsonRPCError struct {
Code int `json:"code"`
Message string `json:"message"`
}

func NewBankhashVerifier(endpoint string, mode BankhashVerifyMode) *BankhashVerifier {
v := &BankhashVerifier{
endpoint: endpoint,
httpClient: &http.Client{
Timeout: 5 * time.Second,
},
mode: mode,
verifyCh: make(chan verifyRequest, 64),
done: make(chan struct{}),
}
go v.worker()
return v
}

// Submit queues a bankhash for async verification.
func (v *BankhashVerifier) Submit(slot uint64, computedHash []byte) {
hashCopy := make([]byte, len(computedHash))
copy(hashCopy, computedHash)
select {
case v.verifyCh <- verifyRequest{slot: slot, computed: hashCopy}:
default:
// channel full, skip to avoid blocking replay
v.mu.Lock()
v.errors++
v.mu.Unlock()
}
}

// Stop shuts down the verification worker and logs a summary.
func (v *BankhashVerifier) Stop() {
close(v.verifyCh)
<-v.done
v.mu.Lock()
defer v.mu.Unlock()
mlog.Log.Infof("bankhash verify shutdown: verified=%d mismatches=%d errors=%d last_slot=%d",
v.verified, v.mismatches, v.errors, v.lastVerified)
}

func (v *BankhashVerifier) worker() {
defer close(v.done)
for req := range v.verifyCh {
v.verify(req)
}
}

func (v *BankhashVerifier) verify(req verifyRequest) {
expected, err := v.fetchBankHash(req.slot)
if err != nil {
v.mu.Lock()
v.errors++
errCount := v.errors
v.mu.Unlock()
if errCount%100 == 1 {
mlog.Log.Infof("bankhash verify: fetch error for slot %d: %v (total errors: %d)", req.slot, err, errCount)
}
return
}

v.mu.Lock()
v.verified++
v.lastVerified = req.slot
v.mu.Unlock()

if !bytes.Equal(req.computed, expected) {
v.mu.Lock()
v.mismatches++
v.mu.Unlock()

computedStr := base58.Encode(req.computed)
expectedStr := base58.Encode(expected)

if v.mode == BankhashVerifyPanic {
panic(fmt.Sprintf("DIVERGENCE in slot %d: bankhash mismatch: computed=%s expected=%s", req.slot, computedStr, expectedStr))
}
mlog.Log.Errorf("DIVERGENCE in slot %d: bankhash mismatch: computed=%s expected=%s", req.slot, computedStr, expectedStr)
}
}

func (v *BankhashVerifier) fetchBankHash(slot uint64) ([]byte, error) {
reqBody := jsonRPCRequest{
JSONRPC: "2.0",
ID: 1,
Method: "getBankHash",
Params: []interface{}{slot},
}

bodyBytes, err := json.Marshal(reqBody)
if err != nil {
return nil, fmt.Errorf("marshal request: %w", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

httpReq, err := http.NewRequestWithContext(ctx, "POST", v.endpoint, bytes.NewReader(bodyBytes))
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")

resp, err := v.httpClient.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("http request: %w", err)
}
defer resp.Body.Close()

respBytes, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("read response: %w", err)
}

var rpcResp jsonRPCResponse
if err := json.Unmarshal(respBytes, &rpcResp); err != nil {
return nil, fmt.Errorf("unmarshal response: %w", err)
}

if rpcResp.Error != nil {
return nil, fmt.Errorf("rpc error %d: %s", rpcResp.Error.Code, rpcResp.Error.Message)
}

var hashStr string
if err := json.Unmarshal(rpcResp.Result, &hashStr); err != nil {
return nil, fmt.Errorf("unmarshal result: %w", err)
}

hashBytes, err := base58.DecodeFromString(hashStr)
if err != nil {
return nil, fmt.Errorf("decode base58 hash: %w", err)
}

return hashBytes[:], nil
}

// Stats returns current verification statistics.
func (v *BankhashVerifier) Stats() (verified, mismatches, errors int) {
v.mu.Lock()
defer v.mu.Unlock()
return v.verified, v.mismatches, v.errors
}
26 changes: 20 additions & 6 deletions pkg/replay/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -1189,7 +1189,7 @@ func ReplayBlocks(
acctsDb *accountsdb.AccountsDb,
acctsDbPath string,
mithrilState *state.MithrilState, // State file with manifest_* seed fields
resumeState *ResumeState, // nil if not resuming, contains parent slot info when resuming
resumeState *ResumeState, // nil if not resuming, contains parent slot info when resuming
startSlot, endSlot uint64,
rpcEndpoints []string, // RPC endpoints in priority order (first = primary, rest = fallbacks)
blockDir string,
Expand All @@ -1201,6 +1201,7 @@ func ReplayBlocks(
rpcServer *rpcserver.RpcServer,
blockFetchOpts *BlockFetchOpts,
onCancelWriteState OnCancelWriteState, // callback to write state immediately on cancellation (can be nil)
bankhashVerifier *BankhashVerifier, // optional bankhash verifier (nil to disable)
) *ReplayResult {
result := &ReplayResult{}

Expand Down Expand Up @@ -1648,6 +1649,11 @@ func ReplayBlocks(
fmt.Fprintf(bankhashLogFile, "%d %s\n", block.Slot, base58.Encode(lastSlotCtx.FinalBankhash))
}

// Verify bankhash against reference node (async, non-blocking)
if bankhashVerifier != nil {
bankhashVerifier.Submit(block.Slot, lastSlotCtx.FinalBankhash)
}

statsd.Count(statsd.SlotReplays, 1, nil)
statsd.Timing(statsd.SlotReplayDurationMs, uint64(slotReplayDuration.Nanoseconds())/1e6, nil)
statsd.Gauge(statsd.Epoch, float64(block.Epoch), nil)
Expand Down Expand Up @@ -1918,6 +1924,11 @@ func ReplayBlocks(
// Serialize all epoch stakes for persistence
result.ComputedEpochStakes = serializeAllEpochStakes()

// Stop bankhash verifier and log summary
if bankhashVerifier != nil {
bankhashVerifier.Stop()
}

return result
}

Expand Down Expand Up @@ -2026,7 +2037,7 @@ func sequentialTxLoop(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup, bl
}
txFeeInfo, txErr := ProcessTransaction(slotCtx, sigverifyWg, tx, txMeta, dbgOpts, nil)

if txErr != nil {
if txMeta != nil && txErr != nil {
if txMeta.Err == nil && tx.IsVote() {
mlog.Log.Errorf("[run:%s] DIVERGENCE in slot %d: vote tx %s failed locally but succeeded onchain => bankhash mismatch at parent slot %d",
CurrentRunID, block.Slot, tx.Signatures[0], block.ParentSlot)
Expand All @@ -2035,11 +2046,11 @@ func sequentialTxLoop(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup, bl
}

// check for success-failure return value divergences
if txErr == nil && txMeta.Err != nil {
if txMeta != nil && txErr == nil && txMeta.Err != nil {
mlog.Log.Errorf("[run:%s] DIVERGENCE in slot %d: tx %s succeeded locally but failed onchain: %+v",
CurrentRunID, block.Slot, tx.Signatures[0], block.TxMetas[idx].Err)
panic(fmt.Sprintf("tx %s return value divergence: txErr was nil, but onchain err was %+v", tx.Signatures[0], block.TxMetas[idx].Err))
} else if txErr != nil && txMeta.Err == nil {
} else if txMeta != nil && txErr != nil && txMeta.Err == nil {
mlog.Log.Errorf("[run:%s] DIVERGENCE in slot %d: tx %s failed locally (%v) but succeeded onchain",
CurrentRunID, block.Slot, tx.Signatures[0], txErr)
panic(fmt.Sprintf("tx %s return value divergence: txErr was %+v (%s), but onchain err was nil", tx.Signatures[0], txErr, txErr))
Expand All @@ -2058,11 +2069,14 @@ func parallelTxLoop(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup, bloc

if rblock.FromLightbringer {
wg := &sync.WaitGroup{}
workerPool, _ := ants.NewPoolWithFunc(txParallelism, func(i interface{}) {
workerPool, poolErr := ants.NewPoolWithFunc(txParallelism, func(i interface{}) {
defer wg.Done()
idx := i.(uint64)
txFeeInfos[idx], errs[idx] = ProcessTransaction(slotCtx, sigverifyWg, rblock.Transactions[idx], nil, dbgOpts, nil)
})
if poolErr != nil {
panic(fmt.Sprintf("failed to create worker pool: %s", poolErr))
}

for _, entry := range rblock.Entries {
for _, txIdx := range entry.Indices {
Expand Down Expand Up @@ -2157,7 +2171,7 @@ func ProcessBlock(
for i := range block.Transactions {
unresolvedBlock.Transactions[i] = &solana.Transaction{}
*(unresolvedBlock.Transactions[i]) = *block.Transactions[i]
if unresolvedBlock.TxMetas != nil && !block.FromLightbringer {
if block.TxMetas != nil && !block.FromLightbringer {
unresolvedBlock.TxMetas[i] = &rpc.TransactionMeta{}
*(unresolvedBlock.TxMetas[i]) = *block.TxMetas[i]
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/replay/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func ProcessTransaction(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup,
start = time.Now()
txFeeInfo, _, err := fees.CalculateAndDeductTxFees(tx, txMeta, instrs, &execCtx.TransactionContext.Accounts, computeBudgetLimits, slotCtx.Features)
if err != nil {
return txFeeInfo, nil
return &fees.TxFeeInfo{}, nil
}

metrics.GlobalBlockReplay.CalcAndDeductFees.AddTimingSince(start)
Expand Down Expand Up @@ -495,7 +495,7 @@ func ProcessTransaction(slotCtx *sealevel.SlotCtx, sigverifyWg *sync.WaitGroup,
}

// check for CU consumed divergences
if instrErr == nil && *txMeta.ComputeUnitsConsumed != execCtx.ComputeMeter.Used() {
if instrErr == nil && txMeta != nil && txMeta.ComputeUnitsConsumed != nil && *txMeta.ComputeUnitsConsumed != execCtx.ComputeMeter.Used() {
discrepancy := max(execCtx.ComputeMeter.Used(), *txMeta.ComputeUnitsConsumed) - min(execCtx.ComputeMeter.Used(), *txMeta.ComputeUnitsConsumed)
var sign byte
if execCtx.ComputeMeter.Used() > *txMeta.ComputeUnitsConsumed {
Expand Down
Loading
Loading