Skip to content
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
3f2a32a
Implement exclusion
pmikolajczyk41 Oct 16, 2025
2c3fc15
nil check
pmikolajczyk41 Oct 16, 2025
02e3554
Fix BroadcastSingle
pmikolajczyk41 Oct 16, 2025
f1c13d8
remove unused function
pmikolajczyk41 Oct 16, 2025
b99fe2a
checkpoint
pmikolajczyk41 Oct 16, 2025
61cef7a
Rename interface for getting ArbOS version
pmikolajczyk41 Oct 16, 2025
c4c6fac
Incorporate ArbOSVersionGetter to ExecutionClient
pmikolajczyk41 Oct 16, 2025
77ce721
Fetch version earlier
pmikolajczyk41 Oct 16, 2025
209c94a
Fixes
pmikolajczyk41 Oct 16, 2025
b26bc07
another nil check
pmikolajczyk41 Oct 16, 2025
215920a
Last touch
pmikolajczyk41 Oct 16, 2025
eea4bbf
Test changes
pmikolajczyk41 Oct 16, 2025
980a789
Merge branch 'master' into pmikolajczyk/nit-4019
pmikolajczyk41 Oct 16, 2025
b217b7a
lint
pmikolajczyk41 Oct 16, 2025
dab6739
Make the method async
pmikolajczyk41 Oct 16, 2025
4ef9543
Use the right context
pmikolajczyk41 Oct 16, 2025
b2b40b9
Revert "Use the right context"
pmikolajczyk41 Oct 16, 2025
f5885b2
// lint:require-exhaustive-initialization
pmikolajczyk41 Oct 16, 2025
eb99672
Change API of the broadcaster
pmikolajczyk41 Oct 16, 2025
abfdc46
Adapt to it (1/2)
pmikolajczyk41 Oct 16, 2025
94f7fad
Adapt to it (2/2)
pmikolajczyk41 Oct 16, 2025
ca62807
lint
pmikolajczyk41 Oct 16, 2025
e9fb8a0
do not change geth
pmikolajczyk41 Oct 16, 2025
2f433eb
Merge branch 'master' into pmikolajczyk/nit-4019
pmikolajczyk41 Oct 16, 2025
33ade1b
Do not fail when there's no arbos version
pmikolajczyk41 Oct 16, 2025
3c8a7d8
Fill the new field
pmikolajczyk41 Oct 16, 2025
a3cbc86
lint
pmikolajczyk41 Oct 16, 2025
0f598ca
nil metadata
pmikolajczyk41 Oct 16, 2025
d87f9f5
aaaaaaaa
pmikolajczyk41 Oct 16, 2025
740871f
apply suggestions
pmikolajczyk41 Oct 16, 2025
81f3666
Merge branch 'master' into pmikolajczyk/nit-4019
joshuacolvin0 Oct 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type BatchPoster struct {
l1Reader *headerreader.HeaderReader
inbox *InboxTracker
streamer *TransactionStreamer
arbOSVersionGetter execution.ExecutionBatchPoster
arbOSVersionGetter execution.ArbOSVersionGetter
config BatchPosterConfigFetcher
seqInbox *bridgegen.SequencerInbox
syncMonitor *SyncMonitor
Expand Down Expand Up @@ -323,7 +323,7 @@ type BatchPosterOpts struct {
L1Reader *headerreader.HeaderReader
Inbox *InboxTracker
Streamer *TransactionStreamer
VersionGetter execution.ExecutionBatchPoster
VersionGetter execution.ArbOSVersionGetter
SyncMonitor *SyncMonitor
Config BatchPosterConfigFetcher
DeployInfo *chaininfo.RollupAddresses
Expand Down Expand Up @@ -1367,7 +1367,7 @@ func (b *BatchPoster) MaybePostSequencerBatch(ctx context.Context) (bool, error)
var use4844 bool
config := b.config()
if config.Post4844Blobs && b.dapWriter == nil && latestHeader.ExcessBlobGas != nil && latestHeader.BlobGasUsed != nil {
arbOSVersion, err := b.arbOSVersionGetter.ArbOSVersionForMessageIndex(arbutil.MessageIndex(arbmath.SaturatingUSub(uint64(batchPosition.MessageCount), 1)))
arbOSVersion, err := b.arbOSVersionGetter.ArbOSVersionForMessageIndex(arbutil.MessageIndex(arbmath.SaturatingUSub(uint64(batchPosition.MessageCount), 1))).Await(ctx)
if err != nil {
return false, err
}
Expand Down
4 changes: 4 additions & 0 deletions arbnode/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ func (w *execClientWrapper) BlockNumberToMessageIndex(blockNum uint64) container
return containers.NewReadyPromise(w.ExecutionEngine.BlockNumberToMessageIndex(blockNum))
}

func (w *execClientWrapper) ArbOSVersionForMessageIndex(msgIdx arbutil.MessageIndex) containers.PromiseInterface[uint64] {
return w.ExecutionEngine.ArbOSVersionForMessageIndex(msgIdx)
}

func (w *execClientWrapper) StopAndWait() {
}

Expand Down
23 changes: 20 additions & 3 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (t *InboxTracker) FindInboxBatchContainingMessage(pos arbutil.MessageIndex)
}
}

