@@ -18,7 +18,6 @@ package builderstages
1818
1919import (
2020 context0 "context"
21- "errors"
2221 "fmt"
2322 "sync/atomic"
2423 "time"
@@ -28,7 +27,6 @@ import (
2827
2928 "github.com/erigontech/erigon/common"
3029 "github.com/erigontech/erigon/common/dbg"
31- "github.com/erigontech/erigon/common/empty"
3230 "github.com/erigontech/erigon/common/log/v3"
3331 "github.com/erigontech/erigon/db/kv"
3432 "github.com/erigontech/erigon/db/kv/membatchwithdb"
@@ -40,7 +38,6 @@ import (
4038 "github.com/erigontech/erigon/execution/exec"
4139 "github.com/erigontech/erigon/execution/metrics"
4240 "github.com/erigontech/erigon/execution/protocol"
43- "github.com/erigontech/erigon/execution/protocol/aa"
4441 "github.com/erigontech/erigon/execution/protocol/params"
4542 "github.com/erigontech/erigon/execution/protocol/rules"
4643 "github.com/erigontech/erigon/execution/stagedsync"
@@ -49,7 +46,6 @@ import (
4946 "github.com/erigontech/erigon/execution/types"
5047 "github.com/erigontech/erigon/execution/types/accounts"
5148 "github.com/erigontech/erigon/execution/vm"
52- "github.com/erigontech/erigon/execution/vm/evmtypes"
5349 "github.com/erigontech/erigon/txnprovider"
5450)
5551
@@ -100,24 +96,23 @@ func SpawnBuilderExecStage(ctx context0.Context, s *stagedsync.StageState, sd *e
10096 chainID , _ := uint256 .FromBig (cfg .chainConfig .ChainID )
10197 logPrefix := s .LogPrefix ()
10298 current := cfg .builderState .BuiltBlock
103- needBAL := execCfg .ChainConfig ().IsAmsterdam (current .Header .Time ) || execCfg .IsExperimentalBAL ()
10499
105100 stateReader := state .NewReaderV3 (sd .AsGetter (tx ))
106101 ibs := state .New (stateReader )
107102 defer ibs .Release (false )
108103 ibs .SetTxContext (current .Header .Number .Uint64 (), - 1 )
109- var balIO * state.VersionedIO
110- var systemReads state.ReadSet
111- var systemWrites state.VersionedWrites
112- var systemAccess state.AccessSet
113- if needBAL {
104+
105+ ba := exec .NewBlockAssembler (exec.AssemblerCfg {
106+ ChainConfig : cfg .chainConfig ,
107+ Engine : cfg .engine ,
108+ BlockReader : cfg .blockReader ,
109+ ExperimentalBAL : execCfg .IsExperimentalBAL (),
110+ }, cfg .payloadId , current .ParentHeaderTime , current .Header , current .Uncles , current .Withdrawals )
111+
112+ if ba .HasBAL () {
114113 ibs .SetVersionMap (state .NewVersionMap (nil ))
115- balIO = & state.VersionedIO {}
116114 }
117- // Clique consensus needs forced author in the evm context
118- //if cfg.chainConfig.Consensus == chain.CliqueConsensus {
119- // execCfg.author = &cfg.builderState.BuilderConfig.Etherbase
120- //}
115+
121116 execCfg = execCfg .WithAuthor (accounts .InternAddress (cfg .builderState .BuilderConfig .Etherbase ))
122117
123118 getHeader := func (hash common.Hash , number uint64 ) (* types.Header , error ) {
@@ -135,19 +130,13 @@ func SpawnBuilderExecStage(ctx context0.Context, s *stagedsync.StageState, sd *e
135130 }
136131 defer simSd .Close ()
137132
138- chainReader := exec .NewChainReader (cfg .chainConfig , tx , cfg .blockReader , logger )
139-
140133 txNum , _ , err := sd .SeekCommitment (ctx , tx )
141134 if err != nil {
142135 return err
143136 }
144137
145- protocol .InitializeBlockExecution (cfg .engine , chainReader , current .Header , cfg .chainConfig , ibs , & state.NoopWriter {}, logger , nil )
146- if needBAL {
147- systemReads = stagedsync .MergeReadSets (systemReads , ibs .VersionedReads ())
148- systemWrites = stagedsync .MergeVersionedWrites (systemWrites , ibs .VersionedWrites (false ))
149- systemAccess = systemAccess .Merge (ibs .AccessedAddresses ())
150- ibs .ResetVersionedIO ()
138+ if err := ba .Initialize (ibs , tx , logger ); err != nil {
139+ return err
151140 }
152141
153142 coinbase := accounts .InternAddress (cfg .builderState .BuilderConfig .Etherbase )
@@ -170,7 +159,7 @@ func SpawnBuilderExecStage(ctx context0.Context, s *stagedsync.StageState, sd *e
170159 }
171160
172161 if len (txns ) > 0 {
173- logs , stop , err := addTransactionsToBlock (ctx , logPrefix , current , cfg . chainConfig , cfg .vmConfig , getHeader , cfg . engine , txns , coinbase , ibs , balIO , interrupt , cfg . payloadId , logger )
162+ logs , stop , err := ba . AddTransactions (ctx , getHeader , txns , coinbase , cfg .vmConfig , ibs , interrupt , logPrefix , logger )
174163 if err != nil {
175164 return err
176165 }
@@ -194,29 +183,27 @@ func SpawnBuilderExecStage(ctx context0.Context, s *stagedsync.StageState, sd *e
194183
195184 metrics .UpdateBlockProducerProductionDelay (current .ParentHeaderTime , current .Header .Number .Uint64 (), logger )
196185
197- logger .Debug ("SpawnBuilderExecStage" , "block" , current .Header .Number , "txn" , current .Txns .Len (), "payload" , cfg .payloadId )
198- if current .Uncles == nil {
199- current .Uncles = []* types.Header {}
186+ logger .Debug ("SpawnBuilderExecStage" , "block" , current .Header .Number , "txn" , ba .Txns .Len (), "payload" , cfg .payloadId )
187+ if ba .Uncles == nil {
188+ ba .Uncles = []* types.Header {}
200189 }
201- if current .Txns == nil {
202- current .Txns = []types.Transaction {}
190+ if ba .Txns == nil {
191+ ba .Txns = []types.Transaction {}
203192 }
204- if current .Receipts == nil {
205- current .Receipts = types.Receipts {}
193+ if ba .Receipts == nil {
194+ ba .Receipts = types.Receipts {}
206195 }
207196
208- if err := cfg .engine .Prepare (chainReader , current .Header , ibs ); err != nil {
197+ block , err := ba .AssembleBlock (stateReader , ibs , tx , logger )
198+ if err != nil {
209199 return err
210200 }
211201
212- var block * types.Block
213- if needBAL {
214- ibs .ResetVersionedIO ()
215- }
216- block , current .Requests , err = protocol .FinalizeBlockExecution (cfg .engine , stateReader , current .Header , current .Txns , current .Uncles , & state.NoopWriter {}, cfg .chainConfig , ibs , current .Receipts , current .Withdrawals , chainReader , true , logger , nil )
217- if err != nil {
218- return fmt .Errorf ("cannot finalize block execution: %s" , err )
219- }
202+ // Copy results back to BuiltBlock
203+ current .Txns = ba .Txns
204+ current .Receipts = ba .Receipts
205+ current .Requests = ba .Requests
206+ current .BlockAccessList = ba .BlockAccessList
220207
221208 // Note: This gets reset in BuilderFinish - but we need it here to
222209 // process execv3 - when we remove that this becomes redundant
@@ -231,33 +218,15 @@ func SpawnBuilderExecStage(ctx context0.Context, s *stagedsync.StageState, sd *e
231218 }
232219
233220 blockHeight := block .NumberU64 ()
234- if needBAL {
235- systemReads = stagedsync .MergeReadSets (systemReads , ibs .VersionedReads ())
236- systemWrites = stagedsync .MergeVersionedWrites (systemWrites , ibs .VersionedWrites (false ))
237- systemAccess = systemAccess .Merge (ibs .AccessedAddresses ())
238- ibs .ResetVersionedIO ()
239-
240- systemVersion := state.Version {BlockNum : blockHeight , TxIndex : - 1 }
241- balIO .RecordReads (systemVersion , systemReads )
242- balIO .RecordWrites (systemVersion , systemWrites )
243- balIO .RecordAccesses (systemVersion , systemAccess )
244- current .BlockAccessList = stagedsync .CreateBAL (blockHeight , balIO , execCfg .DirsDataDir ())
245- // Note: This gets reset in BuilderFinish - but we need it here to
246- // process execv3 - when we remove that this becomes redundant
247- hash := current .BlockAccessList .Hash ()
248- header .BlockAccessListHash = & hash
249- } else {
250- // Note: This gets reset in BuilderFinish - but we need it here to
251- // process execv3 - when we remove that this becomes redundant
252- if execCfg .ChainConfig ().IsAmsterdam (current .Header .Time ) {
253- header .BlockAccessListHash = & empty .BlockAccessListHash
254- }
255- }
256221
257222 writeBlockForExecution := func (rwTx kv.TemporalRwTx ) error {
258223 if err = rawdb .WriteHeader (rwTx , block .Header ()); err != nil {
259224 return fmt .Errorf ("cannot write header: %s" , err )
260225 }
226+ // Verify header round-trips correctly through RLP marshaling
227+ if readBack := rawdb .ReadHeader (rwTx , block .Hash (), blockHeight ); readBack == nil {
228+ return fmt .Errorf ("header round-trip failed: written header for block %d (hash %x) not readable" , blockHeight , block .Hash ())
229+ }
261230 if err = rawdb .WriteCanonicalHash (rwTx , block .Hash (), blockHeight ); err != nil {
262231 return fmt .Errorf ("cannot write canonical hash: %s" , err )
263232 }
@@ -290,7 +259,7 @@ func SpawnBuilderExecStage(ctx context0.Context, s *stagedsync.StageState, sd *e
290259
291260 // This flag will skip checking the state root
292261 execS := & stagedsync.StageState {State : s .State , ID : stages .Execution , BlockNumber : blockHeight - 1 }
293- forceParallel := dbg .Exec3Parallel /* || cfg.chainConfig.IsAmsterdam(current.Header.Time)*/ // TODO Re-enable after bals testing
262+ forceParallel := dbg .Exec3Parallel || cfg .chainConfig .IsAmsterdam (current .Header .Time )
294263 execTx := tx
295264 execSd := sd
296265 var execCleanup func ()
@@ -547,199 +516,6 @@ func filterBadTransactions(transactions []types.Transaction, chainID *uint256.In
547516 return filtered , nil
548517}
549518
550- func addTransactionsToBlock (
551- ctx context0.Context ,
552- logPrefix string ,
553- current * BuiltBlock ,
554- chainConfig * chain.Config ,
555- vmConfig * vm.Config ,
556- getHeader func (hash common.Hash , number uint64 ) (* types.Header , error ),
557- engine rules.Engine ,
558- txns types.Transactions ,
559- coinbase accounts.Address ,
560- ibs * state.IntraBlockState ,
561- balIO * state.VersionedIO ,
562- interrupt * atomic.Bool ,
563- payloadId uint64 ,
564- logger log.Logger ,
565- ) (types.Logs , bool , error ) {
566- header := current .Header
567- txnIdx := ibs .TxnIndex () + 1
568- gasPool := new (protocol.GasPool ).AddGas (header .GasLimit - header .GasUsed )
569- if header .BlobGasUsed != nil {
570- gasPool .AddBlobGas (chainConfig .GetMaxBlobGasPerBlock (header .Time ) - * header .BlobGasUsed )
571- }
572- signer := types .MakeSigner (chainConfig , header .Number .Uint64 (), header .Time )
573-
574- var coalescedLogs types.Logs
575- noop := state .NewNoopWriter ()
576- recordTxIO := func () {
577- if balIO == nil {
578- return
579- }
580- version := ibs .Version ()
581- balIO .RecordReads (version , ibs .VersionedReads ())
582- balIO .RecordWrites (version , ibs .VersionedWrites (false ))
583- balIO .RecordAccesses (version , ibs .AccessedAddresses ())
584- ibs .ResetVersionedIO ()
585- }
586- clearTxIO := func () {
587- if balIO == nil {
588- return
589- }
590- ibs .AccessedAddresses ()
591- ibs .ResetVersionedIO ()
592- }
593-
594- var builderCommitTx = func (txn types.Transaction , coinbase accounts.Address , vmConfig * vm.Config , chainConfig * chain.Config , ibs * state.IntraBlockState , current * BuiltBlock ) ([]* types.Log , error ) {
595- ibs .SetTxContext (current .Header .Number .Uint64 (), txnIdx )
596- gasSnap := gasPool .Gas ()
597- blobGasSnap := gasPool .BlobGas ()
598- snap := ibs .PushSnapshot ()
599- defer ibs .PopSnapshot (snap )
600-
601- if txn .Type () == types .AccountAbstractionTxType {
602- aaTxn := txn .(* types.AccountAbstractionTransaction )
603- blockContext := protocol .NewEVMBlockContext (header , protocol .GetHashFn (header , getHeader ), engine , coinbase , chainConfig )
604- evm := vm .NewEVM (blockContext , evmtypes.TxContext {}, ibs , chainConfig , * vmConfig )
605- paymasterContext , validationGasUsed , err := aa .ValidateAATransaction (aaTxn , ibs , gasPool , header , evm , chainConfig )
606- if err != nil {
607- ibs .RevertToSnapshot (snap , err )
608- gasPool = new (protocol.GasPool ).AddGas (gasSnap ).AddBlobGas (blobGasSnap ) // restore gasPool as well as ibs
609- return nil , err
610- }
611-
612- status , gasUsed , err := aa .ExecuteAATransaction (aaTxn , paymasterContext , validationGasUsed , gasPool , evm , header , ibs )
613- if err != nil {
614- ibs .RevertToSnapshot (snap , err )
615- gasPool = new (protocol.GasPool ).AddGas (gasSnap ).AddBlobGas (blobGasSnap ) // restore gasPool as well as ibs
616- return nil , err
617- }
618-
619- header .GasUsed += gasUsed
620- logs := ibs .GetLogs (ibs .TxnIndex (), txn .Hash (), header .Number .Uint64 (), header .Hash ())
621- receipt := aa .CreateAAReceipt (txn .Hash (), status , gasUsed , header .GasUsed , header .Number .Uint64 (), uint64 (ibs .TxnIndex ()), logs )
622-
623- current .AddTxn (txn )
624- current .Receipts = append (current .Receipts , receipt )
625- return receipt .Logs , nil
626- }
627-
628- gasUsed := protocol .NewGasUsed (header , current .Receipts .CumulativeGasUsed ())
629- receipt , err := protocol .ApplyTransaction (chainConfig , protocol .GetHashFn (header , getHeader ), engine , coinbase , gasPool , ibs , noop , header , txn , gasUsed , * vmConfig )
630- if err != nil {
631- ibs .RevertToSnapshot (snap , err )
632- gasPool = new (protocol.GasPool ).AddGas (gasSnap ).AddBlobGas (blobGasSnap ) // restore gasPool as well as ibs
633- return nil , err
634- }
635- protocol .SetGasUsed (header , gasUsed )
636- current .AddTxn (txn )
637- current .Receipts = append (current .Receipts , receipt )
638- return receipt .Logs , nil
639- }
640-
641- var stopped * time.Ticker
642- defer func () {
643- if stopped != nil {
644- stopped .Stop ()
645- }
646- }()
647-
648- done := false
649-
650- LOOP:
651- for _ , txn := range txns {
652- // see if we need to stop now
653- if stopped != nil {
654- select {
655- case <- stopped .C :
656- done = true
657- break LOOP
658- default :
659- }
660- }
661-
662- if err := common .Stopped (ctx .Done ()); err != nil {
663- return nil , true , err
664- }
665-
666- if interrupt != nil && interrupt .Load () && stopped == nil {
667- logger .Debug ("Transaction adding was requested to stop" , "payload" , payloadId )
668- // ensure we run for at least 500ms after the request to stop comes in from GetPayload
669- stopped = time .NewTicker (500 * time .Millisecond )
670- }
671- // If we don't have enough gas for any further transactions then we're done
672- if gasPool .Gas () < params .TxGas {
673- logger .Debug (fmt .Sprintf ("[%s] Not enough gas for further transactions" , logPrefix ), "have" , gasPool , "want" , params .TxGas )
674- done = true
675- break
676- }
677-
678- rlpSpacePostTxn := current .AvailableRlpSpace (chainConfig , txn )
679- if rlpSpacePostTxn < 0 {
680- rlpSpacePreTxn := current .AvailableRlpSpace (chainConfig )
681- logger .Debug (
682- fmt .Sprintf ("[%s] Skipping transaction since it does not fit in available rlp space" , logPrefix ),
683- "hash" , txn .Hash (),
684- "pre" , rlpSpacePreTxn ,
685- "post" , rlpSpacePostTxn ,
686- )
687- continue
688- }
689-
690- // We use the eip155 signer regardless of the env hf.
691- from , err := txn .Sender (* signer )
692- if err != nil {
693- logger .Warn (fmt .Sprintf ("[%s] Could not recover transaction sender" , logPrefix ), "hash" , txn .Hash (), "err" , err )
694- continue
695- }
696-
697- // Check whether the txn is replay protected. If we're not in the EIP155 (Spurious Dragon) hf
698- // phase, start ignoring the sender until we do.
699- if txn .Protected () && ! chainConfig .IsSpuriousDragon (header .Number .Uint64 ()) {
700- logger .Debug (fmt .Sprintf ("[%s] Ignoring replay protected transaction" , logPrefix ), "hash" , txn .Hash (), "eip155" , chainConfig .SpuriousDragonBlock )
701- continue
702- }
703-
704- // Start executing the transaction
705- logs , err := builderCommitTx (txn , coinbase , vmConfig , chainConfig , ibs , current )
706- if err == nil {
707- recordTxIO ()
708- } else {
709- clearTxIO ()
710- }
711- if errors .Is (err , protocol .ErrGasLimitReached ) {
712- // Skip the env out-of-gas transaction
713- logger .Debug (fmt .Sprintf ("[%s] Gas limit exceeded for env block" , logPrefix ), "hash" , txn .Hash (), "sender" , from )
714- } else if errors .Is (err , protocol .ErrNonceTooLow ) {
715- // New head notification data race between the transaction pool and builder, skip
716- logger .Debug (fmt .Sprintf ("[%s] Skipping transaction with low nonce" , logPrefix ), "hash" , txn .Hash (), "sender" , from , "nonce" , txn .GetNonce (), "err" , err )
717- } else if errors .Is (err , protocol .ErrNonceTooHigh ) {
718- // Reorg notification data race between the transaction pool and builder, skip
719- logger .Debug (fmt .Sprintf ("[%s] Skipping transaction with high nonce" , logPrefix ), "hash" , txn .Hash (), "sender" , from , "nonce" , txn .GetNonce ())
720- } else if err == nil {
721- // Everything ok, collect the logs and proceed to the next transaction
722- logger .Trace (fmt .Sprintf ("[%s] Added transaction" , logPrefix ), "hash" , txn .Hash (), "sender" , from , "nonce" , txn .GetNonce (), "payload" , payloadId )
723- coalescedLogs = append (coalescedLogs , logs ... )
724- txnIdx ++
725- } else {
726- // Strange error, discard the transaction and get the next in line (note, the
727- // nonce-too-high clause will prevent us from executing in vain).
728- logger .Debug (fmt .Sprintf ("[%s] Skipping transaction" , logPrefix ), "hash" , txn .Hash (), "sender" , from , "err" , err )
729- }
730- }
731-
732- /*
733- // Notify resubmit loop to decrease resubmitting interval if env interval is larger
734- // than the user-specified one.
735- if interrupt != nil {
736- w.resubmitAdjustCh <- &intervalAdjust{inc: false}
737- }
738- */
739- return coalescedLogs , done , nil
740-
741- }
742-
743519func NotifyPendingLogs (logPrefix string , notifier stagedsync.ChainEventNotifier , logs types.Logs , logger log.Logger ) {
744520 if len (logs ) == 0 {
745521 return
0 commit comments