Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
1815cc7
Pass the new limit to the batch poster and use it in the batchSegment…
pmikolajczyk41 Nov 3, 2025
a4fc03f
Set the field for tests
pmikolajczyk41 Nov 3, 2025
6d128d9
Get rid of unnecessary usage of the limit
pmikolajczyk41 Nov 3, 2025
82172d7
Cover inbox usages and tests
pmikolajczyk41 Nov 3, 2025
b7aa16a
Cover MEL
pmikolajczyk41 Nov 3, 2025
94f18b1
Add accessor for the limit in the config
pmikolajczyk41 Nov 10, 2025
4730759
Pass chain config instead of arbitrum
pmikolajczyk41 Nov 10, 2025
75d679f
Avoid nil pointer dereference
pmikolajczyk41 Nov 10, 2025
5238ed6
Add rationale for nil
pmikolajczyk41 Nov 10, 2025
ab40ca2
update pin
pmikolajczyk41 Nov 10, 2025
75057a7
Merge remote-tracking branch 'origin/master' into pmikolajczyk/nit-31…
pmikolajczyk41 Nov 10, 2025
69cab03
fix forkid
pmikolajczyk41 Nov 10, 2025
9a11649
Merge remote-tracking branch 'origin/master' into pmikolajczyk/nit-31…
pmikolajczyk41 Nov 10, 2025
0a0d6af
Merge remote-tracking branch 'refs/remotes/origin/master' into pmikol…
pmikolajczyk41 Dec 5, 2025
776f1b5
Update fork id in test
pmikolajczyk41 Dec 5, 2025
82e8518
Leaving chain info JSON as before (consistent with on-chain info)
pmikolajczyk41 Dec 5, 2025
6ecd436
Add a test for configurable limit in batch poster
pmikolajczyk41 Dec 5, 2025
b078ad4
Add two tests for parsing
pmikolajczyk41 Dec 8, 2025
2e898fd
Lint
pmikolajczyk41 Dec 8, 2025
86125b0
ForkID
pmikolajczyk41 Dec 8, 2025
cdc8678
Merge branch 'master' into pmikolajczyk/nit-3121-uncompressed-batch-s…
pmikolajczyk41 Dec 8, 2025
1c4e943
Remove batch poster system test
pmikolajczyk41 Dec 9, 2025
dec7e98
Rename setup function
pmikolajczyk41 Dec 9, 2025
24a2d00
Make batch size limit arbos-version dependent
pmikolajczyk41 Dec 9, 2025
d8a5394
Use arbos version in batch poster
pmikolajczyk41 Dec 9, 2025
b4d5cce
Use arbos version in inbox and MEL
pmikolajczyk41 Dec 9, 2025
3da6e6f
Checkpoint
pmikolajczyk41 Dec 9, 2025
e4ed04f
checkpoint
pmikolajczyk41 Dec 9, 2025
03233dd
fix tests
pmikolajczyk41 Dec 9, 2025
fd6384f
nil protection
pmikolajczyk41 Dec 9, 2025
c3da851
update pin
pmikolajczyk41 Dec 9, 2025
c6856f1
Merge branch 'master' into pmikolajczyk/nit-3121-uncompressed-batch-s…
pmikolajczyk41 Dec 9, 2025
883a3e1
forbid changing the limit in the chain config
pmikolajczyk41 Dec 10, 2025
cc673af
Merge remote-tracking branch 'origin/master' into pmikolajczyk/nit-31…
pmikolajczyk41 Dec 10, 2025
93cff8b
tiny refactor in replay
pmikolajczyk41 Dec 10, 2025
f6dd87e
pass non-nil arbos version getter
pmikolajczyk41 Dec 10, 2025
449b987
Use the previous message index
pmikolajczyk41 Dec 10, 2025
a9d33d8
Merge remote-tracking branch 'origin/master' into pmikolajczyk/nit-31…
pmikolajczyk41 Dec 11, 2025
f1872b4
Fixed implementation
pmikolajczyk41 Dec 11, 2025
072f95e
Set correct version in test
pmikolajczyk41 Dec 11, 2025
c491e88
lint
pmikolajczyk41 Dec 11, 2025
4a1b615
Be more resilient
pmikolajczyk41 Dec 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 25 additions & 19 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2022, Offchain Labs, Inc.
// Copyright 2021-2025, Offchain Labs, Inc.
// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md

