Skip to content

Commit 16592fd

Browse files
sebastianstclaude
andauthored
bgpo: convert BTO from subscription to polling (#19095)
The Blob Tip Oracle (BTO) previously used ethclient.SubscribeNewHead() for header notifications, which only works over WebSocket connections. This change converts it to polling for new headers instead, allowing it to work with HTTP connections. The polling uses the existing PollRate configuration (2.5s default) to check for new headers at regular intervals. Co-authored-by: Claude Haiku 4.5 <noreply@anthropic.com>
1 parent b053ac1 commit 16592fd

File tree

2 files changed

+50
-85
lines changed

2 files changed

+50
-85
lines changed

op-service/bgpo/oracle.go

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package bgpo
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"math/big"
78
"sort"
@@ -24,7 +25,6 @@ type BTOBackend interface {
2425
BlockNumber(ctx context.Context) (uint64, error)
2526
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
2627
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
27-
SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)
2828
}
2929

3030
// BlobTipOracle tracks blob base gas prices by subscribing to new block headers
@@ -49,7 +49,6 @@ type BlobTipOracle struct {
4949
ctx context.Context
5050
cancel context.CancelFunc
5151

52-
sub ethereum.Subscription
5352
loopDone chan struct{}
5453
}
5554

@@ -121,48 +120,49 @@ func NewBlobTipOracle(backend BTOBackend, chainConfig *params.ChainConfig, log l
121120
}
122121

123122
// Start starts the oracle's background processing. It returns after the cache is prepopulated and
124-
// the subscription is set up. To stop the background processing, call [BlobTipOracle.Close]. The
125-
// background processing will also stop if the subscription fails.
123+
// the polling loop is started. To stop the background processing, call [BlobTipOracle.Close].
126124
func (o *BlobTipOracle) Start() error {
127-
// Pre-populate cache with recent blocks before subscribing
125+
// Pre-populate cache with recent blocks before starting the polling loop
128126
if err := o.prePopulateCache(); err != nil {
129127
o.log.Warn("Failed to pre-populate cache, continuing anyway", "err", err)
130128
}
131129

132-
headers := make(chan *types.Header, 10)
133-
134-
sub, err := o.backend.SubscribeNewHead(o.ctx, headers)
135-
if err != nil {
136-
return err
137-
}
138-
o.sub = sub
139-
o.log.Info("Blob tip oracle started, subscribed to newHeads")
130+
o.log.Info("Blob tip oracle started, polling for new headers")
140131

141132
o.loopDone = make(chan struct{})
142-
go o.processHeaders(headers)
133+
go o.pollLoop()
143134
return nil
144135
}
145136

