Skip to content

Commit 4536507

Browse files
committed
Address review comments
1 parent 51fdd75 commit 4536507

7 files changed

Lines changed: 67 additions & 43 deletions

File tree

beacon/blockchain/blob_fetcher.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ func (bf *blobFetcher) SetHeadSlot(slot math.Slot) {
125125
bf.executor.blobRequester.SetHeadSlot(slot)
126126
}
127127

128-
// QueueBlobRequest queues a request to fetch blobs for a specific slot.
129-
func (bf *blobFetcher) QueueBlobRequest(slot math.Slot, block *ctypes.BeaconBlock) error {
128+
// QueueBlobRequest queues a request to fetch blobs for a specific block.
129+
func (bf *blobFetcher) QueueBlobRequest(block *ctypes.BeaconBlock) error {
130130
// Don't queue if no blobs expected
131131
commitments := block.GetBody().GetBlobKzgCommitments()
132132
if len(commitments) == 0 {
@@ -139,6 +139,7 @@ func (bf *blobFetcher) QueueBlobRequest(slot math.Slot, block *ctypes.BeaconBloc
139139
Commitments: commitments,
140140
}
141141

142+
slot := block.GetHeader().Slot
142143
if err := bf.queue.Add(slot, request); err != nil {
143144
return err
144145
}

beacon/blockchain/finalize_block.go

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -120,37 +120,37 @@ func (s *Service) FinalizeSidecars(
120120
// caught up. We don't need to process sidecars unless they are within DA period.
121121
//
122122
//#nosec: G115 // SyncingToHeight will never be negative.
123-
if s.chainSpec.WithinDAPeriod(blk.GetSlot(), math.Slot(syncingToHeight)) {
124-
// If blobs were passed in (normal consensus mode), use them
123+
if !s.chainSpec.WithinDAPeriod(blk.GetSlot(), math.Slot(syncingToHeight)) {
124+
// Here outside Data Availability window. Just log if needed
125125
if len(blobs) > 0 {
126-
return processBlobsFunc(blobs)
126+
s.logger.Info(
127+
"Skipping blob processing outside of Data Availability Period",
128+
"slot", blk.GetSlot().Base10(), "head", syncingToHeight,
129+
)
127130
}
128-
129-
// Check the block to see if there should be blobs
130-
expectedBlobs := len(blk.GetBody().GetBlobKzgCommitments())
131-
if expectedBlobs == 0 {
132-
return nil // No blobs expected for this block
133-
}
134-
135-
// Queue the blob fetch request to be handled asynchronously
136-
err := s.blobFetcher.QueueBlobRequest(blk.GetSlot(), blk)
137-
if err != nil {
138-
// TODO: should we log and continue here instead of erroring out?
139-
return fmt.Errorf("failed to queue blob fetch request for slot %d: %w", blk.GetSlot().Unwrap(), err)
140-
}
141-
142-
// Return immediately without waiting for blobs
143-
// The background fetcher will handle retrieval and storage
144131
return nil
145132
}
146133

147-
// Here outside Data Availability window. Just log if needed
134+
// If blobs were passed in (normal consensus mode), use them
148135
if len(blobs) > 0 {
149-
s.logger.Info(
150-
"Skipping blob processing outside of Data Availability Period",
151-
"slot", blk.GetSlot().Base10(), "head", syncingToHeight,
152-
)
136+
return processBlobsFunc(blobs)
153137
}
138+
139+
// Check the block to see if there should be blobs
140+
expectedBlobs := len(blk.GetBody().GetBlobKzgCommitments())
141+
if expectedBlobs == 0 {
142+
return nil // No blobs expected for this block
143+
}
144+
145+
// Queue the blob fetch request to be handled asynchronously
146+
err := s.blobFetcher.QueueBlobRequest(blk)
147+
if err != nil {
148+
// TODO: should we log and continue here instead of erroring out?
149+
return fmt.Errorf("failed to queue blob fetch request for slot %d: %w", blk.GetSlot().Unwrap(), err)
150+
}
151+
152+
// Return immediately without waiting for blobs
153+
// The background fetcher will handle retrieval and storage
154154
return nil
155155
}
156156

beacon/blockchain/interfaces.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,8 @@ type BlobFetcher interface {
199199
Start(ctx context.Context)
200200
// Stop gracefully shuts down the blob fetcher.
201201
Stop()
202-
// QueueBlobRequest queues a request to fetch blobs for a specific slot.
203-
QueueBlobRequest(slot math.Slot, block *ctypes.BeaconBlock) error
202+
// QueueBlobRequest queues a request to fetch blobs for a specific block.
203+
QueueBlobRequest(block *ctypes.BeaconBlock) error
204204
// SetHeadSlot updates the head slot for blob fetching.
205205
SetHeadSlot(slot math.Slot)
206206
}

consensus/cometbft/service/finalize_block.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,17 +50,18 @@ func (s *Service) finalizeBlock(
5050
}
5151

5252
getBlobsFunc := func(cachedBlobData []byte) (datypes.BlobSidecars, error) {
53-
var sidecars datypes.BlobSidecars
54-
if !s.chainSpec.IsBlobConsensusEnabledAtHeight(req.Height) {
55-
var err error
56-
sidecars, err = encoding.UnmarshalBlobSidecarsFromABCIRequest(req, s.chainSpec)
57-
if err != nil {
58-
return nil, fmt.Errorf("finalize block: failed parsing blobs from request: %w", err)
59-
}
60-
} else if len(cachedBlobData) > 0 {
53+
if len(cachedBlobData) != 0 {
54+
var sidecars datypes.BlobSidecars
6155
if err := ssz.Unmarshal(cachedBlobData, &sidecars); err != nil {
6256
return nil, fmt.Errorf("finalize block: failed to unmarshal cached blob data: %w", err)
6357
}
58+
return sidecars, nil
59+
}
60+
61+
// not cached
62+
sidecars, err := encoding.UnmarshalBlobSidecarsFromABCIRequest(req, s.chainSpec)
63+
if err != nil {
64+
return nil, fmt.Errorf("finalize block: failed parsing blobs from request: %w", err)
6465
}
6566
return sidecars, nil
6667
}
@@ -150,8 +151,7 @@ func (s *Service) finalizeBlock(
150151
return nil, fmt.Errorf("failed checking cached final state, hash %s, height %d: %w", hash, req.Height, err)
151152
}
152153

153-
var cachedBlobData []byte
154-
sidecars, err := getBlobsFunc(cachedBlobData)
154+
sidecars, err := getBlobsFunc(nil)
155155
if err != nil {
156156
return nil, err
157157
}
@@ -166,7 +166,7 @@ func (s *Service) finalizeBlock(
166166
s.cachedStates.SetCached(hash, &cache.Element{
167167
State: finalState,
168168
ValUpdates: valUpdates,
169-
Blobs: cachedBlobData,
169+
Blobs: nil,
170170
})
171171
if err = s.cachedStates.MarkAsFinal(hash); err != nil {
172172
return nil, fmt.Errorf("failed marking state as final, hash %s, height %d: %w", hash, req.Height, err)

da/blobreactor/reactor.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,12 @@ func (br *BlobReactor) handleBlobRequest(peer p2p.Peer, req *BlobRequest) {
311311
return
312312
}
313313

314-
br.logger.Info("Sent blob response", "slot", req.Slot.Unwrap(), "request_id", req.RequestID, "peer", peer.ID(), "data_size", len(msgData))
314+
br.logger.Info("Sent blob response",
315+
"slot", req.Slot.Unwrap(),
316+
"request_id", req.RequestID,
317+
"peer", peer.ID(),
318+
"data_size", len(msgData),
319+
)
315320
}
316321

317322
// handleBlobResponse processes incoming blob responses
@@ -528,7 +533,12 @@ func (br *BlobReactor) requestBlobsFromPeer(ctx context.Context, peerID p2p.ID,
528533
}
529534

530535
if len(resp.SidecarData) > defaultRecvMessageCapacity {
531-
err = fmt.Errorf("peer %s sent oversized response: %d bytes (max %d)", peerID, len(resp.SidecarData), defaultRecvMessageCapacity)
536+
err = fmt.Errorf(
537+
"peer %s sent oversized response: %d bytes (max %d)",
538+
peerID,
539+
len(resp.SidecarData),
540+
defaultRecvMessageCapacity,
541+
)
532542
return nil, newBlobRequestError(err, statusInvalidResponse)
533543
}
534544

da/store/store.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package store
2222

2323
import (
2424
"context"
25+
"fmt"
2526

2627
ctypes "github.com/berachain/beacon-kit/consensus-types/types"
2728
"github.com/berachain/beacon-kit/da/types"
@@ -59,6 +60,11 @@ func (s *Store) IsDataAvailable(
5960
// We need to check each commitment with its corresponding index
6061
// Since commitments can be duplicated, we check by index order
6162
for i, commitment := range body.GetBlobKzgCommitments() {
63+
// Validate index is within byte range
64+
if i > 255 { //nolint:mnd // 255 is max value for byte
65+
s.logger.Error("Blob index exceeds maximum value of 255", "index", i)
66+
return false
67+
}
6268
// Check if the block data is available in the IndexDB with the index appended
6369
blockData, err := s.IndexDB.Has(slot.Unwrap(), append(commitment[:], byte(i)))
6470
if err != nil || !blockData {
@@ -97,13 +103,20 @@ func (s *Store) Persist(sidecars types.BlobSidecars) error {
97103
if sidecar == nil {
98104
return ErrAttemptedToStoreNilSidecar
99105
}
106+
107+
// Validate index is within byte range to prevent overflow
108+
index := sidecar.GetIndex()
109+
if index > 255 { //nolint:mnd // 255 is max value for byte
110+
return fmt.Errorf("blob index %d exceeds maximum value of 255", index)
111+
}
112+
100113
bz, err := sidecar.MarshalSSZ()
101114
if err != nil {
102115
return err
103116
}
104117
slot = sidecar.GetBeaconBlockHeader().GetSlot()
105118
// Include blob index in the key to prevent overwrites when KZG commitments are duplicated
106-
err = s.IndexDB.Set(slot.Unwrap(), append(sidecar.KzgCommitment[:], byte(sidecar.GetIndex())), bz)
119+
err = s.IndexDB.Set(slot.Unwrap(), append(sidecar.KzgCommitment[:], byte(index)), bz)
107120
if err != nil {
108121
return err
109122
}

testing/simulated/blob_fetcher_integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (s *SimulatedSuite) TestBlobFetcher_MultiNodeFetch() {
124124
node1Fetcher.SetHeadSlot(testSlot + 10)
125125

126126
// Queue blob request, wait for it to be downloaded and validate
127-
s.Require().NoError(node1Fetcher.QueueBlobRequest(testSlot, block))
127+
s.Require().NoError(node1Fetcher.QueueBlobRequest(block))
128128
time.Sleep(200 * time.Millisecond)
129129
storedSidecars, err := node1Store.GetBlobSidecars(testSlot)
130130
s.Require().NoError(err)

0 commit comments

Comments
 (0)