Skip to content

Commit af6b551

Browse files
[r30] mapReduce: final txn must be executed in reduecer because require "all receipts of block" (#15206)
pick #15205 - also added blocks read-ahead at non-chain-tip (and added accesslist use there)
1 parent 737cc31 commit af6b551

File tree

3 files changed

+73
-17
lines changed

3 files changed

+73
-17
lines changed

eth/stagedsync/exec3.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -433,16 +433,14 @@ func ExecV3(ctx context.Context,
433433
agg.BuildFilesInBackground(outputTxNum.Load())
434434

435435
var readAhead chan uint64
436-
if !parallel {
436+
if !isMining && !inMemExec && execStage.CurrentSyncCycle.IsInitialCycle {
437437
// snapshots are often stored on chaper drives. don't expect low-read-latency and manually read-ahead.
438438
// can't use OS-level ReadAhead - because Data >> RAM
439439
// it also warmsup state a bit - by touching senders/coninbase accounts and code
440-
if !execStage.CurrentSyncCycle.IsInitialCycle {
441-
var clean func()
440+
var clean func()
442441

443-
readAhead, clean = blocksReadAhead(ctx, &cfg, 4, true)
444-
defer clean()
445-
}
442+
readAhead, clean = blocksReadAhead(ctx, &cfg, 4)
443+
defer clean()
446444
}
447445

448446
var b *types.Block

eth/stagedsync/stage_execute.go

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"time"
2525

2626
"github.com/c2h5oh/datasize"
27-
"github.com/erigontech/erigon/execution/exec3"
2827
"golang.org/x/sync/errgroup"
2928

3029
"github.com/erigontech/erigon-lib/chain"
@@ -38,7 +37,6 @@ import (
3837
"github.com/erigontech/erigon-lib/log/v3"
3938
libstate "github.com/erigontech/erigon-lib/state"
4039
"github.com/erigontech/erigon-lib/wrap"
41-
4240
"github.com/erigontech/erigon/consensus"
4341
"github.com/erigontech/erigon/core/rawdb"
4442
"github.com/erigontech/erigon/core/rawdb/rawdbhelpers"
@@ -48,6 +46,7 @@ import (
4846
"github.com/erigontech/erigon/eth/ethconfig"
4947
"github.com/erigontech/erigon/eth/stagedsync/stages"
5048
"github.com/erigontech/erigon/ethdb/prune"
49+
"github.com/erigontech/erigon/execution/exec3"
5150
"github.com/erigontech/erigon/turbo/services"
5251
"github.com/erigontech/erigon/turbo/shards"
5352
"github.com/erigontech/erigon/turbo/silkworm"
@@ -261,7 +260,7 @@ func SpawnExecuteBlocksStage(s *StageState, u Unwinder, txc wrap.TxContainer, to
261260
return nil
262261
}
263262

264-
func blocksReadAhead(ctx context.Context, cfg *ExecuteBlockCfg, workers int, histV3 bool) (chan uint64, context.CancelFunc) {
263+
func blocksReadAhead(ctx context.Context, cfg *ExecuteBlockCfg, workers int) (chan uint64, context.CancelFunc) {
265264
const readAheadBlocks = 100
266265
readAhead := make(chan uint64, readAheadBlocks)
267266
g, gCtx := errgroup.WithContext(ctx)
@@ -274,6 +273,9 @@ func blocksReadAhead(ctx context.Context, cfg *ExecuteBlockCfg, workers int, his
274273
if tx != nil {
275274
tx.Rollback()
276275
}
276+
if rec := recover(); rec != nil {
277+
err = fmt.Errorf("%s, %s", rec, dbg.Stack())
278+
}
277279
}()
278280

279281
for i := 0; ; i++ {
@@ -296,7 +298,7 @@ func blocksReadAhead(ctx context.Context, cfg *ExecuteBlockCfg, workers int, his
296298
}
297299
}
298300

299-
if err := blocksReadAheadFunc(gCtx, tx, cfg, bn+readAheadBlocks, histV3); err != nil {
301+
if err := blocksReadAheadFunc(gCtx, tx, cfg, bn+readAheadBlocks); err != nil {
300302
return err
301303
}
302304
}
@@ -307,7 +309,7 @@ func blocksReadAhead(ctx context.Context, cfg *ExecuteBlockCfg, workers int, his
307309
_ = g.Wait()
308310
}
309311
}
310-
func blocksReadAheadFunc(ctx context.Context, tx kv.Tx, cfg *ExecuteBlockCfg, blockNum uint64, histV3 bool) error {
312+
func blocksReadAheadFunc(ctx context.Context, tx kv.Tx, cfg *ExecuteBlockCfg, blockNum uint64) error {
311313
block, err := cfg.blockReader.BlockByNumber(ctx, tx, blockNum)
312314
if err != nil {
313315
return err
@@ -316,9 +318,55 @@ func blocksReadAheadFunc(ctx context.Context, tx kv.Tx, cfg *ExecuteBlockCfg, bl
316318
return nil
317319
}
318320
_, _ = cfg.engine.Author(block.HeaderNoCopy()) // Bor consensus: this calc is heavy and has cache
319-
if histV3 {
321+
322+
ttx, ok := tx.(kv.TemporalTx)
323+
if !ok {
320324
return nil
321325
}
326+
327+
stateReader := state.NewReaderV3(ttx)
328+
senders := block.Body().SendersFromTxs()
329+
330+
for _, sender := range senders {
331+
a, _ := stateReader.ReadAccountData(sender)
332+
if a == nil {
333+
continue
334+
}
335+
336+
//Code domain using .bt index - means no false-positives
337+
if code, _ := stateReader.ReadAccountCode(sender, 0); len(code) > 0 {
338+
_, _ = code[0], code[len(code)-1]
339+
}
340+
}
341+
342+
for _, txn := range block.Transactions() {
343+
to := txn.GetTo()
344+
if to != nil {
345+
a, _ := stateReader.ReadAccountData(*to)
346+
if a == nil {
347+
continue
348+
}
349+
//if account != nil && !bytes.Equal(account.CodeHash, types.EmptyCodeHash.Bytes()) {
350+
// reader.Code(*tx.To(), common.BytesToHash(account.CodeHash))
351+
//}
352+
if code, _ := stateReader.ReadAccountCode(*to, 0); len(code) > 0 {
353+
_, _ = code[0], code[len(code)-1]
354+
}
355+
356+
for _, list := range txn.GetAccessList() {
357+
stateReader.ReadAccountData(list.Address)
358+
if len(list.StorageKeys) > 0 {
359+
for _, slot := range list.StorageKeys {
360+
stateReader.ReadAccountStorage(list.Address, 0, &slot)
361+
}
362+
}
363+
}
364+
//TODO: exec txn and pre-fetch commitment keys. see also: `func (p *statePrefetcher) Prefetch` in geth
365+
}
366+
367+
}
368+
_, _ = stateReader.ReadAccountData(block.Coinbase())
369+
322370
return nil
323371
}
324372

execution/exec3/historical_trace_worker.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -280,12 +280,15 @@ func NewHistoricalTraceWorkers(consumer TraceConsumer, cfg *ExecArgs, ctx contex
280280
log.Warn("[StageCustomTrace]", "err", err)
281281
}
282282
}()
283-
return doHistoryReduce(consumer, cfg.ChainDB, ctx, toTxNum, outputTxNum, rws)
283+
return doHistoryReduce(consumer, cfg, ctx, toTxNum, outputTxNum, rws, logger)
284284
})
285285
return g
286286
}
287287

288-
func doHistoryReduce(consumer TraceConsumer, db kv.TemporalRoDB, ctx context.Context, toTxNum uint64, outputTxNum *atomic.Uint64, rws *state.ResultsQueue) error {
288+
func doHistoryReduce(consumer TraceConsumer, cfg *ExecArgs, ctx context.Context, toTxNum uint64, outputTxNum *atomic.Uint64, rws *state.ResultsQueue, logger log.Logger) error {
289+
db := cfg.ChainDB
290+
applyWorker := NewHistoricalTraceWorker(consumer, nil, rws, true, ctx, cfg, log.New())
291+
289292
tx, err := db.BeginTemporalRo(ctx)
290293
if err != nil {
291294
return err
@@ -298,7 +301,7 @@ func doHistoryReduce(consumer TraceConsumer, db kv.TemporalRoDB, ctx context.Con
298301
return err
299302
}
300303

301-
processedTxNum, _, err := processResultQueueHistorical(consumer, rws, outputTxNum.Load(), tx, true)
304+
processedTxNum, _, err := processResultQueueHistorical(consumer, rws, outputTxNum.Load(), tx, true, applyWorker)
302305
if err != nil {
303306
return fmt.Errorf("processResultQueueHistorical: %w", err)
304307
}
@@ -332,7 +335,7 @@ func doHistoryMap(consumer TraceConsumer, cfg *ExecArgs, ctx context.Context, in
332335
return mapGroup.Wait()
333336
}
334337

335-
func processResultQueueHistorical(consumer TraceConsumer, rws *state.ResultsQueue, outputTxNumIn uint64, tx kv.TemporalTx, forceStopAtBlockEnd bool) (outputTxNum uint64, stopedAtBlockEnd bool, err error) {
338+
func processResultQueueHistorical(consumer TraceConsumer, rws *state.ResultsQueue, outputTxNumIn uint64, tx kv.TemporalTx, forceStopAtBlockEnd bool, applyWorker *HistoricalTraceWorker) (outputTxNum uint64, stopedAtBlockEnd bool, err error) {
336339
rwsIt := rws.Iter()
337340
defer rwsIt.Close()
338341

@@ -342,11 +345,15 @@ func processResultQueueHistorical(consumer TraceConsumer, rws *state.ResultsQueu
342345
outputTxNum++
343346
stopedAtBlockEnd = txTask.Final
344347

348+
if txTask.Final { // final txn must be executed here, because `consensus.Finalize` requires "all receipts of block" to be available
349+
applyWorker.RunTxTaskNoLock(txTask.Reset())
350+
}
345351
if txTask.Error != nil {
346352
return outputTxNum, false, txTask.Error
347353
}
348354

349355
txTask.CreateReceipt(tx)
356+
350357
if err := consumer.Reduce(txTask, tx); err != nil {
351358
return outputTxNum, false, err
352359
}
@@ -412,12 +419,15 @@ func CustomTraceMapReduce(fromBlock, toBlock uint64, consumer TraceConsumer, ctx
412419
outTxNum := &atomic.Uint64{}
413420
outTxNum.Store(fromTxNum)
414421

422+
ctx, cancleCtx := context.WithCancel(ctx)
415423
workers := NewHistoricalTraceWorkers(consumer, cfg, ctx, toTxNum, in, WorkerCount, outTxNum, logger)
416424
defer workers.Wait()
417425

418426
workersExited := &atomic.Bool{}
419427
go func() {
420-
workers.Wait()
428+
if err := workers.Wait(); err != nil {
429+
cancleCtx()
430+
}
421431
workersExited.Store(true)
422432
}()
423433

0 commit comments

Comments
 (0)