146-
func (o *BlobTipOracle) processHeaders(headers chan *types.Header) {
147-
defer o.log.Debug("Blob tip oracle header processing loop exited")
137+
func (o *BlobTipOracle) pollLoop() {
138+
defer o.log.Debug("Blob tip oracle polling loop exited")
148139
defer close(o.loopDone)
149140

150-
// Process headers as they arrive
141+
ticker := time.NewTicker(o.config.PollRate)
142+
defer ticker.Stop()
143+
151144
for {
152145
select {
153-
case header := <-headers:
154-
if err := o.processHeader(header); err != nil {
155-
o.log.Error("Error processing header", "err", err, "block", header.Number)
156-
}
157-
case err := <-o.sub.Err():
158-
if err != nil {
159-
o.log.Error("Subscription error", "err", err)
160-
return
161-
}
162-
return
163146
case <-o.ctx.Done():
164147
o.log.Info("Blob tip oracle context canceled")
165148
return
149+
case <-ticker.C:
150+
nextBlock := o.latestBlock + 1
151+
header, err := func() (*types.Header, error) {
152+
ctx, cancel := context.WithTimeout(o.ctx, o.config.NetworkTimeout)
153+
defer cancel()
154+
return o.backend.HeaderByNumber(ctx, big.NewInt(int64(nextBlock)))
155+
}()
156+
if errors.Is(err, ethereum.NotFound) {
157+
continue // Block not yet available
158+
}
159+
if err != nil {
160+
o.log.Warn("Failed to get header", "err", err, "block", nextBlock)
161+
continue
162+
}
163+
if err := o.processHeader(header); err != nil {
164+
o.log.Error("Error processing header", "err", err, "block", nextBlock)
165+
}
166166
}
167167
}
168168
}
@@ -379,9 +379,6 @@ func (o *BlobTipOracle) extractTipsForBlobTxs(block *types.Block, baseFee *big.I
379379
// Close stops the oracle and cleans up resources.
380380
func (o *BlobTipOracle) Close() {
381381
o.cancel()
382-
if o.sub != nil {
383-
o.sub.Unsubscribe()
384-
}
385382
if o.loopDone != nil {
386383
<-o.loopDone
387384
}

op-service/bgpo/oracle_test.go

Lines changed: 23 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ import (
1919
"github.com/ethereum-optimism/optimism/op-service/testlog"
2020
)
2121

22+
// mockBTOBackend mocks BTOBackend for testing.
23+
2224
type mockBTOBackend struct {
2325
mock.Mock
2426
}
@@ -44,38 +46,8 @@ func (m *mockBTOBackend) BlockByNumber(ctx context.Context, number *big.Int) (*t
4446
return args.Get(0).(*types.Block), args.Error(1)
4547
}
4648

47-
func (m *mockBTOBackend) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
48-
args := m.Called(ctx, ch)
49-
if args.Get(0) == nil {
50-
return nil, args.Error(1)
51-
}
52-
return args.Get(0).(ethereum.Subscription), args.Error(1)
53-
}
54-
5549
var _ BTOBackend = (*mockBTOBackend)(nil)
5650

57-
// mockSubscription implements ethereum.Subscription for testing.
58-
type mockSubscription struct {
59-
errCh chan error
60-
unsubbed bool
61-
}
62-
63-
func newMockSubscription() *mockSubscription {
64-
return &mockSubscription{
65-
errCh: make(chan error, 1),
66-
}
67-
}
68-
69-
func (s *mockSubscription) Unsubscribe() {
70-
if !s.unsubbed {
71-
s.unsubbed = true
72-
}
73-
}
74-
75-
func (s *mockSubscription) Err() <-chan error {
76-
return s.errCh
77-
}
78-
7951
func createHeader(blockNum uint64, excessBlobGas *uint64) *types.Header {
8052
header := &types.Header{
8153
Number: big.NewInt(int64(blockNum)),
@@ -433,19 +405,20 @@ func TestExtractBlobFeeCaps(t *testing.T) {
433405
}
434406

435407
func TestOracleLifecycle(t *testing.T) {
436-
mbackend := new(mockBTOBackend)
437408
chainConfig := params.MainnetChainConfig
438409
logger := testlog.Logger(t, log.LevelDebug)
439410

440-
oracle := NewBlobTipOracle(mbackend, chainConfig, logger, &BlobTipOracleConfig{
441-
PricesCacheSize: 10,
442-
BlockCacheSize: 10,
443-
MaxBlocks: 2,
444-
Percentile: 60,
445-
NetworkTimeout: time.Second,
446-
})
411+
t.Run("start and close with polling", func(t *testing.T) {
412+
mbackend := new(mockBTOBackend)
413+
oracle := NewBlobTipOracle(mbackend, chainConfig, logger, &BlobTipOracleConfig{
414+
PricesCacheSize: 10,
415+
BlockCacheSize: 10,
416+
MaxBlocks: 2,
417+
Percentile: 60,
418+
NetworkTimeout: time.Second,
419+
PollRate: 50 * time.Millisecond, // Fast polling for test
420+
})
447421

448-
t.Run("start and close", func(t *testing.T) {
449422
latestBlock := uint64(100)
450423

451424
// Mock pre-population calls
@@ -459,12 +432,15 @@ func TestOracleLifecycle(t *testing.T) {
459432
mbackend.On("BlockByNumber", mock.Anything, big.NewInt(int64(i))).Return(block, nil).Once()
460433
}
461434

462-
// Mock subscription
463-
sub := newMockSubscription()
464-
var headerCh chan<- *types.Header
465-
mbackend.On("SubscribeNewHead", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
466-
headerCh = args.Get(1).(chan<- *types.Header)
467-
}).Return(sub, nil).Once()
435+
// Mock polling: first return NotFound, then return a new header
436+
mbackend.On("HeaderByNumber", mock.Anything, big.NewInt(101)).Return(nil, ethereum.NotFound).Once()
437+
newHeader := createHeader(101, &excessBlobGas)
438+
newBlock := createBlock(101, newHeader.BaseFee, []*types.Transaction{})
439+
mbackend.On("HeaderByNumber", mock.Anything, big.NewInt(101)).Return(newHeader, nil).Once()
440+
mbackend.On("BlockByNumber", mock.Anything, big.NewInt(101)).Return(newBlock, nil).Once()
441+
442+
// After processing block 101, polling will try block 102 which doesn't exist
443+
mbackend.On("HeaderByNumber", mock.Anything, big.NewInt(102)).Return(nil, ethereum.NotFound).Maybe()
468444

469445
// Start the oracle
470446
err := oracle.Start()
@@ -481,14 +457,7 @@ func TestOracleLifecycle(t *testing.T) {
481457
require.Equal(t, uint64(100), latestBlockNum)
482458
require.NotNil(t, fee)
483459

484-
// Send a new header through the subscription to verify processing works
485-
newHeader := createHeader(101, &excessBlobGas)
486-
newBlock := createBlock(101, newHeader.BaseFee, []*types.Transaction{})
487-
mbackend.On("BlockByNumber", mock.Anything, big.NewInt(101)).Return(newBlock, nil).Once()
488-
489-
headerCh <- newHeader
490-
491-
// Give the goroutine time to process
460+
// Wait for polling to pick up the new header
492461
require.Eventually(t, func() bool {
493462
latestBlockNum, _ = oracle.GetLatestBlobBaseFee()
494463
return latestBlockNum == 101
@@ -497,8 +466,6 @@ func TestOracleLifecycle(t *testing.T) {
497466
// Close the oracle
498467
oracle.Close()
499468

500-
// Verify subscription was unsubscribed
501-
require.True(t, sub.unsubbed, "subscription should be unsubscribed after Close")
502469
select {
503470
case <-oracle.loopDone:
504471
// Expect loop to have exited
@@ -510,6 +477,7 @@ func TestOracleLifecycle(t *testing.T) {
510477
})
511478

512479
t.Run("close before start is safe", func(t *testing.T) {
480+
mbackend := new(mockBTOBackend)
513481
oracle2 := NewBlobTipOracle(mbackend, chainConfig, logger, &BlobTipOracleConfig{
514482
PricesCacheSize: 10,
515483
BlockCacheSize: 10,

0 commit comments

Comments
 (0)