Skip to content

Commit 4c7f7f6

Browse files
address PR comments
1 parent a9583f7 commit 4c7f7f6

File tree

5 files changed

+28
-42
lines changed

5 files changed

+28
-42
lines changed

arbnode/db-schema/schema.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ var (
1515
SequencerBatchMetaPrefix []byte = []byte("s") // maps a batch sequence number to BatchMetadata
1616
DelayedSequencedPrefix []byte = []byte("a") // maps a delayed message count to the first sequencer batch sequence number with this delayed count
1717
MelStatePrefix []byte = []byte("l") // maps a parent chain block number to its computed MEL state
18-
MelDelayedMessagePrefix []byte = []byte("y") // maps a delayed sequence number to an accumulator and an RLP encoded message [TODO: might need to replace or be replaced by RlpDelayedMessagePrefix]
19-
MelSequencerBatchMetaPrefix []byte = []byte("q") // maps a batch sequence number to BatchMetadata [TODO: might need to replace or be replaced by SequencerBatchMetaPrefix
18+
MelDelayedMessagePrefix []byte = []byte("y") // maps a delayed sequence number to an accumulator and an RLP encoded message [TODO(NIT-4209): might need to replace or be replaced by RlpDelayedMessagePrefix]
19+
MelSequencerBatchMetaPrefix []byte = []byte("q") // maps a batch sequence number to BatchMetadata [TODO(NIT-4209): might need to replace or be replaced by SequencerBatchMetaPrefix]
2020

2121
MessageCountKey []byte = []byte("_messageCount") // contains the current message count
2222
LastPrunedMessageKey []byte = []byte("_lastPrunedMessageKey") // contains the last pruned message key

arbnode/mel/delayed_message_backlog.go

Lines changed: 19 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package mel
22

33
import (
4-
"context"
54
"errors"
65
"fmt"
76

@@ -21,40 +20,30 @@ type DelayedMessageBacklogEntry struct {
2120
// also contains compact witnesses of a Merkle tree representing all seen delayed messages. To prove that a delayed message is part of
2221
// this Merkle tree, this data structure can be used to verify Merkle proofs against the MEL state.
2322
type DelayedMessageBacklog struct {
24-
ctx context.Context
25-
capacity int
23+
targetBufferSize int
2624
entries []*DelayedMessageBacklogEntry
2725
dirtiesStartPos int // represents the starting point of dirties in the entries list, items added while processing a state
2826
initMessage *DelayedInboxMessage
29-
finalizedAndReadIndexFetcher func(context.Context) (uint64, error)
27+
finalizedAndReadIndexFetcher func() (uint64, error)
3028
}
3129

32-
func NewDelayedMessageBacklog(ctx context.Context, capacity int, finalizedAndReadIndexFetcher func(context.Context) (uint64, error), opts ...func(*DelayedMessageBacklog)) (*DelayedMessageBacklog, error) {
33-
if capacity == 0 {
34-
return nil, fmt.Errorf("capacity of DelayedMessageBacklog cannot be zero")
30+
func NewDelayedMessageBacklog(targetBufferSize int, finalizedAndReadIndexFetcher func() (uint64, error)) (*DelayedMessageBacklog, error) {
31+
if targetBufferSize == 0 {
32+
return nil, fmt.Errorf("targetBufferSize of DelayedMessageBacklog cannot be zero")
3533
}
3634
if finalizedAndReadIndexFetcher == nil {
3735
return nil, fmt.Errorf("finalizedAndReadIndexFetcher of DelayedMessageBacklog cannot be nil")
3836
}
3937
backlog := &DelayedMessageBacklog{
40-
ctx: ctx,
41-
capacity: capacity,
38+
targetBufferSize: targetBufferSize,
4239
entries: make([]*DelayedMessageBacklogEntry, 0),
4340
initMessage: nil,
4441
finalizedAndReadIndexFetcher: finalizedAndReadIndexFetcher,
4542
}
46-
for _, opt := range opts {
47-
opt(backlog)
48-
}
4943
return backlog, nil
5044
}
5145

52-
func WithUnboundedCapacity(d *DelayedMessageBacklog) {
53-
d.capacity = 0
54-
d.finalizedAndReadIndexFetcher = nil
55-
}
56-
57-
// Add takes values of a DelayedMessageBacklogEntry and adds it to the backlog given the entry succeeds validation. It also attempts trimming of backlog if capacity is reached
46+
// Add takes values of a DelayedMessageBacklogEntry and adds it to the backlog given the entry succeeds validation. It also attempts trimming of backlog if targetBufferSize is reached
5847
func (d *DelayedMessageBacklog) Add(entry *DelayedMessageBacklogEntry) error {
5948
if len(d.entries) > 0 {
6049
expectedIndex := d.entries[0].Index + uint64(len(d.entries))
@@ -63,7 +52,11 @@ func (d *DelayedMessageBacklog) Add(entry *DelayedMessageBacklogEntry) error {
6352
}
6453
}
6554
d.entries = append(d.entries, entry)
66-
return d.clear()
55+
if len(d.entries) <= d.targetBufferSize {
56+
return nil
57+
}
58+
d.trimFinalizedAndReadEntries()
59+
return nil
6760
}
6861

6962
func (d *DelayedMessageBacklog) Get(index uint64) (*DelayedMessageBacklogEntry, error) {
@@ -86,27 +79,24 @@ func (d *DelayedMessageBacklog) Len() int { return le
8679
func (d *DelayedMessageBacklog) GetInitMsg() *DelayedInboxMessage { return d.initMessage }
8780
func (d *DelayedMessageBacklog) setInitMsg(msg *DelayedInboxMessage) { d.initMessage = msg }
8881

89-
// clear removes from backlog (if exceeds capacity) the entries that correspond to the delayed messages that are both READ and belong to finalized parent chain blocks
90-
func (d *DelayedMessageBacklog) clear() error {
91-
if len(d.entries) <= d.capacity {
92-
return nil
93-
}
82+
// trimFinalizedAndReadEntries removes from backlog (if exceeds targetBufferSize) the entries that correspond to the delayed messages that are both READ
83+
// and belong to finalized parent chain blocks. We should not interrupt delayed messages accumulation if we cannot trim the backlog, since its not high priority
84+
func (d *DelayedMessageBacklog) trimFinalizedAndReadEntries() {
9485
if d.finalizedAndReadIndexFetcher != nil && d.dirtiesStartPos > 0 { // if all entries are currently dirty we dont trim the finalized ones
95-
finalizedDelayedMessagesRead, err := d.finalizedAndReadIndexFetcher(d.ctx)
86+
finalizedDelayedMessagesRead, err := d.finalizedAndReadIndexFetcher()
9687
if err != nil {
9788
log.Error("Unable to trim finalized and read delayed messages from DelayedMessageBacklog, will be retried later", "err", err)
98-
return nil // we should not interrupt delayed messages accumulation if we cannot trim the backlog, since its not high priority
89+
return
9990
}
10091
if finalizedDelayedMessagesRead > d.entries[0].Index {
10192
leftTrimPos := min(finalizedDelayedMessagesRead-d.entries[0].Index, uint64(len(d.entries)))
10293
// #nosec G115
103-
leftTrimPos = min(leftTrimPos, uint64(d.dirtiesStartPos)) // cannot clear dirties yet, they will be cleared out in the next attempt
94+
leftTrimPos = min(leftTrimPos, uint64(d.dirtiesStartPos)) // cannot trim dirties yet, they will be trimmed out in the next attempt
10495
d.entries = d.entries[leftTrimPos:]
10596
// #nosec G115
10697
d.dirtiesStartPos -= int(leftTrimPos) // adjust start position of dirties
10798
}
10899
}
109-
return nil
110100
}
111101

112102
// Reorg removes from backlog the entries that corresponded to the reorged out parent chain blocks
@@ -120,7 +110,7 @@ func (d *DelayedMessageBacklog) reorg(newDelayedMessagedSeen uint64) error {
120110
if newDelayedMessagedSeen >= d.entries[0].Index {
121111
rightTrimPos := newDelayedMessagedSeen - d.entries[0].Index
122112
if rightTrimPos > uint64(len(d.entries)) {
123-
return fmt.Errorf("newDelayedMessagedSeen: %d durign a reorg is greater (by more than 1) than the greatest delayed message index stored in backlog: %d", newDelayedMessagedSeen, d.entries[len(d.entries)-1].Index)
113+
return fmt.Errorf("newDelayedMessagedSeen: %d during a reorg is greater (by more than 1) than the greatest delayed message index stored in backlog: %d", newDelayedMessagedSeen, d.entries[len(d.entries)-1].Index)
124114
}
125115
d.entries = d.entries[:rightTrimPos]
126116
} else {

arbnode/mel/delayed_message_backlog_test.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package mel
22

33
import (
4-
"context"
54
"reflect"
65
"strings"
76
"testing"
@@ -10,10 +9,7 @@ import (
109
)
1110

1211
func TestDelayedMessageBacklog(t *testing.T) {
13-
ctx, cancel := context.WithCancel(context.Background())
14-
defer cancel()
15-
16-
backlog, err := NewDelayedMessageBacklog(ctx, 1, func(ctx context.Context) (uint64, error) { return 0, nil }, WithUnboundedCapacity)
12+
backlog, err := NewDelayedMessageBacklog(100, func() (uint64, error) { return 0, nil })
1713
require.NoError(t, err)
1814

1915
// Verify handling of dirties
@@ -64,8 +60,8 @@ func TestDelayedMessageBacklog(t *testing.T) {
6460

6561
// Verify that advancing the finalizedAndRead will trim the delayedMessageBacklogEntry while keeping the unread ones
6662
finalizedAndRead := uint64(7)
67-
backlog.finalizedAndReadIndexFetcher = func(context.Context) (uint64, error) { return finalizedAndRead, nil }
68-
require.NoError(t, backlog.clear())
63+
backlog.finalizedAndReadIndexFetcher = func() (uint64, error) { return finalizedAndRead, nil }
64+
backlog.trimFinalizedAndReadEntries()
6965
require.True(t, len(backlog.entries) == int(numEntries-finalizedAndRead)) // #nosec G115
7066
require.True(t, backlog.entries[0].Index == finalizedAndRead)
7167

arbnode/mel/runner/backlog.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99

1010
// InitializeDelayedMessageBacklog is to be only called by the Start fsm step of MEL. This function fills the backlog based on the seen and read count from the given mel state
1111
func InitializeDelayedMessageBacklog(ctx context.Context, d *mel.DelayedMessageBacklog, db *Database, state *mel.State, finalizedAndReadIndexFetcher func(context.Context) (uint64, error)) error {
12-
if state.DelayedMessagesSeen == 0 && state.DelayedMessagesRead == 0 { // this is the first mel state so no need to initialize backlog even if the state isnt finalized yet
12+
if state.DelayedMessagesSeen == 0 && state.DelayedMessagesRead == 0 { // this is the first mel state so no need to initialize backlog even if the state isn't finalized yet
1313
return nil
1414
}
1515
finalizedDelayedMessagesRead := state.DelayedMessagesRead // Assume to be finalized, then update if needed
@@ -31,7 +31,7 @@ func InitializeDelayedMessageBacklog(ctx context.Context, d *mel.DelayedMessageB
3131
return err
3232
}
3333
if uint64(len(delayedMsgIndexToParentChainBlockNum)) < state.DelayedMessagesSeen-targetDelayedMessagesRead {
34-
return fmt.Errorf("number of mappings from index to ParentChainBlockNum: %d are insufficient, needed atleast: %d", uint64(len(delayedMsgIndexToParentChainBlockNum)), state.DelayedMessagesSeen-targetDelayedMessagesRead)
34+
return fmt.Errorf("number of mappings from index to ParentChainBlockNum: %d are insufficient, needed atleast: %d", len(delayedMsgIndexToParentChainBlockNum), state.DelayedMessagesSeen-targetDelayedMessagesRead)
3535
}
3636

3737
// Create DelayedMessageBacklogEntry for all the delayed messages that are seen but not read

arbnode/mel/runner/backlog_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func TestDelayedMessageBacklogInitialization(t *testing.T) {
7373

7474
require.True(t, state.DelayedMessagesSeen == uint64(numMelStates)*5+1) // #nosec G115
7575
require.True(t, state.DelayedMessagesRead == 1)
76-
delayedMessageBacklog, err := mel.NewDelayedMessageBacklog(ctx, 1, func(ctx context.Context) (uint64, error) { return 0, nil }, mel.WithUnboundedCapacity)
76+
delayedMessageBacklog, err := mel.NewDelayedMessageBacklog(100, func() (uint64, error) { return 0, nil })
7777
require.NoError(t, err)
7878
require.NoError(t, InitializeDelayedMessageBacklog(ctx, delayedMessageBacklog, melDb, state, nil))
7979
require.True(t, delayedMessageBacklog.Len() == 25)
@@ -108,7 +108,7 @@ func TestDelayedMessageBacklogInitialization(t *testing.T) {
108108
// delayedMessageBacklogEntry for indexes below the DelayedMessagesRead as those have not been finalized yet!
109109
newState, err := melDb.GetHeadMelState(ctx)
110110
require.NoError(t, err)
111-
newDelayedMessageBacklog, err := mel.NewDelayedMessageBacklog(ctx, 1, func(ctx context.Context) (uint64, error) { return 0, nil }, mel.WithUnboundedCapacity)
111+
newDelayedMessageBacklog, err := mel.NewDelayedMessageBacklog(100, func() (uint64, error) { return 0, nil })
112112
require.NoError(t, err)
113113
require.NoError(t, InitializeDelayedMessageBacklog(ctx, newDelayedMessageBacklog, melDb, newState, func(context.Context) (uint64, error) { return 7, nil }))
114114
// Notice that instead of having seenUnread list from delayed index 13 to 25 inclusive we will have it from 7 to 25 as only till block=7 the chain has finalized and that block has DelayedMessagesRead=7

0 commit comments

Comments
 (0)