func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcaster) error {
func (t *InboxTracker) PopulateFeedBacklog(ctx context.Context, broadcastServer *broadcaster.Broadcaster) error {
batchCount, err := t.GetBatchCount()
if err != nil {
return fmt.Errorf("error getting batch count: %w", err)
Expand All @@ -290,6 +290,11 @@ func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcas
return fmt.Errorf("error getting batch %v message count: %w", batchIndex, err)
}
}

if t.txStreamer == nil {
return errors.New("txStreamer is nil")
}

messageCount, err := t.txStreamer.GetMessageCount()
if err != nil {
return fmt.Errorf("error getting tx streamer message count: %w", err)
Expand All @@ -309,10 +314,22 @@ func (t *InboxTracker) PopulateFeedBacklog(broadcastServer *broadcaster.Broadcas

blockMetadata, err := t.txStreamer.BlockMetadataAtMessageIndex(seqNum)
if err != nil {
log.Warn("Error getting blockMetadata byte array from tx streamer", "err", err)
log.Warn("error getting blockMetadata byte array from tx streamer", "err", err)
}

arbOSVersion, err := t.txStreamer.exec.ArbOSVersionForMessageIndex(seqNum).Await(ctx)
if err != nil {
log.Warn("error getting ArbOS version for message %v: %w", seqNum, err)
}

messageWithInfo := arbostypes.MessageWithMetadataAndBlockInfo{
MessageWithMeta: *message,
BlockHash: blockHash,
BlockMetadata: blockMetadata,
ArbOSVersion: arbOSVersion,
}

feedMessage, err := broadcastServer.NewBroadcastFeedMessage(*message, seqNum, blockHash, blockMetadata)
feedMessage, err := broadcastServer.NewBroadcastFeedMessage(messageWithInfo, seqNum)
if err != nil {
return fmt.Errorf("error creating broadcast feed message %v: %w", seqNum, err)
}
Expand Down
24 changes: 12 additions & 12 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ func getBatchPoster(
l1Reader *headerreader.HeaderReader,
inboxTracker *InboxTracker,
txStreamer *TransactionStreamer,
exec execution.ExecutionBatchPoster,
arbOSVersionGetter execution.ArbOSVersionGetter,
arbDb ethdb.Database,
syncMonitor *SyncMonitor,
deployInfo *chaininfo.RollupAddresses,
Expand All @@ -933,8 +933,8 @@ func getBatchPoster(
) (*BatchPoster, error) {
var batchPoster *BatchPoster
if config.BatchPoster.Enable {
if exec == nil {
return nil, errors.New("batch poster requires an execution batch poster")
if arbOSVersionGetter == nil {
return nil, errors.New("batch poster requires ArbOS version getter")
}

if txOptsBatchPoster == nil && config.BatchPoster.DataPoster.ExternalSigner.URL == "" {
Expand All @@ -949,7 +949,7 @@ func getBatchPoster(
L1Reader: l1Reader,
Inbox: inboxTracker,
Streamer: txStreamer,
VersionGetter: exec,
VersionGetter: arbOSVersionGetter,
SyncMonitor: syncMonitor,
Config: func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster },
DeployInfo: deployInfo,
Expand Down Expand Up @@ -1056,7 +1056,7 @@ func createNodeImpl(
executionClient execution.ExecutionClient,
executionSequencer execution.ExecutionSequencer,
executionRecorder execution.ExecutionRecorder,
executionBatchPoster execution.ExecutionBatchPoster,
arbOSVersionGetter execution.ArbOSVersionGetter,
arbDb ethdb.Database,
configFetcher ConfigFetcher,
l2Config *params.ChainConfig,
Expand Down Expand Up @@ -1153,7 +1153,7 @@ func createNodeImpl(
return nil, err
}

batchPoster, err := getBatchPoster(ctx, config, configFetcher, txOptsBatchPoster, dapWriter, l1Reader, inboxTracker, txStreamer, executionBatchPoster, arbDb, syncMonitor, deployInfo, parentChainID, dapReaders, stakerAddr)
batchPoster, err := getBatchPoster(ctx, config, configFetcher, txOptsBatchPoster, dapWriter, l1Reader, inboxTracker, txStreamer, arbOSVersionGetter, arbDb, syncMonitor, deployInfo, parentChainID, dapReaders, stakerAddr)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1308,7 +1308,7 @@ func CreateNodeFullExecutionClient(
executionClient execution.ExecutionClient,
executionSequencer execution.ExecutionSequencer,
executionRecorder execution.ExecutionRecorder,
executionBatchPoster execution.ExecutionBatchPoster,
arbOSVersionGetter execution.ArbOSVersionGetter,
arbDb ethdb.Database,
configFetcher ConfigFetcher,
l2Config *params.ChainConfig,
Expand All @@ -1322,10 +1322,10 @@ func CreateNodeFullExecutionClient(
blobReader daprovider.BlobReader,
latestWasmModuleRoot common.Hash,
) (*Node, error) {
if (executionClient == nil) || (executionSequencer == nil) || (executionRecorder == nil) || (executionBatchPoster == nil) {
if (executionClient == nil) || (executionSequencer == nil) || (executionRecorder == nil) || (arbOSVersionGetter == nil) {
return nil, errors.New("execution client, sequencer, recorder, and batch poster must be non-nil")
}
currentNode, err := createNodeImpl(ctx, stack, executionClient, executionSequencer, executionRecorder, executionBatchPoster, arbDb, configFetcher, l2Config, l1client, deployInfo, txOptsValidator, txOptsBatchPoster, dataSigner, fatalErrChan, parentChainID, blobReader, latestWasmModuleRoot)
currentNode, err := createNodeImpl(ctx, stack, executionClient, executionSequencer, executionRecorder, arbOSVersionGetter, arbDb, configFetcher, l2Config, l1client, deployInfo, txOptsValidator, txOptsBatchPoster, dataSigner, fatalErrChan, parentChainID, blobReader, latestWasmModuleRoot)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1386,7 +1386,7 @@ func (n *Node) Start(ctx context.Context) error {
if n.InboxTracker != nil && n.BroadcastServer != nil {
// Even if the sequencer coordinator will populate this backlog,
// we want to make sure it's populated before any clients connect.
err = n.InboxTracker.PopulateFeedBacklog(n.BroadcastServer)
err = n.InboxTracker.PopulateFeedBacklog(ctx, n.BroadcastServer)
if err != nil {
return fmt.Errorf("error populating feed backlog on startup: %w", err)
}
Expand Down Expand Up @@ -1556,8 +1556,8 @@ func (n *Node) GetBatchParentChainBlock(seqNum uint64) containers.PromiseInterfa
return containers.NewReadyPromise(n.InboxTracker.GetBatchParentChainBlock(seqNum))
}

func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithMeta arbostypes.MessageWithMetadata, msgResult execution.MessageResult, blockMetadata common.BlockMetadata) containers.PromiseInterface[struct{}] {
err := n.TxStreamer.WriteMessageFromSequencer(pos, msgWithMeta, msgResult, blockMetadata)
func (n *Node) WriteMessageFromSequencer(pos arbutil.MessageIndex, msgWithInfo arbostypes.MessageWithMetadataAndBlockInfo) containers.PromiseInterface[struct{}] {
err := n.TxStreamer.WriteMessageFromSequencer(pos, msgWithInfo)
return containers.NewReadyPromise(struct{}{}, err)
}

Expand Down
46 changes: 26 additions & 20 deletions arbnode/transaction_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,18 @@ func (s *TransactionStreamer) addMessagesAndReorg(batch ethdb.Batch, msgIdxOfFir

messagesWithComputedBlockHash := make([]arbostypes.MessageWithMetadataAndBlockInfo, 0, len(messagesResults))
for i := 0; i < len(messagesResults); i++ {
// #nosec G115
arbOSVersion, err := s.exec.ArbOSVersionForMessageIndex(msgIdxOfFirstMsgToAdd + arbutil.MessageIndex(i)).Await(s.GetContext())
if err != nil {
// #nosec G115
log.Warn("error getting arbOS version for message", "msgIdx", msgIdxOfFirstMsgToAdd+arbutil.MessageIndex(i), "err", err)
}

messagesWithComputedBlockHash = append(messagesWithComputedBlockHash, arbostypes.MessageWithMetadataAndBlockInfo{
MessageWithMeta: newMessages[i].MessageWithMeta,
BlockHash: &messagesResults[i].BlockHash,
BlockMetadata: nil,
ArbOSVersion: arbOSVersion,
})
}
s.broadcastMessages(messagesWithComputedBlockHash, msgIdxOfFirstMsgToAdd)
Expand Down Expand Up @@ -525,10 +534,16 @@ func (s *TransactionStreamer) getMessageWithMetadataAndBlockInfo(msgIdx arbutil.
return nil, err
}

arbOSVersion, err := s.exec.ArbOSVersionForMessageIndex(msgIdx).Await(s.GetContext())
if err != nil {
log.Warn("Failed to get ArbOS version for message", "msgIdx", msgIdx, "err", err)
}

msgWithBlockInfo := arbostypes.MessageWithMetadataAndBlockInfo{
MessageWithMeta: *msg,
BlockHash: blockHash,
BlockMetadata: blockMetadata,
ArbOSVersion: arbOSVersion,
}
return &msgWithBlockInfo, nil
}
Expand Down Expand Up @@ -610,6 +625,7 @@ func (s *TransactionStreamer) AddBroadcastMessages(feedMessages []*message.Broad
MessageWithMeta: feedMessage.Message,
BlockHash: feedMessage.BlockHash,
BlockMetadata: feedMessage.BlockMetadata,
ArbOSVersion: feedMessage.ArbOSVersion,
}
messages = append(messages, msgWithBlockInfo)
expectedMsgIdx++
Expand Down Expand Up @@ -736,6 +752,9 @@ func (s *TransactionStreamer) AddMessagesAndEndBatch(firstMsgIdx arbutil.Message
for _, message := range messages {
messagesWithBlockInfo = append(messagesWithBlockInfo, arbostypes.MessageWithMetadataAndBlockInfo{
MessageWithMeta: message,
BlockHash: nil,
BlockMetadata: nil,
ArbOSVersion: 0,
})
}

Expand Down Expand Up @@ -1032,9 +1051,7 @@ func (s *TransactionStreamer) ExpectChosenSequencer() error {

func (s *TransactionStreamer) WriteMessageFromSequencer(
msgIdx arbutil.MessageIndex,
msgWithMeta arbostypes.MessageWithMetadata,
msgResult execution.MessageResult,
blockMetadata common.BlockMetadata,
msgWithInfo arbostypes.MessageWithMetadataAndBlockInfo,
) error {
if err := s.ExpectChosenSequencer(); err != nil {
return err
Expand Down Expand Up @@ -1101,24 +1118,18 @@ func (s *TransactionStreamer) WriteMessageFromSequencer(
}

if s.coordinator != nil {
if err := s.coordinator.SequencingMessage(msgIdx, &msgWithMeta, blockMetadata); err != nil {
if err := s.coordinator.SequencingMessage(msgIdx, &msgWithInfo.MessageWithMeta, msgWithInfo.BlockMetadata); err != nil {
return err
}
}

msgWithBlockInfo := arbostypes.MessageWithMetadataAndBlockInfo{
MessageWithMeta: msgWithMeta,
BlockHash: &msgResult.BlockHash,
BlockMetadata: blockMetadata,
}

if err := s.writeMessages(msgIdx, []arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, nil); err != nil {
if err := s.writeMessages(msgIdx, []arbostypes.MessageWithMetadataAndBlockInfo{msgWithInfo}, nil); err != nil {
return err
}
if s.trackBlockMetadataFrom == 0 || msgIdx < s.trackBlockMetadataFrom {
msgWithBlockInfo.BlockMetadata = nil
msgWithInfo.BlockMetadata = nil
}
s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, msgIdx)
s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithInfo}, msgIdx)

return nil
}
Expand All @@ -1136,7 +1147,7 @@ func (s *TransactionStreamer) PopulateFeedBacklog() error {
if s.broadcastServer == nil || s.inboxReader == nil {
return nil
}
return s.inboxReader.tracker.PopulateFeedBacklog(s.broadcastServer)
return s.inboxReader.tracker.PopulateFeedBacklog(s.GetContext(), s.broadcastServer)
}

func (s *TransactionStreamer) writeMessage(msgIdx arbutil.MessageIndex, msg arbostypes.MessageWithMetadataAndBlockInfo, batch ethdb.Batch) error {
Expand Down Expand Up @@ -1400,12 +1411,7 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context) bool {
return false
}

msgWithBlockInfo := arbostypes.MessageWithMetadataAndBlockInfo{
MessageWithMeta: msgAndBlockInfo.MessageWithMeta,
BlockHash: &msgResult.BlockHash,
BlockMetadata: msgAndBlockInfo.BlockMetadata,
}
s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, msgIdxToExecute)
s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{*msgAndBlockInfo}, msgIdxToExecute)

return msgIdxToExecute+1 <= consensusHeadMsgIdx
}
Expand Down
2 changes: 2 additions & 0 deletions arbos/arbostypes/messagewithmeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ type MessageWithMetadata struct {
DelayedMessagesRead uint64 `json:"delayedMessagesRead"`
}

// lint:require-exhaustive-initialization
type MessageWithMetadataAndBlockInfo struct {
MessageWithMeta MessageWithMetadata
BlockHash *common.Hash
BlockMetadata common.BlockMetadata
ArbOSVersion uint64
}

var EmptyTestMessageWithMetadata = MessageWithMetadata{
Expand Down
21 changes: 15 additions & 6 deletions broadcastclient/broadcastclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ func TestReceiveMessages(t *testing.T) {
})
}

func testMessage() arbostypes.MessageWithMetadataAndBlockInfo {
return arbostypes.MessageWithMetadataAndBlockInfo{
MessageWithMeta: arbostypes.TestMessageWithMetadataAndRequestId,
BlockHash: nil,
BlockMetadata: nil,
ArbOSVersion: 0,
}
}

func testReceiveMessages(t *testing.T, clientCompression bool, serverCompression bool, serverRequire bool, expectNoMessagesReceived bool) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -93,7 +102,7 @@ func testReceiveMessages(t *testing.T, clientCompression bool, serverCompression
go func() {
for i := 0; i < messageCount; i++ {
// #nosec G115
Require(t, b.BroadcastSingle(arbostypes.TestMessageWithMetadataAndRequestId, arbutil.MessageIndex(i), nil, nil))
Require(t, b.BroadcastSingle(testMessage(), arbutil.MessageIndex(i)))
}
}()

Expand Down Expand Up @@ -150,7 +159,7 @@ func TestInvalidSignature(t *testing.T) {
go func() {
for i := 0; i < messageCount; i++ {
// #nosec G115
Require(t, b.BroadcastSingle(arbostypes.TestMessageWithMetadataAndRequestId, arbutil.MessageIndex(i), nil, nil))
Require(t, b.BroadcastSingle(testMessage(), arbutil.MessageIndex(i)))
}
}()

Expand Down Expand Up @@ -313,7 +322,7 @@ func TestServerClientDisconnect(t *testing.T) {
broadcastClient.Start(ctx)

t.Log("broadcasting seq 0 message")
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil, nil))
Require(t, b.BroadcastSingle(testMessage(), 0))

// Wait for client to receive batch to ensure it is connected
timer := time.NewTimer(5 * time.Second)
Expand Down Expand Up @@ -385,7 +394,7 @@ func TestBroadcastClientConfirmedMessage(t *testing.T) {
broadcastClient.Start(ctx)

t.Log("broadcasting seq 0 message")
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil, nil))
Require(t, b.BroadcastSingle(testMessage(), 0))

// Wait for client to receive batch to ensure it is connected
timer := time.NewTimer(5 * time.Second)
Expand Down Expand Up @@ -727,8 +736,8 @@ func TestBroadcasterSendsCachedMessagesOnClientConnect(t *testing.T) {
Require(t, b.Start(ctx))
defer b.StopAndWait()

Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 0, nil, nil))
Require(t, b.BroadcastSingle(arbostypes.EmptyTestMessageWithMetadata, 1, nil, nil))
Require(t, b.BroadcastSingle(testMessage(), 0))
Require(t, b.BroadcastSingle(testMessage(), 1))

var wg sync.WaitGroup
for i := 0; i < 2; i++ {
Expand Down
Loading
Loading