Skip to content

Commit 6c295a6

Browse files
authored
Better streamlining of sender recovery (erigontech#18390)
## Summary - Refactor sender recovery stage to process individual transactions instead of blocks for better parallelism ## Sender Recovery Stage (`execution/stagedsync/stage_senders.go`) - Change job granularity from block-level to transaction-level, this means that we have parallelism on the chain-tip. - Each worker now processes individual transactions instead of entire blocks - Transactions from the same block can be processed concurrently across workers - Added `pendingBlock` tracking to aggregate results before writing to collector Before there was no parallelism in sender recovery. next step: Query senders from txpool when possible. this is marginal but it could make sense in the future.
1 parent 5540ed3 commit 6c295a6

1 file changed

Lines changed: 89 additions & 42 deletions

File tree

execution/stagedsync/stage_senders.go

Lines changed: 89 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"github.com/erigontech/erigon/execution/stagedsync/headerdownload"
4444
"github.com/erigontech/erigon/execution/stagedsync/stages"
4545
"github.com/erigontech/erigon/execution/types"
46+
"github.com/erigontech/erigon/execution/types/accounts"
4647
"github.com/erigontech/erigon/node/ethconfig"
4748
)
4849

@@ -115,6 +116,7 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R
115116
defer logEvery.Stop()
116117

117118
startFrom := s.BlockNumber + 1
119+
recoveryStart := time.Now()
118120

119121
jobs := make(chan *senderRecoveryJob, cfg.batchSize)
120122
out := make(chan *senderRecoveryJob, cfg.batchSize)
@@ -136,6 +138,18 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R
136138
collectorSenders.SortAndFlushInBackground(true)
137139
collectorSenders.LogLvl(log.LvlDebug)
138140

141+
// pendingBlocks tracks blocks that are being processed
142+
// key: blockIndex, value: {senders slice, txCount, received count}
143+
type pendingBlock struct {
144+
senders []byte
145+
txCount int
146+
received int
147+
hash common.Hash
148+
}
149+
pendingBlocks := make(map[int]*pendingBlock)
150+
var pendingMu sync.Mutex
151+
var lastBlockIndex int
152+
139153
errCh := make(chan senderRecoveryError)
140154
go func() {
141155
defer dbg.LogPanic()
@@ -147,12 +161,6 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R
147161
select {
148162
case <-quitCh:
149163
return
150-
case <-logEvery.C:
151-
n := s.BlockNumber
152-
if j != nil {
153-
n += uint64(j.index)
154-
}
155-
logger.Info(fmt.Sprintf("[%s] Recovery", logPrefix), "block_number", n, "ch", fmt.Sprintf("%d/%d", len(jobs), cap(jobs)))
156164
case j, ok = <-out:
157165
if !ok {
158166
return
@@ -162,13 +170,31 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R
162170
return
163171
}
164172

165-
k := make([]byte, 4)
166-
binary.BigEndian.PutUint32(k, uint32(j.index))
167-
index := int(binary.BigEndian.Uint32(k))
168-
if err := collectorSenders.Collect(dbutils.BlockBodyKey(s.BlockNumber+uint64(index)+1, j.blockHash), j.senders); err != nil {
169-
errCh <- senderRecoveryError{err: j.err}
173+
pendingMu.Lock()
174+
pb := pendingBlocks[j.blockIndex]
175+
if pb == nil {
176+
pendingMu.Unlock()
177+
errCh <- senderRecoveryError{err: fmt.Errorf("unexpected block index %d", j.blockIndex)}
170178
return
171179
}
180+
// Copy sender address to the correct position
181+
fromValue := j.from.Value()
182+
copy(pb.senders[j.txIndex*length.Addr:], fromValue[:])
183+
pb.received++
184+
185+
// Check if block is complete
186+
if pb.received == pb.txCount {
187+
if err := collectorSenders.Collect(dbutils.BlockBodyKey(s.BlockNumber+uint64(j.blockIndex)+1, pb.hash), pb.senders); err != nil {
188+
pendingMu.Unlock()
189+
errCh <- senderRecoveryError{err: err}
190+
return
191+
}
192+
delete(pendingBlocks, j.blockIndex)
193+
if j.blockIndex > lastBlockIndex {
194+
lastBlockIndex = j.blockIndex
195+
}
196+
}
197+
pendingMu.Unlock()
172198
}
173199
}
174200
}()
@@ -237,26 +263,50 @@ Loop:
237263
continue
238264
}
239265

