Skip to content

Commit 9743396

Browse files
authored
fix: flashblock benchmark scheduling (#157)
1 parent 149daf7 commit 9743396

File tree

7 files changed

+96
-98
lines changed

7 files changed

+96
-98
lines changed

runner/clients/reth/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (r *RethClient) Run(ctx context.Context, cfg *types.RuntimeConfig) error {
9494
args = append(args, "--engine.state-provider-metrics")
9595
args = append(args, "--disable-discovery")
9696
args = append(args, "--port", fmt.Sprintf("%d", r.p2pPort))
97-
args = append(args, "-vvv")
97+
args = append(args, "-vv")
9898

9999
// increase mempool size
100100
args = append(args, "--txpool.pending-max-count", "100000000")

runner/network/consensus/validator_consensus.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,23 +64,34 @@ func (f *SyncingConsensusClient) propose(ctx context.Context, payload *engine.Ex
6464
}
6565

6666
// Start starts the fake consensus client.
67-
func (f *SyncingConsensusClient) Start(ctx context.Context, payloads []engine.ExecutableData, metricsCollector metrics.Collector, firstTestBlock uint64) error {
67+
func (f *SyncingConsensusClient) Start(ctx context.Context, payloads []engine.ExecutableData, metricsCollector metrics.Collector, firstTestBlock uint64, startedBlockSignal chan uint64) error {
6868
f.log.Info("Starting sync benchmark", "num_payloads", len(payloads))
6969
m := metrics.NewBlockMetrics()
70+
7071
for i := 0; i < len(payloads); i++ {
71-
m.SetBlockNumber(uint64(max(0, int(payloads[i].Number)-int(firstTestBlock))))
72+
m.SetBlockNumber(uint64(max(0, int(payloads[i].Number)-int(firstTestBlock)+1)))
7273
f.log.Info("Proposing payload", "payload_index", i)
74+
75+
startTime := time.Now()
7376
err := f.propose(ctx, &payloads[i], m)
7477
if err != nil {
7578
return err
7679
}
7780

81+
select {
82+
case startedBlockSignal <- payloads[i].Number + 1:
83+
default:
84+
}
85+
86+
time.Sleep(time.Until(startTime.Add(f.options.BlockTime)))
87+
7888
if payloads[i].Number >= firstTestBlock {
7989
err = metricsCollector.Collect(ctx, m)
8090
if err != nil {
8191
f.log.Error("Failed to collect metrics", "error", err)
8292
}
8393
}
94+
8495
}
8596
return nil
8697
}

runner/network/flashblocks/replay_server.go

Lines changed: 32 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
type ReplayServer struct {
1818
log log.Logger
1919
port uint64
20-
flashblocks []types.FlashblocksPayloadV1
20+
flashblocks map[uint64][]types.FlashblocksPayloadV1
2121
blockTime time.Duration
2222

2323
server *http.Server
@@ -30,7 +30,7 @@ type ReplayServer struct {
3030
stopOnce sync.Once
3131
}
3232

33-
func NewReplayServer(log log.Logger, port uint64, flashblocks []types.FlashblocksPayloadV1, blockTime time.Duration) *ReplayServer {
33+
func NewReplayServer(log log.Logger, port uint64, flashblocks map[uint64][]types.FlashblocksPayloadV1, blockTime time.Duration) *ReplayServer {
3434
return &ReplayServer{
3535
log: log,
3636
port: port,
@@ -117,100 +117,53 @@ func (s *ReplayServer) removeConnection(conn *websocket.Conn) {
117117
}
118118

119119
// ReplayFlashblocks replays flashblocks to connected clients at evenly spaced intervals.
120-
func (s *ReplayServer) ReplayFlashblocks(ctx context.Context) error {
120+
func (s *ReplayServer) ReplayFlashblock(ctx context.Context, blockNumber uint64) error {
121121
if len(s.flashblocks) == 0 {
122122
s.log.Info("No flashblocks to replay")
123123
return nil
124124
}
125125

126-
blockGroups := s.groupFlashblocksByBlock()
126+
flashblocks, ok := s.flashblocks[blockNumber]
127+
if !ok {
128+
s.log.Info("No flashblocks to replay for block", "block_number", blockNumber)
129+
return nil
130+
}
127131

128132
s.log.Info("Starting flashblock replay",
129-
"total_flashblocks", len(s.flashblocks),
130-
"num_blocks", len(blockGroups),
133+
"flashblocks", len(flashblocks),
134+
)
135+
136+
numIntervals := 1
137+
if len(flashblocks) > 1 {
138+
numIntervals = len(flashblocks)
139+
}
140+
141+
interval := s.blockTime / time.Duration(numIntervals)
142+
143+
s.log.Debug("Replaying flashblocks for block",
144+
"block_number", blockNumber,
145+
"num_flashblocks", len(flashblocks),
146+
"interval", interval,
131147
)
132148

133-
for blockNum, flashblocks := range blockGroups {
149+
for i, flashblock := range flashblocks {
134150
select {
135151
case <-ctx.Done():
136152
return ctx.Err()
137153
default:
138154
}
139155

140-
if len(flashblocks) == 0 {
141-
continue
142-
}
143-
144-
interval := s.blockTime / time.Duration(len(flashblocks)+1)
145-
146-
s.log.Debug("Replaying flashblocks for block",
147-
"block_number", blockNum,
148-
"num_flashblocks", len(flashblocks),
149-
"interval", interval,
150-
)
151-
152-
for i, flashblock := range flashblocks {
153-
select {
154-
case <-ctx.Done():
155-
return ctx.Err()
156-
default:
157-
}
158-
159-
time.Sleep(interval)
160-
161-
if err := s.broadcastFlashblock(flashblock); err != nil {
162-
s.log.Warn("Error broadcasting flashblock", "err", err, "index", i)
163-
}
156+
if err := s.broadcastFlashblock(flashblock); err != nil {
157+
s.log.Warn("Error broadcasting flashblock", "err", err, "index", i)
164158
}
165159

166-
remainingTime := s.blockTime - interval*time.Duration(len(flashblocks))
167-
if remainingTime > 0 {
168-
time.Sleep(remainingTime)
169-
}
160+
time.Sleep(interval)
170161
}
171162

172163
s.log.Info("Flashblock replay complete")
173164
return nil
174165
}
175166

176-
// groupFlashblocksByBlock groups flashblocks by block number, sorted by index.
177-
func (s *ReplayServer) groupFlashblocksByBlock() map[uint64][]types.FlashblocksPayloadV1 {
178-
groups := make(map[uint64][]types.FlashblocksPayloadV1)
179-
180-
// Build PayloadID -> blockNum mapping from flashblocks with Base
181-
payloadIDToBlockNum := make(map[types.PayloadID]uint64)
182-
for _, fb := range s.flashblocks {
183-
if fb.Base != nil {
184-
payloadIDToBlockNum[fb.PayloadID] = uint64(fb.Base.BlockNumber)
185-
}
186-
}
187-
188-
for _, fb := range s.flashblocks {
189-
var blockNum uint64
190-
if fb.Base != nil {
191-
blockNum = uint64(fb.Base.BlockNumber)
192-
} else if bn, ok := payloadIDToBlockNum[fb.PayloadID]; ok {
193-
blockNum = bn
194-
}
195-
groups[blockNum] = append(groups[blockNum], fb)
196-
}
197-
198-
for blockNum := range groups {
199-
sortByIndex(groups[blockNum])
200-
}
201-
202-
return groups
203-
}
204-
205-
func sortByIndex(flashblocks []types.FlashblocksPayloadV1) {
206-
for i := 1; i < len(flashblocks); i++ {
207-
j := i
208-
for j > 0 && flashblocks[j-1].Index > flashblocks[j].Index {
209-
flashblocks[j-1], flashblocks[j] = flashblocks[j], flashblocks[j-1]
210-
j--
211-
}
212-
}
213-
}
214167

215168
func (s *ReplayServer) broadcastFlashblock(flashblock types.FlashblocksPayloadV1) error {
216169
data, err := json.Marshal(flashblock)
@@ -232,9 +185,15 @@ func (s *ReplayServer) broadcastFlashblock(flashblock types.FlashblocksPayloadV1
232185
}
233186
}
234187

188+
blockNumber := 0
189+
if flashblock.Base != nil {
190+
blockNumber = int(flashblock.Base.BlockNumber)
191+
}
192+
235193
s.log.Debug("Broadcasted flashblock",
236194
"payload_id", fmt.Sprintf("%x", flashblock.PayloadID),
237195
"index", flashblock.Index,
196+
"block_number", blockNumber,
238197
"num_clients", len(connections),
239198
)
240199

runner/network/network_benchmark.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,10 +183,10 @@ func (nb *NetworkBenchmark) benchmarkValidator(ctx context.Context, payloadResul
183183

184184
nb.log.Info("Validator header", "number", validatorHeader.Number.Uint64(), "lastSetupBlock", lastSetupBlock)
185185

186-
if validatorHeader.Number.Cmp(big.NewInt(int64(lastSetupBlock))) < 0 {
186+
if validatorHeader.Number.Cmp(big.NewInt(int64(lastSetupBlock)-1)) < 0 {
187187
nb.log.Info("Validator is behind first test block, catching up", "validator_block", validatorHeader.Number.Uint64(), "last_setup_block", lastSetupBlock)
188188
// fetch all blocks the validator node is missing
189-
for i := validatorHeader.Number.Uint64() + 1; i <= lastSetupBlock; i++ {
189+
for i := validatorHeader.Number.Uint64() + 1; i < lastSetupBlock; i++ {
190190
block, err := sequencerClient.Client().BlockByNumber(ctx, big.NewInt(int64(i)))
191191
if err != nil {
192192
sequencerClient.Stop()

runner/network/sequencer_benchmark.go

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics.
154154
flashblocksClient := sequencerClient.FlashblocksClient()
155155
if flashblocksClient != nil {
156156
nb.log.Info("Starting flashblocks collection")
157-
flashblockCollector = newFlashblockCollector()
157+
flashblockCollector = newFlashblockCollector(nb.log)
158158
flashblocksClient.AddListener(flashblockCollector)
159159

160160
if err := flashblocksClient.Start(benchmarkCtx); err != nil {
@@ -204,23 +204,26 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics.
204204

205205
go func() {
206206
consensusClient := consensus.NewSequencerConsensusClient(nb.log, sequencerClient.Client(), sequencerClient.AuthClient(), mempool, consensus.ConsensusClientOptions{
207-
BlockTime: params.BlockTime,
208-
GasLimit: params.GasLimit,
209-
GasLimitSetup: 1e9, // 1G gas
207+
BlockTime: params.BlockTime,
208+
GasLimit: params.GasLimit,
209+
GasLimitSetup: 1e9, // 1G gas
210210
ParallelTxBatches: nb.config.Config.ParallelTxBatches(),
211211
}, headBlockHash, headBlockNumber, l1Chain, nb.config.BatcherAddr())
212212

213213
payloads := make([]engine.ExecutableData, 0)
214214

215+
var lastSetupPayload *engine.ExecutableData
215216
setupLoop:
216217
for {
217218
_blockMetrics := metrics.NewBlockMetrics()
218-
_, err := consensusClient.Propose(benchmarkCtx, _blockMetrics, true)
219+
setupPayload, err := consensusClient.Propose(benchmarkCtx, _blockMetrics, true)
219220
if err != nil {
220221
errChan <- err
221222
return
222223
}
223224

225+
lastSetupPayload = setupPayload
226+
224227
select {
225228
case <-setupComplete:
226229
break setupLoop
@@ -231,6 +234,8 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics.
231234

232235
}
233236

237+
payloads = append(payloads, *lastSetupPayload)
238+
234239
blockMetrics := metrics.NewBlockMetrics()
235240

236241
// run for a few blocks
@@ -276,7 +281,7 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics.
276281
return nil, 0, err
277282
case payloads := <-payloadResult:
278283
// Collect flashblocks if available
279-
var flashblocks []types.FlashblocksPayloadV1
284+
var flashblocks map[uint64][]types.FlashblocksPayloadV1
280285
if flashblockCollector != nil {
281286
flashblocks = flashblockCollector.GetFlashblocks()
282287
nb.log.Info("Collected flashblocks", "count", len(flashblocks))
@@ -286,37 +291,50 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics.
286291
ExecutablePayloads: payloads,
287292
Flashblocks: flashblocks,
288293
}
289-
return result, payloads[0].Number - 1, nil
294+
return result, payloads[0].Number, nil
290295
}
291296
}
292297

293298
// flashblockCollector implements FlashblockListener to collect flashblocks.
294299
type flashblockCollector struct {
295-
flashblocks []types.FlashblocksPayloadV1
296-
mu sync.Mutex
300+
log log.Logger
301+
flashblocks map[uint64][]types.FlashblocksPayloadV1
302+
currentBaseBlock *uint64
303+
mu sync.Mutex
297304
}
298305

299306
// newFlashblockCollector creates a new flashblock collector.
300-
func newFlashblockCollector() *flashblockCollector {
307+
func newFlashblockCollector(log log.Logger) *flashblockCollector {
301308
return &flashblockCollector{
302-
flashblocks: make([]types.FlashblocksPayloadV1, 0),
309+
flashblocks: make(map[uint64][]types.FlashblocksPayloadV1),
310+
log: log,
303311
}
304312
}
305313

306314
// OnFlashblock implements FlashblockListener.
307315
func (c *flashblockCollector) OnFlashblock(flashblock types.FlashblocksPayloadV1) {
308316
c.mu.Lock()
309317
defer c.mu.Unlock()
310-
c.flashblocks = append(c.flashblocks, flashblock)
318+
if flashblock.Base != nil {
319+
baseBlock := uint64(flashblock.Base.BlockNumber)
320+
c.currentBaseBlock = &baseBlock
321+
} else if c.currentBaseBlock == nil {
322+
c.log.Warn("received flashblock without base block number")
323+
return
324+
}
325+
c.log.Info("Collected flashblock", "block_number", *c.currentBaseBlock, "index", flashblock.Index, "tx_count", len(flashblock.Diff.Transactions))
326+
c.flashblocks[*c.currentBaseBlock] = append(c.flashblocks[*c.currentBaseBlock], flashblock)
311327
}
312328

313329
// GetFlashblocks returns all collected flashblocks.
314-
func (c *flashblockCollector) GetFlashblocks() []types.FlashblocksPayloadV1 {
330+
func (c *flashblockCollector) GetFlashblocks() map[uint64][]types.FlashblocksPayloadV1 {
315331
c.mu.Lock()
316332
defer c.mu.Unlock()
317333

318334
// Return a copy to avoid race conditions
319-
result := make([]types.FlashblocksPayloadV1, len(c.flashblocks))
320-
copy(result, c.flashblocks)
335+
result := make(map[uint64][]types.FlashblocksPayloadV1)
336+
for blockNumber, flashblocks := range c.flashblocks {
337+
result[blockNumber] = append([]types.FlashblocksPayloadV1{}, flashblocks...)
338+
}
321339
return result
322340
}

runner/network/types/payload_result.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ type PayloadResult struct {
1212
ExecutablePayloads []engine.ExecutableData
1313

1414
// Flashblocks are the flashblock payloads collected during the benchmark (if available)
15-
Flashblocks []clientTypes.FlashblocksPayloadV1
15+
Flashblocks map[uint64][]clientTypes.FlashblocksPayloadV1
1616
}
1717

1818
// HasFlashblocks returns true if flashblock payloads were collected.

runner/network/validator_benchmark.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ func (vb *validatorBenchmark) Run(ctx context.Context, payloads []engine.Executa
6767
headBlockHash := headBlockHeader.Hash()
6868
headBlockNumber := headBlockHeader.Number.Uint64()
6969

70+
startedBlockSignal := make(chan uint64)
71+
7072
// If flashblock server is available and client supports flashblocks, wait for connection
7173
// and start replaying flashblocks in the background
7274
if vb.flashblockServer != nil && vb.validatorClient.SupportsFlashblocks() {
@@ -80,9 +82,17 @@ func (vb *validatorBenchmark) Run(ctx context.Context, payloads []engine.Executa
8082

8183
// Start replaying flashblocks in a goroutine
8284
go func() {
83-
if err := vb.flashblockServer.ReplayFlashblocks(ctx); err != nil {
84-
if !errors.Is(err, context.Canceled) {
85-
vb.log.Warn("Error replaying flashblocks", "err", err)
85+
for {
86+
select {
87+
case <-ctx.Done():
88+
return
89+
case blockNumber := <-startedBlockSignal:
90+
vb.log.Info("Replaying flashblocks for block", "block_number", blockNumber)
91+
if err := vb.flashblockServer.ReplayFlashblock(ctx, blockNumber); err != nil {
92+
if !errors.Is(err, context.Canceled) {
93+
vb.log.Warn("Error replaying flashblocks", "err", err)
94+
}
95+
}
8696
}
8797
}
8898
}()
@@ -93,7 +103,7 @@ func (vb *validatorBenchmark) Run(ctx context.Context, payloads []engine.Executa
93103
BlockTime: vb.config.Params.BlockTime,
94104
}, headBlockHash, headBlockNumber)
95105

96-
err = consensusClient.Start(ctx, payloads, metricsCollector, lastSetupBlock)
106+
err = consensusClient.Start(ctx, payloads, metricsCollector, lastSetupBlock + 1, startedBlockSignal)
97107
if err != nil {
98108
if errors.Is(err, context.Canceled) {
99109
return err

0 commit comments

Comments
 (0)