Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/offchainlabs/nitro/arbnode/mel"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
"github.com/offchainlabs/nitro/staker"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/stopwaiter"
Expand Down Expand Up @@ -749,3 +750,77 @@ func (r *InboxReader) GetDelayBlocks() uint64 {
func (r *InboxReader) SupportsPushingFinalityData() bool {
return true
}

type ParentChainDataSource interface {
BatchDataProvider
staker.InboxTrackerInterface
staker.InboxReaderInterface
GetBatchMetadata(seqNum uint64) (mel.BatchMetadata, error)
GetBatchParentChainBlock(seqNum uint64) (uint64, error)
GetDelayedCount() (uint64, error)
}

func (r *InboxReader) GetParentChainDataSource() ParentChainDataSource {
return &batchDataProviderImpl{r}
}

type batchDataProviderImpl struct {
r *InboxReader
}

func (b *batchDataProviderImpl) GetBatchCount() (uint64, error) {
return b.r.tracker.GetBatchCount()
}

func (b *batchDataProviderImpl) GetBatchMessageCount(seqNum uint64) (arbutil.MessageIndex, error) {
return b.r.tracker.GetBatchMessageCount(seqNum)
}

func (b *batchDataProviderImpl) GetDelayedAcc(seqNum uint64) (common.Hash, error) {
return b.r.tracker.GetDelayedAcc(seqNum)
}

func (b *batchDataProviderImpl) GetSequencerMessageBytes(ctx context.Context, seqNum uint64) ([]byte, common.Hash, error) {
return b.r.GetSequencerMessageBytes(ctx, seqNum)
}

func (b *batchDataProviderImpl) GetSequencerMessageBytesForParentBlock(ctx context.Context, seqNum uint64, parentChainBlock uint64) ([]byte, common.Hash, error) {
return b.r.GetSequencerMessageBytesForParentBlock(ctx, seqNum, parentChainBlock)
}

func (b *batchDataProviderImpl) FindParentChainBlockContainingDelayed(ctx context.Context, index uint64) (uint64, error) {
_, _, localParentChainBlockNumber, err := b.r.tracker.getRawDelayedMessageAccumulatorAndParentChainBlockNumber(ctx, index)
return localParentChainBlockNumber, err
}

func (b *batchDataProviderImpl) GetBatchMetadata(seqNum uint64) (mel.BatchMetadata, error) {
return b.r.tracker.GetBatchMetadata(seqNum)
}

func (b *batchDataProviderImpl) GetBatchParentChainBlock(seqNum uint64) (uint64, error) {
return b.r.tracker.GetBatchParentChainBlock(seqNum)
}

func (b *batchDataProviderImpl) FindInboxBatchContainingMessage(pos arbutil.MessageIndex) (uint64, bool, error) {
return b.r.tracker.FindInboxBatchContainingMessage(pos)
}

func (b *batchDataProviderImpl) GetDelayedCount() (uint64, error) {
return b.r.tracker.GetDelayedCount()
}

func (b *batchDataProviderImpl) SetBlockValidator(validator *staker.BlockValidator) {
b.r.tracker.SetBlockValidator(validator)
}

func (b *batchDataProviderImpl) GetDelayedMessageBytes(ctx context.Context, seqNum uint64) ([]byte, error) {
return b.r.tracker.GetDelayedMessageBytes(ctx, seqNum)
}

func (b *batchDataProviderImpl) GetBatchAcc(seqNum uint64) (common.Hash, error) {
return b.r.tracker.GetBatchAcc(seqNum)
}

func (b *batchDataProviderImpl) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
return b.r.GetFinalizedMsgCount(ctx)
}
65 changes: 3 additions & 62 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcaster"
"github.com/offchainlabs/nitro/broadcaster/message"
"github.com/offchainlabs/nitro/daprovider"
"github.com/offchainlabs/nitro/staker"
"github.com/offchainlabs/nitro/util/containers"
Expand Down Expand Up @@ -270,63 +268,6 @@ func (t *InboxTracker) FindInboxBatchContainingMessage(pos arbutil.MessageIndex)
}
}

