diff --git a/arbnode/inbox_reader.go b/arbnode/inbox_reader.go index f542db0023..a7844b8769 100644 --- a/arbnode/inbox_reader.go +++ b/arbnode/inbox_reader.go @@ -22,6 +22,7 @@ import ( "github.com/offchainlabs/nitro/arbnode/mel" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/broadcastclient" + "github.com/offchainlabs/nitro/staker" "github.com/offchainlabs/nitro/util/arbmath" "github.com/offchainlabs/nitro/util/headerreader" "github.com/offchainlabs/nitro/util/stopwaiter" @@ -749,3 +750,77 @@ func (r *InboxReader) GetDelayBlocks() uint64 { func (r *InboxReader) SupportsPushingFinalityData() bool { return true } + +type ParentChainDataSource interface { + BatchDataProvider + staker.InboxTrackerInterface + staker.InboxReaderInterface + GetBatchMetadata(seqNum uint64) (mel.BatchMetadata, error) + GetBatchParentChainBlock(seqNum uint64) (uint64, error) + GetDelayedCount() (uint64, error) +} + +func (r *InboxReader) GetParentChainDataSource() ParentChainDataSource { + return &batchDataProviderImpl{r} +} + +type batchDataProviderImpl struct { + r *InboxReader +} + +func (b *batchDataProviderImpl) GetBatchCount() (uint64, error) { + return b.r.tracker.GetBatchCount() +} + +func (b *batchDataProviderImpl) GetBatchMessageCount(seqNum uint64) (arbutil.MessageIndex, error) { + return b.r.tracker.GetBatchMessageCount(seqNum) +} + +func (b *batchDataProviderImpl) GetDelayedAcc(seqNum uint64) (common.Hash, error) { + return b.r.tracker.GetDelayedAcc(seqNum) +} + +func (b *batchDataProviderImpl) GetSequencerMessageBytes(ctx context.Context, seqNum uint64) ([]byte, common.Hash, error) { + return b.r.GetSequencerMessageBytes(ctx, seqNum) +} + +func (b *batchDataProviderImpl) GetSequencerMessageBytesForParentBlock(ctx context.Context, seqNum uint64, parentChainBlock uint64) ([]byte, common.Hash, error) { + return b.r.GetSequencerMessageBytesForParentBlock(ctx, seqNum, parentChainBlock) +} + +func (b *batchDataProviderImpl) FindParentChainBlockContainingDelayed(ctx context.Context, index uint64) (uint64, error) { + _, _, localParentChainBlockNumber, err := b.r.tracker.getRawDelayedMessageAccumulatorAndParentChainBlockNumber(ctx, index) + return localParentChainBlockNumber, err +} + +func (b *batchDataProviderImpl) GetBatchMetadata(seqNum uint64) (mel.BatchMetadata, error) { + return b.r.tracker.GetBatchMetadata(seqNum) +} + +func (b *batchDataProviderImpl) GetBatchParentChainBlock(seqNum uint64) (uint64, error) { + return b.r.tracker.GetBatchParentChainBlock(seqNum) +} + +func (b *batchDataProviderImpl) FindInboxBatchContainingMessage(pos arbutil.MessageIndex) (uint64, bool, error) { + return b.r.tracker.FindInboxBatchContainingMessage(pos) +} + +func (b *batchDataProviderImpl) GetDelayedCount() (uint64, error) { + return b.r.tracker.GetDelayedCount() +} + +func (b *batchDataProviderImpl) SetBlockValidator(validator *staker.BlockValidator) { + b.r.tracker.SetBlockValidator(validator) +} + +func (b *batchDataProviderImpl) GetDelayedMessageBytes(ctx context.Context, seqNum uint64) ([]byte, error) { + return b.r.tracker.GetDelayedMessageBytes(ctx, seqNum) +} + +func (b *batchDataProviderImpl) GetBatchAcc(seqNum uint64) (common.Hash, error) { + return b.r.tracker.GetBatchAcc(seqNum) +} + +func (b *batchDataProviderImpl) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) { + return b.r.GetFinalizedMsgCount(ctx) +} diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index 679ec57521..feaabf23d7 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -24,8 +24,6 @@ import ( "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbstate" "github.com/offchainlabs/nitro/arbutil" - "github.com/offchainlabs/nitro/broadcaster" - "github.com/offchainlabs/nitro/broadcaster/message" "github.com/offchainlabs/nitro/daprovider" "github.com/offchainlabs/nitro/staker" "github.com/offchainlabs/nitro/util/containers" @@ -270,63 +268,6 @@ func (t *InboxTracker) FindInboxBatchContainingMessage(pos arbutil.MessageIndex) } } -func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcaster) error { - batchCount, err := t.GetBatchCount() - if err != nil { - return fmt.Errorf("error getting batch count: %w", err) - } - var startMessage arbutil.MessageIndex - if batchCount >= 2 { - // As in AddSequencerBatches, we want to keep the most recent batch's messages. - // This prevents issues if a user's L1 is a bit behind or an L1 reorg occurs. - // `batchCount - 2` is the index of the batch before the last batch. - batchIndex := batchCount - 2 - startMessage, err = t.GetBatchMessageCount(batchIndex) - if err != nil { - return fmt.Errorf("error getting batch %v message count: %w", batchIndex, err) - } - } - - if t.txStreamer == nil { - return errors.New("txStreamer is nil") - } - - messageCount, err := t.txStreamer.GetMessageCount() - if err != nil { - return fmt.Errorf("error getting tx streamer message count: %w", err) - } - var feedMessages []*message.BroadcastFeedMessage - for seqNum := startMessage; seqNum < messageCount; seqNum++ { - message, err := t.txStreamer.GetMessage(seqNum) - if err != nil { - return fmt.Errorf("error getting message %v: %w", seqNum, err) - } - - msgResult, err := t.txStreamer.ResultAtMessageIndex(seqNum) - var blockHash *common.Hash - if err == nil { - blockHash = &msgResult.BlockHash - } - - blockMetadata, err := t.txStreamer.BlockMetadataAtMessageIndex(seqNum) - if err != nil { - log.Warn("error getting blockMetadata byte array from tx streamer", "err", err) - } - - messageWithInfo := arbostypes.MessageWithMetadataAndBlockInfo{ - MessageWithMeta: *message, - BlockHash: blockHash, - BlockMetadata: blockMetadata, - } - feedMessage, err := broadcastServer.NewBroadcastFeedMessage(messageWithInfo, seqNum) - if err != nil { - return fmt.Errorf("error creating broadcast feed message %v: %w", seqNum, err) - } - feedMessages = append(feedMessages, feedMessage) - } - return broadcastServer.PopulateFeedBacklog(feedMessages) -} - func (t *InboxTracker) legacyGetDelayedMessageAndAccumulator(ctx context.Context, seqNum uint64) (*arbostypes.L1IncomingMessage, common.Hash, error) { key := dbKey(schema.LegacyDelayedMessagePrefix, seqNum) data, err := t.db.Get(key) @@ -344,7 +285,7 @@ func (t *InboxTracker) legacyGetDelayedMessageAndAccumulator(ctx context.Context } err = msg.FillInBatchGasFields(func(batchNum uint64) ([]byte, error) { - data, _, err := t.txStreamer.inboxReader.GetSequencerMessageBytes(ctx, batchNum) + data, _, err := t.txStreamer.batchDataProvider.GetSequencerMessageBytes(ctx, batchNum) return data, err }) @@ -357,7 +298,7 @@ func (t *InboxTracker) GetDelayedMessageAccumulatorAndParentChainBlockNumber(ctx return msg, acc, parentChainBlockNumber, err } err = msg.FillInBatchGasFields(func(batchNum uint64) ([]byte, error) { - data, _, err := t.txStreamer.inboxReader.GetSequencerMessageBytesForParentBlock(ctx, batchNum, parentChainBlockNumber) + data, _, err := t.txStreamer.batchDataProvider.GetSequencerMessageBytesForParentBlock(ctx, batchNum, parentChainBlockNumber) return data, err }) return msg, acc, parentChainBlockNumber, err @@ -941,7 +882,7 @@ func (t *InboxTracker) FinalizedDelayedMessageAtPosition( } } err = msg.FillInBatchGasFields(func(batchNum uint64) ([]byte, error) { - data, _, err := t.txStreamer.inboxReader.GetSequencerMessageBytesForParentBlock( + data, _, err := t.txStreamer.batchDataProvider.GetSequencerMessageBytesForParentBlock( ctx, batchNum, parentChainBlockNumber, ) return data, err diff --git a/arbnode/mel/runner/mel.go b/arbnode/mel/runner/mel.go index d221243391..f2d760cbf4 100644 --- a/arbnode/mel/runner/mel.go +++ b/arbnode/mel/runner/mel.go @@ -23,12 +23,13 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/offchainlabs/nitro/arbnode/mel" - melextraction "github.com/offchainlabs/nitro/arbnode/mel/extraction" + "github.com/offchainlabs/nitro/arbnode/mel/extraction" "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/bold/containers/fsm" "github.com/offchainlabs/nitro/cmd/chaininfo" "github.com/offchainlabs/nitro/daprovider" + "github.com/offchainlabs/nitro/staker" "github.com/offchainlabs/nitro/util/headerreader" "github.com/offchainlabs/nitro/util/stopwaiter" ) @@ -117,6 +118,8 @@ type MessageExtractor struct { reorgEventsNotifier chan uint64 seqBatchCounter SequencerBatchCountFetcher l1Reader *headerreader.HeaderReader + + blockValidator *staker.BlockValidator // TODO: remove post MEL block validation } // Creates a message extractor instance with the specified parameters, @@ -224,22 +227,39 @@ func (m *MessageExtractor) CurrentFSMState() FSMState { return m.fsm.Current().State } +// getStateByRPCBlockNum currently supports fetching of respective state for safe and finalized parent chain blocks func (m *MessageExtractor) getStateByRPCBlockNum(ctx context.Context, blockNum rpc.BlockNumber) (*mel.State, error) { - blk, err := m.parentChainReader.HeaderByNumber(ctx, big.NewInt(blockNum.Int64())) - if err != nil { - return nil, err + var blk uint64 + var err error + switch blockNum { + case rpc.SafeBlockNumber: + blk, err = m.l1Reader.LatestSafeBlockNr(ctx) + if err != nil { + return nil, err + } + case rpc.FinalizedBlockNumber: + blk, err = m.l1Reader.LatestFinalizedBlockNr(ctx) + if err != nil { + return nil, err + } + default: + return nil, fmt.Errorf("getStateByRPCBlockNum requested with unknown blockNum: %v", blockNum) } headMelStateBlockNum, err := m.melDB.GetHeadMelStateBlockNum() if err != nil { return nil, err } - state, err := m.melDB.State(min(headMelStateBlockNum, blk.Number.Uint64())) + state, err := m.melDB.State(min(headMelStateBlockNum, blk)) if err != nil { return nil, err } return state, nil } +func (m *MessageExtractor) SetBlockValidator(blockValidator *staker.BlockValidator) { + m.blockValidator = blockValidator +} + func (m *MessageExtractor) GetSafeMsgCount(ctx context.Context) (arbutil.MessageIndex, error) { state, err := m.getStateByRPCBlockNum(ctx, rpc.SafeBlockNumber) if err != nil { @@ -288,7 +308,6 @@ func (m *MessageExtractor) GetL1Reader() *headerreader.HeaderReader { return m.l1Reader } -// GetFinalizedDelayedMessagesRead uses MessageExtractor's context for calls to parentChainReader func (m *MessageExtractor) GetFinalizedDelayedMessagesRead() (uint64, error) { ctx, err := m.GetContextSafe() if err != nil { @@ -332,6 +351,14 @@ func (m *MessageExtractor) GetDelayedMessage(index uint64) (*mel.DelayedInboxMes return m.melDB.FetchDelayedMessage(index) } +func (m *MessageExtractor) GetDelayedMessageBytes(ctx context.Context, seqNum uint64) ([]byte, error) { + delayedMsg, err := m.GetDelayedMessage(seqNum) + if err != nil { + return nil, err + } + return delayedMsg.Message.Serialize() +} + func (m *MessageExtractor) GetDelayedAcc(seqNum uint64) (common.Hash, error) { delayedMsg, err := m.GetDelayedMessage(seqNum) if err != nil { @@ -340,9 +367,6 @@ func (m *MessageExtractor) GetDelayedAcc(seqNum uint64) (common.Hash, error) { return delayedMsg.AfterInboxAcc(), nil } -// GetDelayedCountAtParentChainBlock uses the caller-provided ctx (not m.GetContext()) -// because it is called from FinalizedDelayedMessageAtPosition, which receives its -// context from the DelayedSequencer — a running component that supplies a valid context. func (m *MessageExtractor) GetDelayedCountAtParentChainBlock(ctx context.Context, parentChainBlockNum uint64) (uint64, error) { state, err := m.melDB.State(parentChainBlockNum) if err != nil { @@ -359,6 +383,12 @@ func (m *MessageExtractor) GetDelayedCount() (uint64, error) { return state.DelayedMessagesSeen, nil } +// FindParentChainBlockContainingDelayed is only relevant and invoked by txstreamer when batch gas cost data is nil for a +// batchpostingreport- but this should never be possible as ExtractMessages function would fill in the cost data during message extraction +func (m *MessageExtractor) FindParentChainBlockContainingDelayed(context.Context, uint64) (uint64, error) { + return 0, errors.New("FindParentChainBlockContainingDelayed is not implemented by MEL as batch gas cost data is already filled in during extraction") +} + func (m *MessageExtractor) GetBatchMetadata(seqNum uint64) (mel.BatchMetadata, error) { headState, err := m.melDB.GetHeadMelState() if err != nil { @@ -375,7 +405,7 @@ func (m *MessageExtractor) GetBatchMetadata(seqNum uint64) (mel.BatchMetadata, e } func (m *MessageExtractor) SupportsPushingFinalityData() bool { - return false + return true } // FinalizedDelayedMessageAtPosition returns the delayed message at the @@ -418,12 +448,16 @@ func (m *MessageExtractor) GetSequencerMessageBytes(ctx context.Context, seqNum if err != nil { return nil, common.Hash{}, err } + return m.GetSequencerMessageBytesForParentBlock(ctx, seqNum, metadata.ParentChainBlock) +} + +func (m *MessageExtractor) GetSequencerMessageBytesForParentBlock(ctx context.Context, seqNum uint64, parentChainBlock uint64) ([]byte, common.Hash, error) { // No need to specify a max headers to fetch, as we are using the logs fetcher only, so we can pass in a 0. logsFetcher := newLogsAndHeadersFetcher(m.parentChainReader, 0) - if err = logsFetcher.fetchSequencerBatchLogs(ctx, metadata.ParentChainBlock, metadata.ParentChainBlock); err != nil { + if err := logsFetcher.fetchSequencerBatchLogs(ctx, parentChainBlock, parentChainBlock); err != nil { return nil, common.Hash{}, err } - parentChainHeader, err := m.parentChainReader.HeaderByNumber(ctx, new(big.Int).SetUint64(metadata.ParentChainBlock)) + parentChainHeader, err := m.parentChainReader.HeaderByNumber(ctx, new(big.Int).SetUint64(parentChainBlock)) if err != nil { return nil, common.Hash{}, err } @@ -439,7 +473,7 @@ func (m *MessageExtractor) GetSequencerMessageBytes(ctx context.Context, seqNum } seenBatches = append(seenBatches, batch.SequenceNumber) } - return nil, common.Hash{}, fmt.Errorf("sequencer batch %v not found in L1 block %v (found batches %v)", seqNum, metadata.ParentChainBlock, seenBatches) + return nil, common.Hash{}, fmt.Errorf("sequencer batch %v not found in L1 block %v (found batches %v)", seqNum, parentChainBlock, seenBatches) } // ReorgTo, when reorgEventsNotifier is set, should only be called after the readers of the channel are started as this is a blocking operation. To be only @@ -458,6 +492,11 @@ func (m *MessageExtractor) ReorgTo(parentChainBlockNumber uint64) error { return nil } +func (m *MessageExtractor) GetBatchAcc(seqNum uint64) (common.Hash, error) { + batchMetadata, err := m.GetBatchMetadata(seqNum) + return batchMetadata.Accumulator, err +} + func (m *MessageExtractor) GetBatchMessageCount(seqNum uint64) (arbutil.MessageIndex, error) { metadata, err := m.GetBatchMetadata(seqNum) return metadata.MessageCount, err diff --git a/arbnode/mel/runner/process_next_block.go b/arbnode/mel/runner/process_next_block.go index dc9641fc86..406df917ae 100644 --- a/arbnode/mel/runner/process_next_block.go +++ b/arbnode/mel/runner/process_next_block.go @@ -76,6 +76,9 @@ func (m *MessageExtractor) processNextBlock(ctx context.Context, current *fsm.Cu if m.reorgEventsNotifier != nil { m.reorgEventsNotifier <- preState.ParentChainBlockNumber } + if m.blockValidator != nil { + m.blockValidator.ReorgToBatchCount(preState.BatchCount) + } } // Conditionally prefetch headers and logs for upcoming block/s if err = m.logsAndHeadersPreFetcher.fetch(ctx, preState); err != nil { diff --git a/arbnode/message_pruner.go b/arbnode/message_pruner.go index 2f47e6aee3..c8f169d962 100644 --- a/arbnode/message_pruner.go +++ b/arbnode/message_pruner.go @@ -25,8 +25,9 @@ import ( type MessagePruner struct { stopwaiter.StopWaiter + consensusDB ethdb.Database transactionStreamer *TransactionStreamer - inboxTracker *InboxTracker + batchMetaFetcher BatchMetadataFetcher config MessagePrunerConfigFetcher pruningLock sync.Mutex lastPruneDone time.Time @@ -55,10 +56,11 @@ func MessagePrunerConfigAddOptions(prefix string, f *pflag.FlagSet) { f.Uint64(prefix+".min-batches-left", DefaultMessagePrunerConfig.MinBatchesLeft, "min number of batches not pruned") } -func NewMessagePruner(transactionStreamer *TransactionStreamer, inboxTracker *InboxTracker, config MessagePrunerConfigFetcher) *MessagePruner { +func NewMessagePruner(consensusDB ethdb.Database, transactionStreamer *TransactionStreamer, batchMetaFetcher BatchMetadataFetcher, config MessagePrunerConfigFetcher) *MessagePruner { return &MessagePruner{ + consensusDB: consensusDB, transactionStreamer: transactionStreamer, - inboxTracker: inboxTracker, + batchMetaFetcher: batchMetaFetcher, config: config, } } @@ -93,7 +95,7 @@ func (m *MessagePruner) UpdateLatestConfirmed(count arbutil.MessageIndex, global func (m *MessagePruner) prune(ctx context.Context, count arbutil.MessageIndex, globalState validator.GoGlobalState) error { trimBatchCount := globalState.Batch minBatchesLeft := m.config().MinBatchesLeft - batchCount, err := m.inboxTracker.GetBatchCount() + batchCount, err := m.batchMetaFetcher.GetBatchCount() if err != nil { return err } @@ -106,7 +108,7 @@ func (m *MessagePruner) prune(ctx context.Context, count arbutil.MessageIndex, g if trimBatchCount < 1 { return nil } - endBatchMetadata, err := m.inboxTracker.GetBatchMetadata(trimBatchCount - 1) + endBatchMetadata, err := m.batchMetaFetcher.GetBatchMetadata(trimBatchCount - 1) if err != nil { return err } @@ -125,7 +127,7 @@ func (m *MessagePruner) deleteOldMessagesFromDB(ctx context.Context, messageCoun m.cachedPrunedMessages = fetchLastPrunedKey(m.transactionStreamer.db, schema.LastPrunedMessageKey) } if m.cachedPrunedDelayedMessages == 0 { - m.cachedPrunedDelayedMessages = fetchLastPrunedKey(m.inboxTracker.db, schema.LastPrunedDelayedMessageKey) + m.cachedPrunedDelayedMessages = fetchLastPrunedKey(m.consensusDB, schema.LastPrunedDelayedMessageKey) } prunedKeysRange, _, err := deleteFromLastPrunedUptoEndKey(ctx, m.transactionStreamer.db, schema.MessageResultPrefix, m.cachedPrunedMessages, uint64(messageCount)) if err != nil { @@ -153,14 +155,14 @@ func (m *MessagePruner) deleteOldMessagesFromDB(ctx context.Context, messageCoun insertLastPrunedKey(m.transactionStreamer.db, schema.LastPrunedMessageKey, lastPrunedMessage) m.cachedPrunedMessages = lastPrunedMessage - prunedKeysRange, lastPrunedDelayedMessage, err := deleteFromLastPrunedUptoEndKey(ctx, m.inboxTracker.db, schema.RlpDelayedMessagePrefix, m.cachedPrunedDelayedMessages, delayedMessageCount) + prunedKeysRange, lastPrunedDelayedMessage, err := deleteFromLastPrunedUptoEndKey(ctx, m.consensusDB, schema.RlpDelayedMessagePrefix, m.cachedPrunedDelayedMessages, delayedMessageCount) if err != nil { return fmt.Errorf("error deleting last batch delayed messages: %w", err) } if len(prunedKeysRange) > 0 { log.Info("Pruned last batch delayed messages:", "first pruned key", prunedKeysRange[0], "last pruned key", prunedKeysRange[len(prunedKeysRange)-1]) } - insertLastPrunedKey(m.inboxTracker.db, schema.LastPrunedDelayedMessageKey, lastPrunedDelayedMessage) + insertLastPrunedKey(m.consensusDB, schema.LastPrunedDelayedMessageKey, lastPrunedDelayedMessage) m.cachedPrunedDelayedMessages = lastPrunedDelayedMessage return nil } diff --git a/arbnode/message_pruner_test.go b/arbnode/message_pruner_test.go index 374c7d5e14..10e28b6dd3 100644 --- a/arbnode/message_pruner_test.go +++ b/arbnode/message_pruner_test.go @@ -98,7 +98,8 @@ func setupDatabase(t *testing.T, messageCount, delayedMessageCount uint64) (ethd return inboxTrackerDb, transactionStreamerDb, &MessagePruner{ transactionStreamer: &TransactionStreamer{db: transactionStreamerDb}, - inboxTracker: &InboxTracker{db: inboxTrackerDb}, + consensusDB: inboxTrackerDb, + batchMetaFetcher: &InboxTracker{db: inboxTrackerDb}, } } diff --git a/arbnode/node.go b/arbnode/node.go index 85ae889378..bd093a22bc 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -767,10 +767,6 @@ func getInboxTrackerAndReader( if err != nil { return nil, nil, err } - if err := txStreamer.SetInboxReaders(inboxReader, delayedBridge); err != nil { - return nil, nil, err - } - return inboxTracker, inboxReader, nil } @@ -899,7 +895,7 @@ func getBlockValidator( config *Config, configFetcher ConfigFetcher, statelessBlockValidator *staker.StatelessBlockValidator, - inboxTracker *InboxTracker, + inboxTracker staker.InboxTrackerInterface, txStreamer *TransactionStreamer, fatalErrChan chan error, ) (*staker.BlockValidator, error) { @@ -932,8 +928,9 @@ func getStaker( l1client *ethclient.Client, deployInfo *chaininfo.RollupAddresses, txStreamer *TransactionStreamer, - inboxTracker *InboxTracker, - inboxReader *InboxReader, + inboxReader staker.InboxReaderInterface, + inboxTracker staker.InboxTrackerInterface, + batchMetaFetcher BatchMetadataFetcher, stack *node.Node, fatalErrChan chan error, statelessBlockValidator *staker.StatelessBlockValidator, @@ -990,7 +987,7 @@ func getStaker( var confirmedNotifiers []legacystaker.LatestConfirmedNotifier if config.MessagePruner.Enable { - messagePruner = NewMessagePruner(txStreamer, inboxTracker, func() *MessagePrunerConfig { return &configFetcher.Get().MessagePruner }) + messagePruner = NewMessagePruner(consensusDB, txStreamer, batchMetaFetcher, func() *MessagePrunerConfig { return &configFetcher.Get().MessagePruner }) confirmedNotifiers = append(confirmedNotifiers, messagePruner) } @@ -1062,8 +1059,8 @@ func getSeqCoordinator( func getStatelessBlockValidator( config *Config, configFetcher ConfigFetcher, - inboxReader *InboxReader, - inboxTracker *InboxTracker, + inboxReader staker.InboxReaderInterface, + inboxTracker staker.InboxTrackerInterface, txStreamer *TransactionStreamer, exec execution.ExecutionRecorder, consensusDB ethdb.Database, @@ -1167,8 +1164,7 @@ func getBatchPoster( func getDelayedSequencer( l1Reader *headerreader.HeaderReader, - inboxTracker *InboxTracker, - msgExtractor *melrunner.MessageExtractor, + delayedMessageFetcher DelayedMessageFetcher, delayedBridge *DelayedBridge, exec execution.ExecutionSequencer, configFetcher ConfigFetcher, @@ -1178,14 +1174,7 @@ func getDelayedSequencer( // No ExecutionSequencer means delayed messages cannot be sequenced. return nil, nil } - // Convert typed nil *MessageExtractor to untyped nil so the interface parameter - // in NewDelayedSequencer is properly nil (Go nil-interface semantics). - var delayedMessageFetcher DelayedMessageFetcher - if inboxTracker != nil { - delayedMessageFetcher = inboxTracker - } else if msgExtractor != nil { - delayedMessageFetcher = msgExtractor - } else { + if delayedMessageFetcher == nil { return nil, errors.New("delayed sequencer requires either an inbox tracker or a message extractor") } // always create DelayedSequencer if exec is non nil, it won't do anything if it is disabled @@ -1345,25 +1334,40 @@ func createNodeImpl( return nil, err } if messageExtractor != nil { - if err := txStreamer.SetMsgExtractor(messageExtractor); err != nil { + if err := messageExtractor.SetMessageConsumer(txStreamer); err != nil { return nil, err } - if err := messageExtractor.SetMessageConsumer(txStreamer); err != nil { + } + + var batchDataProvider BatchDataProvider + if inboxReader != nil && inboxTracker != nil { + batchDataProvider = inboxReader.GetParentChainDataSource() + } else if messageExtractor != nil { + batchDataProvider = messageExtractor + } + if batchDataProvider != nil { + if err := txStreamer.SetBatchDataProvider(batchDataProvider, delayedBridge); err != nil { return nil, err } } - statelessBlockValidator, err := getStatelessBlockValidator(config, configFetcher, inboxReader, inboxTracker, txStreamer, executionRecorder, consensusDB, dapRegistry, stack, latestWasmModuleRoot) - if err != nil { - return nil, err + // TODO: rename staker.InboxReaderInterface and staker.InboxTrackerInterface to a better name + var validatorInboxReader staker.InboxReaderInterface + var validatorInboxTracker staker.InboxTrackerInterface + if messageExtractor != nil { + validatorInboxReader = messageExtractor + validatorInboxTracker = messageExtractor + } else { + validatorInboxReader = inboxReader + validatorInboxTracker = inboxTracker } - blockValidator, err := getBlockValidator(config, configFetcher, statelessBlockValidator, inboxTracker, txStreamer, fatalErrChan) + statelessBlockValidator, err := getStatelessBlockValidator(config, configFetcher, validatorInboxReader, validatorInboxTracker, txStreamer, executionRecorder, consensusDB, dapRegistry, stack, latestWasmModuleRoot) if err != nil { return nil, err } - stakerObj, messagePruner, stakerAddr, err := getStaker(ctx, config, configFetcher, consensusDB, l1Reader, txOptsValidator, syncMonitor, parentChain, l1client, deployInfo, txStreamer, inboxTracker, inboxReader, stack, fatalErrChan, statelessBlockValidator, blockValidator, dapRegistry) + blockValidator, err := getBlockValidator(config, configFetcher, statelessBlockValidator, validatorInboxTracker, txStreamer, fatalErrChan) if err != nil { return nil, err } @@ -1374,12 +1378,26 @@ func createNodeImpl( } else if messageExtractor != nil { batchMetaFetcher = messageExtractor } + + stakerObj, messagePruner, stakerAddr, err := getStaker(ctx, config, configFetcher, consensusDB, l1Reader, txOptsValidator, syncMonitor, parentChain, l1client, deployInfo, txStreamer, validatorInboxReader, validatorInboxTracker, batchMetaFetcher, stack, fatalErrChan, statelessBlockValidator, blockValidator, dapRegistry) + if err != nil { + return nil, err + } + batchPoster, err := getBatchPoster(ctx, config, configFetcher, l2Config, txOptsBatchPoster, dapWriters, l1Reader, batchMetaFetcher, txStreamer, arbOSVersionGetter, consensusDB, syncMonitor, deployInfo, parentChain, dapRegistry, stakerAddr) if err != nil { return nil, err } - delayedSequencer, err := getDelayedSequencer(l1Reader, inboxTracker, messageExtractor, delayedBridge, executionSequencer, configFetcher, coordinator) + // Convert typed nil *MessageExtractor to untyped nil so the interface parameter + // in NewDelayedSequencer is properly nil (Go nil-interface semantics). + var delayedMessageFetcher DelayedMessageFetcher + if inboxTracker != nil { + delayedMessageFetcher = inboxTracker + } else if messageExtractor != nil { + delayedMessageFetcher = messageExtractor + } + delayedSequencer, err := getDelayedSequencer(l1Reader, delayedMessageFetcher, delayedBridge, executionSequencer, configFetcher, coordinator) if err != nil { return nil, err } @@ -1795,32 +1813,28 @@ func (n *Node) BlockMetadataAtMessageIndex(msgIdx arbutil.MessageIndex) containe return containers.NewReadyPromise(n.TxStreamer.BlockMetadataAtMessageIndex(msgIdx)) } +func (n *Node) GetParentChainDataSource() ParentChainDataSource { + if n.MessageExtractor != nil { + return n.MessageExtractor + } + return n.InboxReader.GetParentChainDataSource() +} + func (n *Node) GetL1Confirmations(msgIdx arbutil.MessageIndex) containers.PromiseInterface[uint64] { if n.L1Reader == nil { return containers.NewReadyPromise(uint64(0), nil) } // batches not yet posted have 0 confirmations but no error - var batchNum uint64 - var found bool - var err error - if n.MessageExtractor != nil { - batchNum, found, err = n.MessageExtractor.FindInboxBatchContainingMessage(msgIdx) - } else { - batchNum, found, err = n.InboxTracker.FindInboxBatchContainingMessage(msgIdx) - } + pcds := n.GetParentChainDataSource() + batchNum, found, err := pcds.FindInboxBatchContainingMessage(msgIdx) if err != nil { return containers.NewReadyPromise(uint64(0), err) } if !found { return containers.NewReadyPromise(uint64(0), nil) } - var parentChainBlockNum uint64 - if n.MessageExtractor != nil { - parentChainBlockNum, err = n.MessageExtractor.GetBatchParentChainBlock(batchNum) - } else { - parentChainBlockNum, err = n.InboxTracker.GetBatchParentChainBlock(batchNum) - } + parentChainBlockNum, err := pcds.GetBatchParentChainBlock(batchNum) if err != nil { return containers.NewReadyPromise(uint64(0), err) } @@ -1873,14 +1887,7 @@ func (n *Node) GetL1Confirmations(msgIdx arbutil.MessageIndex) containers.Promis } func (n *Node) FindBatchContainingMessage(msgIdx arbutil.MessageIndex) containers.PromiseInterface[uint64] { - var batchNum uint64 - var found bool - var err error - if n.MessageExtractor != nil { - batchNum, found, err = n.MessageExtractor.FindInboxBatchContainingMessage(msgIdx) - } else { - batchNum, found, err = n.InboxTracker.FindInboxBatchContainingMessage(msgIdx) - } + batchNum, found, err := n.GetParentChainDataSource().FindInboxBatchContainingMessage(msgIdx) if err == nil && !found { return containers.NewReadyPromise(uint64(0), errors.New("block not yet found on any batch")) } diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 6f05d405a3..8b061e0dad 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/offchainlabs/nitro/arbnode/db/schema" - "github.com/offchainlabs/nitro/arbnode/mel/runner" "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/broadcastclient" @@ -46,6 +45,15 @@ var ( messageTimer = metrics.NewRegisteredHistogram("arb/txstreamer/message/duration", nil, metrics.NewBoundedHistogramSample()) ) +type BatchDataProvider interface { + GetBatchCount() (uint64, error) + GetBatchMessageCount(seqNum uint64) (arbutil.MessageIndex, error) + GetDelayedAcc(seqNum uint64) (common.Hash, error) + GetSequencerMessageBytes(ctx context.Context, seqNum uint64) ([]byte, common.Hash, error) + GetSequencerMessageBytesForParentBlock(ctx context.Context, seqNum uint64, parentChainBlock uint64) ([]byte, common.Hash, error) + FindParentChainBlockContainingDelayed(ctx context.Context, index uint64) (uint64, error) +} + // TransactionStreamer produces blocks from a node's L1 messages, storing the results in the blockchain and recording their positions // The streamer is notified when there's new batches to process type TransactionStreamer struct { @@ -70,11 +78,10 @@ type TransactionStreamer struct { broadcasterQueuedMessagesFirstMsgIdx atomic.Uint64 broadcasterQueuedMessagesActiveReorg bool - coordinator *SeqCoordinator - broadcastServer *broadcaster.Broadcaster - inboxReader *InboxReader - msgExtractor *melrunner.MessageExtractor - delayedBridge *DelayedBridge + coordinator *SeqCoordinator + broadcastServer *broadcaster.Broadcaster + batchDataProvider BatchDataProvider + delayedBridge *DelayedBridge trackBlockMetadataFrom arbutil.MessageIndex syncTillMessage arbutil.MessageIndex @@ -188,35 +195,18 @@ func (s *TransactionStreamer) SetSeqCoordinator(coordinator *SeqCoordinator) err return nil } -func (s *TransactionStreamer) SetInboxReaders(inboxReader *InboxReader, delayedBridge *DelayedBridge) error { +func (s *TransactionStreamer) SetBatchDataProvider(provider BatchDataProvider, delayedBridge *DelayedBridge) error { if s.Started() { - return errors.New("trying to set inbox reader after start") + return errors.New("trying to set batch data provider after start") } - if s.inboxReader != nil || s.delayedBridge != nil { - return errors.New("trying to set inbox reader when already set") + if s.batchDataProvider != nil || s.delayedBridge != nil { + return errors.New("trying to set batch data provider when already set") } - if s.msgExtractor != nil { - return errors.New("cannot set inbox reader: message extractor is already set") - } - s.inboxReader = inboxReader + s.batchDataProvider = provider s.delayedBridge = delayedBridge return nil } -func (s *TransactionStreamer) SetMsgExtractor(msgExtractor *melrunner.MessageExtractor) error { - if s.Started() { - return errors.New("trying to set message extractor after start") - } - if s.msgExtractor != nil { - return errors.New("trying to set message extractor when already set") - } - if s.inboxReader != nil { - return errors.New("cannot set message extractor: inbox reader is already set") - } - s.msgExtractor = msgExtractor - return nil -} - func (s *TransactionStreamer) ChainConfig() *params.ChainConfig { return s.chainConfig } @@ -362,19 +352,16 @@ func (s *TransactionStreamer) addMessagesAndReorg(batch ethdb.Batch, msgIdxOfFir log.Error("delayed message header RequestId doesn't match database DelayedMessagesRead", "header", oldMessage.Message.Header, "delayedMessagesRead", oldMessage.DelayedMessagesRead) continue } - if s.msgExtractor != nil { - continue - } if delayedMsgIdx != lastDelayedMsgIdx { // This is the wrong position for the delayed message continue } - if s.inboxReader != nil { + if s.batchDataProvider != nil && s.delayedBridge != nil { // this is a delayed message. Should be resequenced if all 3 agree: // oldMessage, accumulator stored in tracker, and the message re-read from l1 - expectedAcc, err := s.inboxReader.tracker.GetDelayedAcc(delayedMsgIdx) + expectedAcc, err := s.batchDataProvider.GetDelayedAcc(delayedMsgIdx) if err != nil { if !strings.Contains(err.Error(), "not found") { log.Error("reorg-resequence: failed to read expected accumulator", "err", err) @@ -506,8 +493,8 @@ func (s *TransactionStreamer) GetMessage(msgIdx arbutil.MessageIndex) (*arbostyp } var parentChainBlockNumber *uint64 - if message.DelayedMessagesRead != 0 && s.inboxReader != nil && s.inboxReader.tracker != nil { - _, _, localParentChainBlockNumber, err := s.inboxReader.tracker.getRawDelayedMessageAccumulatorAndParentChainBlockNumber(ctx, message.DelayedMessagesRead-1) + if message.DelayedMessagesRead != 0 && s.batchDataProvider != nil { + localParentChainBlockNumber, err := s.batchDataProvider.FindParentChainBlockContainingDelayed(ctx, message.DelayedMessagesRead-1) if err != nil { log.Warn("Failed to fetch parent chain block number for delayed message. Will fall back to BatchMetadata", "idx", message.DelayedMessagesRead-1) } else { @@ -515,7 +502,7 @@ func (s *TransactionStreamer) GetMessage(msgIdx arbutil.MessageIndex) (*arbostyp } } - if s.inboxReader != nil { + if s.batchDataProvider != nil { err = message.Message.FillInBatchGasFields(func(batchNum uint64) ([]byte, error) { ctx, err := s.GetContextSafe() if err != nil { @@ -524,9 +511,9 @@ func (s *TransactionStreamer) GetMessage(msgIdx arbutil.MessageIndex) (*arbostyp var data []byte if parentChainBlockNumber != nil { - data, _, err = s.inboxReader.GetSequencerMessageBytesForParentBlock(ctx, batchNum, *parentChainBlockNumber) + data, _, err = s.batchDataProvider.GetSequencerMessageBytesForParentBlock(ctx, batchNum, *parentChainBlockNumber) } else { - data, _, err = s.inboxReader.GetSequencerMessageBytes(ctx, batchNum) + data, _, err = s.batchDataProvider.GetSequencerMessageBytes(ctx, batchNum) } if err != nil { return nil, err @@ -1184,16 +1171,10 @@ func (s *TransactionStreamer) ResumeReorgs() { } func (s *TransactionStreamer) PopulateFeedBacklog(ctx context.Context) error { - if s.broadcastServer == nil { - return nil - } - if s.inboxReader != nil { - return s.inboxReader.tracker.PopulateFeedBacklog(s.broadcastServer) - } - if s.msgExtractor == nil { + if s.broadcastServer == nil || s.batchDataProvider == nil { return nil } - batchCount, err := s.msgExtractor.GetBatchCount() + batchCount, err := s.batchDataProvider.GetBatchCount() if err != nil { return fmt.Errorf("error getting batch count: %w", err) } @@ -1203,7 +1184,7 @@ func (s *TransactionStreamer) PopulateFeedBacklog(ctx context.Context) error { // This prevents issues if a user's L1 is a bit behind or an L1 reorg occurs. // `batchCount - 2` is the index of the batch before the last batch. batchIndex := batchCount - 2 - startMessage, err = s.msgExtractor.GetBatchMessageCount(batchIndex) + startMessage, err = s.batchDataProvider.GetBatchMessageCount(batchIndex) if err != nil { return fmt.Errorf("error getting batch %v message count: %w", batchIndex, err) } diff --git a/changelog/ganeshvanahalli-nit-4753.md b/changelog/ganeshvanahalli-nit-4753.md new file mode 100644 index 0000000000..c86b14bc4f --- /dev/null +++ b/changelog/ganeshvanahalli-nit-4753.md @@ -0,0 +1,2 @@ +### Changed + - Replace InboxReader and InboxTracker implementation with Message extractor code \ No newline at end of file diff --git a/system_tests/batch_poster_test.go b/system_tests/batch_poster_test.go index 13bdfe6451..fc71c63520 100644 --- a/system_tests/batch_poster_test.go +++ b/system_tests/batch_poster_test.go @@ -145,12 +145,7 @@ func testBatchPosterParallel(t *testing.T, useRedis bool, useRedisLock bool) { if err != nil { t.Fatalf("Failed to get parent chain id: %v", err) } - var batchMetaFetcher arbnode.BatchMetadataFetcher - if builder.L2.ConsensusNode.InboxTracker != nil { - batchMetaFetcher = builder.L2.ConsensusNode.InboxTracker - } else if builder.L2.ConsensusNode.MessageExtractor != nil { - batchMetaFetcher = builder.L2.ConsensusNode.MessageExtractor - } + batchMetaFetcher := builder.L2.ConsensusNode.GetParentChainDataSource() for i := 0; i < parallelBatchPosters; i++ { // Make a copy of the batch poster config so NewBatchPoster calling Validate() on it doesn't race batchPosterConfig := builder.nodeConfig.BatchPoster @@ -291,12 +286,7 @@ func TestRedisBatchPosterHandoff(t *testing.T) { if err != nil { t.Fatalf("Failed to get parent chain id: %v", err) } - var batchMetaFetcher arbnode.BatchMetadataFetcher - if builder.L2.ConsensusNode.InboxTracker != nil { - batchMetaFetcher = builder.L2.ConsensusNode.InboxTracker - } else if builder.L2.ConsensusNode.MessageExtractor != nil { - batchMetaFetcher = builder.L2.ConsensusNode.MessageExtractor - } + batchMetaFetcher := builder.L2.ConsensusNode.GetParentChainDataSource() newBatchPoster := func() *arbnode.BatchPoster { // Make a copy of the batch poster config so NewBatchPoster calling Validate() on it doesn't race batchPosterConfig := builder.nodeConfig.BatchPoster @@ -444,9 +434,9 @@ func TestBatchPosterKeepsUp(t *testing.T) { start := time.Now() for { time.Sleep(time.Second) - batches, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() + batches, err := builder.L2.ConsensusNode.GetParentChainDataSource().GetBatchCount() Require(t, err) - postedMessages, err := builder.L2.ConsensusNode.InboxTracker.GetBatchMessageCount(batches - 1) + postedMessages, err := builder.L2.ConsensusNode.GetParentChainDataSource().GetBatchMessageCount(batches - 1) Require(t, err) haveMessages, err := builder.L2.ConsensusNode.TxStreamer.GetMessageCount() Require(t, err) @@ -836,7 +826,7 @@ func TestBatchPosterL1SurplusMatchesBatchGasFlaky(t *testing.T) { var batchNum uint64 for { var found bool - batchNum, found, err = builder.L2.ConsensusNode.InboxTracker.FindInboxBatchContainingMessage(arbutil.MessageIndex(l2Block.NumberU64())) + batchNum, found, err = builder.L2.ConsensusNode.GetParentChainDataSource().FindInboxBatchContainingMessage(arbutil.MessageIndex(l2Block.NumberU64())) if err == nil && found { break } @@ -946,11 +936,7 @@ func TestBatchPosterActuallyPostsBlobsToL1(t *testing.T) { Require(t, err) var melBatchCount uint64 for range 10 { - if builder.L2.ConsensusNode.MessageExtractor != nil { - melBatchCount, err = builder.L2.ConsensusNode.MessageExtractor.GetBatchCount() - } else { - melBatchCount, err = builder.L2.ConsensusNode.InboxTracker.GetBatchCount() - } + melBatchCount, err = builder.L2.ConsensusNode.GetParentChainDataSource().GetBatchCount() Require(t, err) if melBatchCount == batchCount { break @@ -967,11 +953,7 @@ func TestBatchPosterActuallyPostsBlobsToL1(t *testing.T) { var sequencerMessageBytes []byte retryUntilFound(t, ctx, 30, 100*time.Millisecond, fmt.Sprintf("GetSequencerMessageBytes(seq %d)", sequenceNum), "not found in L1 block", func() error { var getErr error - if builder.L2.ConsensusNode.MessageExtractor != nil { - sequencerMessageBytes, _, err = builder.L2.ConsensusNode.MessageExtractor.GetSequencerMessageBytes(ctx, sequenceNum) - } else { - sequencerMessageBytes, _, err = builder.L2.ConsensusNode.InboxReader.GetSequencerMessageBytes(ctx, sequenceNum) - } + sequencerMessageBytes, _, getErr = builder.L2.ConsensusNode.GetParentChainDataSource().GetSequencerMessageBytes(ctx, sequenceNum) return getErr }) diff --git a/system_tests/batch_size_limit_test.go b/system_tests/batch_size_limit_test.go index e33c4bd861..920918eaed 100644 --- a/system_tests/batch_size_limit_test.go +++ b/system_tests/batch_size_limit_test.go @@ -149,12 +149,7 @@ func checkReceiverAccountBalance(t *testing.T, ctx context.Context, builder *Nod // ensureBatchWasProcessed waits until a particular batch has been processed by the L2 node. func ensureBatchWasProcessed(t *testing.T, builder *NodeBuilder, batchNum uint64) { require.Eventuallyf(t, func() bool { - var err error - if builder.L2.ConsensusNode.MessageExtractor != nil { - _, err = builder.L2.ConsensusNode.MessageExtractor.GetBatchMetadata(batchNum) - } else { - _, err = builder.L2.ConsensusNode.InboxTracker.GetBatchMetadata(batchNum) - } + _, err := builder.L2.ConsensusNode.GetParentChainDataSource().GetBatchMetadata(batchNum) return err == nil }, 5*time.Second, time.Second, "Batch %d was not processed in time", batchNum) } diff --git a/system_tests/bold_challenge_protocol_test.go b/system_tests/bold_challenge_protocol_test.go index a1e558dcea..159f765754 100644 --- a/system_tests/bold_challenge_protocol_test.go +++ b/system_tests/bold_challenge_protocol_test.go @@ -174,9 +174,10 @@ func testChallengeProtocolBOLD(t *gotesting.T, useExternalSigner bool, useRedis blockValidatorConfig.RedisValidationClientConfig.RedisURL = redisURL locator, err := server_common.NewMachineLocator("") Require(t, err) + pcdsA := l2nodeA.GetParentChainDataSource() statelessA, err := staker.NewStatelessBlockValidator( - l2nodeA.InboxReader, - l2nodeA.InboxTracker, + pcdsA, + pcdsA, l2nodeA.TxStreamer, l2nodeA.ExecutionRecorder, l2nodeA.ConsensusDB, @@ -192,9 +193,10 @@ func testChallengeProtocolBOLD(t *gotesting.T, useExternalSigner bool, useRedis valCfg.UseJit = false _, valStackB := createTestValidationNode(t, ctx, &valCfg, spawnerOpts...) + pcdsB := l2nodeB.GetParentChainDataSource() statelessB, err := staker.NewStatelessBlockValidator( - l2nodeB.InboxReader, - l2nodeB.InboxTracker, + pcdsB, + pcdsB, l2nodeB.TxStreamer, l2nodeB.ExecutionRecorder, l2nodeB.ConsensusDB, @@ -209,7 +211,7 @@ func testChallengeProtocolBOLD(t *gotesting.T, useExternalSigner bool, useRedis blockValidatorA, err := staker.NewBlockValidator( statelessA, - l2nodeA.InboxTracker, + pcdsA, l2nodeA.TxStreamer, StaticFetcherFrom(t, &blockValidatorConfig), nil, @@ -220,7 +222,7 @@ func testChallengeProtocolBOLD(t *gotesting.T, useExternalSigner bool, useRedis blockValidatorB, err := staker.NewBlockValidator( statelessB, - l2nodeB.InboxTracker, + pcdsB, l2nodeB.TxStreamer, StaticFetcherFrom(t, &blockValidatorConfig), nil, @@ -239,9 +241,9 @@ func testChallengeProtocolBOLD(t *gotesting.T, useExternalSigner bool, useRedis CheckBatchFinality: false, }, goodDir, - l2nodeA.InboxTracker, + pcdsA, l2nodeA.TxStreamer, - l2nodeA.InboxReader, + pcdsA, nil, ) Require(t, err) @@ -256,9 +258,9 @@ func testChallengeProtocolBOLD(t *gotesting.T, useExternalSigner bool, useRedis CheckBatchFinality: false, }, evilDir, - l2nodeB.InboxTracker, + pcdsB, l2nodeB.TxStreamer, - l2nodeB.InboxReader, + pcdsB, nil, ) Require(t, err) @@ -350,13 +352,13 @@ func testChallengeProtocolBOLD(t *gotesting.T, useExternalSigner bool, useRedis makeBoldBatch(t, l2nodeB, l2info, l1client, &sequencerTxOpts, evilSeqInboxBinding, evilSeqInbox, numMessagesPerBatch, divergeAt) totalMessagesPosted += numMessagesPerBatch - bcA, err := l2nodeA.InboxTracker.GetBatchCount() + bcA, err := l2nodeA.GetParentChainDataSource().GetBatchCount() Require(t, err) - bcB, err := l2nodeB.InboxTracker.GetBatchCount() + bcB, err := l2nodeB.GetParentChainDataSource().GetBatchCount() Require(t, err) - msgA, err := l2nodeA.InboxTracker.GetBatchMessageCount(bcA - 1) + msgA, err := l2nodeA.GetParentChainDataSource().GetBatchMessageCount(bcA - 1) Require(t, err) - msgB, err := l2nodeB.InboxTracker.GetBatchMessageCount(bcB - 1) + msgB, err := l2nodeB.GetParentChainDataSource().GetBatchMessageCount(bcB - 1) Require(t, err) t.Logf("Node A batch count %d, msgs %d", bcA, msgA) @@ -711,7 +713,7 @@ func syncBatchToNode( Require(t, err) // Optional: log batch metadata - batchMetaData, err := l2Node.InboxTracker.GetBatchMetadata(batches[0].SequenceNumber) + batchMetaData, err := l2Node.GetParentChainDataSource().GetBatchMetadata(batches[0].SequenceNumber) log.Info("Batch metadata", "md", batchMetaData) Require(t, err, "failed to get batch metadata after adding batch:") } diff --git a/system_tests/bold_customda_challenge_test.go b/system_tests/bold_customda_challenge_test.go index f3ce26b0a5..860a3d2e1c 100644 --- a/system_tests/bold_customda_challenge_test.go +++ b/system_tests/bold_customda_challenge_test.go @@ -398,9 +398,10 @@ func testChallengeProtocolBOLDCustomDA(t *testing.T, evilStrategy EvilStrategy, err = dapReadersB.SetupDACertificateReader(daClientB, daClientB) Require(t, err) + pcdsA := l2nodeA.GetParentChainDataSource() statelessA, err := staker.NewStatelessBlockValidator( - l2nodeA.InboxReader, - l2nodeA.InboxTracker, + pcdsA, + pcdsA, l2nodeA.TxStreamer, l2nodeA.ExecutionRecorder, l2nodeA.ConsensusDB, @@ -414,9 +415,10 @@ func testChallengeProtocolBOLDCustomDA(t *testing.T, evilStrategy EvilStrategy, Require(t, err) _, valStackB := createTestValidationNode(t, ctx, &valCfg, spawnerOpts...) + pcdsB := l2nodeB.GetParentChainDataSource() statelessB, err := staker.NewStatelessBlockValidator( - l2nodeB.InboxReader, - l2nodeB.InboxTracker, + pcdsB, + pcdsB, l2nodeB.TxStreamer, l2nodeB.ExecutionRecorder, l2nodeB.ConsensusDB, @@ -431,7 +433,7 @@ func testChallengeProtocolBOLDCustomDA(t *testing.T, evilStrategy EvilStrategy, blockValidatorA, err := staker.NewBlockValidator( statelessA, - l2nodeA.InboxTracker, + pcdsA, l2nodeA.TxStreamer, StaticFetcherFrom(t, &blockValidatorConfig), nil, @@ -442,7 +444,7 @@ func testChallengeProtocolBOLDCustomDA(t *testing.T, evilStrategy EvilStrategy, blockValidatorB, err := staker.NewBlockValidator( statelessB, - l2nodeB.InboxTracker, + pcdsB, l2nodeB.TxStreamer, StaticFetcherFrom(t, &blockValidatorConfig), nil, @@ -453,14 +455,14 @@ func testChallengeProtocolBOLDCustomDA(t *testing.T, evilStrategy EvilStrategy, // Create ProofEnhancers from DA validators proofEnhancerA := proofenhancement.NewProofEnhancementManager() - customDAEnhancerA := proofenhancement.NewReadPreimageProofEnhancer(dapReadersA, l2nodeA.InboxTracker, l2nodeA.InboxReader) + customDAEnhancerA := proofenhancement.NewReadPreimageProofEnhancer(dapReadersA, pcdsA, pcdsA) proofEnhancerA.RegisterEnhancer(proofenhancement.MarkerCustomDAReadPreimage, customDAEnhancerA) - validateCertificateEnhancerA := proofenhancement.NewValidateCertificateProofEnhancer(dapReadersA, l2nodeA.InboxTracker, l2nodeA.InboxReader) + validateCertificateEnhancerA := proofenhancement.NewValidateCertificateProofEnhancer(dapReadersA, pcdsA, pcdsA) proofEnhancerA.RegisterEnhancer(proofenhancement.MarkerCustomDAValidateCertificate, validateCertificateEnhancerA) proofEnhancerB := proofenhancement.NewProofEnhancementManager() - customDAEnhancerB := proofenhancement.NewReadPreimageProofEnhancer(dapReadersB, l2nodeB.InboxTracker, l2nodeB.InboxReader) - validateCertificateEnhancerB := proofenhancement.NewValidateCertificateProofEnhancer(dapReadersB, l2nodeB.InboxTracker, l2nodeB.InboxReader) + customDAEnhancerB := proofenhancement.NewReadPreimageProofEnhancer(dapReadersB, pcdsB, pcdsB) + validateCertificateEnhancerB := proofenhancement.NewValidateCertificateProofEnhancer(dapReadersB, pcdsB, pcdsB) proofEnhancerB.RegisterEnhancer(proofenhancement.MarkerCustomDAValidateCertificate, validateCertificateEnhancerB) // For EvilDataEvilCert strategy, wrap the enhancer to inject evil certificates @@ -482,9 +484,9 @@ func testChallengeProtocolBOLDCustomDA(t *testing.T, evilStrategy EvilStrategy, CheckBatchFinality: false, }, goodDir, - l2nodeA.InboxTracker, + pcdsA, l2nodeA.TxStreamer, - l2nodeA.InboxReader, + pcdsA, proofEnhancerA, ) Require(t, err) @@ -499,9 +501,9 @@ func testChallengeProtocolBOLDCustomDA(t *testing.T, evilStrategy EvilStrategy, CheckBatchFinality: false, }, evilDir, - l2nodeB.InboxTracker, + pcdsB, l2nodeB.TxStreamer, - l2nodeB.InboxReader, + pcdsB, proofEnhancerB, ) Require(t, err) @@ -584,14 +586,14 @@ func testChallengeProtocolBOLDCustomDA(t *testing.T, evilStrategy EvilStrategy, time.Sleep(100 * time.Millisecond) // Get and log batch 0 from both nodes - msgA0, _, err := l2nodeA.InboxReader.GetSequencerMessageBytes(ctx, 0) + msgA0, _, err := l2nodeA.GetParentChainDataSource().GetSequencerMessageBytes(ctx, 0) if err != nil { t.Logf("Error getting batch 0 from node A: %v", err) } else { PrintSequencerInboxMessage(t, "Node A - Batch 0", msgA0) } - msgB0, _, err := l2nodeB.InboxReader.GetSequencerMessageBytes(ctx, 0) + msgB0, _, err := l2nodeB.GetParentChainDataSource().GetSequencerMessageBytes(ctx, 0) if err != nil { t.Logf("Error getting batch 0 from node B: %v", err) } @@ -678,14 +680,14 @@ func testChallengeProtocolBOLDCustomDA(t *testing.T, evilStrategy EvilStrategy, time.Sleep(100 * time.Millisecond) // Get and log batch 1 from both nodes - msgA1, _, err := l2nodeA.InboxReader.GetSequencerMessageBytes(ctx, 1) + msgA1, _, err := l2nodeA.GetParentChainDataSource().GetSequencerMessageBytes(ctx, 1) if err != nil { t.Logf("Error getting batch 1 from node A: %v", err) } else { PrintSequencerInboxMessage(t, "Node A - Batch 1", msgA1) } - msgB1, _, err := l2nodeB.InboxReader.GetSequencerMessageBytes(ctx, 1) + msgB1, _, err := l2nodeB.GetParentChainDataSource().GetSequencerMessageBytes(ctx, 1) if err != nil { t.Logf("Error getting batch 1 from node B: %v", err) } @@ -702,7 +704,7 @@ func testChallengeProtocolBOLDCustomDA(t *testing.T, evilStrategy EvilStrategy, // Log third batch messages (batch 2 - second CustomDA batch with divergence) t.Logf("\n======== BATCH 2 (second CustomDA batch - WITH DIVERGENCE) ========") // Get and log batch 2 from both nodes - msgA2, _, err := l2nodeA.InboxReader.GetSequencerMessageBytes(ctx, 2) + msgA2, _, err := l2nodeA.GetParentChainDataSource().GetSequencerMessageBytes(ctx, 2) if err != nil { t.Logf("Error getting batch 2 from node A: %v", err) } else { @@ -714,7 +716,7 @@ func testChallengeProtocolBOLDCustomDA(t *testing.T, evilStrategy EvilStrategy, } } - msgB2, _, err := l2nodeB.InboxReader.GetSequencerMessageBytes(ctx, 2) + msgB2, _, err := l2nodeB.GetParentChainDataSource().GetSequencerMessageBytes(ctx, 2) if err != nil { t.Logf("Error getting batch 2 from node B: %v", err) } else { @@ -737,18 +739,18 @@ func testChallengeProtocolBOLDCustomDA(t *testing.T, evilStrategy EvilStrategy, } } - bcA, err := l2nodeA.InboxTracker.GetBatchCount() + bcA, err := l2nodeA.GetParentChainDataSource().GetBatchCount() Require(t, err) - bcB, err := l2nodeB.InboxTracker.GetBatchCount() + bcB, err := l2nodeB.GetParentChainDataSource().GetBatchCount() Require(t, err) if bcA != bcB { t.Fatalf("FATAL: Expected Node A batch count %d to be equal to Node B batch count %d", bcA, bcB) } - msgA, err := l2nodeA.InboxTracker.GetBatchMessageCount(bcA - 1) + msgA, err := l2nodeA.GetParentChainDataSource().GetBatchMessageCount(bcA - 1) Require(t, err) - msgB, err := l2nodeB.InboxTracker.GetBatchMessageCount(bcB - 1) + msgB, err := l2nodeB.GetParentChainDataSource().GetBatchMessageCount(bcB - 1) Require(t, err) t.Logf("Node A batch count %d, msgs %d", bcA, msgA) diff --git a/system_tests/bold_l3_support_test.go b/system_tests/bold_l3_support_test.go index 7a9a072826..3eaaef6b4f 100644 --- a/system_tests/bold_l3_support_test.go +++ b/system_tests/bold_l3_support_test.go @@ -212,9 +212,9 @@ func startL3BoldChallengeManager(t *testing.T, ctx context.Context, builder *Nod CheckBatchFinality: false, }, cacheDir, - node.ConsensusNode.InboxTracker, + node.ConsensusNode.GetParentChainDataSource(), node.ConsensusNode.TxStreamer, - node.ConsensusNode.InboxReader, + node.ConsensusNode.GetParentChainDataSource(), nil, ) Require(t, err) diff --git a/system_tests/bold_new_challenge_test.go b/system_tests/bold_new_challenge_test.go index 4ec4cc630f..ebacd2818c 100644 --- a/system_tests/bold_new_challenge_test.go +++ b/system_tests/bold_new_challenge_test.go @@ -293,9 +293,9 @@ func startBoldChallengeManager(t *testing.T, ctx context.Context, builder *NodeB CheckBatchFinality: false, }, cacheDir, - node.ConsensusNode.InboxTracker, + node.ConsensusNode.GetParentChainDataSource(), node.ConsensusNode.TxStreamer, - node.ConsensusNode.InboxReader, + node.ConsensusNode.GetParentChainDataSource(), nil, ) Require(t, err) diff --git a/system_tests/bold_state_provider_test.go b/system_tests/bold_state_provider_test.go index f625c1f979..90798d14a0 100644 --- a/system_tests/bold_state_provider_test.go +++ b/system_tests/bold_state_provider_test.go @@ -82,7 +82,7 @@ func TestChallengeProtocolBOLD_Bisections(t *testing.T) { totalBatchesBig, err := bridgeBinding.SequencerMessageCount(&bind.CallOpts{Context: ctx}) Require(t, err) totalBatches := totalBatchesBig.Uint64() - totalMessageCount, err := l2node.InboxTracker.GetBatchMessageCount(totalBatches - 1) + totalMessageCount, err := l2node.GetParentChainDataSource().GetBatchMessageCount(totalBatches - 1) Require(t, err) log.Info("Status", "totalBatches", totalBatches, "totalMessageCount", totalMessageCount) t.Logf("totalBatches: %v, totalMessageCount: %v\n", totalBatches, totalMessageCount) @@ -100,7 +100,7 @@ func TestChallengeProtocolBOLD_Bisections(t *testing.T) { if lastInfo.GlobalState.Batch >= totalBatches { return true } - batchMsgCount, err := l2node.InboxTracker.GetBatchMessageCount(lastInfo.GlobalState.Batch) + batchMsgCount, err := l2node.GetParentChainDataSource().GetBatchMessageCount(lastInfo.GlobalState.Batch) if err != nil { t.Logf("GetBatchMessageCount error (will retry): %v", err) return false @@ -199,7 +199,7 @@ func TestChallengeProtocolBOLD_StateProvider(t *testing.T) { totalBatchesBig, err := bridgeBinding.SequencerMessageCount(&bind.CallOpts{Context: ctx}) Require(t, err) totalBatches := totalBatchesBig.Uint64() - totalMessageCount, err := l2node.InboxTracker.GetBatchMessageCount(totalBatches - 1) + totalMessageCount, err := l2node.GetParentChainDataSource().GetBatchMessageCount(totalBatches - 1) Require(t, err) // Wait until the validator has validated the batches. @@ -382,9 +382,10 @@ func setupBoldStateProvider(t *testing.T, ctx context.Context, blockChallengeHei locator, err := server_common.NewMachineLocator(valnode.TestValidationConfig.Wasm.RootPath) Require(t, err) + pcds := l2node.GetParentChainDataSource() stateless, err := staker.NewStatelessBlockValidator( - l2node.InboxReader, - l2node.InboxTracker, + pcds, + pcds, l2node.TxStreamer, l2node.ExecutionRecorder, l2node.ConsensusDB, @@ -398,7 +399,7 @@ func setupBoldStateProvider(t *testing.T, ctx context.Context, blockChallengeHei blockValidator, err := staker.NewBlockValidator( stateless, - l2node.InboxTracker, + pcds, l2node.TxStreamer, StaticFetcherFrom(t, &blockValidatorConfig), nil, @@ -418,9 +419,9 @@ func setupBoldStateProvider(t *testing.T, ctx context.Context, blockChallengeHei CheckBatchFinality: false, }, dir, - l2node.InboxTracker, + pcds, l2node.TxStreamer, - l2node.InboxReader, + pcds, nil, ) Require(t, err) diff --git a/system_tests/common_test.go b/system_tests/common_test.go index 3e47e77003..6ab01ad346 100644 --- a/system_tests/common_test.go +++ b/system_tests/common_test.go @@ -751,9 +751,6 @@ func (b *NodeBuilder) CheckConfig(t *testing.T) { b.execConfig.Caching.StateHistory = gethexec.GetStateHistory(gethexec.DefaultSequencerConfig.MaxBlockSpeed) } } - if b.nodeConfig.BlockValidator.Enable { - b.nodeConfig.MessageExtraction.Enable = false // Skip running in MEL mode for block validator tests - } } func (b *NodeBuilder) BuildL1(t *testing.T) { @@ -1199,6 +1196,7 @@ func build2ndNode( ) (*TestClient, func()) { if params.nodeConfig == nil { params.nodeConfig = arbnode.ConfigDefaultL1NonSequencerTest() + params.nodeConfig.MessageExtraction.Enable = firstNodeNodeConfig.MessageExtraction.Enable } if params.anyTrustConfig != nil { params.nodeConfig.DA.AnyTrust = *params.anyTrustConfig @@ -2816,9 +2814,9 @@ func recordBlock(t *testing.T, block uint64, builder *NodeBuilder, targets ...ra inboxPos := arbutil.MessageIndex(block) for { time.Sleep(250 * time.Millisecond) - batches, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() + batches, err := builder.L2.ConsensusNode.GetParentChainDataSource().GetBatchCount() Require(t, err) - haveMessages, err := builder.L2.ConsensusNode.InboxTracker.GetBatchMessageCount(batches - 1) + haveMessages, err := builder.L2.ConsensusNode.GetParentChainDataSource().GetBatchMessageCount(batches - 1) Require(t, err) if haveMessages >= inboxPos { break diff --git a/system_tests/consensus_rpc_api_test.go b/system_tests/consensus_rpc_api_test.go index 62604c0610..a311a037a3 100644 --- a/system_tests/consensus_rpc_api_test.go +++ b/system_tests/consensus_rpc_api_test.go @@ -226,7 +226,7 @@ func TestFindBatch(t *testing.T) { if expBatchNum != gotBatchNum { Fatal(t, "wrong result from findBatchContainingBlock. blocknum ", blockNum, " expected ", expBatchNum, " got ", gotBatchNum) } - batchL1Block, err := builder.L2.ConsensusNode.InboxTracker.GetBatchParentChainBlock(gotBatchNum) + batchL1Block, err := builder.L2.ConsensusNode.GetParentChainDataSource().GetBatchParentChainBlock(gotBatchNum) Require(t, err) blockHeader, err := builder.L2.Client.HeaderByNumber(ctx, new(big.Int).SetUint64(blockNum)) Require(t, err) diff --git a/system_tests/fast_confirm_test.go b/system_tests/fast_confirm_test.go index 03e8c9a256..cb08ef54b1 100644 --- a/system_tests/fast_confirm_test.go +++ b/system_tests/fast_confirm_test.go @@ -267,9 +267,10 @@ func setupFastConfirmation(ctx context.Context, t *testing.T) (*NodeBuilder, *le locator, err := server_common.NewMachineLocator(valnode.TestValidationConfig.Wasm.RootPath) Require(t, err) + pcds := l2node.GetParentChainDataSource() stateless, err := staker.NewStatelessBlockValidator( - l2node.InboxReader, - l2node.InboxTracker, + pcds, + pcds, l2node.TxStreamer, execNode, l2node.ConsensusDB, @@ -294,9 +295,9 @@ func setupFastConfirmation(ctx context.Context, t *testing.T) (*NodeBuilder, *le nil, l2node.DeployInfo.ValidatorUtils, l2node.DeployInfo.Rollup, - l2node.InboxTracker, + pcds, l2node.TxStreamer, - l2node.InboxReader, + pcds, nil, ) Require(t, err) @@ -463,9 +464,10 @@ func TestFastConfirmationWithSafe(t *testing.T) { locator, err := server_common.NewMachineLocator(valnode.TestValidationConfig.Wasm.RootPath) Require(t, err) + pcdsA := l2nodeA.GetParentChainDataSource() statelessA, err := staker.NewStatelessBlockValidator( - l2nodeA.InboxReader, - l2nodeA.InboxTracker, + pcdsA, + pcdsA, l2nodeA.TxStreamer, execNodeA, l2nodeA.ConsensusDB, @@ -490,9 +492,9 @@ func TestFastConfirmationWithSafe(t *testing.T) { nil, l2nodeA.DeployInfo.ValidatorUtils, l2nodeA.DeployInfo.Rollup, - l2nodeA.InboxTracker, + pcdsA, l2nodeA.TxStreamer, - l2nodeA.InboxReader, + pcdsA, nil, ) Require(t, err) @@ -520,9 +522,10 @@ func TestFastConfirmationWithSafe(t *testing.T) { valConfigB := legacystaker.TestL1ValidatorConfig valConfigB.EnableFastConfirmation = true valConfigB.Strategy = "watchtower" + pcdsB := l2nodeB.GetParentChainDataSource() statelessB, err := staker.NewStatelessBlockValidator( - l2nodeB.InboxReader, - l2nodeB.InboxTracker, + pcdsB, + pcdsB, l2nodeB.TxStreamer, execNodeB, l2nodeB.ConsensusDB, @@ -547,9 +550,9 @@ func TestFastConfirmationWithSafe(t *testing.T) { nil, l2nodeB.DeployInfo.ValidatorUtils, l2nodeB.DeployInfo.Rollup, - l2nodeB.InboxTracker, + pcdsB, l2nodeB.TxStreamer, - l2nodeB.InboxReader, + pcdsB, nil, ) Require(t, err) diff --git a/system_tests/fees_test.go b/system_tests/fees_test.go index ffcd4a6b89..dc72302251 100644 --- a/system_tests/fees_test.go +++ b/system_tests/fees_test.go @@ -168,12 +168,7 @@ func testSequencerPriceAdjustsFrom(t *testing.T, initialEstimate uint64) { Require(t, err) lastEstimate, err := arbGasInfo.GetL1BaseFeeEstimate(&bind.CallOpts{Context: ctx}) Require(t, err) - var lastBatchCount uint64 - if builder.L2.ConsensusNode.MessageExtractor != nil { - lastBatchCount, err = builder.L2.ConsensusNode.MessageExtractor.GetBatchCount() - } else { - lastBatchCount, err = builder.L2.ConsensusNode.InboxTracker.GetBatchCount() - } + lastBatchCount, err := builder.L2.ConsensusNode.GetParentChainDataSource().GetBatchCount() Require(t, err) l1Header, err := builder.L1.Client.HeaderByNumber(ctx, nil) Require(t, err) @@ -247,12 +242,7 @@ func testSequencerPriceAdjustsFrom(t *testing.T, initialEstimate uint64) { // Wait for the batch poster to post a new batch. Under -race // each poll cycle is significantly slower, so allow more retries. for j := 50; j > 0; j-- { - var newBatchCount uint64 - if builder.L2.ConsensusNode.MessageExtractor != nil { - newBatchCount, err = builder.L2.ConsensusNode.MessageExtractor.GetBatchCount() - } else { - newBatchCount, err = builder.L2.ConsensusNode.InboxTracker.GetBatchCount() - } + newBatchCount, err := builder.L2.ConsensusNode.GetParentChainDataSource().GetBatchCount() Require(t, err) if newBatchCount > lastBatchCount { colors.PrintGrey("posted new batch ", newBatchCount) diff --git a/system_tests/full_challenge_impl_test.go b/system_tests/full_challenge_impl_test.go index d7a0744b6f..f52590bb6b 100644 --- a/system_tests/full_challenge_impl_test.go +++ b/system_tests/full_challenge_impl_test.go @@ -172,7 +172,7 @@ func makeBatch(t *testing.T, l2Node *arbnode.Node, l2Info *BlockchainTestInfo, b } err = l2Node.InboxTracker.AddSequencerBatches(ctx, backend, batches) Require(t, err) - _, err = l2Node.InboxTracker.GetBatchMetadata(0) + _, err = l2Node.GetParentChainDataSource().GetBatchMetadata(0) Require(t, err, "failed to get batch metadata after adding batch:") } @@ -381,7 +381,8 @@ func RunChallengeTest(t *testing.T, asserterIsCorrect bool, useStubs bool, chall locator, err := server_common.NewMachineLocator(builder.valnodeConfig.Wasm.RootPath) Require(t, err) - asserterValidator, err := staker.NewStatelessBlockValidator(asserterL2.InboxReader, asserterL2.InboxTracker, asserterL2.TxStreamer, asserterExec, asserterL2.ConsensusDB, nil, StaticFetcherFrom(t, &conf.BlockValidator), valStack, locator.LatestWasmModuleRoot()) + asserterPcds := asserterL2.GetParentChainDataSource() + asserterValidator, err := staker.NewStatelessBlockValidator(asserterPcds, asserterPcds, asserterL2.TxStreamer, asserterExec, asserterL2.ConsensusDB, nil, StaticFetcherFrom(t, &conf.BlockValidator), valStack, locator.LatestWasmModuleRoot()) if err != nil { Fatal(t, err) } @@ -394,11 +395,12 @@ func RunChallengeTest(t *testing.T, asserterIsCorrect bool, useStubs bool, chall Fatal(t, err) } defer asserterValidator.Stop() - asserterManager, err := legacystaker.NewChallengeManager(ctx, l1Backend, &asserterTxOpts, asserterTxOpts.From, challengeManagerAddr, 1, asserterValidator, 0, 0, asserterL2.InboxTracker, asserterL2.TxStreamer) + asserterManager, err := legacystaker.NewChallengeManager(ctx, l1Backend, &asserterTxOpts, asserterTxOpts.From, challengeManagerAddr, 1, asserterValidator, 0, 0, asserterPcds, asserterL2.TxStreamer) if err != nil { Fatal(t, err) } - challengerValidator, err := staker.NewStatelessBlockValidator(challengerL2.InboxReader, challengerL2.InboxTracker, challengerL2.TxStreamer, challengerExec, challengerL2.ConsensusDB, nil, StaticFetcherFrom(t, &conf.BlockValidator), valStack, locator.LatestWasmModuleRoot()) + challengerPcds := challengerL2.GetParentChainDataSource() + challengerValidator, err := staker.NewStatelessBlockValidator(challengerPcds, challengerPcds, challengerL2.TxStreamer, challengerExec, challengerL2.ConsensusDB, nil, StaticFetcherFrom(t, &conf.BlockValidator), valStack, locator.LatestWasmModuleRoot()) if err != nil { Fatal(t, err) } @@ -411,7 +413,7 @@ func RunChallengeTest(t *testing.T, asserterIsCorrect bool, useStubs bool, chall Fatal(t, err) } defer challengerValidator.Stop() - challengerManager, err := legacystaker.NewChallengeManager(ctx, l1Backend, &challengerTxOpts, challengerTxOpts.From, challengeManagerAddr, 1, challengerValidator, 0, 0, challengerL2.InboxTracker, challengerL2.TxStreamer) + challengerManager, err := legacystaker.NewChallengeManager(ctx, l1Backend, &challengerTxOpts, challengerTxOpts.From, challengeManagerAddr, 1, challengerValidator, 0, 0, challengerPcds, challengerL2.TxStreamer) if err != nil { Fatal(t, err) } diff --git a/system_tests/inbox_blob_failure_test.go b/system_tests/inbox_blob_failure_test.go index 0c9ffb330f..12a9367d87 100644 --- a/system_tests/inbox_blob_failure_test.go +++ b/system_tests/inbox_blob_failure_test.go @@ -89,11 +89,7 @@ func TestInboxReaderBlobFailureWithDelayedMessage(t *testing.T) { var batchNum uint64 for i := 0; i < 30; i++ { var found bool - if builder.L2.ConsensusNode.MessageExtractor != nil { - batchNum, found, err = builder.L2.ConsensusNode.MessageExtractor.FindInboxBatchContainingMessage(arbutil.MessageIndex(l2Block.NumberU64())) - } else { - batchNum, found, err = builder.L2.ConsensusNode.InboxTracker.FindInboxBatchContainingMessage(arbutil.MessageIndex(l2Block.NumberU64())) - } + batchNum, found, err = builder.L2.ConsensusNode.GetParentChainDataSource().FindInboxBatchContainingMessage(arbutil.MessageIndex(l2Block.NumberU64())) Require(t, err) if found { break @@ -106,19 +102,9 @@ func TestInboxReaderBlobFailureWithDelayedMessage(t *testing.T) { time.Sleep(time.Second) // Record sequencer state before starting follower - var seqDelayed uint64 - if builder.L2.ConsensusNode.MessageExtractor != nil { - seqDelayed, err = builder.L2.ConsensusNode.MessageExtractor.GetDelayedCount() - } else { - seqDelayed, err = builder.L2.ConsensusNode.InboxTracker.GetDelayedCount() - } + seqDelayed, err := builder.L2.ConsensusNode.GetParentChainDataSource().GetDelayedCount() Require(t, err) - var seqBatch uint64 - if builder.L2.ConsensusNode.MessageExtractor != nil { - seqBatch, err = builder.L2.ConsensusNode.MessageExtractor.GetBatchCount() - } else { - seqBatch, err = builder.L2.ConsensusNode.InboxTracker.GetBatchCount() - } + seqBatch, err := builder.L2.ConsensusNode.GetParentChainDataSource().GetBatchCount() Require(t, err) // Build follower with failing blob reader @@ -138,19 +124,9 @@ func TestInboxReaderBlobFailureWithDelayedMessage(t *testing.T) { time.Sleep(2 * time.Second) // Check if follower is out of sync - var follDelayed uint64 - if testClientB.ConsensusNode.MessageExtractor != nil { - follDelayed, err = testClientB.ConsensusNode.MessageExtractor.GetDelayedCount() - } else { - follDelayed, err = testClientB.ConsensusNode.InboxTracker.GetDelayedCount() - } + follDelayed, err := testClientB.ConsensusNode.GetParentChainDataSource().GetDelayedCount() Require(t, err) - var follBatch uint64 - if testClientB.ConsensusNode.MessageExtractor != nil { - follBatch, err = testClientB.ConsensusNode.MessageExtractor.GetBatchCount() - } else { - follBatch, err = testClientB.ConsensusNode.InboxTracker.GetBatchCount() - } + follBatch, err := testClientB.ConsensusNode.GetParentChainDataSource().GetBatchCount() Require(t, err) if follDelayed == seqDelayed && follBatch < seqBatch { @@ -194,18 +170,9 @@ func TestInboxReaderBlobFailureWithDelayedMessage(t *testing.T) { t.Logf(" Batch-posting-report for batch %d", batchNum) // Check if this batch exists in our database - if testClientB.ConsensusNode.MessageExtractor != nil { - _, err := testClientB.ConsensusNode.MessageExtractor.GetBatchMetadata(batchNum) - if err != nil { - // TODO After we have fixed the issue, this can be changed back to log.Fatalf - t.Logf("CORRUPTION DETECTED: Delayed message %d is a batch-posting-report for batch %d, but batch %d doesn't exist in database! Error: %v", i, batchNum, batchNum, err) - } - } else { - _, err := testClientB.ConsensusNode.InboxTracker.GetBatchMetadata(batchNum) - if err != nil { - // TODO After we have fixed the issue, this can be changed back to log.Fatalf - t.Logf("CORRUPTION DETECTED: Delayed message %d is a batch-posting-report for batch %d, but batch %d doesn't exist in database! Error: %v", i, batchNum, batchNum, err) - } + if _, err := testClientB.ConsensusNode.GetParentChainDataSource().GetBatchMetadata(batchNum); err != nil { + // TODO After we have fixed the issue, this can be changed back to log.Fatalf + t.Logf("CORRUPTION DETECTED: Delayed message %d is a batch-posting-report for batch %d, but batch %d doesn't exist in database! Error: %v", i, batchNum, batchNum, err) } } } @@ -232,12 +199,7 @@ func TestInboxReaderBlobFailureWithDelayedMessage(t *testing.T) { verifyReceipt, _ := builder.L2.Client.TransactionReceipt(ctx, verifyTx.Hash()) if verifyReceipt != nil { verifyBlock, _ := builder.L2.Client.BlockByHash(ctx, verifyReceipt.BlockHash) - var found bool - if builder.L2.ConsensusNode.MessageExtractor != nil { - _, found, err = builder.L2.ConsensusNode.MessageExtractor.FindInboxBatchContainingMessage(arbutil.MessageIndex(verifyBlock.NumberU64())) - } else { - _, found, err = builder.L2.ConsensusNode.InboxTracker.FindInboxBatchContainingMessage(arbutil.MessageIndex(verifyBlock.NumberU64())) - } + _, found, err := builder.L2.ConsensusNode.GetParentChainDataSource().FindInboxBatchContainingMessage(arbutil.MessageIndex(verifyBlock.NumberU64())) if err == nil && found { break } diff --git a/system_tests/meaningless_reorg_test.go b/system_tests/meaningless_reorg_test.go index 53d790025b..9fe6c69425 100644 --- a/system_tests/meaningless_reorg_test.go +++ b/system_tests/meaningless_reorg_test.go @@ -46,7 +46,7 @@ func TestMeaninglessBatchReorg(t *testing.T) { } time.Sleep(10 * time.Millisecond) } - metadata, err := builder.L2.ConsensusNode.InboxTracker.GetBatchMetadata(1) + metadata, err := builder.L2.ConsensusNode.GetParentChainDataSource().GetBatchMetadata(1) Require(t, err) originalBatchBlock := batchReceipt.BlockNumber.Uint64() if metadata.ParentChainBlock != originalBatchBlock { @@ -88,7 +88,7 @@ func TestMeaninglessBatchReorg(t *testing.T) { if i >= 500 { Fatal(t, "Failed to read batch reorg from L1") } - metadata, err = builder.L2.ConsensusNode.InboxTracker.GetBatchMetadata(1) + metadata, err = builder.L2.ConsensusNode.GetParentChainDataSource().GetBatchMetadata(1) Require(t, err) if metadata.ParentChainBlock == newBatchBlock { break @@ -98,7 +98,7 @@ func TestMeaninglessBatchReorg(t *testing.T) { time.Sleep(10 * time.Millisecond) } - _, _, err = builder.L2.ConsensusNode.InboxReader.GetSequencerMessageBytes(ctx, 1) + _, _, err = builder.L2.ConsensusNode.GetParentChainDataSource().GetSequencerMessageBytes(ctx, 1) Require(t, err) l2Header, err := builder.L2.Client.HeaderByNumber(ctx, l2Receipt.BlockNumber) diff --git a/system_tests/overflow_assertions_test.go b/system_tests/overflow_assertions_test.go index c5957e30fe..34e5afbe28 100644 --- a/system_tests/overflow_assertions_test.go +++ b/system_tests/overflow_assertions_test.go @@ -97,9 +97,10 @@ func TestOverflowAssertions(t *testing.T) { locator, err := server_common.NewMachineLocator(valCfg.Wasm.RootPath) Require(t, err) + pcds := l2node.GetParentChainDataSource() stateless, err := staker.NewStatelessBlockValidator( - l2node.InboxReader, - l2node.InboxTracker, + pcds, + pcds, l2node.TxStreamer, l2node.ExecutionRecorder, l2node.ConsensusDB, @@ -114,7 +115,7 @@ func TestOverflowAssertions(t *testing.T) { blockValidator, err := staker.NewBlockValidator( stateless, - l2node.InboxTracker, + pcds, l2node.TxStreamer, StaticFetcherFrom(t, &blockValidatorConfig), nil, @@ -133,9 +134,9 @@ func TestOverflowAssertions(t *testing.T) { CheckBatchFinality: false, }, goodDir, - l2node.InboxTracker, + pcds, l2node.TxStreamer, - l2node.InboxReader, + pcds, nil, ) Require(t, err) @@ -177,9 +178,9 @@ func TestOverflowAssertions(t *testing.T) { makeBoldBatch(t, l2node, l2info, l1client, &sequencerTxOpts, honestSeqInboxBinding, honestSeqInbox, numMessagesPerBatch, divergeAt) totalMessagesPosted += numMessagesPerBatch - bc, err := l2node.InboxTracker.GetBatchCount() + bc, err := l2node.GetParentChainDataSource().GetBatchCount() Require(t, err) - msgs, err := l2node.InboxTracker.GetBatchMessageCount(bc - 1) + msgs, err := l2node.GetParentChainDataSource().GetBatchMessageCount(bc - 1) Require(t, err) t.Logf("Node batch count %d, msgs %d", bc, msgs) diff --git a/system_tests/program_test.go b/system_tests/program_test.go index 61e03fc682..223767246e 100644 --- a/system_tests/program_test.go +++ b/system_tests/program_test.go @@ -1694,7 +1694,6 @@ func setupProgramTest(t *testing.T, jit bool, builderOpts ...func(*NodeBuilder)) ctx, cancel := context.WithCancel(context.Background()) builder := NewNodeBuilder(ctx).DefaultConfig(t, true).WithPreBoldDeployment() - builder.nodeConfig.MessageExtraction.Enable = false for _, opt := range builderOpts { opt(builder) @@ -1914,9 +1913,9 @@ func waitForSequencer(t *testing.T, builder *NodeBuilder, block uint64) { Require(t, err) msgCount := msgIndex + 1 doUntil(t, 20*time.Millisecond, 500, func() bool { - batchCount, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() + batchCount, err := builder.L2.ConsensusNode.GetParentChainDataSource().GetBatchCount() Require(t, err) - meta, err := builder.L2.ConsensusNode.InboxTracker.GetBatchMetadata(batchCount - 1) + meta, err := builder.L2.ConsensusNode.GetParentChainDataSource().GetBatchMetadata(batchCount - 1) Require(t, err) msgExecuted, err := builder.L2.ExecNode.ExecEngine.HeadMessageIndex() Require(t, err) diff --git a/system_tests/revalidation_test.go b/system_tests/revalidation_test.go index e1cc8f063f..0929e8d911 100644 --- a/system_tests/revalidation_test.go +++ b/system_tests/revalidation_test.go @@ -112,7 +112,7 @@ func createTransactionTillBatchCount(ctx context.Context, t *testing.T, builder Require(t, err) _, err = builder.L2.EnsureTxSucceeded(tx) Require(t, err) - count, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() + count, err := builder.L2.ConsensusNode.GetParentChainDataSource().GetBatchCount() Require(t, err) if count > finalCount { return diff --git a/system_tests/rust_validation_test.go b/system_tests/rust_validation_test.go index 53d6b0155e..461b84609e 100644 --- a/system_tests/rust_validation_test.go +++ b/system_tests/rust_validation_test.go @@ -255,7 +255,7 @@ func waitForMessageIndex(t *testing.T, ctx context.Context, builder *NodeBuilder t.Helper() AdvanceL1(t, ctx, builder.L1.Client, builder.L1Info, 30) doUntil(t, 250*time.Millisecond, 20, func() bool { - _, found, err := builder.L2.ConsensusNode.InboxTracker.FindInboxBatchContainingMessage(pos) + _, found, err := builder.L2.ConsensusNode.GetParentChainDataSource().FindInboxBatchContainingMessage(pos) Require(t, err) return found }) diff --git a/system_tests/seqfeed_test.go b/system_tests/seqfeed_test.go index 7eb8db0f07..8a7d1cbd93 100644 --- a/system_tests/seqfeed_test.go +++ b/system_tests/seqfeed_test.go @@ -519,12 +519,7 @@ func TestRegressionInPopulateFeedBacklog(t *testing.T) { Require(t, err) // sub in correct batch hash - var batchData []byte - if builder.L2.ConsensusNode.MessageExtractor != nil { - batchData, _, err = builder.L2.ConsensusNode.MessageExtractor.GetSequencerMessageBytes(ctx, 0) - } else { - batchData, _, err = builder.L2.ConsensusNode.InboxReader.GetSequencerMessageBytes(ctx, 0) - } + batchData, _, err := builder.L2.ConsensusNode.GetParentChainDataSource().GetSequencerMessageBytes(ctx, 0) Require(t, err) expectedBatchHash := crypto.Keccak256Hash(batchData) copy(data[52:52+32], expectedBatchHash[:]) diff --git a/system_tests/staker_test.go b/system_tests/staker_test.go index 5cfa6bf111..1083dd5d40 100644 --- a/system_tests/staker_test.go +++ b/system_tests/staker_test.go @@ -196,9 +196,10 @@ func stakerTestImpl(t *testing.T, faultyStaker bool, honestStakerInactive bool) locator, err := server_common.NewMachineLocator(valnode.TestValidationConfig.Wasm.RootPath) Require(t, err) + pcdsA := l2nodeA.GetParentChainDataSource() statelessA, err := staker.NewStatelessBlockValidator( - l2nodeA.InboxReader, - l2nodeA.InboxTracker, + pcdsA, + pcdsA, l2nodeA.TxStreamer, execNodeA, l2nodeA.ConsensusDB, @@ -221,9 +222,9 @@ func stakerTestImpl(t *testing.T, faultyStaker bool, honestStakerInactive bool) nil, l2nodeA.DeployInfo.ValidatorUtils, l2nodeA.DeployInfo.Rollup, - l2nodeA.InboxTracker, + pcdsA, l2nodeA.TxStreamer, - l2nodeA.InboxReader, + pcdsA, nil, ) Require(t, err) @@ -255,9 +256,10 @@ func stakerTestImpl(t *testing.T, faultyStaker bool, honestStakerInactive bool) Require(t, err) valConfigB := legacystaker.TestL1ValidatorConfig valConfigB.Strategy = "MakeNodes" + pcdsB := l2nodeB.GetParentChainDataSource() statelessB, err := staker.NewStatelessBlockValidator( - l2nodeB.InboxReader, - l2nodeB.InboxTracker, + pcdsB, + pcdsB, l2nodeB.TxStreamer, execNodeB, l2nodeB.ConsensusDB, @@ -280,9 +282,9 @@ func stakerTestImpl(t *testing.T, faultyStaker bool, honestStakerInactive bool) nil, l2nodeB.DeployInfo.ValidatorUtils, l2nodeB.DeployInfo.Rollup, - l2nodeB.InboxTracker, + pcdsB, l2nodeB.TxStreamer, - l2nodeB.InboxReader, + pcdsB, nil, ) Require(t, err) @@ -306,9 +308,9 @@ func stakerTestImpl(t *testing.T, faultyStaker bool, honestStakerInactive bool) nil, l2nodeA.DeployInfo.ValidatorUtils, l2nodeA.DeployInfo.Rollup, - l2nodeA.InboxTracker, + pcdsA, l2nodeA.TxStreamer, - l2nodeA.InboxReader, + pcdsA, nil, ) Require(t, err) diff --git a/system_tests/validation_inputs_at_test.go b/system_tests/validation_inputs_at_test.go index 6c134fb620..d55d85cfb2 100644 --- a/system_tests/validation_inputs_at_test.go +++ b/system_tests/validation_inputs_at_test.go @@ -57,9 +57,9 @@ func TestValidationInputsAtWithWasmTarget(t *testing.T) { inboxPos := arbutil.MessageIndex(receipt.BlockNumber.Uint64()) for range 40 { time.Sleep(250 * time.Millisecond) - batches, err := builder.L2.ConsensusNode.InboxTracker.GetBatchCount() + batches, err := builder.L2.ConsensusNode.GetParentChainDataSource().GetBatchCount() Require(t, err) - haveMessages, err := builder.L2.ConsensusNode.InboxTracker.GetBatchMessageCount(batches - 1) + haveMessages, err := builder.L2.ConsensusNode.GetParentChainDataSource().GetBatchMessageCount(batches - 1) Require(t, err) if haveMessages >= inboxPos { break