Skip to content

Commit fba6bdb

Browse files
authored
Merge pull request #504 from OffchainLabs/l1reader_reduce_calls
l1reader: reduce unnecessary calls
2 parents 75bf757 + a33a39d commit fba6bdb

File tree

4 files changed

+58
-29
lines changed

4 files changed

+58
-29
lines changed

arbnode/delayed_sequencer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func (d *DelayedSequencer) update(ctx context.Context, lastBlockHeader *types.He
154154
}
155155

156156
func (d *DelayedSequencer) run(ctx context.Context) {
157-
headerChan, cancel := d.l1Reader.Subscribe()
157+
headerChan, cancel := d.l1Reader.Subscribe(false)
158158
defer cancel()
159159

160160
for {

arbnode/l1reader.go

Lines changed: 55 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,17 @@ import (
1717

1818
type L1Reader struct {
1919
util.StopWaiter
20-
config L1ReaderConfig
21-
client arbutil.L1Interface
22-
outChannels map[chan<- *types.Header]struct{}
23-
outChannelsBehind map[chan<- *types.Header]struct{}
24-
chanMutex sync.Mutex
25-
lastBroadcastHash common.Hash
26-
lastBroadcastHeader *types.Header
27-
lastPendingBlockNr uint64
20+
config L1ReaderConfig
21+
client arbutil.L1Interface
22+
23+
chanMutex sync.Mutex
24+
// All fields below require the chanMutex
25+
outChannels map[chan<- *types.Header]struct{}
26+
outChannelsBehind map[chan<- *types.Header]struct{}
27+
lastBroadcastHash common.Hash
28+
lastBroadcastHeader *types.Header
29+
lastPendingCallBlockNr uint64
30+
requiresPendingCallUpdates int
2831
}
2932

3033
type L1ReaderConfig struct {
@@ -67,20 +70,30 @@ func NewL1Reader(client arbutil.L1Interface, config L1ReaderConfig) *L1Reader {
6770
}
6871

6972
// Subscribers are notified when there is a change.
70-
func (s *L1Reader) Subscribe() (<-chan *types.Header, func()) {
73+
// Channel could be missing headers and have duplicates.
74+
// Listening to the channel will make sure listenere is notified when header changes.
75+
func (s *L1Reader) Subscribe(requireBlockNrUpdates bool) (<-chan *types.Header, func()) {
7176
s.chanMutex.Lock()
7277
defer s.chanMutex.Unlock()
7378

79+
if requireBlockNrUpdates {
80+
s.requiresPendingCallUpdates++
81+
}
7482
result := make(chan *types.Header)
7583
outchannel := (chan<- *types.Header)(result)
7684
s.outChannelsBehind[outchannel] = struct{}{}
77-
unsubscribeFunc := func() { s.unsubscribe(outchannel) }
85+
unsubscribeFunc := func() { s.unsubscribe(requireBlockNrUpdates, outchannel) }
7886
return result, unsubscribeFunc
7987
}
8088

81-
func (s *L1Reader) unsubscribe(from chan<- *types.Header) {
89+
func (s *L1Reader) unsubscribe(requireBlockNrUpdates bool, from chan<- *types.Header) {
8290
s.chanMutex.Lock()
8391
defer s.chanMutex.Unlock()
92+
93+
if requireBlockNrUpdates {
94+
s.requiresPendingCallUpdates--
95+
}
96+
8497
if _, ok := s.outChannels[from]; ok {
8598
delete(s.outChannels, from)
8699
close(from)
@@ -95,6 +108,8 @@ func (s *L1Reader) closeAll() {
95108
s.chanMutex.Lock()
96109
defer s.chanMutex.Unlock()
97110

111+
s.requiresPendingCallUpdates = 0
112+
98113
for ch := range s.outChannels {
99114
delete(s.outChannels, ch)
100115
close(ch)
@@ -118,15 +133,17 @@ func (s *L1Reader) possiblyBroadcast(h *types.Header) {
118133
s.lastBroadcastHeader = h
119134
}
120135

121-
pendingBlockNr, err := arbutil.GetPendingCallBlockNumber(s.GetContext(), s.client)
122-
if err == nil && pendingBlockNr.IsUint64() {
123-
pendingU64 := pendingBlockNr.Uint64()
124-
if pendingU64 > s.lastPendingBlockNr {
125-
broadcastThis = true
126-
s.lastPendingBlockNr = pendingU64
136+
if s.requiresPendingCallUpdates > 0 {
137+
pendingCallBlockNr, err := arbutil.GetPendingCallBlockNumber(s.GetContext(), s.client)
138+
if err == nil && pendingCallBlockNr.IsUint64() {
139+
pendingU64 := pendingCallBlockNr.Uint64()
140+
if pendingU64 > s.lastPendingCallBlockNr {
141+
broadcastThis = true
142+
s.lastPendingCallBlockNr = pendingU64
143+
}
144+
} else {
145+
log.Warn("GetPendingCallBlockNr: bad result", "err", err, "number", pendingCallBlockNr)
127146
}
128-
} else {
129-
log.Warn("GetPendingBlockNr: bad result", "err", err, "number", pendingBlockNr)
130147
}
131148

132149
if broadcastThis {
@@ -203,19 +220,17 @@ func (s *L1Reader) broadcastLoop(ctx context.Context) {
203220
}
204221

205222
func (s *L1Reader) WaitForTxApproval(ctxIn context.Context, tx *types.Transaction) (*types.Receipt, error) {
206-
headerchan, unsubscribe := s.Subscribe()
223+
headerchan, unsubscribe := s.Subscribe(true)
207224
defer unsubscribe()
208225
ctx, cancel := context.WithTimeout(ctxIn, s.config.TxTimeout)
209226
defer cancel()
210227
txHash := tx.Hash()
211228
for {
212229
receipt, err := s.client.TransactionReceipt(ctx, txHash)
213-
if err == nil {
214-
callBlockNr, err := arbutil.GetPendingCallBlockNumber(ctx, s.client)
215-
if err != nil {
216-
return nil, err
217-
}
218-
if callBlockNr.Cmp(receipt.BlockNumber) > 0 {
230+
if err == nil && receipt.BlockNumber.IsUint64() {
231+
receiptBlockNr := receipt.BlockNumber.Uint64()
232+
callBlockNr := s.LastPendingCallBlockNr()
233+
if callBlockNr > receiptBlockNr {
219234
return receipt, arbutil.DetailTxError(ctx, s.client, tx, receipt)
220235
}
221236
}
@@ -240,6 +255,20 @@ func (s *L1Reader) LastHeader(ctx context.Context) (*types.Header, error) {
240255
return s.client.HeaderByNumber(ctx, nil)
241256
}
242257

258+
func (s *L1Reader) UpdatingPendingCallBlockNr() bool {
259+
s.chanMutex.Lock()
260+
defer s.chanMutex.Unlock()
261+
return s.requiresPendingCallUpdates > 0
262+
}
263+
264+
// blocknumber used by pending calls.
265+
// only updated if UpdatingPendingCallBlockNr returns true
266+
func (s *L1Reader) LastPendingCallBlockNr() uint64 {
267+
s.chanMutex.Lock()
268+
defer s.chanMutex.Unlock()
269+
return s.lastPendingCallBlockNr
270+
}
271+
243272
func (s *L1Reader) Client() arbutil.L1Interface {
244273
return s.client
245274
}

arbnode/sequencer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ func (s *Sequencer) Start(ctxIn context.Context) error {
303303
return errors.New("sequencer not initialized")
304304
}
305305

306-
headerChan, cancel := s.l1Reader.Subscribe()
306+
headerChan, cancel := s.l1Reader.Subscribe(false)
307307

308308
s.LaunchThread(func(ctx context.Context) {
309309
defer cancel()

validator/stateless_block_validator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ type InboxReaderInterface interface {
5858

5959
type L1ReaderInterface interface {
6060
Client() arbutil.L1Interface
61-
Subscribe() (<-chan *types.Header, func())
61+
Subscribe(bool) (<-chan *types.Header, func())
6262
WaitForTxApproval(ctx context.Context, tx *types.Transaction) (*types.Receipt, error)
6363
}
6464

0 commit comments

Comments
 (0)