Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
9d6e7a9
blobreactor implementation
fridrik01 Sep 10, 2025
34a3d76
fix: store blobs per block hash to prevent race condition during bloc…
fridrik01 Sep 23, 2025
8d8e224
fix: handle peers returning 0 blobs during sync mode transition
fridrik01 Sep 23, 2025
8fad209
fix: off-by-one error when transitioning from sync to consensus mode
fridrik01 Sep 23, 2025
4413a9f
Implement concurrent request handling with per-request channels
fridrik01 Sep 24, 2025
46e1194
fix: cleanup in blobreactor
fridrik01 Sep 25, 2025
958a180
fix: try blob fetching async to prevent CometBFT blocking during sync…
fridrik01 Sep 25, 2025
d4ed6c3
revert off-by-one fix
fridrik01 Sep 25, 2025
cbd6e7a
add more logging
fridrik01 Sep 25, 2025
dc74b7d
Add persistent async blob fetcher with byzantine fault tolerance via …
fridrik01 Sep 26, 2025
01d8935
Simplify peer selection and add blob count validation
fridrik01 Sep 27, 2025
242ce3e
fix lint
fridrik01 Sep 27, 2025
25236e8
fix: panic on multiple calls to blobFetcher Close()
fridrik01 Sep 27, 2025
b595ba8
fix: use atomic file write to prevent reading partial json files
fridrik01 Sep 27, 2025
65b385d
Use math.Slot
fridrik01 Sep 30, 2025
1cafeb4
Add blob download retry logic, other refactorings
fridrik01 Oct 1, 2025
1407247
Move blob queue to blobs/download_queue and cleanup orphaned tmp file…
fridrik01 Oct 1, 2025
7239484
Add constants for timeouts
fridrik01 Oct 1, 2025
274c0a7
DELETE: Simulate blob fetching failures to test retries
fridrik01 Oct 1, 2025
6ad6706
DELETE: simulate failures
fridrik01 Oct 2, 2025
5813e71
refactor and split up blob fetcher into queue/executor + add test suite
fridrik01 Oct 2, 2025
e8d4f80
fix lint errors
fridrik01 Oct 2, 2025
7bdc0d8
Make blob fetcher timeouts configurable
fridrik01 Oct 3, 2025
62c46f8
Add integration test for blob_fetcher, refactor helpers shared by uni…
fridrik01 Oct 3, 2025
3209a7a
fix: corrupt json files wont stall processing + refactoring and minor…
fridrik01 Oct 3, 2025
672086d
fix: cleanup responseChans
fridrik01 Oct 6, 2025
c0a1d2b
improve ssz handling + remove error from response
fridrik01 Oct 6, 2025
d2947fd
add todo
fridrik01 Oct 6, 2025
eaf51e3
Add blobreactor/fetcher metrics
fridrik01 Oct 7, 2025
3508ac0
DELETE: testing error metrics in devnet
fridrik01 Oct 10, 2025
0478777
Revert the testing for failed blobs + don't log queue when 0)
fridrik01 Oct 13, 2025
d9c673c
Add kurtosis e2e test for syncing node requiring fetching blob from p…
fridrik01 Oct 13, 2025
bbb7799
Add IsBlobConsensusEnabledAtHeight helper to reduce code duplication
fridrik01 Oct 16, 2025
fe2d125
Use default blobreactor config if missing
fridrik01 Oct 24, 2025
a6e214f
Address review comments
fridrik01 Oct 28, 2025
0ef6906
Check for pending blobs in GetBlobSidecars Api endpoint
fridrik01 Oct 31, 2025
cc4ec8a
fix: address lint errors after rebase
fridrik01 Oct 31, 2025
935e407
Merge branch 'main' into blobreactor
abi87 Nov 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions beacon/blockchain/blob_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// SPDX-License-Identifier: BUSL-1.1
//
// Copyright (C) 2025, Berachain Foundation. All rights reserved.
// Use of this software is governed by the Business Source License included
// in the LICENSE file of this repository and at www.mariadb.com/bsl11.
//
// ANY USE OF THE LICENSED WORK IN VIOLATION OF THIS LICENSE WILL AUTOMATICALLY
// TERMINATE YOUR RIGHTS UNDER THIS LICENSE FOR THE CURRENT AND ALL OTHER
// VERSIONS OF THE LICENSED WORK.
//
// THIS LICENSE DOES NOT GRANT YOU ANY RIGHT IN ANY TRADEMARK OR LOGO OF
// LICENSOR OR ITS AFFILIATES (PROVIDED THAT YOU MAY USE A TRADEMARK OR LOGO OF
// LICENSOR AS EXPRESSLY REQUIRED BY THIS LICENSE).
//
// TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
// AN "AS IS" BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
// EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
// TITLE.