package arbnode
Expand Down Expand Up @@ -104,6 +104,7 @@ type BatchPoster struct {
inbox *InboxTracker
streamer *TransactionStreamer
arbOSVersionGetter execution.ArbOSVersionGetter
chainConfig *params.ChainConfig
config BatchPosterConfigFetcher
seqInbox *bridgegen.SequencerInbox
syncMonitor *SyncMonitor
Expand Down Expand Up @@ -359,6 +360,7 @@ type BatchPosterOpts struct {
DAPWriters []daprovider.Writer
ParentChainID *big.Int
DAPReaders *daprovider.DAProviderRegistry
ChainConfig *params.ChainConfig
}

func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, error) {
Expand Down Expand Up @@ -405,6 +407,7 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
inbox: opts.Inbox,
streamer: opts.Streamer,
arbOSVersionGetter: opts.VersionGetter,
chainConfig: opts.ChainConfig,
syncMonitor: opts.SyncMonitor,
config: opts.Config,
seqInbox: seqInbox,
Expand Down Expand Up @@ -903,6 +906,7 @@ type batchSegments struct {
recompressionLevel int
newUncompressedSize int
totalUncompressedSize int
maxUncompressedSize int
lastCompressedSize int
trailingHeaders int // how many trailing segments are headers
isDone bool
Expand All @@ -920,7 +924,7 @@ type buildingBatch struct {
firstUsefulMsg *arbostypes.MessageWithMetadata
}

func (b *BatchPoster) newBatchSegments(ctx context.Context, firstDelayed uint64, use4844 bool, usingAltDA bool) (*batchSegments, error) {
func (b *BatchPoster) newBatchSegments(ctx context.Context, firstDelayed uint64, use4844 bool, usingAltDA bool, arbosVersion uint64) (*batchSegments, error) {
config := b.config()
var maxSize int

Expand Down Expand Up @@ -983,12 +987,13 @@ func (b *BatchPoster) newBatchSegments(ctx context.Context, firstDelayed uint64,
recompressionLevel = compressionLevel
}
return &batchSegments{
compressedBuffer: compressedBuffer,
compressedWriter: brotli.NewWriterLevel(compressedBuffer, compressionLevel),
sizeLimit: maxSize,
recompressionLevel: recompressionLevel,
rawSegments: make([][]byte, 0, 128),
delayedMsg: firstDelayed,
compressedBuffer: compressedBuffer,
compressedWriter: brotli.NewWriterLevel(compressedBuffer, compressionLevel),
sizeLimit: maxSize,
recompressionLevel: recompressionLevel,
rawSegments: make([][]byte, 0, 128),
delayedMsg: firstDelayed,
maxUncompressedSize: int(b.chainConfig.MaxUncompressedBatchSize(arbosVersion)), // #nosec G115
}, nil
}

Expand All @@ -1003,8 +1008,8 @@ func (s *batchSegments) recompressAll() error {
return err
}
}
if s.totalUncompressedSize > arbstate.MaxDecompressedLen {
return fmt.Errorf("batch size %v exceeds maximum decompressed length %v", s.totalUncompressedSize, arbstate.MaxDecompressedLen)
if s.totalUncompressedSize > s.maxUncompressedSize {
return fmt.Errorf("batch size %v exceeds maximum uncompressed length %v", s.totalUncompressedSize, s.maxUncompressedSize)
}
if len(s.rawSegments) >= arbstate.MaxSegmentsPerSequencerMessage {
return fmt.Errorf("number of raw segments %v excees maximum number %v", len(s.rawSegments), arbstate.MaxSegmentsPerSequencerMessage)
Expand All @@ -1014,10 +1019,10 @@ func (s *batchSegments) recompressAll() error {

func (s *batchSegments) testForOverflow(isHeader bool) (bool, error) {
// we've reached the max decompressed size
if s.totalUncompressedSize > arbstate.MaxDecompressedLen {
log.Info("Batch full: max decompressed length exceeded",
if s.totalUncompressedSize > s.maxUncompressedSize {
log.Info("Batch full: max uncompressed length exceeded",
"current", s.totalUncompressedSize,
"max", arbstate.MaxDecompressedLen,
"max", s.maxUncompressedSize,
"isHeader", isHeader)
return true, nil
}
Expand Down Expand Up @@ -1410,6 +1415,11 @@ func (b *BatchPoster) MaybePostSequencerBatch(ctx context.Context) (bool, error)
if dbBatchCount > batchPosition.NextSeqNum {
return false, fmt.Errorf("attempting to post batch %v, but the local inbox tracker database already has %v batches", batchPosition.NextSeqNum, dbBatchCount)
}

arbOSVersion, err := b.arbOSVersionGetter.ArbOSVersionForMessageIndex(arbutil.MessageIndex(arbmath.SaturatingUSub(uint64(batchPosition.MessageCount), 1))).Await(ctx)
if err != nil {
return false, err
}
if b.building == nil || b.building.startMsgCount != batchPosition.MessageCount {
latestHeader, err := b.l1Reader.LastHeader(ctx)
if err != nil {
Expand All @@ -1423,10 +1433,6 @@ func (b *BatchPoster) MaybePostSequencerBatch(ctx context.Context) (bool, error)
config.Post4844Blobs &&
latestHeader.ExcessBlobGas != nil &&
latestHeader.BlobGasUsed != nil {
arbOSVersion, err := b.arbOSVersionGetter.ArbOSVersionForMessageIndex(arbutil.MessageIndex(arbmath.SaturatingUSub(uint64(batchPosition.MessageCount), 1))).Await(ctx)
if err != nil {
return false, err
}
if arbOSVersion >= params.ArbosVersion_20 {
if config.IgnoreBlobPrice {
use4844 = true
Expand Down Expand Up @@ -1483,7 +1489,7 @@ func (b *BatchPoster) MaybePostSequencerBatch(ctx context.Context) (bool, error)
// Only use 4844 batching when posting to EthDA
use4844 = use4844 && buildingForEthDA
usingAltDA := !buildingForEthDA
segments, err := b.newBatchSegments(ctx, batchPosition.DelayedMessageCount, use4844, usingAltDA)
segments, err := b.newBatchSegments(ctx, batchPosition.DelayedMessageCount, use4844, usingAltDA, arbOSVersion)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -1928,7 +1934,7 @@ func (b *BatchPoster) MaybePostSequencerBatch(ctx context.Context) (bool, error)
b.building.muxBackend.seqMsg = seqMsg
b.building.muxBackend.delayedInboxStart = batchPosition.DelayedMessageCount
b.building.muxBackend.SetPositionWithinMessage(0)
simMux := arbstate.NewInboxMultiplexer(b.building.muxBackend, batchPosition.DelayedMessageCount, dapReaders, daprovider.KeysetValidate)
simMux := arbstate.NewInboxMultiplexer(b.building.muxBackend, batchPosition.DelayedMessageCount, dapReaders, daprovider.KeysetValidate, b.chainConfig, arbOSVersion)
log.Debug("Begin checking the correctness of batch against inbox multiplexer", "startMsgSeqNum", batchPosition.MessageCount, "endMsgSeqNum", b.building.msgCount-1)
for i := batchPosition.MessageCount; i < b.building.msgCount; i++ {
msg, err := simMux.Pop(ctx)
Expand Down
11 changes: 9 additions & 2 deletions arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,22 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"

"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
)

const arbosVersion = params.ArbosVersion_50

func TestSequencerReorgFromDelayed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

exec, streamer, db, _ := NewTransactionStreamerForTest(t, ctx, common.Address{})
tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig)
arbosVersionGetter := execution.ConstArbosVersionGetter{Version: arbosVersion}
tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig, &arbosVersionGetter, arbosVersion)
Require(t, err)

err = streamer.Start(ctx)
Expand Down Expand Up @@ -220,7 +225,9 @@ func TestSequencerReorgFromLastDelayedMsg(t *testing.T) {
defer cancel()

exec, streamer, db, _ := NewTransactionStreamerForTest(t, ctx, common.Address{})
tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig)
arbosVersionGetter := execution.ConstArbosVersionGetter{Version: arbosVersion}

tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig, &arbosVersionGetter, arbosVersion)
Require(t, err)

err = streamer.Start(ctx)
Expand Down
47 changes: 33 additions & 14 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2021-2022, Offchain Labs, Inc.
// Copyright 2021-2025, Offchain Labs, Inc.
// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md

package arbnode
Expand All @@ -25,7 +25,9 @@ import (
"github.com/offchainlabs/nitro/broadcaster"
"github.com/offchainlabs/nitro/broadcaster/message"
"github.com/offchainlabs/nitro/daprovider"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/staker"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/containers"
)

Expand All @@ -35,24 +37,28 @@ var (
)

type InboxTracker struct {
db ethdb.Database
txStreamer *TransactionStreamer
mutex sync.Mutex
validator *staker.BlockValidator
dapReaders *daprovider.DAProviderRegistry
snapSyncConfig SnapSyncConfig
db ethdb.Database
txStreamer *TransactionStreamer
mutex sync.Mutex
validator *staker.BlockValidator
dapReaders *daprovider.DAProviderRegistry
snapSyncConfig SnapSyncConfig
arbosVersionGetter execution.ArbOSVersionGetter
lastSeenArbosVersion uint64

batchMetaMutex sync.Mutex
batchMeta *containers.LruCache[uint64, BatchMetadata]
}

func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, dapReaders *daprovider.DAProviderRegistry, snapSyncConfig SnapSyncConfig) (*InboxTracker, error) {
func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, dapReaders *daprovider.DAProviderRegistry, snapSyncConfig SnapSyncConfig, arbosVersionGetter execution.ArbOSVersionGetter, initialArbosVersion uint64) (*InboxTracker, error) {
tracker := &InboxTracker{
db: db,
txStreamer: txStreamer,
dapReaders: dapReaders,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
snapSyncConfig: snapSyncConfig,
db: db,
txStreamer: txStreamer,
dapReaders: dapReaders,
batchMeta: containers.NewLruCache[uint64, BatchMetadata](1000),
snapSyncConfig: snapSyncConfig,
arbosVersionGetter: arbosVersionGetter,
lastSeenArbosVersion: initialArbosVersion,
}
return tracker, nil
}
Expand Down Expand Up @@ -781,7 +787,20 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client *ethclien
ctx: ctx,
client: client,
}
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, t.dapReaders, daprovider.KeysetValidate)
recentArbosVersion, err := t.arbosVersionGetter.ArbOSVersionForMessageIndex(arbmath.SaturatingUSub(prevbatchmeta.MessageCount, 1)).Await(ctx)
if err != nil {
recentArbosVersion = t.lastSeenArbosVersion
} else {
t.lastSeenArbosVersion = recentArbosVersion
}
multiplexer := arbstate.NewInboxMultiplexer(
backend,
prevbatchmeta.DelayedMessageCount,
t.dapReaders,
daprovider.KeysetValidate,
t.txStreamer.chainConfig,
recentArbosVersion,
)
batchMessageCounts := make(map[uint64]arbutil.MessageIndex)
currentPos := prevbatchmeta.MessageCount + 1
for {
Expand Down
16 changes: 16 additions & 0 deletions arbnode/mel/extraction/message_extraction_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"

"github.com/offchainlabs/nitro/arbnode/mel"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/daprovider"
"github.com/offchainlabs/nitro/execution"
)

// Defines a method that can read a delayed message from an external database.
Expand Down Expand Up @@ -55,11 +58,14 @@ func ExtractMessages(
delayedMsgDatabase DelayedMessageDatabase,
receiptFetcher ReceiptFetcher,
txsFetcher TransactionsFetcher,
chainConfig *params.ChainConfig,
arbosVersionGetter execution.ArbOSVersionGetter,
) (*mel.State, []*arbostypes.MessageWithMetadata, []*mel.DelayedInboxMessage, error) {
return extractMessagesImpl(
ctx,
inputState,
parentChainHeader,
chainConfig,
dataProviders,
delayedMsgDatabase,
txsFetcher,
Expand All @@ -71,6 +77,7 @@ func ExtractMessages(
messagesFromBatchSegments,
arbstate.ParseSequencerMessage,
arbostypes.ParseBatchPostingReportMessageFields,
arbosVersionGetter,
)
}

Expand All @@ -81,6 +88,7 @@ func extractMessagesImpl(
ctx context.Context,
inputState *mel.State,
parentChainHeader *types.Header,
chainConfig *params.ChainConfig,
dataProviders *daprovider.DAProviderRegistry,
delayedMsgDatabase DelayedMessageDatabase,
txsFetcher TransactionsFetcher,
Expand All @@ -92,6 +100,7 @@ func extractMessagesImpl(
extractBatchMessages batchMsgExtractionFunc,
parseSequencerMessage sequencerMessageParserFunc,
parseBatchPostingReport batchPostingReportParserFunc,
arbosVersionGetter execution.ArbOSVersionGetter,
) (*mel.State, []*arbostypes.MessageWithMetadata, []*mel.DelayedInboxMessage, error) {

state := inputState.Clone()
Expand Down Expand Up @@ -193,13 +202,20 @@ func extractMessagesImpl(
return nil, nil, nil, errors.New("encountered initialize message that is not the first delayed message and the first batch ")
}

arbosVersion, err := arbosVersionGetter.ArbOSVersionForMessageIndex(arbutil.MessageIndex(state.MsgCount)).Await(ctx)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to get Arbos version for message index %d: %w", state.MsgCount, err)
}

rawSequencerMsg, err := parseSequencerMessage(
ctx,
batch.SequenceNumber,
batch.BlockHash,
serialized,
dataProviders,
daprovider.KeysetValidate,
chainConfig,
arbosVersion,
)
if err != nil {
return nil, nil, nil, err
Expand Down
13 changes: 12 additions & 1 deletion arbnode/mel/extraction/message_extraction_function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"

"github.com/offchainlabs/nitro/arbnode/mel"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/cmd/chaininfo"
"github.com/offchainlabs/nitro/daprovider"
"github.com/offchainlabs/nitro/execution"
)

func TestExtractMessages(t *testing.T) {
Expand All @@ -31,7 +34,7 @@ func TestExtractMessages(t *testing.T) {
lookupDelayedMsgs func(context.Context, *mel.State, *types.Header, ReceiptFetcher, TransactionsFetcher) ([]*mel.DelayedInboxMessage, error)
serializer func(context.Context, *mel.SequencerInboxBatch, *types.Transaction, uint, ReceiptFetcher) ([]byte, error)
parseReport func(io.Reader) (*big.Int, common.Address, common.Hash, uint64, *big.Int, uint64, error)
parseSequencerMsg func(context.Context, uint64, common.Hash, []byte, arbstate.DapReaderSource, daprovider.KeysetValidationMode) (*arbstate.SequencerMessage, error)
parseSequencerMsg func(context.Context, uint64, common.Hash, []byte, arbstate.DapReaderSource, daprovider.KeysetValidationMode, *params.ChainConfig, uint64) (*arbstate.SequencerMessage, error)
extractBatchMessages func(context.Context, *mel.State, *arbstate.SequencerMessage, DelayedMessageDatabase) ([]*arbostypes.MessageWithMetadata, error)
expectedError string
expectedMsgCount uint64
Expand Down Expand Up @@ -150,13 +153,16 @@ func TestExtractMessages(t *testing.T) {
nil,
nil,
txsFetcher,
chaininfo.ArbitrumDevTestChainConfig(),
&execution.ConstArbosVersionGetter{Version: params.ArbosVersion_50},
)
} else {
// Test the internal extractMessagesImpl function
postState, messages, delayedMessages, err = extractMessagesImpl(
ctx,
melState,
header,
chaininfo.ArbitrumDevTestChainConfig(),
nil,
nil,
txsFetcher,
Expand All @@ -168,6 +174,7 @@ func TestExtractMessages(t *testing.T) {
tt.extractBatchMessages,
tt.parseSequencerMsg,
tt.parseReport,
&execution.ConstArbosVersionGetter{Version: params.ArbosVersion_50},
)
}

Expand Down Expand Up @@ -321,6 +328,8 @@ func successfulParseSequencerMsg(
data []byte,
dapReaders arbstate.DapReaderSource,
keysetValidationMode daprovider.KeysetValidationMode,
chainConfig *params.ChainConfig,
arbosVersion uint64,
) (*arbstate.SequencerMessage, error) {
return nil, nil
}
Expand All @@ -332,6 +341,8 @@ func failingParseSequencerMsg(
data []byte,
dapReaders arbstate.DapReaderSource,
keysetValidationMode daprovider.KeysetValidationMode,
chainConfig *params.ChainConfig,
arbosVersion uint64,
) (*arbstate.SequencerMessage, error) {
return nil, errors.New("failed to parse sequencer message")
}
Expand Down
Loading
Loading