func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcaster) error {
batchCount, err := t.GetBatchCount()
if err != nil {
return fmt.Errorf("error getting batch count: %w", err)
}
var startMessage arbutil.MessageIndex
if batchCount >= 2 {
// As in AddSequencerBatches, we want to keep the most recent batch's messages.
// This prevents issues if a user's L1 is a bit behind or an L1 reorg occurs.
// `batchCount - 2` is the index of the batch before the last batch.
batchIndex := batchCount - 2
startMessage, err = t.GetBatchMessageCount(batchIndex)
if err != nil {
return fmt.Errorf("error getting batch %v message count: %w", batchIndex, err)
}
}

if t.txStreamer == nil {
return errors.New("txStreamer is nil")
}

messageCount, err := t.txStreamer.GetMessageCount()
if err != nil {
return fmt.Errorf("error getting tx streamer message count: %w", err)
}
var feedMessages []*message.BroadcastFeedMessage
for seqNum := startMessage; seqNum < messageCount; seqNum++ {
message, err := t.txStreamer.GetMessage(seqNum)
if err != nil {
return fmt.Errorf("error getting message %v: %w", seqNum, err)
}

msgResult, err := t.txStreamer.ResultAtMessageIndex(seqNum)
var blockHash *common.Hash
if err == nil {
blockHash = &msgResult.BlockHash
}

blockMetadata, err := t.txStreamer.BlockMetadataAtMessageIndex(seqNum)
if err != nil {
log.Warn("error getting blockMetadata byte array from tx streamer", "err", err)
}

messageWithInfo := arbostypes.MessageWithMetadataAndBlockInfo{
MessageWithMeta: *message,
BlockHash: blockHash,
BlockMetadata: blockMetadata,
}
feedMessage, err := broadcastServer.NewBroadcastFeedMessage(messageWithInfo, seqNum)
if err != nil {
return fmt.Errorf("error creating broadcast feed message %v: %w", seqNum, err)
}
feedMessages = append(feedMessages, feedMessage)
}
return broadcastServer.PopulateFeedBacklog(feedMessages)
}