package blockchain

import (
"context"
"fmt"

datypes "github.com/berachain/beacon-kit/da/types"
"github.com/berachain/beacon-kit/log"
)

// blobFetchExecutor handles the Byzantine-critical blob fetch and verification logic.
// This is the core component that ensures we only accept valid blobs from peers.
type blobFetchExecutor struct {
blobProcessor BlobProcessor
blobRequester BlobRequester
storageBackend StorageBackend
logger log.Logger
}

// FetchBlobsAndVerify fetches, verifies, and stores blobs for a single request.
// It creates a verifier function that the BlobRequester uses to validate blobs.
// If verification fails, the BlobRequester will automatically try the next peer.
func (e *blobFetchExecutor) FetchBlobsAndVerify(ctx context.Context, req BlobFetchRequest) error {
e.logger.Info("Fetching blobs from peers", "slot", req.Header.Slot.Unwrap(), "expected_blobs", len(req.Commitments))

select {
case <-ctx.Done():
return ctx.Err()
default:
}

// Create a verifier function that validates blobs against the stored header and commitments.
// This is the Byzantine fault tolerance mechanism - if a peer sends invalid blobs,
// verification will fail and BlobRequester will try the next peer.
verifier := func(sidecars datypes.BlobSidecars) error {
return e.blobProcessor.VerifySidecars(ctx, sidecars, req.Header, req.Commitments)
}

// Request blobs with verification - will try multiple peers if verification fails
fetchedBlobs, err := e.blobRequester.RequestBlobs(ctx, req.Header.Slot, verifier)
if err != nil {
return fmt.Errorf("failed to request valid blobs for slot %d: %w", req.Header.Slot.Unwrap(), err)
}

// Process and store the validated blobs
err = e.blobProcessor.ProcessSidecars(e.storageBackend.AvailabilityStore(), fetchedBlobs)
if err != nil {
return fmt.Errorf("failed to process blobs for slot %d: %w", req.Header.Slot.Unwrap(), err)
}

e.logger.Info("Successfully fetched and stored blobs", "slot", req.Header.Slot.Unwrap(), "count", len(fetchedBlobs))
return nil
}
121 changes: 121 additions & 0 deletions beacon/blockchain/blob_executor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// SPDX-License-Identifier: BUSL-1.1
//
// Copyright (C) 2025, Berachain Foundation. All rights reserved.
// Use of this software is governed by the Business Source License included
// in the LICENSE file of this repository and at www.mariadb.com/bsl11.
//
// ANY USE OF THE LICENSED WORK IN VIOLATION OF THIS LICENSE WILL AUTOMATICALLY
// TERMINATE YOUR RIGHTS UNDER THIS LICENSE FOR THE CURRENT AND ALL OTHER
// VERSIONS OF THE LICENSED WORK.
//
// TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
// AN "AS IS" BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
// EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
// TITLE.

//nolint:testpackage // Testing internal components
package blockchain

