diff --git a/arbnode/batch_poster.go b/arbnode/batch_poster.go index 4a7a8c9e6e..08df91c36b 100644 --- a/arbnode/batch_poster.go +++ b/arbnode/batch_poster.go @@ -97,7 +97,7 @@ type BatchPoster struct { l1Reader *headerreader.HeaderReader inbox *InboxTracker streamer *TransactionStreamer - arbOSVersionGetter execution.ExecutionBatchPoster + arbOSVersionGetter execution.ArbOSVersionGetter config BatchPosterConfigFetcher seqInbox *bridgegen.SequencerInbox syncMonitor *SyncMonitor @@ -323,7 +323,7 @@ type BatchPosterOpts struct { L1Reader *headerreader.HeaderReader Inbox *InboxTracker Streamer *TransactionStreamer - VersionGetter execution.ExecutionBatchPoster + VersionGetter execution.ArbOSVersionGetter SyncMonitor *SyncMonitor Config BatchPosterConfigFetcher DeployInfo *chaininfo.RollupAddresses @@ -1367,7 +1367,7 @@ func (b *BatchPoster) MaybePostSequencerBatch(ctx context.Context) (bool, error) var use4844 bool config := b.config() if config.Post4844Blobs && b.dapWriter == nil && latestHeader.ExcessBlobGas != nil && latestHeader.BlobGasUsed != nil { - arbOSVersion, err := b.arbOSVersionGetter.ArbOSVersionForMessageIndex(arbutil.MessageIndex(arbmath.SaturatingUSub(uint64(batchPosition.MessageCount), 1))) + arbOSVersion, err := b.arbOSVersionGetter.ArbOSVersionForMessageIndex(arbutil.MessageIndex(arbmath.SaturatingUSub(uint64(batchPosition.MessageCount), 1))).Await(ctx) if err != nil { return false, err } diff --git a/arbnode/inbox_test.go b/arbnode/inbox_test.go index b0f5b871d6..eed708378a 100644 --- a/arbnode/inbox_test.go +++ b/arbnode/inbox_test.go @@ -120,6 +120,10 @@ func (w *execClientWrapper) BlockNumberToMessageIndex(blockNum uint64) container return containers.NewReadyPromise(w.ExecutionEngine.BlockNumberToMessageIndex(blockNum)) } +func (w *execClientWrapper) ArbOSVersionForMessageIndex(msgIdx arbutil.MessageIndex) containers.PromiseInterface[uint64] { + return w.ExecutionEngine.ArbOSVersionForMessageIndex(msgIdx) +} + func (w *execClientWrapper) StopAndWait() { } diff --git a/arbnode/inbox_tracker.go b/arbnode/inbox_tracker.go index 5c742b1947..46d5c12ad4 100644 --- a/arbnode/inbox_tracker.go +++ b/arbnode/inbox_tracker.go @@ -274,7 +274,7 @@ func (t *InboxTracker) FindInboxBatchContainingMessage(pos arbutil.MessageIndex) } } -func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcaster) error { +func (t *InboxTracker) PopulateFeedBacklog(ctx context.Context, broadcastServer *broadcaster.Broadcaster) error { batchCount, err := t.GetBatchCount() if err != nil { return fmt.Errorf("error getting batch count: %w", err) @@ -290,6 +290,11 @@ func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcas return fmt.Errorf("error getting batch %v message count: %w", batchIndex, err) } } + + if t.txStreamer == nil { + return errors.New("txStreamer is nil") + } + messageCount, err := t.txStreamer.GetMessageCount() if err != nil { return fmt.Errorf("error getting tx streamer message count: %w", err) @@ -309,10 +314,22 @@ func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcas blockMetadata, err := t.txStreamer.BlockMetadataAtMessageIndex(seqNum) if err != nil { - log.Warn("Error getting blockMetadata byte array from tx streamer", "err", err) + log.Warn("error getting blockMetadata byte array from tx streamer", "err", err) + } + + arbOSVersion, err := t.txStreamer.exec.ArbOSVersionForMessageIndex(seqNum).Await(ctx) + if err != nil { + log.Warn("error getting ArbOS version for message %v: %w", seqNum, err) + } + + messageWithInfo := arbostypes.MessageWithMetadataAndBlockInfo{ + MessageWithMeta: *message, + BlockHash: blockHash, + BlockMetadata: blockMetadata, + ArbOSVersion: arbOSVersion, } - feedMessage, err := broadcastServer.NewBroadcastFeedMessage(*message, seqNum, blockHash, blockMetadata) + feedMessage, err := broadcastServer.NewBroadcastFeedMessage(messageWithInfo, seqNum) if err != nil { return fmt.Errorf("error creating broadcast feed message %v: %w", seqNum, err) } diff --git a/arbnode/node.go b/arbnode/node.go index 5892247f8c..f428d21e51 100644 --- a/arbnode/node.go +++ b/arbnode/node.go @@ -923,7 +923,7 @@ func getBatchPoster( l1Reader *headerreader.HeaderReader, inboxTracker *InboxTracker, txStreamer *TransactionStreamer, - exec execution.ExecutionBatchPoster, + arbOSVersionGetter execution.ArbOSVersionGetter, arbDb ethdb.Database, syncMonitor *SyncMonitor, deployInfo *chaininfo.RollupAddresses, @@ -933,8 +933,8 @@ func getBatchPoster( ) (*BatchPoster, error) { var batchPoster *BatchPoster if config.BatchPoster.Enable { - if exec == nil { - return nil, errors.New("batch poster requires an execution batch poster") + if arbOSVersionGetter == nil { + return nil, errors.New("batch poster requires ArbOS version getter") } if txOptsBatchPoster == nil && config.BatchPoster.DataPoster.ExternalSigner.URL == "" { @@ -949,7 +949,7 @@ func getBatchPoster( L1Reader: l1Reader, Inbox: inboxTracker, Streamer: txStreamer, - VersionGetter: exec, + VersionGetter: arbOSVersionGetter, SyncMonitor: syncMonitor, Config: func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster }, DeployInfo: deployInfo, @@ -1056,7 +1056,7 @@ func createNodeImpl( executionClient execution.ExecutionClient, executionSequencer execution.ExecutionSequencer, executionRecorder execution.ExecutionRecorder, - executionBatchPoster execution.ExecutionBatchPoster, + arbOSVersionGetter execution.ArbOSVersionGetter, arbDb ethdb.Database, configFetcher ConfigFetcher, l2Config *params.ChainConfig, @@ -1153,7 +1153,7 @@ func createNodeImpl( return nil, err } - batchPoster, err := getBatchPoster(ctx, config, configFetcher, txOptsBatchPoster, dapWriter, l1Reader, inboxTracker, txStreamer, executionBatchPoster, arbDb, syncMonitor, deployInfo, parentChainID, dapReaders, stakerAddr) + batchPoster, err := getBatchPoster(ctx, config, configFetcher, txOptsBatchPoster, dapWriter, l1Reader, inboxTracker, txStreamer, arbOSVersionGetter, arbDb, syncMonitor, deployInfo, parentChainID, dapReaders, stakerAddr) if err != nil { return nil, err } @@ -1308,7 +1308,7 @@ func CreateNodeFullExecutionClient( executionClient execution.ExecutionClient, executionSequencer execution.ExecutionSequencer, executionRecorder execution.ExecutionRecorder, - executionBatchPoster execution.ExecutionBatchPoster, + arbOSVersionGetter execution.ArbOSVersionGetter, arbDb ethdb.Database, configFetcher ConfigFetcher, l2Config *params.ChainConfig, @@ -1322,10 +1322,10 @@ func CreateNodeFullExecutionClient( blobReader daprovider.BlobReader, latestWasmModuleRoot common.Hash, ) (*Node, error) { - if (executionClient == nil) || (executionSequencer == nil) || (executionRecorder == nil) || (executionBatchPoster == nil) { - return nil, errors.New("execution client, sequencer, recorder, and batch poster must be non-nil") + if (executionClient == nil) || (executionSequencer == nil) || (executionRecorder == nil) || (arbOSVersionGetter == nil) { + return nil, errors.New("execution client, sequencer, recorder, and ArbOS version getter must be non-nil") } - currentNode, err := createNodeImpl(ctx, stack, executionClient, executionSequencer, executionRecorder, executionBatchPoster, arbDb, configFetcher, l2Config, l1client, deployInfo, txOptsValidator, txOptsBatchPoster, dataSigner, fatalErrChan, parentChainID, blobReader, latestWasmModuleRoot) + currentNode, err := createNodeImpl(ctx, stack, executionClient, executionSequencer, executionRecorder, arbOSVersionGetter, arbDb, configFetcher, l2Config, l1client, deployInfo, txOptsValidator, txOptsBatchPoster, dataSigner, fatalErrChan, parentChainID, blobReader, latestWasmModuleRoot) if err != nil { return nil, err } @@ -1386,7 +1386,7 @@ func (n *Node) Start(ctx context.Context) error { if n.InboxTracker != nil && n.BroadcastServer != nil { // Even if the sequencer coordinator will populate this backlog, // we want to make sure it's populated before any clients connect. - err = n.InboxTracker.PopulateFeedBacklog(n.BroadcastServer) + err = n.InboxTracker.PopulateFeedBacklog(ctx, n.BroadcastServer) if err != nil { return fmt.Errorf("error populating feed backlog on startup: %w", err) } @@ -1556,8 +1556,8 @@ func (n *Node) GetBatchParentChainBlock(seqNum uint64) containers.PromiseInterfa return containers.NewReadyPromise(n.InboxTracker.GetBatchParentChainBlock(seqNum)) } -func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata, msgResult execution.MessageResult, blockMetadata common.BlockMetadata) containers.PromiseInterface[struct{}] { - err := n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta, msgResult, blockMetadata) +func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithInfo arbostypes.MessageWithMetadataAndBlockInfo) containers.PromiseInterface[struct{}] { + err := n.TxStreamer.WriteMessageFromSequencer(pos, msgWithInfo) return containers.NewReadyPromise(struct{}{}, err) } diff --git a/arbnode/transaction_streamer.go b/arbnode/transaction_streamer.go index 77a6b9f5f9..5ab449bfc7 100644 --- a/arbnode/transaction_streamer.go +++ b/arbnode/transaction_streamer.go @@ -401,9 +401,18 @@ func (s *TransactionStreamer) addMessagesAndReorg(batch ethdb.Batch, msgIdxOfFir messagesWithComputedBlockHash := make([]arbostypes.MessageWithMetadataAndBlockInfo, 0, len(messagesResults)) for i := 0; i < len(messagesResults); i++ { + // #nosec G115 -- message index arithmetic is safe within valid range + msgIdx := msgIdxOfFirstMsgToAdd + arbutil.MessageIndex(i) + arbOSVersion, err := s.exec.ArbOSVersionForMessageIndex(msgIdx).Await(s.GetContext()) + if err != nil { + log.Warn("error getting arbOS version for message", "msgIdx", msgIdx, "err", err) + } + messagesWithComputedBlockHash = append(messagesWithComputedBlockHash, arbostypes.MessageWithMetadataAndBlockInfo{ MessageWithMeta: newMessages[i].MessageWithMeta, BlockHash: &messagesResults[i].BlockHash, + BlockMetadata: nil, + ArbOSVersion: arbOSVersion, }) } s.broadcastMessages(messagesWithComputedBlockHash, msgIdxOfFirstMsgToAdd) @@ -525,10 +534,16 @@ func (s *TransactionStreamer) getMessageWithMetadataAndBlockInfo(msgIdx arbutil. return nil, err } + arbOSVersion, err := s.exec.ArbOSVersionForMessageIndex(msgIdx).Await(s.GetContext()) + if err != nil { + log.Warn("Failed to get ArbOS version for message", "msgIdx", msgIdx, "err", err) + } + msgWithBlockInfo := arbostypes.MessageWithMetadataAndBlockInfo{ MessageWithMeta: *msg, BlockHash: blockHash, BlockMetadata: blockMetadata, + ArbOSVersion: arbOSVersion, } return &msgWithBlockInfo, nil } @@ -610,6 +625,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*message.Broad MessageWithMeta: feedMessage.Message, BlockHash: feedMessage.BlockHash, BlockMetadata: feedMessage.BlockMetadata, + ArbOSVersion: feedMessage.ArbOSVersion, } messages = append(messages, msgWithBlockInfo) expectedMsgIdx++ @@ -736,6 +752,9 @@ func (s *TransactionStreamer) AddMessagesAndEndBatch(firstMsgIdx arbutil.Message for _, message := range messages { messagesWithBlockInfo = append(messagesWithBlockInfo, arbostypes.MessageWithMetadataAndBlockInfo{ MessageWithMeta: message, + BlockHash: nil, + BlockMetadata: nil, + ArbOSVersion: 0, }) } @@ -1032,9 +1051,7 @@ func (s *TransactionStreamer) ExpectChosenSequencer() error { func (s *TransactionStreamer) WriteMessageFromSequencer( msgIdx arbutil.MessageIndex, - msgWithMeta arbostypes.MessageWithMetadata, - msgResult execution.MessageResult, - blockMetadata common.BlockMetadata, + msgWithInfo arbostypes.MessageWithMetadataAndBlockInfo, ) error { if err := s.ExpectChosenSequencer(); err != nil { return err @@ -1101,24 +1118,18 @@ func (s *TransactionStreamer) WriteMessageFromSequencer( } if s.coordinator != nil { - if err := s.coordinator.SequencingMessage(msgIdx, &msgWithMeta, blockMetadata); err != nil { + if err := s.coordinator.SequencingMessage(msgIdx, &msgWithInfo.MessageWithMeta, msgWithInfo.BlockMetadata); err != nil { return err } } - msgWithBlockInfo := arbostypes.MessageWithMetadataAndBlockInfo{ - MessageWithMeta: msgWithMeta, - BlockHash: &msgResult.BlockHash, - BlockMetadata: blockMetadata, - } - - if err := s.writeMessages(msgIdx, []arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, nil); err != nil { + if err := s.writeMessages(msgIdx, []arbostypes.MessageWithMetadataAndBlockInfo{msgWithInfo}, nil); err != nil { return err } if s.trackBlockMetadataFrom == 0 || msgIdx < s.trackBlockMetadataFrom { - msgWithBlockInfo.BlockMetadata = nil + msgWithInfo.BlockMetadata = nil } - s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, msgIdx) + s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithInfo}, msgIdx) return nil } @@ -1136,7 +1147,7 @@ func (s *TransactionStreamer) PopulateFeedBacklog() error { if s.broadcastServer == nil || s.inboxReader == nil { return nil } - return s.inboxReader.tracker.PopulateFeedBacklog(s.broadcastServer) + return s.inboxReader.tracker.PopulateFeedBacklog(s.GetContext(), s.broadcastServer) } func (s *TransactionStreamer) writeMessage(msgIdx arbutil.MessageIndex, msg arbostypes.MessageWithMetadataAndBlockInfo, batch ethdb.Batch) error { @@ -1400,12 +1411,7 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context) bool { return false } - msgWithBlockInfo := arbostypes.MessageWithMetadataAndBlockInfo{ - MessageWithMeta: msgAndBlockInfo.MessageWithMeta, - BlockHash: &msgResult.BlockHash, - BlockMetadata: msgAndBlockInfo.BlockMetadata, - } - s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, msgIdxToExecute) + s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{*msgAndBlockInfo}, msgIdxToExecute) return msgIdxToExecute+1 <= consensusHeadMsgIdx } diff --git a/arbos/arbostypes/messagewithmeta.go b/arbos/arbostypes/messagewithmeta.go index c32c3cd795..6999c21e24 100644 --- a/arbos/arbostypes/messagewithmeta.go +++ b/arbos/arbostypes/messagewithmeta.go @@ -19,10 +19,12 @@ type MessageWithMetadata struct { DelayedMessagesRead uint64 `json:"delayedMessagesRead"` } +// lint:require-exhaustive-initialization type MessageWithMetadataAndBlockInfo struct { MessageWithMeta MessageWithMetadata BlockHash *common.Hash BlockMetadata common.BlockMetadata + ArbOSVersion uint64 } var EmptyTestMessageWithMetadata = MessageWithMetadata{ diff --git a/broadcastclient/broadcastclient_test.go b/broadcastclient/broadcastclient_test.go index e9195426e0..123d20db2e 100644 --- a/broadcastclient/broadcastclient_test.go +++ b/broadcastclient/broadcastclient_test.go @@ -52,6 +52,15 @@ func TestReceiveMessages(t *testing.T) { }) } +func testMessage() arbostypes.MessageWithMetadataAndBlockInfo { + return arbostypes.MessageWithMetadataAndBlockInfo{ + MessageWithMeta: arbostypes.TestMessageWithMetadataAndRequestId, + BlockHash: nil, + BlockMetadata: nil, + ArbOSVersion: 0, + } +} + func testReceiveMessages(t *testing.T, clientCompression bool, serverCompression bool, serverRequire bool, expectNoMessagesReceived bool) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) @@ -93,7 +102,7 @@ func testReceiveMessages(t *testing.T, clientCompression bool, serverCompression go func() { for i := 0; i < messageCount; i++ { // #nosec G115 - Require(t, b.BroadcastSingle(arbostypes.TestMessageWithMetadataAndRequestId, arbutil.MessageIndex(i), nil, nil)) + Require(t, b.BroadcastSingle(testMessage(), arbutil.MessageIndex(i))) } }() @@ -150,7 +159,7 @@ func TestInvalidSignature(t *testing.T) { go func() { for i := 0; i < messageCount; i++ { // #nosec G115 - Require(t, b.BroadcastSingle(arbostypes.TestMessageWithMetadataAndRequestId, arbutil.MessageIndex(i), nil, nil)) + Require(t, b.BroadcastSingle(testMessage(), arbutil.MessageIndex(i))) } }() @@ -313,7 +322,7 @@ func TestServerClientDisconnect(t *testing.T) { broadcastClient.Start(ctx) t.Log("broadcasting seq 0 message") - Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil, nil)) + Require(t, b.BroadcastSingle(testMessage(), 0)) // Wait for client to receive batch to ensure it is connected timer := time.NewTimer(5 * time.Second) @@ -385,7 +394,7 @@ func TestBroadcastClientConfirmedMessage(t *testing.T) { broadcastClient.Start(ctx) t.Log("broadcasting seq 0 message") - Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil, nil)) + Require(t, b.BroadcastSingle(testMessage(), 0)) // Wait for client to receive batch to ensure it is connected timer := time.NewTimer(5 * time.Second) @@ -727,8 +736,8 @@ func TestBroadcasterSendsCachedMessagesOnClientConnect(t *testing.T) { Require(t, b.Start(ctx)) defer b.StopAndWait() - Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil, nil)) - Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 1, nil, nil)) + Require(t, b.BroadcastSingle(testMessage(), 0)) + Require(t, b.BroadcastSingle(testMessage(), 1)) var wg sync.WaitGroup for i := 0; i < 2; i++ { diff --git a/broadcastclients/broadcastclients_test.go b/broadcastclients/broadcastclients_test.go index ab5c064290..91929cefda 100644 --- a/broadcastclients/broadcastclients_test.go +++ b/broadcastclients/broadcastclients_test.go @@ -47,6 +47,15 @@ func (ts *MockTransactionStreamer) AddBroadcastMessages(feedMessages []*message. return nil } +func testMessage() arbostypes.MessageWithMetadataAndBlockInfo { + return arbostypes.MessageWithMetadataAndBlockInfo{ + MessageWithMeta: arbostypes.EmptyTestMessageWithMetadata, + BlockHash: nil, + BlockMetadata: nil, + ArbOSVersion: 0, + } +} + // Test that a basic setup of broadcaster and BroadcastClients works func TestBasicBroadcastClientSetup(t *testing.T) { t.Parallel() @@ -135,7 +144,7 @@ func TestBasicBroadcastClientSetup(t *testing.T) { // Send messages with sequential sequence numbers for i := 0; i < messageCount; i++ { // #nosec G115 - Require(t, b.BroadcastSingle(arbostypes.TestMessageWithMetadataAndRequestId, arbutil.MessageIndex(i), nil, nil)) + Require(t, b.BroadcastSingle(testMessage(), arbutil.MessageIndex(i))) } wg.Wait() @@ -284,7 +293,7 @@ func TestPrimaryToSecondaryFailover(t *testing.T) { for i := 0; i < initialMessageCount; i++ { // #nosec G115 seq := arbutil.MessageIndex(i) - err := primaryB.BroadcastSingle(arbostypes.TestMessageWithMetadataAndRequestId, seq, nil, nil) + err := primaryB.BroadcastSingle(testMessage(), seq) Require(t, err) time.Sleep(50 * time.Millisecond) } @@ -320,7 +329,7 @@ func TestPrimaryToSecondaryFailover(t *testing.T) { for i := 0; i < secondaryMessageCount; i++ { // #nosec G115 seq := arbutil.MessageIndex(startSeq + i) - err := secondaryB.BroadcastSingle(arbostypes.TestMessageWithMetadataAndRequestId, seq, nil, nil) + err := secondaryB.BroadcastSingle(testMessage(), seq) Require(t, err) time.Sleep(50 * time.Millisecond) } diff --git a/broadcaster/broadcaster.go b/broadcaster/broadcaster.go index e7d65f4824..b9cdde6a91 100644 --- a/broadcaster/broadcaster.go +++ b/broadcaster/broadcaster.go @@ -11,8 +11,8 @@ import ( "github.com/gobwas/ws" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" "github.com/offchainlabs/nitro/arbos/arbostypes" "github.com/offchainlabs/nitro/arbutil" @@ -40,11 +40,14 @@ func NewBroadcaster(config wsbroadcastserver.BroadcasterConfigFetcher, chainId u } func (b *Broadcaster) NewBroadcastFeedMessage( - message arbostypes.MessageWithMetadata, + messageWithInfo arbostypes.MessageWithMetadataAndBlockInfo, sequenceNumber arbutil.MessageIndex, - blockHash *common.Hash, - blockMetadata common.BlockMetadata, ) (*m.BroadcastFeedMessage, error) { + message := messageWithInfo.MessageWithMeta + if messageWithInfo.ArbOSVersion < params.ArbosVersion_50 && message.Message != nil { + message.Message.BatchDataStats = nil + } + var messageSignature []byte if b.dataSigner != nil { hash, err := message.Hash(sequenceNumber, b.chainId) @@ -60,17 +63,16 @@ func (b *Broadcaster) NewBroadcastFeedMessage( return &m.BroadcastFeedMessage{ SequenceNumber: sequenceNumber, Message: message, - BlockHash: blockHash, + BlockHash: messageWithInfo.BlockHash, Signature: messageSignature, - BlockMetadata: blockMetadata, + BlockMetadata: messageWithInfo.BlockMetadata, + ArbOSVersion: messageWithInfo.ArbOSVersion, }, nil } func (b *Broadcaster) BroadcastSingle( - msg arbostypes.MessageWithMetadata, + msg arbostypes.MessageWithMetadataAndBlockInfo, msgIdx arbutil.MessageIndex, - blockHash *common.Hash, - blockMetadata common.BlockMetadata, ) (err error) { defer func() { if r := recover(); r != nil { @@ -78,7 +80,7 @@ func (b *Broadcaster) BroadcastSingle( err = errors.New("panic in BroadcastSingle") } }() - bfm, err := b.NewBroadcastFeedMessage(msg, msgIdx, blockHash, blockMetadata) + bfm, err := b.NewBroadcastFeedMessage(msg, msgIdx) if err != nil { return err } @@ -107,8 +109,7 @@ func (b *Broadcaster) BroadcastMessages( }() var feedMessages []*m.BroadcastFeedMessage for i, msg := range messagesWithBlockInfo { - // #nosec G115 - bfm, err := b.NewBroadcastFeedMessage(msg.MessageWithMeta, firstMsgIdx+arbutil.MessageIndex(i), msg.BlockHash, msg.BlockMetadata) + bfm, err := b.NewBroadcastFeedMessage(msg, firstMsgIdx+arbutil.MessageIndex(i)) // #nosec G115 if err != nil { return err } diff --git a/broadcaster/broadcaster_test.go b/broadcaster/broadcaster_test.go index fe8d5e0d3c..1f8f5c0241 100644 --- a/broadcaster/broadcaster_test.go +++ b/broadcaster/broadcaster_test.go @@ -9,11 +9,20 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/params" + "github.com/offchainlabs/nitro/arbos/arbostypes" + "github.com/offchainlabs/nitro/arbutil" + "github.com/offchainlabs/nitro/util/signature" "github.com/offchainlabs/nitro/util/testhelpers" "github.com/offchainlabs/nitro/wsbroadcastserver" ) +const chainId = uint64(5555) + type predicate interface { Test() bool Error() string @@ -52,17 +61,18 @@ func (p *messageCountPredicate) Error() string { return fmt.Sprintf("Expected %d, was %d: %s", p.expected, p.was, p.contextMessage) } +func testMessage() arbostypes.MessageWithMetadataAndBlockInfo { + return arbostypes.MessageWithMetadataAndBlockInfo{ + MessageWithMeta: arbostypes.EmptyTestMessageWithMetadata, + BlockHash: nil, + BlockMetadata: nil, + ArbOSVersion: 0, + } +} + func TestBroadcasterMessagesRemovedOnConfirmation(t *testing.T) { - ctx, cancelFunc := context.WithCancel(context.Background()) + b, cancelFunc, _ := setup(t) defer cancelFunc() - - config := wsbroadcastserver.DefaultTestBroadcasterConfig - - chainId := uint64(5555) - feedErrChan := make(chan error, 10) - b := NewBroadcaster(func() *wsbroadcastserver.BroadcasterConfig { return &config }, chainId, feedErrChan, nil) - Require(t, b.Initialize()) - Require(t, b.Start(ctx)) defer b.StopAndWait() expectMessageCount := func(count int, contextMessage string) predicate { @@ -70,17 +80,17 @@ func TestBroadcasterMessagesRemovedOnConfirmation(t *testing.T) { } // Normal broadcasting and confirming - Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 1, nil, nil)) + Require(t, b.BroadcastSingle(testMessage(), 1)) waitUntilUpdated(t, expectMessageCount(1, "after 1 message")) - Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 2, nil, nil)) + Require(t, b.BroadcastSingle(testMessage(), 2)) waitUntilUpdated(t, expectMessageCount(2, "after 2 messages")) - Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 3, nil, nil)) + Require(t, b.BroadcastSingle(testMessage(), 3)) waitUntilUpdated(t, expectMessageCount(3, "after 3 messages")) - Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 4, nil, nil)) + Require(t, b.BroadcastSingle(testMessage(), 4)) waitUntilUpdated(t, expectMessageCount(4, "after 4 messages")) - Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 5, nil, nil)) + Require(t, b.BroadcastSingle(testMessage(), 5)) waitUntilUpdated(t, expectMessageCount(5, "after 4 messages")) - Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 6, nil, nil)) + Require(t, b.BroadcastSingle(testMessage(), 6)) waitUntilUpdated(t, expectMessageCount(6, "after 4 messages")) b.Confirm(4) @@ -96,7 +106,7 @@ func TestBroadcasterMessagesRemovedOnConfirmation(t *testing.T) { "nothing changed because confirmed sequence number before cache")) b.Confirm(5) - Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 7, nil, nil)) + Require(t, b.BroadcastSingle(testMessage(), 7)) waitUntilUpdated(t, expectMessageCount(2, "after 7 messages, 5 cleared by confirm")) @@ -111,7 +121,57 @@ func Require(t *testing.T, err error, printables ...interface{}) { testhelpers.RequireImpl(t, err, printables...) } -func Fail(t *testing.T, printables ...interface{}) { - t.Helper() - testhelpers.FailImpl(t, printables...) +func TestBatchDataStatsIsIncludedBasedOnArbOSVersion(t *testing.T) { + b, cancelFunc, signer := setup(t) + defer cancelFunc() + defer b.StopAndWait() + + sequenceNumber := arbutil.MessageIndex(0) + message := testMessage() + batchDataStats := &arbostypes.BatchDataStats{Length: 1, NonZeros: 2} + message.MessageWithMeta.Message.BatchDataStats = batchDataStats + + // For ArbOS versions >= 50, BatchDataStats should be preserved + message.ArbOSVersion = params.ArbosVersion_50 + feedMsg, err := b.NewBroadcastFeedMessage(message, sequenceNumber) + Require(t, err) + require.Equal(t, batchDataStats, feedMsg.Message.Message.BatchDataStats) + require.Equal(t, signMessage(t, message, sequenceNumber, signer), feedMsg.Signature) + + // For ArbOS versions < 50, BatchDataStats should be nil + message.ArbOSVersion = params.ArbosVersion_41 + feedMsg, err = b.NewBroadcastFeedMessage(message, sequenceNumber) + Require(t, err) + require.Nil(t, feedMsg.Message.Message.BatchDataStats) + + message.MessageWithMeta.Message.BatchDataStats = nil + require.Equal(t, signMessage(t, message, sequenceNumber, signer), feedMsg.Signature) +} + +func setup(t *testing.T) (*Broadcaster, context.CancelFunc, signature.DataSignerFunc) { + ctx, cancelFunc := context.WithCancel(context.Background()) + + config := wsbroadcastserver.DefaultTestBroadcasterConfig + + feedErrChan := make(chan error, 10) + signer := dataSigner(t) + b := NewBroadcaster(func() *wsbroadcastserver.BroadcasterConfig { return &config }, chainId, feedErrChan, signer) + Require(t, b.Initialize()) + Require(t, b.Start(ctx)) + + return b, cancelFunc, signer +} + +func dataSigner(t *testing.T) signature.DataSignerFunc { + testPrivateKey, err := crypto.GenerateKey() + testhelpers.RequireImpl(t, err) + return signature.DataSignerFromPrivateKey(testPrivateKey) +} + +func signMessage(t *testing.T, message arbostypes.MessageWithMetadataAndBlockInfo, sequenceNumber arbutil.MessageIndex, signer signature.DataSignerFunc) []byte { + hash, err := message.MessageWithMeta.Hash(sequenceNumber, chainId) + Require(t, err) + sig, err := signer(hash.Bytes()) + Require(t, err) + return sig } diff --git a/broadcaster/message/message.go b/broadcaster/message/message.go index 3790fe8dae..8366a15799 100644 --- a/broadcaster/message/message.go +++ b/broadcaster/message/message.go @@ -39,6 +39,7 @@ type BroadcastFeedMessage struct { BlockHash *common.Hash `json:"blockHash,omitempty"` Signature []byte `json:"signature"` BlockMetadata common.BlockMetadata `json:"blockMetadata,omitempty"` + ArbOSVersion uint64 `json:"arbOSVersion,omitempty"` CumulativeSumMsgSize uint64 `json:"-"` } diff --git a/execution/gethexec/executionengine.go b/execution/gethexec/executionengine.go index 0ce2b70b4e..2e9364ed9d 100644 --- a/execution/gethexec/executionengine.go +++ b/execution/gethexec/executionengine.go @@ -50,6 +50,7 @@ import ( "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/execution" "github.com/offchainlabs/nitro/util/arbmath" + "github.com/offchainlabs/nitro/util/containers" "github.com/offchainlabs/nitro/util/sharedmetrics" "github.com/offchainlabs/nitro/util/stopwaiter" ) @@ -643,7 +644,15 @@ func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes. } blockMetadata := s.blockMetadataFromBlock(block, timeboostedTxs) - _, err = s.consensus.WriteMessageFromSequencer(msgIdx, msgWithMeta, *msgResult, blockMetadata).Await(s.GetContext()) + + msgWithInfo := arbostypes.MessageWithMetadataAndBlockInfo{ + MessageWithMeta: msgWithMeta, + BlockHash: &msgResult.BlockHash, + BlockMetadata: blockMetadata, + ArbOSVersion: types.DeserializeHeaderExtraInformation(block.Header()).ArbOSFormatVersion, + } + + _, err = s.consensus.WriteMessageFromSequencer(msgIdx, msgWithInfo).Await(s.GetContext()) if err != nil { return nil, err } @@ -722,7 +731,14 @@ func (s *ExecutionEngine) sequenceDelayedMessageWithBlockMutex(message *arbostyp return nil, err } - _, err = s.consensus.WriteMessageFromSequencer(msgIdx, messageWithMeta, *msgResult, s.blockMetadataFromBlock(block, nil)).Await(s.GetContext()) + msgWithInfo := arbostypes.MessageWithMetadataAndBlockInfo{ + MessageWithMeta: messageWithMeta, + BlockHash: &msgResult.BlockHash, + BlockMetadata: s.blockMetadataFromBlock(block, nil), + ArbOSVersion: types.DeserializeHeaderExtraInformation(block.Header()).ArbOSFormatVersion, + } + + _, err = s.consensus.WriteMessageFromSequencer(msgIdx, msgWithInfo).Await(s.GetContext()) if err != nil { return nil, err } @@ -1061,13 +1077,13 @@ func (s *ExecutionEngine) digestMessageWithBlockMutex(msgIdxToDigest arbutil.Mes return msgResult, nil } -func (s *ExecutionEngine) ArbOSVersionForMessageIndex(msgIdx arbutil.MessageIndex) (uint64, error) { +func (s *ExecutionEngine) ArbOSVersionForMessageIndex(msgIdx arbutil.MessageIndex) containers.PromiseInterface[uint64] { block := s.bc.GetBlockByNumber(s.MessageIndexToBlockNumber(msgIdx)) if block == nil { - return 0, fmt.Errorf("couldn't find block for message index %d", msgIdx) + return containers.NewReadyPromise(uint64(0), fmt.Errorf("couldn't find block for message index %d", msgIdx)) } extra := types.DeserializeHeaderExtraInformation(block.Header()) - return extra.ArbOSFormatVersion, nil + return containers.NewReadyPromise(extra.ArbOSFormatVersion, nil) } func (s *ExecutionEngine) Start(ctx_in context.Context) { diff --git a/execution/gethexec/node.go b/execution/gethexec/node.go index 9b98330cd4..2359388add 100644 --- a/execution/gethexec/node.go +++ b/execution/gethexec/node.go @@ -488,7 +488,7 @@ func (n *ExecutionNode) SequenceDelayedMessage(message *arbostypes.L1IncomingMes func (n *ExecutionNode) ResultAtMessageIndex(msgIdx arbutil.MessageIndex) containers.PromiseInterface[*execution.MessageResult] { return containers.NewReadyPromise(n.ExecEngine.ResultAtMessageIndex(msgIdx)) } -func (n *ExecutionNode) ArbOSVersionForMessageIndex(msgIdx arbutil.MessageIndex) (uint64, error) { +func (n *ExecutionNode) ArbOSVersionForMessageIndex(msgIdx arbutil.MessageIndex) containers.PromiseInterface[uint64] { return n.ExecEngine.ArbOSVersionForMessageIndex(msgIdx) } diff --git a/execution/interface.go b/execution/interface.go index 756b32e5ab..9e5b52bd7d 100644 --- a/execution/interface.go +++ b/execution/interface.go @@ -54,6 +54,7 @@ type ExecutionClient interface { ResultAtMessageIndex(msgIdx arbutil.MessageIndex) containers.PromiseInterface[*MessageResult] MessageIndexToBlockNumber(messageNum arbutil.MessageIndex) containers.PromiseInterface[uint64] BlockNumberToMessageIndex(blockNum uint64) containers.PromiseInterface[arbutil.MessageIndex] + ArbOSVersionGetter SetFinalityData(ctx context.Context, safeFinalityData *arbutil.FinalityData, finalizedFinalityData *arbutil.FinalityData, validatedFinalityData *arbutil.FinalityData) containers.PromiseInterface[struct{}] SetConsensusSyncData(ctx context.Context, syncData *ConsensusSyncData) containers.PromiseInterface[struct{}] MarkFeedStart(to arbutil.MessageIndex) containers.PromiseInterface[struct{}] @@ -91,8 +92,8 @@ type ExecutionSequencer interface { } // needed for batch poster -type ExecutionBatchPoster interface { - ArbOSVersionForMessageIndex(msgIdx arbutil.MessageIndex) (uint64, error) +type ArbOSVersionGetter interface { + ArbOSVersionForMessageIndex(msgIdx arbutil.MessageIndex) containers.PromiseInterface[uint64] } // not implemented in execution, used as input @@ -107,7 +108,7 @@ type ConsensusInfo interface { } type ConsensusSequencer interface { - WriteMessageFromSequencer(msgIdx arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata, msgResult MessageResult, blockMetadata common.BlockMetadata) containers.PromiseInterface[struct{}] + WriteMessageFromSequencer(msgIdx arbutil.MessageIndex, msgWithInfo arbostypes.MessageWithMetadataAndBlockInfo) containers.PromiseInterface[struct{}] ExpectChosenSequencer() containers.PromiseInterface[struct{}] }