func (t *InboxTracker) legacyGetDelayedMessageAndAccumulator(ctx context.Context, seqNum uint64) (*arbostypes.L1IncomingMessage, common.Hash, error) {
key := dbKey(schema.LegacyDelayedMessagePrefix, seqNum)
data, err := t.db.Get(key)
Expand All @@ -344,7 +285,7 @@ func (t *InboxTracker) legacyGetDelayedMessageAndAccumulator(ctx context.Context
}

err = msg.FillInBatchGasFields(func(batchNum uint64) ([]byte, error) {
data, _, err := t.txStreamer.inboxReader.GetSequencerMessageBytes(ctx, batchNum)
data, _, err := t.txStreamer.batchDataProvider.GetSequencerMessageBytes(ctx, batchNum)
return data, err
})

Expand All @@ -357,7 +298,7 @@ func (t *InboxTracker) GetDelayedMessageAccumulatorAndParentChainBlockNumber(ctx
return msg, acc, parentChainBlockNumber, err
}
err = msg.FillInBatchGasFields(func(batchNum uint64) ([]byte, error) {
data, _, err := t.txStreamer.inboxReader.GetSequencerMessageBytesForParentBlock(ctx, batchNum, parentChainBlockNumber)
data, _, err := t.txStreamer.batchDataProvider.GetSequencerMessageBytesForParentBlock(ctx, batchNum, parentChainBlockNumber)
return data, err
})
return msg, acc, parentChainBlockNumber, err
Expand Down Expand Up @@ -941,7 +882,7 @@ func (t *InboxTracker) FinalizedDelayedMessageAtPosition(
}
}
err = msg.FillInBatchGasFields(func(batchNum uint64) ([]byte, error) {
data, _, err := t.txStreamer.inboxReader.GetSequencerMessageBytesForParentBlock(
data, _, err := t.txStreamer.batchDataProvider.GetSequencerMessageBytesForParentBlock(
ctx, batchNum, parentChainBlockNumber,
)
return data, err
Expand Down
65 changes: 52 additions & 13 deletions arbnode/mel/runner/mel.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import (
"github.com/ethereum/go-ethereum/rpc"

"github.com/offchainlabs/nitro/arbnode/mel"
melextraction "github.com/offchainlabs/nitro/arbnode/mel/extraction"
"github.com/offchainlabs/nitro/arbnode/mel/extraction"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/bold/containers/fsm"
"github.com/offchainlabs/nitro/cmd/chaininfo"
"github.com/offchainlabs/nitro/daprovider"
"github.com/offchainlabs/nitro/staker"
"github.com/offchainlabs/nitro/util/headerreader"
"github.com/offchainlabs/nitro/util/stopwaiter"
)
Expand Down Expand Up @@ -117,6 +118,8 @@ type MessageExtractor struct {
reorgEventsNotifier chan uint64
seqBatchCounter SequencerBatchCountFetcher
l1Reader *headerreader.HeaderReader

blockValidator *staker.BlockValidator // TODO: remove post MEL block validation
}

// Creates a message extractor instance with the specified parameters,
Expand Down Expand Up @@ -224,22 +227,39 @@ func (m *MessageExtractor) CurrentFSMState() FSMState {
return m.fsm.Current().State
}

// getStateByRPCBlockNum currently supports fetching of respective state for safe and finalized parent chain blocks
func (m *MessageExtractor) getStateByRPCBlockNum(ctx context.Context, blockNum rpc.BlockNumber) (*mel.State, error) {
blk, err := m.parentChainReader.HeaderByNumber(ctx, big.NewInt(blockNum.Int64()))
if err != nil {
return nil, err
var blk uint64
var err error
switch blockNum {
case rpc.SafeBlockNumber:
blk, err = m.l1Reader.LatestSafeBlockNr(ctx)
if err != nil {
return nil, err
}
case rpc.FinalizedBlockNumber:
blk, err = m.l1Reader.LatestFinalizedBlockNr(ctx)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("getStateByRPCBlockNum requested with unknown blockNum: %v", blockNum)
}
headMelStateBlockNum, err := m.melDB.GetHeadMelStateBlockNum()
if err != nil {
return nil, err
}
state, err := m.melDB.State(min(headMelStateBlockNum, blk.Number.Uint64()))
state, err := m.melDB.State(min(headMelStateBlockNum, blk))
if err != nil {
return nil, err
}
return state, nil
}

func (m *MessageExtractor) SetBlockValidator(blockValidator *staker.BlockValidator) {
m.blockValidator = blockValidator
}

func (m *MessageExtractor) GetSafeMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
state, err := m.getStateByRPCBlockNum(ctx, rpc.SafeBlockNumber)
if err != nil {
Expand Down Expand Up @@ -288,7 +308,6 @@ func (m *MessageExtractor) GetL1Reader() *headerreader.HeaderReader {
return m.l1Reader
}

// GetFinalizedDelayedMessagesRead uses MessageExtractor's context for calls to parentChainReader
func (m *MessageExtractor) GetFinalizedDelayedMessagesRead() (uint64, error) {
ctx, err := m.GetContextSafe()
if err != nil {
Expand Down Expand Up @@ -332,6 +351,14 @@ func (m *MessageExtractor) GetDelayedMessage(index uint64) (*mel.DelayedInboxMes
return m.melDB.FetchDelayedMessage(index)
}

func (m *MessageExtractor) GetDelayedMessageBytes(ctx context.Context, seqNum uint64) ([]byte, error) {
delayedMsg, err := m.GetDelayedMessage(seqNum)
if err != nil {
return nil, err
}
return delayedMsg.Message.Serialize()
}

func (m *MessageExtractor) GetDelayedAcc(seqNum uint64) (common.Hash, error) {
delayedMsg, err := m.GetDelayedMessage(seqNum)
if err != nil {
Expand All @@ -340,9 +367,6 @@ func (m *MessageExtractor) GetDelayedAcc(seqNum uint64) (common.Hash, error) {
return delayedMsg.AfterInboxAcc(), nil
}

// GetDelayedCountAtParentChainBlock uses the caller-provided ctx (not m.GetContext())
// because it is called from FinalizedDelayedMessageAtPosition, which receives its
// context from the DelayedSequencer — a running component that supplies a valid context.
func (m *MessageExtractor) GetDelayedCountAtParentChainBlock(ctx context.Context, parentChainBlockNum uint64) (uint64, error) {
state, err := m.melDB.State(parentChainBlockNum)
if err != nil {
Expand All @@ -359,6 +383,12 @@ func (m *MessageExtractor) GetDelayedCount() (uint64, error) {
return state.DelayedMessagesSeen, nil
}

// FindParentChainBlockContainingDelayed is only relevant and invoked by txstreamer when batch gas cost data is nil for a
// batchpostingreport- but this should never be possible as ExtractMessages function would fill in the cost data during message extraction
func (m *MessageExtractor) FindParentChainBlockContainingDelayed(context.Context, uint64) (uint64, error) {
return 0, errors.New("FindParentChainBlockContainingDelayed is not implemented by MEL as batch gas cost data is already filled in during extraction")
}

func (m *MessageExtractor) GetBatchMetadata(seqNum uint64) (mel.BatchMetadata, error) {
headState, err := m.melDB.GetHeadMelState()
if err != nil {
Expand All @@ -375,7 +405,7 @@ func (m *MessageExtractor) GetBatchMetadata(seqNum uint64) (mel.BatchMetadata, e
}

func (m *MessageExtractor) SupportsPushingFinalityData() bool {
return false
return true
}

// FinalizedDelayedMessageAtPosition returns the delayed message at the
Expand Down Expand Up @@ -418,12 +448,16 @@ func (m *MessageExtractor) GetSequencerMessageBytes(ctx context.Context, seqNum
if err != nil {
return nil, common.Hash{}, err
}
return m.GetSequencerMessageBytesForParentBlock(ctx, seqNum, metadata.ParentChainBlock)
}

func (m *MessageExtractor) GetSequencerMessageBytesForParentBlock(ctx context.Context, seqNum uint64, parentChainBlock uint64) ([]byte, common.Hash, error) {
// No need to specify a max headers to fetch, as we are using the logs fetcher only, so we can pass in a 0.
logsFetcher := newLogsAndHeadersFetcher(m.parentChainReader, 0)
if err = logsFetcher.fetchSequencerBatchLogs(ctx, metadata.ParentChainBlock, metadata.ParentChainBlock); err != nil {
if err := logsFetcher.fetchSequencerBatchLogs(ctx, parentChainBlock, parentChainBlock); err != nil {
return nil, common.Hash{}, err
}
parentChainHeader, err := m.parentChainReader.HeaderByNumber(ctx, new(big.Int).SetUint64(metadata.ParentChainBlock))
parentChainHeader, err := m.parentChainReader.HeaderByNumber(ctx, new(big.Int).SetUint64(parentChainBlock))
if err != nil {
return nil, common.Hash{}, err
}
Expand All @@ -439,7 +473,7 @@ func (m *MessageExtractor) GetSequencerMessageBytes(ctx context.Context, seqNum
}
seenBatches = append(seenBatches, batch.SequenceNumber)
}
return nil, common.Hash{}, fmt.Errorf("sequencer batch %v not found in L1 block %v (found batches %v)", seqNum, metadata.ParentChainBlock, seenBatches)
return nil, common.Hash{}, fmt.Errorf("sequencer batch %v not found in L1 block %v (found batches %v)", seqNum, parentChainBlock, seenBatches)
}

// ReorgTo, when reorgEventsNotifier is set, should only be called after the readers of the channel are started as this is a blocking operation. To be only
Expand All @@ -458,6 +492,11 @@ func (m *MessageExtractor) ReorgTo(parentChainBlockNumber uint64) error {
return nil
}

func (m *MessageExtractor) GetBatchAcc(seqNum uint64) (common.Hash, error) {
batchMetadata, err := m.GetBatchMetadata(seqNum)
return batchMetadata.Accumulator, err
}

func (m *MessageExtractor) GetBatchMessageCount(seqNum uint64) (arbutil.MessageIndex, error) {
metadata, err := m.GetBatchMetadata(seqNum)
return metadata.MessageCount, err
Expand Down
3 changes: 3 additions & 0 deletions arbnode/mel/runner/process_next_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func (m *MessageExtractor) processNextBlock(ctx context.Context, current *fsm.Cu
if m.reorgEventsNotifier != nil {
m.reorgEventsNotifier <- preState.ParentChainBlockNumber
}
if m.blockValidator != nil {
m.blockValidator.ReorgToBatchCount(preState.BatchCount)
}
}
// Conditionally prefetch headers and logs for upcoming block/s
if err = m.logsAndHeadersPreFetcher.fetch(ctx, preState); err != nil {
Expand Down
Loading
Loading