import (
"testing"

"cosmossdk.io/log"
"github.com/berachain/beacon-kit/beacon/blockchain/testhelpers"
"github.com/berachain/beacon-kit/da/blobreactor"
dastore "github.com/berachain/beacon-kit/da/store"
datypes "github.com/berachain/beacon-kit/da/types"
"github.com/berachain/beacon-kit/errors"
"github.com/berachain/beacon-kit/primitives/math"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

// Test when peer sends invalid blobs (verification should reject them)
func TestBlobFetchExecutor_ByzantineBlobs_Rejected(t *testing.T) {
t.Parallel()
ctx := t.Context()
mockProcessor := &testhelpers.SimpleBlobProcessor{}
mockRequester := &testhelpers.SimpleBlobRequester{}
mockStorage := testhelpers.NewSimpleStorageBackend(&dastore.Store{})

executor := &blobFetchExecutor{
blobProcessor: mockProcessor,
blobRequester: mockRequester,
storageBackend: mockStorage,
logger: log.NewNopLogger(),
}

request := createTestBlobRequest(math.Slot(100), 2)
invalidBlobs := []*datypes.BlobSidecar{{Index: 0}, {Index: 1}}

// Byzantine peer returns invalid blobs - KZG proof verification fails
verifyErr := errors.New("KZG proof verification failed")
mockProcessor.On("VerifySidecars", ctx, mock.Anything, request.Header, request.Commitments).Return(verifyErr)
mockRequester.On("RequestBlobs", ctx, math.Slot(100), mock.Anything).Return(invalidBlobs, verifyErr)

err := executor.FetchBlobsAndVerify(ctx, request)
require.Error(t, err)
require.Contains(t, err.Error(), "failed to request valid blobs")

// ProcessSidecars must NOT be called with invalid blobs
mockProcessor.AssertNotCalled(t, "ProcessSidecars")
}

// Test Verifier function is called (Byzantine protection mechanism)
func TestBlobFetchExecutor_VerifierCalled(t *testing.T) {
t.Parallel()
ctx := t.Context()
mockProcessor := &testhelpers.SimpleBlobProcessor{}
mockRequester := &testhelpers.SimpleBlobRequester{}
mockStorage := testhelpers.NewSimpleStorageBackend(&dastore.Store{})

executor := &blobFetchExecutor{
blobProcessor: mockProcessor,
blobRequester: mockRequester,
storageBackend: mockStorage,
logger: log.NewNopLogger(),
}

request := createTestBlobRequest(math.Slot(100), 1)
validBlobs := []*datypes.BlobSidecar{{Index: 0}}

verifierCalled := false
mockProcessor.On("VerifySidecars", ctx, mock.Anything, request.Header, request.Commitments).
Run(func(_ mock.Arguments) { verifierCalled = true }).
Return(nil)
mockRequester.On("RequestBlobs", ctx, math.Slot(100), mock.Anything).Return(validBlobs, nil)
mockProcessor.On("ProcessSidecars", mockStorage.AvailabilityStore(), mock.Anything).Return(nil)

err := executor.FetchBlobsAndVerify(ctx, request)
require.NoError(t, err)
require.True(t, verifierCalled, "Verifier must be called for Byzantine protection")
}

// Test when all peers fail - no valid blobs available
func TestBlobFetchExecutor_AllPeersFailed(t *testing.T) {
t.Parallel()
ctx := t.Context()
mockProcessor := &testhelpers.SimpleBlobProcessor{}
mockRequester := &testhelpers.SimpleBlobRequester{}
mockStorage := testhelpers.NewSimpleStorageBackend(&dastore.Store{})

executor := &blobFetchExecutor{
blobProcessor: mockProcessor,
blobRequester: mockRequester,
storageBackend: mockStorage,
logger: log.NewNopLogger(),
}

request := createTestBlobRequest(math.Slot(100), 2)

// All peers failed (timeout, byzantine, offline, etc.)
mockRequester.On("RequestBlobs", ctx, math.Slot(100), mock.Anything).Return(nil, blobreactor.ErrAllPeersFailed)

err := executor.FetchBlobsAndVerify(ctx, request)
require.Error(t, err)
require.Contains(t, err.Error(), "failed to request valid blobs")

// No blobs should be stored
mockProcessor.AssertNotCalled(t, "ProcessSidecars")
}
Loading
Loading