Skip to content

Commit 099d5ee

Browse files
authored
fiber refactor block package for performance and simplified sync (#3324)
* refactor block package for performance and simplified sync * reduce diff * reduce diff * tidy
1 parent 34c08b0 commit 099d5ee

40 files changed

Lines changed: 635 additions & 3766 deletions

block/DIVERGENCE.md

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# Divergence from Main Branch
2+
3+
This branch (`perf/block-optimization`) introduces breaking changes to maximize performance in the `block/` package. It is **not wire-compatible** with the main branch.
4+
5+
## 1. Combined Header+Data Blobs
6+
7+
**Main**: Headers and data are submitted as separate blobs to separate DA namespaces (`HeaderNamespace`, `DataNamespace`). On retrieval, the DA retriever fetches from both namespaces, decodes headers and data independently, then matches them by block height.
8+
9+
**This branch**: Headers and data are combined into a single blob using a custom binary encoding (`common.MarshalBlockBlob`/`UnmarshalBlockBlob`). Each blob contains the proto-encoded header, proto-encoded data, and the envelope signature, separated by length prefixes with a magic number prefix (`0x45564E44`). Only the `HeaderNamespace` is used.
10+
11+
### Why
12+
- Eliminates matching overhead (no separate header/data pending maps)
13+
- Halves DA submission round trips (one blob per block instead of two)
14+
- Simplifies DA inclusion tracking (single check per block)
15+
- Removes the `DAHeaderEnvelope` protobuf wrapper and the separate `SignedData` protobuf wrapper
16+
17+
## 2. Custom Binary Blob Encoding
18+
19+
**Main**: DA blobs use protobuf encoding (`DAHeaderEnvelope` for headers, `SignedData` for data). Each involves allocating proto message structs, converting Go types to proto types, and calling `proto.Marshal`.
20+
21+
**This branch**: The combined blob wrapper uses a custom binary format: `[magic 4B][header_len 4B][header_bytes][data_len 4B][data_bytes][sig_len 4B][sig_bytes]`. Individual header and data fields are still proto-encoded internally (hash computation requires it), but the envelope wrapper avoids all proto overhead.
22+
23+
### Why
24+
- Zero allocation for the blob wrapper (direct length-prefixed binary)
25+
- No proto message pool management for the envelope
26+
- No `ToProto`/`FromProto` conversion for the DA envelope or signed data
27+
- Simpler and faster encode/decode path
28+
29+
## 3. P2P Sync Removed
30+
31+
**Main**: Full nodes sync from both P2P (via `go-header` `HeaderSyncService`/`DataSyncService`) and DA. The executor broadcasts produced blocks to P2P peers. P2P events include DA height hints for targeted DA retrieval. The syncer runs a P2P worker loop alongside the DA follower.
32+
33+
**This branch**: All P2P sync is removed. Nodes sync exclusively from DA. No P2P broadcasting, no P2P stores, no P2P handler, no DA height hints.
34+
35+
### Removed code
36+
- `syncing/p2p_handler.go` — entire file deleted
37+
- `syncing/syncer_mock.go` — P2P handler mock deleted
38+
- `common/expected_interfaces.go``HeaderP2PBroadcaster`/`DataP2PBroadcaster` types removed
39+
- P2P broadcasting in `executing/executor.go` removed
40+
- P2P worker loop in syncer removed
41+
- `SourceP2P` event source removed
42+
- `DaHeightHints` field removed from `DAHeightEvent`
43+
- `headerStore`/`dataStore` parameters removed from `NewSyncer` and component constructors
44+
- `headerSyncService`/`dataSyncService` parameters removed from aggregator component constructors
45+
- `DAHintAppender` interface removed from DA submitter
46+
- Separate `SubmitHeaders`/`SubmitData` replaced with single `SubmitBlocks`
47+
48+
### Why
49+
- Removes network overhead from P2P gossip
50+
- Eliminates the complexity of two sync sources competing
51+
- DA is the single source of truth, reducing consistency issues
52+
- Removes libp2p dependency from the block package's hot path
53+
- Simplifies the syncer from 3 worker loops to 2 (process loop + pending worker)
54+
55+
## 4. DA Submitter Simplified
56+
57+
**Main**: `DASubmitterAPI` has two methods: `SubmitHeaders` and `SubmitData`, each with separate batching strategies, mutex locks, and retry loops.
58+
59+
**This branch**: `DASubmitterAPI` has a single `SubmitBlocks` method that takes headers and data together, creates combined blobs, signs them, and submits to a single namespace. One batching strategy, one mutex, one retry loop.
60+
61+
### Why
62+
- Halves the submission loop complexity
63+
- Eliminates the envelope cache (no more retry-signing concern)
64+
- Single retry loop instead of two
65+
- Combined blobs submitted atomically — no partial header-without-data states
66+
67+
## Migration Notes
68+
69+
- Existing DA data from main branch is **not readable** by this branch (different blob format)
70+
- This branch requires a fresh start or a migration tool
71+
- The `P2PSignedHeader` and `P2PData` types still exist in `types/` but are no longer used by the block package
72+
- External consumers of `NewSyncComponents` and `NewAggregatorWithCatchupComponents` must update their call sites

block/components.go

Lines changed: 13 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"errors"
66
"fmt"
77

8-
"github.com/celestiaorg/go-header"
98
"github.com/rs/zerolog"
109

1110
"github.com/evstack/ev-node/block/internal/cache"
@@ -22,9 +21,7 @@ import (
2221
"github.com/evstack/ev-node/pkg/genesis"
2322
"github.com/evstack/ev-node/pkg/signer"
2423
"github.com/evstack/ev-node/pkg/store"
25-
"github.com/evstack/ev-node/pkg/sync"
2624
"github.com/evstack/ev-node/pkg/telemetry"
27-
"github.com/evstack/ev-node/types"
2825
)
2926

3027
// Components represents the block-related components
@@ -133,18 +130,14 @@ func (bc *Components) Stop() error {
133130
}
134131

135132
// NewSyncComponents creates components for a non-aggregator full node that can only sync blocks.
136-
// Non-aggregator full nodes can sync from P2P and DA but cannot produce blocks or submit to DA.
133+
// Non-aggregator full nodes can sync from DA but cannot produce blocks or submit to DA.
137134
// They have more sync capabilities than light nodes but no block production. No signer required.
138135
func NewSyncComponents(
139136
config config.Config,
140137
genesis genesis.Genesis,
141138
store store.Store,
142139
exec coreexecutor.Executor,
143140
daClient da.Client,
144-
headerStore header.Store[*types.P2PSignedHeader],
145-
dataStore header.Store[*types.P2PData],
146-
headerDAHintAppender submitting.DAHintAppender,
147-
dataDAHintAppender submitting.DAHintAppender,
148141
logger zerolog.Logger,
149142
metrics *Metrics,
150143
blockOpts BlockOptions,
@@ -166,8 +159,6 @@ func NewSyncComponents(
166159
metrics,
167160
config,
168161
genesis,
169-
headerStore,
170-
dataStore,
171162
logger,
172163
blockOpts,
173164
errorCh,
@@ -182,10 +173,10 @@ func NewSyncComponents(
182173
if p, ok := exec.(coreexecutor.ExecPruner); ok {
183174
execPruner = p
184175
}
185-
pruner := pruner.New(logger, store, execPruner, config.Pruning, config.Node.BlockTime.Duration, config.DA.Address)
176+
prunerObj := pruner.New(logger, store, execPruner, config.Pruning, config.Node.BlockTime.Duration, config.DA.Address)
186177

187178
// Create submitter for sync nodes (no signer, only DA inclusion processing)
188-
var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger, headerDAHintAppender, dataDAHintAppender)
179+
var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger)
189180
if config.Instrumentation.IsTracingEnabled() {
190181
daSubmitter = submitting.WithTracingDASubmitter(daSubmitter)
191182
}
@@ -207,13 +198,13 @@ func NewSyncComponents(
207198
Syncer: syncer,
208199
Submitter: submitter,
209200
Cache: cacheManager,
210-
Pruner: pruner,
201+
Pruner: prunerObj,
211202
errorCh: errorCh,
212203
}, nil
213204
}
214205

215206
// newAggregatorComponents creates components for an aggregator full node that can produce and sync blocks.
216-
// Aggregator nodes have full capabilities - they can produce blocks, sync from P2P and DA,
207+
// Aggregator nodes have full capabilities - they can produce blocks, sync from DA,
217208
// and submit headers/data to DA. Requires a signer for block production and DA submission.
218209
func newAggregatorComponents(
219210
config config.Config,
@@ -223,8 +214,6 @@ func newAggregatorComponents(
223214
sequencer coresequencer.Sequencer,
224215
daClient da.Client,
225216
signer signer.Signer,
226-
headerSyncService *sync.HeaderSyncService,
227-
dataSyncService *sync.DataSyncService,
228217
logger zerolog.Logger,
229218
metrics *Metrics,
230219
blockOpts BlockOptions,
@@ -252,8 +241,6 @@ func newAggregatorComponents(
252241
metrics,
253242
config,
254243
genesis,
255-
headerSyncService,
256-
dataSyncService,
257244
logger,
258245
blockOpts,
259246
errorCh,
@@ -271,7 +258,7 @@ func newAggregatorComponents(
271258
if p, ok := exec.(coreexecutor.ExecPruner); ok {
272259
execPruner = p
273260
}
274-
pruner := pruner.New(logger, store, execPruner, config.Pruning, config.Node.BlockTime.Duration, config.DA.Address)
261+
prunerObj := pruner.New(logger, store, execPruner, config.Pruning, config.Node.BlockTime.Duration, config.DA.Address)
275262

276263
reaper, err := reaping.NewReaper(
277264
exec,
@@ -286,17 +273,17 @@ func newAggregatorComponents(
286273
return nil, fmt.Errorf("failed to create reaper: %w", err)
287274
}
288275

289-
if config.Node.BasedSequencer { // no submissions needed for bases sequencer
276+
if config.Node.BasedSequencer { // no submissions needed for based sequencer
290277
return &Components{
291278
Executor: executor,
292-
Pruner: pruner,
279+
Pruner: prunerObj,
293280
Reaper: reaper,
294281
Cache: cacheManager,
295282
errorCh: errorCh,
296283
}, nil
297284
}
298285

299-
var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger, headerSyncService, dataSyncService)
286+
var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger)
300287
if config.Instrumentation.IsTracingEnabled() {
301288
daSubmitter = submitting.WithTracingDASubmitter(daSubmitter)
302289
}
@@ -316,7 +303,7 @@ func newAggregatorComponents(
316303

317304
return &Components{
318305
Executor: executor,
319-
Pruner: pruner,
306+
Pruner: prunerObj,
320307
Reaper: reaper,
321308
Submitter: submitter,
322309
Cache: cacheManager,
@@ -325,10 +312,10 @@ func newAggregatorComponents(
325312
}
326313

327314
// NewAggregatorWithCatchupComponents creates aggregator components that include a Syncer
328-
// for DA/P2P catchup before block production begins.
315+
// for DA catchup before block production begins.
329316
//
330317
// The caller should:
331-
// 1. Start the Syncer and wait for DA head + P2P catchup
318+
// 1. Start the Syncer and wait for DA head catchup
332319
// 2. Stop the Syncer and set Components.Syncer = nil
333320
// 3. Call Components.Start() — which will start the Executor and other components
334321
func NewAggregatorWithCatchupComponents(
@@ -339,16 +326,14 @@ func NewAggregatorWithCatchupComponents(
339326
sequencer coresequencer.Sequencer,
340327
daClient da.Client,
341328
signer signer.Signer,
342-
headerSyncService *sync.HeaderSyncService,
343-
dataSyncService *sync.DataSyncService,
344329
logger zerolog.Logger,
345330
metrics *Metrics,
346331
blockOpts BlockOptions,
347332
raftNode common.RaftNode,
348333
) (*Components, error) {
349334
bc, err := newAggregatorComponents(
350335
config, genesis, store, exec, sequencer, daClient, signer,
351-
headerSyncService, dataSyncService, logger, metrics, blockOpts, raftNode,
336+
logger, metrics, blockOpts, raftNode,
352337
)
353338
if err != nil {
354339
return nil, err
@@ -364,8 +349,6 @@ func NewAggregatorWithCatchupComponents(
364349
metrics,
365350
config,
366351
genesis,
367-
headerSyncService.Store(),
368-
dataSyncService.Store(),
369352
logger,
370353
blockOpts,
371354
catchupErrCh,

block/components_test.go

Lines changed: 7 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,10 @@ import (
2323
"github.com/evstack/ev-node/pkg/signer/noop"
2424
"github.com/evstack/ev-node/pkg/store"
2525
testmocks "github.com/evstack/ev-node/test/mocks"
26-
extmocks "github.com/evstack/ev-node/test/mocks/external"
27-
"github.com/evstack/ev-node/types"
2826
)
2927

30-
// noopDAHintAppender is a no-op implementation of DAHintAppender for testing
31-
type noopDAHintAppender struct{}
32-
33-
func (n noopDAHintAppender) AppendDAHint(ctx context.Context, daHeight uint64, heights ...uint64) error {
34-
return nil
35-
}
36-
28+
// Test the error channel mechanism works as intended
3729
func TestBlockComponents_ExecutionClientFailure_StopsNode(t *testing.T) {
38-
// Test the error channel mechanism works as intended
39-
4030
// Create a mock component that simulates execution client failure
4131
errorCh := make(chan error, 1)
4232
criticalError := errors.New("execution client connection lost")
@@ -62,13 +52,13 @@ func TestBlockComponents_ExecutionClientFailure_StopsNode(t *testing.T) {
6252
assert.Contains(t, err.Error(), "execution client connection lost")
6353
}
6454

55+
// Simple lifecycle test without creating full components
6556
func TestBlockComponents_StartStop_Lifecycle(t *testing.T) {
66-
// Simple lifecycle test without creating full components
57+
// Test that Start and Stop work without hanging
6758
bc := &Components{
6859
errorCh: make(chan error, 1),
6960
}
7061

71-
// Test that Start and Stop work without hanging
7262
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
7363
defer cancel()
7464

@@ -96,26 +86,12 @@ func TestNewSyncComponents_Creation(t *testing.T) {
9686
daClient.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe()
9787
daClient.On("HasForcedInclusionNamespace").Return(false).Maybe()
9888

99-
// Create mock P2P stores
100-
mockHeaderStore := extmocks.NewMockStore[*types.P2PSignedHeader](t)
101-
mockDataStore := extmocks.NewMockStore[*types.P2PData](t)
102-
103-
// Create noop DAHintAppenders for testing
104-
headerHintAppender := noopDAHintAppender{}
105-
dataHintAppender := noopDAHintAppender{}
106-
107-
// Just test that the constructor doesn't panic - don't start the components
108-
// to avoid P2P store dependencies
10989
components, err := NewSyncComponents(
11090
cfg,
11191
gen,
11292
memStore,
11393
mockExec,
11494
daClient,
115-
mockHeaderStore,
116-
mockDataStore,
117-
headerHintAppender,
118-
dataHintAppender,
11995
zerolog.Nop(),
12096
NopMetrics(),
12197
DefaultBlockOptions(),
@@ -171,12 +147,10 @@ func TestNewAggregatorComponents_Creation(t *testing.T) {
171147
mockSeq,
172148
daClient,
173149
mockSigner,
174-
nil, // header broadcaster
175-
nil, // data broadcaster
176150
zerolog.Nop(),
177151
NopMetrics(),
178152
DefaultBlockOptions(),
179-
nil, // raftNode
153+
nil,
180154
)
181155

182156
require.NoError(t, err)
@@ -189,9 +163,9 @@ func TestNewAggregatorComponents_Creation(t *testing.T) {
189163
assert.Nil(t, components.Syncer) // Aggregator nodes currently don't create syncers in this constructor
190164
}
191165

166+
// This test verifies that when the executor's execution client calls fail,
167+
// the error is properly propagated through the error channel and stops the node
192168
func TestExecutor_RealExecutionClientFailure_StopsNode(t *testing.T) {
193-
// This test verifies that when the executor's execution client calls fail,
194-
// the error is properly propagated through the error channel and stops the node
195169
synctest.Test(t, func(t *testing.T) {
196170
ds := sync.MutexWrap(datastore.NewMapDatastore())
197171
memStore := store.New(ds)
@@ -255,12 +229,10 @@ func TestExecutor_RealExecutionClientFailure_StopsNode(t *testing.T) {
255229
mockSeq,
256230
daClient,
257231
testSigner,
258-
nil, // header broadcaster
259-
nil, // data broadcaster
260232
zerolog.Nop(),
261233
NopMetrics(),
262234
DefaultBlockOptions(),
263-
nil, // raftNode
235+
nil,
264236
)
265237
require.NoError(t, err)
266238

0 commit comments

Comments
 (0)