Skip to content

Commit 5e9e4f3

Browse files
committed
Add kurtosis e2e test for syncing node requiring fetching blob from peers
1 parent 344ff20 commit 5e9e4f3

8 files changed

Lines changed: 374 additions & 8 deletions

File tree

config/spec/devnet.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ func DevnetChainSpecData() *chain.SpecData {
7777

7878
//nolint:mnd // ok for now
7979
specData.BlobConfig = blobreactor.Config{
80-
ConsensusUpdateHeight: 30,
81-
ConsensusEnableHeight: 31,
80+
ConsensusUpdateHeight: 1,
81+
ConsensusEnableHeight: 2,
8282
MaxBytes: 819200,
8383
}
8484

testing/e2e/config/defaults.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ const (
3535
ClientValidator2 = "cl-validator-beaconkit-2"
3636
ClientValidator3 = "cl-validator-beaconkit-3"
3737
ClientValidator4 = "cl-validator-beaconkit-4"
38+
39+
NumFullNodes = 4
40+
41+
ClientFullNode0 = "cl-full-beaconkit-0"
42+
ClientFullNode1 = "cl-full-beaconkit-1"
43+
ClientFullNode2 = "cl-full-beaconkit-2"
44+
ClientFullNode3 = "cl-full-beaconkit-3"
3845
)
3946

4047
// DefaultE2ETestConfig provides a default configuration for end-to-end tests,

testing/e2e/e2e_blob_sync_test.go

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
// SPDX-License-Identifier: BUSL-1.1
2+
//
3+
// Copyright (C) 2025, Berachain Foundation. All rights reserved.
4+
// Use of this software is governed by the Business Source License included
5+
// in the LICENSE file of this repository and at www.mariadb.com/bsl11.
6+
//
7+
// ANY USE OF THE LICENSED WORK IN VIOLATION OF THIS LICENSE WILL AUTOMATICALLY
8+
// TERMINATE YOUR RIGHTS UNDER THIS LICENSE FOR THE CURRENT AND ALL OTHER
9+
// VERSIONS OF THE LICENSED WORK.
10+
//
11+
// THIS LICENSE DOES NOT GRANT YOU ANY RIGHT IN ANY TRADEMARK OR LOGO OF
12+
// LICENSOR OR ITS AFFILIATES (PROVIDED THAT YOU MAY USE A TRADEMARK OR LOGO OF
13+
// LICENSOR AS EXPRESSLY REQUIRED BY THIS LICENSE).
14+
//
15+
// TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
16+
// AN "AS IS" BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
17+
// EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
18+
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
19+
// TITLE.
20+
21+
package e2e_test
22+
23+
import (
24+
"bytes"
25+
"context"
26+
"encoding/binary"
27+
"math/big"
28+
"time"
29+
30+
"github.com/attestantio/go-eth2-client/api"
31+
"github.com/berachain/beacon-kit/testing/e2e/config"
32+
"github.com/berachain/beacon-kit/testing/e2e/suite"
33+
"github.com/berachain/beacon-kit/testing/e2e/suite/types/tx"
34+
"github.com/ethereum/go-ethereum/accounts/abi/bind"
35+
coretypes "github.com/ethereum/go-ethereum/core/types"
36+
)
37+
38+
const (
39+
// NumBlocksWithBlobs is the number of blocks with blob transactions to create before restarting the syncing node.
40+
NumBlocksWithBlobs = 10
41+
)
42+
43+
// TestBlobSync validates that a node can sync from behind and fetch blobs from other peers via the blob reactor.
44+
// This test does the following steps:
45+
// 1. Stop a full node to simulate it being offline
46+
// 2. Produce several blocks with blob transactions while the node is down
47+
// 3. Restart the full node so it needs to catch up
48+
// 4. Verify that the node successfully fetches blobs from peers via P2P
49+
func (s *BeaconKitE2ESuite) TestBlobSync() {
50+
ctx, cancel := context.WithTimeout(s.Ctx(), suite.DefaultE2ETestTimeout)
51+
defer cancel()
52+
53+
// We use full node 0 for this test
54+
fullNodeELService := "el-full-reth-0"
55+
fullNodeCLService := config.ClientFullNode0
56+
57+
// 1. Stop a full node to simulate it being offline
58+
//
59+
s.Logger().Info("Stopping full node to simulate being offline", "service", fullNodeCLService)
60+
err := s.StopService(ctx, fullNodeCLService)
61+
s.Require().NoError(err, "failed to stop full node consensus client")
62+
err = s.StopService(ctx, fullNodeELService)
63+
s.Require().NoError(err, "failed to stop full node execution client")
64+
s.Logger().Info("Full node stopped, now producing blocks with blobs while it's offline")
65+
66+
// Set up connection to a validator's consensus client to produce blocks with blobs
67+
// while the full node is offline. Lets use validator 0 for this.
68+
//
69+
client0 := s.ConsensusClients()[config.ClientValidator0]
70+
s.Require().NotNil(client0)
71+
s.Require().NoError(client0.Connect(ctx))
72+
73+
// Get initial block number before submitting blob transactions
74+
initialBlockNum, err := s.JSONRPCBalancer().BlockNumber(ctx)
75+
s.Require().NoError(err)
76+
s.Logger().Info("Initial block number", "block", initialBlockNum)
77+
78+
// Prepare transaction parameters
79+
sender := s.TestAccounts()[0]
80+
chainID, err := s.JSONRPCBalancer().ChainID(ctx)
81+
s.Require().NoError(err)
82+
tip, err := s.JSONRPCBalancer().SuggestGasTipCap(ctx)
83+
s.Require().NoError(err)
84+
gasFee, err := s.JSONRPCBalancer().SuggestGasPrice(ctx)
85+
s.Require().NoError(err)
86+
nonce, err := s.JSONRPCBalancer().NonceAt(ctx, sender.Address(), new(big.Int).SetUint64(initialBlockNum))
87+
s.Require().NoError(err)
88+
89+
// 2. Produce several blocks with blob transactions while the node is down
90+
//
91+
var (
92+
blobTxs = make([]*coretypes.Transaction, 0)
93+
receipts = make([]*coretypes.Receipt, 0)
94+
currentNonce = nonce
95+
lastBlobBlockNum uint64
96+
)
97+
for blockIdx := range NumBlocksWithBlobs {
98+
// Each block can have 1-6 blob sidecars
99+
numBlobsInBlock := uint64((blockIdx % 6) + 1)
100+
101+
s.Logger().Info("Creating block with blobs", "block_index", blockIdx, "num_blobs", numBlobsInBlock)
102+
for blobIdx := range numBlobsInBlock {
103+
// Create unique blob data for each transaction
104+
blobData := make([]byte, 8)
105+
binary.LittleEndian.PutUint64(blobData, currentNonce)
106+
107+
// Craft blob-carrying transaction
108+
blobTx := tx.New4844Tx(
109+
currentNonce, nil, 1000000,
110+
chainID, tip, gasFee, big.NewInt(0),
111+
[]byte{0x01, 0x02, 0x03, 0x04},
112+
big.NewInt(1), blobData,
113+
coretypes.AccessList{},
114+
)
115+
116+
// Sign and submit the transaction
117+
blobTx, err = sender.SignTx(chainID, blobTx)
118+
s.Require().NoError(err)
119+
s.Logger().Info("Submitting blob transaction",
120+
"tx_hash", blobTx.Hash().Hex(),
121+
"nonce", currentNonce,
122+
"block_index", blockIdx,
123+
"blob_index", blobIdx)
124+
125+
err = s.JSONRPCBalancer().SendTransaction(ctx, blobTx)
126+
s.Require().NoError(err)
127+
blobTxs = append(blobTxs, blobTx)
128+
129+
// Wait for this transaction to be mined
130+
receipt, errWait := bind.WaitMined(ctx, s.JSONRPCBalancer(), blobTx)
131+
s.Require().NoError(errWait)
132+
s.Require().Equal(coretypes.ReceiptStatusSuccessful, receipt.Status)
133+
receipts = append(receipts, receipt)
134+
135+
// Track the highest block number
136+
if receipt.BlockNumber.Uint64() > lastBlobBlockNum {
137+
lastBlobBlockNum = receipt.BlockNumber.Uint64()
138+
}
139+
140+
s.Logger().Info("Blob transaction mined",
141+
"tx_hash", blobTx.Hash().Hex(),
142+
"block", receipt.BlockNumber.Uint64(),
143+
"block_index", blockIdx,
144+
"blob_index", blobIdx)
145+
146+
currentNonce++
147+
}
148+
}
149+
150+
// 3. Restart the full node so it needs to catch up
151+
//
152+
err = s.StartService(ctx, fullNodeCLService)
153+
s.Require().NoError(err, "failed to start full node consensus client")
154+
err = s.StartService(ctx, fullNodeELService)
155+
s.Require().NoError(err, "failed to start full node execution client")
156+
s.Logger().Info("Full node restarted, waiting for it to sync to last blob block...", "last_blob_block", lastBlobBlockNum)
157+
s.Require().NoError(s.WaitForFinalizedBlockNumber(lastBlobBlockNum))
158+
159+
// After catching up, the full node may need to wait a bit more for the blob fetcher to detect missing blobs
160+
s.Logger().Info("Waiting for blob fetcher to process queued blob requests...")
161+
time.Sleep(20 * time.Second)
162+
163+
// 4. Verify that the node successfully fetches blobs from peers via P2P
164+
//
165+
s.Logger().Info("Setting up full node consensus client to verify blob sync...")
166+
err = s.SetupFullNodeConsensusClients()
167+
s.Require().NoError(err, "failed to setup full node consensus clients")
168+
fullNodeClient := s.FullNodeClients()[fullNodeCLService]
169+
s.Require().NotNil(fullNodeClient, "full node consensus client is nil")
170+
s.Logger().Info("Connecting to full node's consensus client...")
171+
err = fullNodeClient.Connect(ctx)
172+
s.Require().NoError(err, "failed to connect to full node consensus client")
173+
174+
// Verify all blobs are accessible from the full node's node-api
175+
for i, receipt := range receipts {
176+
s.Logger().Info("Verifying blob availability on full node",
177+
"block", receipt.BlockNumber.Uint64(),
178+
"tx_index", i,
179+
"full_node", fullNodeCLService)
180+
181+
// Fetch blob sidecars from the full node's node-api
182+
response, errAPI := fullNodeClient.BlobSidecars(ctx, &api.BlobSidecarsOpts{Block: receipt.BlockNumber.String()})
183+
s.Require().NoError(errAPI, "failed to fetch blob sidecars from full node for block %s", receipt.BlockNumber.String())
184+
s.Require().NotNil(response)
185+
s.Require().NotEmpty(response.Data, "no blob sidecars found on full node for block %s", receipt.BlockNumber.String())
186+
187+
// Verify each blob commitment matches what was originally submitted
188+
sidecar := blobTxs[i].BlobTxSidecar()
189+
s.Require().NotNil(sidecar, "blob transaction %d missing sidecar", i)
190+
for j, commitment := range sidecar.Commitments {
191+
found := false
192+
for _, blob := range response.Data {
193+
if bytes.Equal(blob.KZGCommitment[:], commitment[:]) {
194+
s.Require().Equal(sidecar.Blobs[j][:], blob.Blob[:], "blob data mismatch on full node for tx %d blob %d", i, j)
195+
found = true
196+
break
197+
}
198+
}
199+
s.Require().True(found, "blob commitment not found on full node for tx %d blob %d", i, j)
200+
}
201+
202+
s.Logger().Info("Blob verified successfully on full node",
203+
"block", receipt.BlockNumber.Uint64(),
204+
"tx_hash", blobTxs[i].Hash().Hex(),
205+
"num_blobs", len(sidecar.Commitments))
206+
}
207+
208+
s.Logger().Info("Blob sync test completed successfully - full node fetched all blobs via P2P",
209+
"blocks_with_blobs", NumBlocksWithBlobs,
210+
"total_transactions", len(receipts),
211+
"full_node_synced", fullNodeCLService)
212+
}

testing/e2e/e2e_inflation_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ func (s *BeaconKitE2ESuite) TestEVMInflation() {
3838
chainspec, err := spec.DevnetChainSpec()
3939
s.Require().NoError(err)
4040

41+
// Get the current finalized block to start from (in case other tests ran first)
42+
startBlock, err := s.JSONRPCBalancer().BlockNumber(s.Ctx())
43+
s.Require().NoError(err)
44+
4145
var (
4246
inflationPerBlock uint64
4347
inflationAddress common.ExecutionAddress
@@ -49,8 +53,9 @@ func (s *BeaconKitE2ESuite) TestEVMInflation() {
4953
forkSlot int64
5054
onceOnFork sync.Once
5155
)
52-
// Arbitrarily run test for 2 epochs.
53-
for blkNum := range int64(2 * chainspec.SlotsPerEpoch()) {
56+
// Arbitrarily run test for 2 epochs from the current block.
57+
for i := range int64(2 * chainspec.SlotsPerEpoch()) {
58+
blkNum := int64(startBlock) + i
5459
err = s.WaitForFinalizedBlockNumber(uint64(blkNum))
5560
s.Require().NoError(err)
5661
payload, errBlk := s.JSONRPCBalancer().BlockByNumber(s.Ctx(), big.NewInt(blkNum))

testing/e2e/suite/node_control.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// SPDX-License-Identifier: BUSL-1.1
2+
//
3+
// Copyright (C) 2025, Berachain Foundation. All rights reserved.
4+
// Use of this software is governed by the Business Source License included
5+
// in the LICENSE file of this repository and at www.mariadb.com/bsl11.
6+
//
7+
// ANY USE OF THE LICENSED WORK IN VIOLATION OF THIS LICENSE WILL AUTOMATICALLY
8+
// TERMINATE YOUR RIGHTS UNDER THIS LICENSE FOR THE CURRENT AND ALL OTHER
9+
// VERSIONS OF THE LICENSED WORK.
10+
//
11+
// THIS LICENSE DOES NOT GRANT YOU ANY RIGHT IN ANY TRADEMARK OR LOGO OF
12+
// LICENSOR OR ITS AFFILIATES (PROVIDED THAT YOU MAY USE A TRADEMARK OR LOGO OF
13+
// LICENSOR AS EXPRESSLY REQUIRED BY THIS LICENSE).
14+
//
15+
// TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
16+
// AN "AS IS" BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
17+
// EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
18+
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
19+
// TITLE.
20+
21+
package suite
22+
23+
import (
24+
"context"
25+
"fmt"
26+
27+
"github.com/kurtosis-tech/kurtosis/api/golang/core/lib/starlark_run_config"
28+
)
29+
30+
// StopService stops a running service in the Kurtosis enclave.
31+
func (s *KurtosisE2ESuite) StopService(ctx context.Context, serviceName string) error {
32+
s.logger.Info("Stopping service", "service", serviceName)
33+
34+
script := fmt.Sprintf(`
35+
def run(plan):
36+
plan.stop_service("%s")
37+
`, serviceName)
38+
39+
result, err := s.enclave.RunStarlarkScriptBlocking(ctx, script, starlark_run_config.NewRunStarlarkConfig())
40+
if err != nil {
41+
return fmt.Errorf("failed to stop service %s: %w", serviceName, err)
42+
}
43+
44+
if result.ExecutionError != nil {
45+
return fmt.Errorf("error stopping service %s: %s", serviceName, result.ExecutionError.String())
46+
}
47+
48+
if len(result.ValidationErrors) > 0 {
49+
return fmt.Errorf("validation error stopping service %s: %s", serviceName, result.ValidationErrors[0].String())
50+
}
51+
52+
s.logger.Info("Service stopped successfully", "service", serviceName)
53+
return nil
54+
}
55+
56+
// StartService starts a stopped service in the Kurtosis enclave.
57+
func (s *KurtosisE2ESuite) StartService(ctx context.Context, serviceName string) error {
58+
s.logger.Info("Starting service", "service", serviceName)
59+
60+
script := fmt.Sprintf(`
61+
def run(plan):
62+
plan.start_service("%s")
63+
`, serviceName)
64+
65+
result, err := s.enclave.RunStarlarkScriptBlocking(ctx, script, starlark_run_config.NewRunStarlarkConfig())
66+
if err != nil {
67+
return fmt.Errorf("failed to start service %s: %w", serviceName, err)
68+
}
69+
70+
if result.ExecutionError != nil {
71+
return fmt.Errorf("error starting service %s: %s", serviceName, result.ExecutionError.String())
72+
}
73+
74+
if len(result.ValidationErrors) > 0 {
75+
return fmt.Errorf("validation error starting service %s: %s", serviceName, result.ValidationErrors[0].String())
76+
}
77+
78+
s.logger.Info("Service started successfully", "service", serviceName)
79+
return nil
80+
}
81+
82+
// RestartService stops and then starts a service.
83+
func (s *KurtosisE2ESuite) RestartService(ctx context.Context, serviceName string) error {
84+
if err := s.StopService(ctx, serviceName); err != nil {
85+
return err
86+
}
87+
88+
return s.StartService(ctx, serviceName)
89+
}

0 commit comments

Comments
 (0)