240-
j := &senderRecoveryJob{
241-
body: body,
242-
blockNumber: blockNumber,
243-
blockTime: header.Time,
244-
blockHash: blockHash,
245-
index: int(blockNumber) - int(s.BlockNumber) - 1,
266+
blockIndex := int(blockNumber) - int(s.BlockNumber) - 1
267+
if blockIndex < 0 {
268+
panic(blockIndex) //uint-underflow
246269
}
247-
if j.index < 0 {
248-
panic(j.index) //uint-underflow
270+
271+
// Skip blocks with no transactions
272+
if len(body.Transactions) == 0 {
273+
// Write empty senders for blocks with no transactions
274+
if err := collectorSenders.Collect(dbutils.BlockBodyKey(s.BlockNumber+uint64(blockIndex)+1, blockHash), nil); err != nil {
275+
return err
276+
}
277+
continue
249278
}
250-
select {
251-
case recoveryErr := <-errCh:
252-
if recoveryErr.err != nil {
253-
cancelWorkers()
254-
if err := handleRecoverErr(recoveryErr); err != nil {
255-
return err
279+
280+
// Register pending block
281+
pendingMu.Lock()
282+
pendingBlocks[blockIndex] = &pendingBlock{
283+
senders: make([]byte, len(body.Transactions)*length.Addr),
284+
txCount: len(body.Transactions),
285+
hash: blockHash,
286+
}
287+
pendingMu.Unlock()
288+
289+
// Send individual transaction jobs
290+
for txIdx, txn := range body.Transactions {
291+
j := &senderRecoveryJob{
292+
txn: txn,
293+
blockNumber: blockNumber,
294+
blockTime: header.Time,
295+
blockHash: blockHash,
296+
blockIndex: blockIndex,
297+
txIndex: txIdx,
298+
}
299+
select {
300+
case recoveryErr := <-errCh:
301+
if recoveryErr.err != nil {
302+
cancelWorkers()
303+
if err := handleRecoverErr(recoveryErr); err != nil {
304+
return err
305+
}
306+
break Loop
256307
}
257-
break Loop
308+
case jobs <- j:
258309
}
259-
case jobs <- j:
260310
}
261311
}
262312

@@ -304,6 +354,7 @@ Loop:
304354
if err = s.Update(tx, to); err != nil {
305355
return err
306356
}
357+
log.Debug(fmt.Sprintf("[%s] Recovery done", logPrefix), "from", startFrom, "to", to, "blocks", to-startFrom+1, "took", time.Since(recoveryStart))
307358
}
308359

309360
if !useExternalTx {
@@ -321,12 +372,13 @@ type senderRecoveryError struct {
321372
}
322373

323374
type senderRecoveryJob struct {
324-
body *types.Body
325-
senders []byte
375+
txn types.Transaction
376+
from accounts.Address
326377
blockHash common.Hash
327378
blockNumber uint64
328379
blockTime uint64
329-
index int
380+
blockIndex int // index of the block relative to s.BlockNumber
381+
txIndex int // index of the tx within the block
330382
err error
331383
}
332384

@@ -348,20 +400,15 @@ func recoverSenders(ctx context.Context, logPrefix string, cryptoContext *secp25
348400
return
349401
}
350402

351-
body := job.body
352-
job.body = nil // reduce ram usage and help GC
353403
signer := types.MakeSigner(config, job.blockNumber, job.blockTime)
354-
job.senders = make([]byte, len(body.Transactions)*length.Addr)
355-
for i, txn := range body.Transactions {
356-
from, err := signer.SenderWithContext(cryptoContext, txn)
357-
if err != nil {
358-
job.err = fmt.Errorf("%w: error recovering sender for tx=%x, %v",
359-
rules.ErrInvalidBlock, txn.Hash(), err)
360-
break
361-
}
362-
fromValue := from.Value()
363-
copy(job.senders[i*length.Addr:], fromValue[:])
404+
from, err := signer.SenderWithContext(cryptoContext, job.txn)
405+
if err != nil {
406+
job.err = fmt.Errorf("%w: error recovering sender for tx=%x, %v",
407+
rules.ErrInvalidBlock, job.txn.Hash(), err)
408+
} else {
409+
job.from = from
364410
}
411+
job.txn = nil // reduce ram usage and help GC
365412

366413
// prevent sending to close channel
367414
if err := common.Stopped(quit); err != nil {

0 commit comments

Comments
 (0)