Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 70 additions & 36 deletions beacon-chain/state/stategen/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"context"
"encoding/hex"
"fmt"
"slices"

"github.com/OffchainLabs/prysm/v7/beacon-chain/core/transition"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/config/features"
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
Expand Down Expand Up @@ -128,11 +130,19 @@ func (s *State) MigrateToCold(ctx context.Context, fRoot [32]byte) error {
func (s *State) migrateToColdHdiff(ctx context.Context, fRoot [32]byte) error {
s.finalizedInfo.lock.RLock()
oldFSlot := s.finalizedInfo.slot
oldFRoot := s.finalizedInfo.root
oldFState := s.finalizedInfo.state
s.finalizedInfo.lock.RUnlock()
fSlot, err := s.beaconDB.SlotByBlockRoot(ctx, fRoot)
if err != nil {
return errors.Wrap(err, "could not get slot by block root")
}

if oldFState == nil || oldFState.IsNil() {
return errors.New("finalized state is nil")
}

slotsToSave := make([]primitives.Slot, 0)
for slot := oldFSlot; slot < fSlot; slot++ {
if ctx.Err() != nil {
return ctx.Err()
Expand All @@ -148,58 +158,82 @@ func (s *State) migrateToColdHdiff(ctx context.Context, fRoot [32]byte) error {
if uint64(slot) == offset {
continue
}
// The state needs to be saved.
// Try the epoch boundary cache first.
cached, exists, err := s.epochBoundaryStateCache.getBySlot(slot)
slotsToSave = append(slotsToSave, slot)
}

if len(slotsToSave) == 0 {
// Update finalized info in memory.
fInfo, ok, err := s.epochBoundaryStateCache.getByBlockRoot(fRoot)
if err != nil {
log.WithError(err).Errorf("could not get epoch boundary state for slot %d", slot)
cached = nil
exists = false
return err
}
var aRoot [32]byte
var aState state.BeaconState
if exists {
aRoot = cached.root
aState = cached.state
} else {
_, roots, err := s.beaconDB.HighestRootsBelowSlot(ctx, slot)
if ok {
s.SaveFinalizedState(fSlot, fRoot, fInfo.state)
}
return nil
}

blocks, err := s.loadBlocks(ctx, oldFSlot+1, fSlot, fRoot)
if err != nil {
return errors.Wrap(err, "could not load blocks for hdiff migration")
}
slices.SortFunc(blocks, func(a, b interfaces.ReadOnlySignedBeaconBlock) int {
switch {
case a.Block().Slot() < b.Block().Slot():
return -1
case a.Block().Slot() > b.Block().Slot():
return 1
default:
return 0
}
})

currState := oldFState.Copy()
currRoot := oldFRoot
nextBlockIdx := 0

for _, slot := range slotsToSave {
if ctx.Err() != nil {
return ctx.Err()
}

// Replay all canonical blocks up to target slot.
for nextBlockIdx < len(blocks) && blocks[nextBlockIdx].Block().Slot() <= slot {
currState, err = executeStateTransitionStateGen(ctx, currState, blocks[nextBlockIdx])
if err != nil {
return err
return errors.Wrapf(err, "could not replay block at slot %d during hdiff migration", blocks[nextBlockIdx].Block().Slot())
}
// Given the block has been finalized, the db should not have more than one block in a given slot.
// We should error out when this happens.
if len(roots) != 1 {
return errUnknownBlock
}
aRoot = roots[0]
// Different than the legacy MigrateToCold, we need to always get the state even if
// the state exists in DB as part of the hot state db, because we need to process slots
// to the state diff tree slots.
aState, err = s.StateByRoot(ctx, aRoot)
currRoot, err = blocks[nextBlockIdx].Block().HashTreeRoot()
if err != nil {
return err
return errors.Wrap(err, "could not compute block root during hdiff migration")
}
nextBlockIdx++
}
if s.beaconDB.HasState(ctx, aRoot) {
s.migrateHotToCold(aRoot)
continue
}
// advance slots to the target slot
if aState.Slot() < slot {
aState, err = transition.ProcessSlots(ctx, aState, slot)

// If target slot is skipped, advance with process slots.
if currState.Slot() < slot {
currState, err = transition.ProcessSlots(ctx, currState, slot)
if err != nil {
return errors.Wrapf(err, "could not process slots to slot %d", slot)
return errors.Wrapf(err, "could not process slots to slot %d during hdiff migration", slot)
}
}
if err := s.beaconDB.SaveState(ctx, aState, aRoot); err != nil {
if currState.Slot() != slot {
return errors.Errorf("unexpected replay state slot %d while targeting %d", currState.Slot(), slot)
}

// Save to the finalized state-diff tree. This does not store unfinalized states in the diff tree
// because migration only runs after finalized checkpoint advancement.
if err := s.beaconDB.SaveState(ctx, currState, currRoot); err != nil {
return err
}
s.migrateHotToCold(currRoot)
log.WithFields(
logrus.Fields{
"slot": aState.Slot(),
"root": fmt.Sprintf("%#x", aRoot),
"slot": currState.Slot(),
"root": fmt.Sprintf("%#x", currRoot),
}).Info("Saved state in DB")
}

// Update finalized info in memory.
fInfo, ok, err := s.epochBoundaryStateCache.getByBlockRoot(fRoot)
if err != nil {
Expand Down
Loading