Skip to content

Commit 32d3a70

Browse files
Merge branch 'master' into fix-historical-feed-flood
2 parents 9814b5d + a7ff1b3 commit 32d3a70

40 files changed

+428
-190
lines changed

arbnode/batch_poster.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@ var TestBatchPosterConfig = BatchPosterConfig{
310310
L1BlockBound: "",
311311
L1BlockBoundBypass: time.Hour,
312312
UseAccessLists: true,
313+
RedisLock: redislock.TestCfg,
313314
GasEstimateBaseFeeMultipleBips: arbmath.OneInUBips * 3 / 2,
314315
CheckBatchCorrectness: true,
315316
DelayBufferThresholdMargin: 0,
@@ -1647,12 +1648,14 @@ func (b *BatchPoster) MaybePostSequencerBatch(ctx context.Context) (bool, error)
16471648
}
16481649

16491650
sequencerMsg, err := b.building.segments.CloseAndGetBytes()
1651+
defer func() {
1652+
b.building = nil // a closed batchSegments can't be reused
1653+
}()
16501654
if err != nil {
16511655
return false, err
16521656
}
16531657
if sequencerMsg == nil {
16541658
log.Debug("BatchPoster: batch nil", "sequence nr.", batchPosition.NextSeqNum, "from", batchPosition.MessageCount, "prev delayed", batchPosition.DelayedMessageCount)
1655-
b.building = nil // a closed batchSegments can't be reused
16561659
return false, nil
16571660
}
16581661

@@ -1803,17 +1806,19 @@ func (b *BatchPoster) MaybePostSequencerBatch(ctx context.Context) (bool, error)
18031806
return false, fmt.Errorf("error getting message from simulated inbox multiplexer (Pop) when testing correctness of batch: %w", err)
18041807
}
18051808
if msg.DelayedMessagesRead != b.building.muxBackend.allMsgs[i].DelayedMessagesRead {
1806-
b.building = nil
18071809
return false, fmt.Errorf("simulated inbox multiplexer failed to produce correct delayedMessagesRead field for msg with seqNum: %d. Got: %d, Want: %d", i, msg.DelayedMessagesRead, b.building.muxBackend.allMsgs[i].DelayedMessagesRead)
18081810
}
18091811
if !msg.Message.Equals(b.building.muxBackend.allMsgs[i].Message) {
1810-
b.building = nil
18111812
return false, fmt.Errorf("simulated inbox multiplexer failed to produce correct message field for msg with seqNum: %d", i)
18121813
}
18131814
}
18141815
log.Debug("Successfully checked that the batch produces correct messages when ran through inbox multiplexer", "sequenceNumber", batchPosition.NextSeqNum)
18151816
}
18161817

1818+
if !b.redisLock.AttemptLock(ctx) {
1819+
return false, errAttemptLockFailed
1820+
}
1821+
18171822
tx, err := b.dataPoster.PostTransaction(ctx,
18181823
firstUsefulMsgTime,
18191824
nonce,
@@ -1888,7 +1893,6 @@ func (b *BatchPoster) MaybePostSequencerBatch(ctx context.Context) (bool, error)
18881893
backlog = 0
18891894
}
18901895
b.backlog.Store(backlog)
1891-
b.building = nil
18921896

18931897
// If we aren't queueing up transactions, wait for the receipt before moving on to the next batch.
18941898
if config.DataPoster.UseNoOpStorage {

arbnode/dataposter/data_poster.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,7 +1213,6 @@ func (p *DataPoster) Start(ctxIn context.Context) {
12131213
} else {
12141214
log.Error("Failed to fetch latest confirmed tx from queue", "confirmedNonce", confirmedNonce, "err", err, "confirmedMeta", confirmedMeta)
12151215
}
1216-
12171216
}
12181217

12191218
for _, tx := range queueContents {
@@ -1226,9 +1225,14 @@ func (p *DataPoster) Start(ctxIn context.Context) {
12261225
err := p.sendTx(ctx, tx, tx)
12271226
p.maybeLogError(err, tx, "failed to re-send transaction")
12281227
}
1229-
tx, err = p.queue.Get(ctx, tx.FullTx.Nonce())
1228+
nonce := tx.FullTx.Nonce()
1229+
tx, err = p.queue.Get(ctx, nonce)
12301230
if err != nil {
1231-
log.Error("Failed to fetch tx from queue to check updated status", "nonce", tx.FullTx.Nonce(), "err", err)
1231+
log.Error("Failed to fetch tx from queue to check updated status", "nonce", nonce, "err", err)
1232+
return minWait
1233+
}
1234+
if tx == nil {
1235+
log.Error("Failed to fetch tx from queue to check updated status, got tx == nil", "nonce", nonce)
12321236
return minWait
12331237
}
12341238
if nextCheck.After(tx.NextReplacement) {

arbnode/dataposter/dbstorage/storage.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818

1919
// Storage implements db based storage for batch poster.
2020
type Storage struct {
21-
db ethdb.Database
21+
db ethdb.KeyValueStore
2222
encDec storage.EncoderDecoderF
2323
}
2424

@@ -31,7 +31,7 @@ var (
3131
countKey = []byte(".count_key")
3232
)
3333

34-
func New(db ethdb.Database, enc storage.EncoderDecoderF) *Storage {
34+
func New(db ethdb.KeyValueStore, enc storage.EncoderDecoderF) *Storage {
3535
return &Storage{db: db, encDec: enc}
3636
}
3737

arbnode/dataposter/storage_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func newLevelDBStorage(t *testing.T, encF storage.EncoderDecoderF) *dbstorage.St
4747

4848
func newPebbleDBStorage(t *testing.T, encF storage.EncoderDecoderF) *dbstorage.Storage {
4949
t.Helper()
50-
db, err := node.NewPebbleDBDatabase(path.Join(t.TempDir(), "pebble.db"), 0, 0, "default", false, true, conf.PersistentConfigDefault.Pebble.ExtraOptions("pebble"))
50+
db, err := node.NewPebbleDBDatabase(path.Join(t.TempDir(), "pebble.db"), 0, 0, "default", false, conf.PersistentConfigDefault.Pebble.ExtraOptions("pebble"))
5151
if err != nil {
5252
t.Fatalf("NewPebbleDBDatabase() unexpected error: %v", err)
5353
}

arbnode/inbox_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@ func NewTransactionStreamerForTest(t *testing.T, ctx context.Context, ownerAddre
135135
arbDb := rawdb.NewMemoryDatabase()
136136
initReader := statetransfer.NewMemoryInitDataReader(&initData)
137137

138-
cacheConfig := core.DefaultCacheConfigWithScheme(env.GetTestStateScheme())
139-
bc, err := gethexec.WriteOrTestBlockChain(chainDb, cacheConfig, initReader, chainConfig, nil, nil, arbostypes.TestInitMessage, &gethexec.ConfigDefault.TxIndexer, 0)
138+
options := core.DefaultConfig().WithStateScheme(env.GetTestStateScheme())
139+
bc, err := gethexec.WriteOrTestBlockChain(chainDb, options, initReader, chainConfig, nil, nil, arbostypes.TestInitMessage, &gethexec.ConfigDefault.TxIndexer, 0)
140140

141141
if err != nil {
142142
Fail(t, err)

arbnode/redislock/redis.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,14 @@ var DefaultCfg = SimpleCfg{
7171
BackgroundLock: false,
7272
}
7373

74+
var TestCfg = SimpleCfg{
75+
Enable: true,
76+
LockoutDuration: time.Second,
77+
RefreshDuration: time.Second / 6,
78+
Key: "",
79+
BackgroundLock: false,
80+
}
81+
7482
func (l *Simple) attemptLock(ctx context.Context) (bool, error) {
7583
l.mutex.Lock()
7684
defer l.mutex.Unlock()

arbos/arbosState/initialization_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,10 @@ func tryMarshalUnmarshal(input *statetransfer.ArbosInitializationInfo, t *testin
6565
initReader := statetransfer.NewMemoryInitDataReader(&initData)
6666
chainConfig := chaininfo.ArbitrumDevTestChainConfig()
6767

68-
cacheConfig := core.DefaultCacheConfigWithScheme(env.GetTestStateScheme())
69-
stateroot, err := InitializeArbosInDatabase(raw, cacheConfig, initReader, chainConfig, nil, arbostypes.TestInitMessage, 0, 0)
68+
options := core.DefaultConfig().WithStateScheme(env.GetTestStateScheme())
69+
stateroot, err := InitializeArbosInDatabase(raw, options, initReader, chainConfig, nil, arbostypes.TestInitMessage, 0, 0)
7070
Require(t, err)
71-
triedbConfig := cacheConfig.TriedbConfig()
71+
triedbConfig := options.TriedbConfig()
7272
stateDb, err := state.New(stateroot, state.NewDatabase(triedb.NewDatabase(raw, triedbConfig), nil))
7373
Require(t, err)
7474

arbos/arbosState/initialize.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func MakeGenesisBlock(parentHash common.Hash, blockNumber uint64, timestamp uint
5858
return types.NewBlock(head, nil, nil, trie.NewStackTrie(nil))
5959
}
6060

61-
func InitializeArbosInDatabase(db ethdb.Database, cacheConfig *core.CacheConfig, initData statetransfer.InitDataReader, chainConfig *params.ChainConfig, genesisArbOSInit *params.ArbOSInit, initMessage *arbostypes.ParsedInitMessage, timestamp uint64, accountsPerSync uint) (root common.Hash, err error) {
61+
func InitializeArbosInDatabase(db ethdb.Database, cacheConfig *core.BlockChainConfig, initData statetransfer.InitDataReader, chainConfig *params.ChainConfig, genesisArbOSInit *params.ArbOSInit, initMessage *arbostypes.ParsedInitMessage, timestamp uint64, accountsPerSync uint) (root common.Hash, err error) {
6262
triedbConfig := cacheConfig.TriedbConfig()
6363
triedbConfig.Preimages = false
6464
stateDatabase := state.NewDatabase(triedb.NewDatabase(db, triedbConfig), nil)

broadcaster/backlog/backlog.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,29 @@ func (b *backlog) backlogSizeInBytes() (uint64, error) {
8787
return size, nil
8888
}
8989

90+
// deepCopyMessageForDebug creates a selective copy of the message for memory debugging.
91+
// It only deep copies the L2msg field which is typically the largest memory consumer.
92+
// This helps pprof show allocations at the Append point rather than at message creation.
93+
func deepCopyMessageForDebug(msg *m.BroadcastFeedMessage) *m.BroadcastFeedMessage {
94+
if msg == nil || msg.Message.Message == nil || msg.Message.Message.L2msg == nil {
95+
return msg
96+
}
97+
98+
copied := *msg
99+
100+
l2msgCopy := make([]byte, len(msg.Message.Message.L2msg))
101+
copy(l2msgCopy, msg.Message.Message.L2msg)
102+
103+
l1IncomingMessageCopy := *msg.Message.Message
104+
l1IncomingMessageCopy.L2msg = l2msgCopy
105+
106+
messageWithMetadataCopy := msg.Message
107+
messageWithMetadataCopy.Message = &l1IncomingMessageCopy
108+
copied.Message = messageWithMetadataCopy
109+
110+
return &copied
111+
}
112+
90113
// Append will add the given messages to the backlogSegment at head until
91114
// that segment reaches its limit. If messages remain to be added a new segment
92115
// will be created.
@@ -103,8 +126,14 @@ func (b *backlog) Append(bm *m.BroadcastMessage) error {
103126
}
104127
}
105128

129+
enableDeepCopy := b.config().EnableBacklogDeepCopy
106130
lookupByIndex := b.lookupByIndex.Load()
107131
for _, msg := range bm.Messages {
132+
// For memory debugging: deep copy L2msg to track allocations
133+
msgToAppend := msg
134+
if enableDeepCopy {
135+
msgToAppend = deepCopyMessageForDebug(msg)
136+
}
108137
segment := b.tail.Load()
109138
if segment == nil {
110139
segment = newBacklogSegment()
@@ -124,7 +153,7 @@ func (b *backlog) Append(bm *m.BroadcastMessage) error {
124153
if segment.count() >= b.config().SegmentLimit {
125154
segment.messagesLock.RLock()
126155
if len(segment.messages) > 0 {
127-
msg.CumulativeSumMsgSize = segment.messages[len(segment.messages)-1].CumulativeSumMsgSize
156+
msgToAppend.CumulativeSumMsgSize = segment.messages[len(segment.messages)-1].CumulativeSumMsgSize
128157
}
129158
segment.messagesLock.RUnlock()
130159

@@ -135,7 +164,7 @@ func (b *backlog) Append(bm *m.BroadcastMessage) error {
135164
b.tail.Store(segment)
136165
}
137166

138-
err := segment.append(prevMsgIdx, msg)
167+
err := segment.append(prevMsgIdx, msgToAppend)
139168
if errors.Is(err, errDropSegments) {
140169
head := b.head.Load()
141170
b.removeFromLookup(head.Start(), uint64(msg.SequenceNumber))

broadcaster/backlog/config.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,22 @@ import (
77
type ConfigFetcher func() *Config
88

99
type Config struct {
10-
SegmentLimit int `koanf:"segment-limit" reload:"hot"`
10+
SegmentLimit int `koanf:"segment-limit" reload:"hot"`
11+
EnableBacklogDeepCopy bool `koanf:"enable-backlog-deep-copy" reload:"hot"`
1112
}
1213

1314
func AddOptions(prefix string, f *flag.FlagSet) {
1415
f.Int(prefix+".segment-limit", DefaultConfig.SegmentLimit, "the maximum number of messages each segment within the backlog can contain")
16+
f.Bool(prefix+".enable-backlog-deep-copy", DefaultConfig.EnableBacklogDeepCopy, "enable deep copying of L2 messages for memory profiling (debug only)")
1517
}
1618

1719
var (
1820
DefaultConfig = Config{
19-
SegmentLimit: 240,
21+
SegmentLimit: 240,
22+
EnableBacklogDeepCopy: false,
2023
}
2124
DefaultTestConfig = Config{
22-
SegmentLimit: 3,
25+
SegmentLimit: 3,
26+
EnableBacklogDeepCopy: false,
2327
}
2428
)

0 commit comments

Comments
 (0)