Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 27 additions & 257 deletions execution/builder/builderstages/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package builderstages

import (
context0 "context"
"errors"
"fmt"
"sync/atomic"
"time"
Expand All @@ -28,7 +27,6 @@ import (

"github.com/erigontech/erigon/common"
"github.com/erigontech/erigon/common/dbg"
"github.com/erigontech/erigon/common/empty"
"github.com/erigontech/erigon/common/log/v3"
"github.com/erigontech/erigon/db/kv"
"github.com/erigontech/erigon/db/kv/membatchwithdb"
Expand All @@ -40,7 +38,6 @@ import (
"github.com/erigontech/erigon/execution/exec"
"github.com/erigontech/erigon/execution/metrics"
"github.com/erigontech/erigon/execution/protocol"
"github.com/erigontech/erigon/execution/protocol/aa"
"github.com/erigontech/erigon/execution/protocol/params"
"github.com/erigontech/erigon/execution/protocol/rules"
"github.com/erigontech/erigon/execution/stagedsync"
Expand All @@ -49,7 +46,6 @@ import (
"github.com/erigontech/erigon/execution/types"
"github.com/erigontech/erigon/execution/types/accounts"
"github.com/erigontech/erigon/execution/vm"
"github.com/erigontech/erigon/execution/vm/evmtypes"
"github.com/erigontech/erigon/txnprovider"
)

Expand Down Expand Up @@ -100,24 +96,23 @@ func SpawnBuilderExecStage(ctx context0.Context, s *stagedsync.StageState, sd *e
chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID)
logPrefix := s.LogPrefix()
current := cfg.builderState.BuiltBlock
needBAL := execCfg.ChainConfig().IsAmsterdam(current.Header.Time) || execCfg.IsExperimentalBAL()

stateReader := state.NewReaderV3(sd.AsGetter(tx))
ibs := state.New(stateReader)
defer ibs.Release(false)
ibs.SetTxContext(current.Header.Number.Uint64(), -1)
var balIO *state.VersionedIO
var systemReads state.ReadSet
var systemWrites state.VersionedWrites
var systemAccess state.AccessSet
if needBAL {

ba := exec.NewBlockAssembler(exec.AssemblerCfg{
ChainConfig: cfg.chainConfig,
Engine: cfg.engine,
BlockReader: cfg.blockReader,
ExperimentalBAL: execCfg.IsExperimentalBAL(),
}, cfg.payloadId, current.ParentHeaderTime, current.Header, current.Uncles, current.Withdrawals)

if ba.HasBAL() {
ibs.SetVersionMap(state.NewVersionMap(nil))
balIO = &state.VersionedIO{}
}
// Clique consensus needs forced author in the evm context
//if cfg.chainConfig.Consensus == chain.CliqueConsensus {
// execCfg.author = &cfg.builderState.BuilderConfig.Etherbase
//}

execCfg = execCfg.WithAuthor(accounts.InternAddress(cfg.builderState.BuilderConfig.Etherbase))

getHeader := func(hash common.Hash, number uint64) (*types.Header, error) {
Expand All @@ -135,20 +130,12 @@ func SpawnBuilderExecStage(ctx context0.Context, s *stagedsync.StageState, sd *e
}
defer simSd.Close()

chainReader := exec.NewChainReader(cfg.chainConfig, tx, cfg.blockReader, logger)

txNum, _, err := sd.SeekCommitment(ctx, tx)
if err != nil {
return err
}

protocol.InitializeBlockExecution(cfg.engine, chainReader, current.Header, cfg.chainConfig, ibs, &state.NoopWriter{}, logger, nil)
if needBAL {
systemReads = stagedsync.MergeReadSets(systemReads, ibs.VersionedReads())
systemWrites = stagedsync.MergeVersionedWrites(systemWrites, ibs.VersionedWrites(false))
systemAccess = systemAccess.Merge(ibs.AccessedAddresses())
ibs.ResetVersionedIO()
}
ba.Initialize(ibs, tx, logger)

coinbase := accounts.InternAddress(cfg.builderState.BuilderConfig.Etherbase)

Expand All @@ -170,7 +157,7 @@ func SpawnBuilderExecStage(ctx context0.Context, s *stagedsync.StageState, sd *e
}

if len(txns) > 0 {
logs, stop, err := addTransactionsToBlock(ctx, logPrefix, current, cfg.chainConfig, cfg.vmConfig, getHeader, cfg.engine, txns, coinbase, ibs, balIO, interrupt, cfg.payloadId, logger)
logs, stop, err := ba.AddTransactions(ctx, getHeader, txns, coinbase, cfg.vmConfig, ibs, interrupt, logPrefix, logger)
if err != nil {
return err
}
Expand All @@ -194,29 +181,27 @@ func SpawnBuilderExecStage(ctx context0.Context, s *stagedsync.StageState, sd *e

metrics.UpdateBlockProducerProductionDelay(current.ParentHeaderTime, current.Header.Number.Uint64(), logger)

logger.Debug("SpawnBuilderExecStage", "block", current.Header.Number, "txn", current.Txns.Len(), "payload", cfg.payloadId)
if current.Uncles == nil {
current.Uncles = []*types.Header{}
logger.Debug("SpawnBuilderExecStage", "block", current.Header.Number, "txn", ba.Txns.Len(), "payload", cfg.payloadId)
if ba.Uncles == nil {
ba.Uncles = []*types.Header{}
}
if current.Txns == nil {
current.Txns = []types.Transaction{}
if ba.Txns == nil {
ba.Txns = []types.Transaction{}
}
if current.Receipts == nil {
current.Receipts = types.Receipts{}
if ba.Receipts == nil {
ba.Receipts = types.Receipts{}
}

if err := cfg.engine.Prepare(chainReader, current.Header, ibs); err != nil {
block, err := ba.AssembleBlock(stateReader, ibs, tx, logger)
if err != nil {
return err
}

var block *types.Block
if needBAL {
ibs.ResetVersionedIO()
}
block, current.Requests, err = protocol.FinalizeBlockExecution(cfg.engine, stateReader, current.Header, current.Txns, current.Uncles, &state.NoopWriter{}, cfg.chainConfig, ibs, current.Receipts, current.Withdrawals, chainReader, true, logger, nil)
if err != nil {
return fmt.Errorf("cannot finalize block execution: %s", err)
}
// Copy results back to BuiltBlock
current.Txns = ba.Txns
current.Receipts = ba.Receipts
current.Requests = ba.Requests
current.BlockAccessList = ba.BlockAccessList

// Note: This gets reset in BuilderFinish - but we need it here to
// process execv3 - when we remove that this becomes redundant
Expand All @@ -231,28 +216,6 @@ func SpawnBuilderExecStage(ctx context0.Context, s *stagedsync.StageState, sd *e
}

blockHeight := block.NumberU64()
if needBAL {
systemReads = stagedsync.MergeReadSets(systemReads, ibs.VersionedReads())
systemWrites = stagedsync.MergeVersionedWrites(systemWrites, ibs.VersionedWrites(false))
systemAccess = systemAccess.Merge(ibs.AccessedAddresses())
ibs.ResetVersionedIO()

systemVersion := state.Version{BlockNum: blockHeight, TxIndex: -1}
balIO.RecordReads(systemVersion, systemReads)
balIO.RecordWrites(systemVersion, systemWrites)
balIO.RecordAccesses(systemVersion, systemAccess)
current.BlockAccessList = stagedsync.CreateBAL(blockHeight, balIO, execCfg.DirsDataDir())
// Note: This gets reset in BuilderFinish - but we need it here to
// process execv3 - when we remove that this becomes redundant
hash := current.BlockAccessList.Hash()
header.BlockAccessListHash = &hash
} else {
// Note: This gets reset in BuilderFinish - but we need it here to
// process execv3 - when we remove that this becomes redundant
if execCfg.ChainConfig().IsAmsterdam(current.Header.Time) {
header.BlockAccessListHash = &empty.BlockAccessListHash
}
}

writeBlockForExecution := func(rwTx kv.TemporalRwTx) error {
if err = rawdb.WriteHeader(rwTx, block.Header()); err != nil {
Expand Down Expand Up @@ -290,7 +253,7 @@ func SpawnBuilderExecStage(ctx context0.Context, s *stagedsync.StageState, sd *e

// This flag will skip checking the state root
execS := &stagedsync.StageState{State: s.State, ID: stages.Execution, BlockNumber: blockHeight - 1}
forceParallel := dbg.Exec3Parallel /*|| cfg.chainConfig.IsAmsterdam(current.Header.Time)*/ // TODO Re-enable after bals testing
forceParallel := dbg.Exec3Parallel || cfg.chainConfig.IsAmsterdam(current.Header.Time)
execTx := tx
execSd := sd
var execCleanup func()
Expand Down Expand Up @@ -547,199 +510,6 @@ func filterBadTransactions(transactions []types.Transaction, chainID *uint256.In
return filtered, nil
}

func addTransactionsToBlock(
ctx context0.Context,
logPrefix string,
current *BuiltBlock,
chainConfig *chain.Config,
vmConfig *vm.Config,
getHeader func(hash common.Hash, number uint64) (*types.Header, error),
engine rules.Engine,
txns types.Transactions,
coinbase accounts.Address,
ibs *state.IntraBlockState,
balIO *state.VersionedIO,
interrupt *atomic.Bool,
payloadId uint64,
logger log.Logger,
) (types.Logs, bool, error) {
header := current.Header
txnIdx := ibs.TxnIndex() + 1
gasPool := new(protocol.GasPool).AddGas(header.GasLimit - header.GasUsed)
if header.BlobGasUsed != nil {
gasPool.AddBlobGas(chainConfig.GetMaxBlobGasPerBlock(header.Time) - *header.BlobGasUsed)
}
signer := types.MakeSigner(chainConfig, header.Number.Uint64(), header.Time)

var coalescedLogs types.Logs
noop := state.NewNoopWriter()
recordTxIO := func() {
if balIO == nil {
return
}
version := ibs.Version()
balIO.RecordReads(version, ibs.VersionedReads())
balIO.RecordWrites(version, ibs.VersionedWrites(false))
balIO.RecordAccesses(version, ibs.AccessedAddresses())
ibs.ResetVersionedIO()
}
clearTxIO := func() {
if balIO == nil {
return
}
ibs.AccessedAddresses()
ibs.ResetVersionedIO()
}

var builderCommitTx = func(txn types.Transaction, coinbase accounts.Address, vmConfig *vm.Config, chainConfig *chain.Config, ibs *state.IntraBlockState, current *BuiltBlock) ([]*types.Log, error) {
ibs.SetTxContext(current.Header.Number.Uint64(), txnIdx)
gasSnap := gasPool.Gas()
blobGasSnap := gasPool.BlobGas()
snap := ibs.PushSnapshot()
defer ibs.PopSnapshot(snap)

if txn.Type() == types.AccountAbstractionTxType {
aaTxn := txn.(*types.AccountAbstractionTransaction)
blockContext := protocol.NewEVMBlockContext(header, protocol.GetHashFn(header, getHeader), engine, coinbase, chainConfig)
evm := vm.NewEVM(blockContext, evmtypes.TxContext{}, ibs, chainConfig, *vmConfig)
paymasterContext, validationGasUsed, err := aa.ValidateAATransaction(aaTxn, ibs, gasPool, header, evm, chainConfig)
if err != nil {
ibs.RevertToSnapshot(snap, err)
gasPool = new(protocol.GasPool).AddGas(gasSnap).AddBlobGas(blobGasSnap) // restore gasPool as well as ibs
return nil, err
}

status, gasUsed, err := aa.ExecuteAATransaction(aaTxn, paymasterContext, validationGasUsed, gasPool, evm, header, ibs)
if err != nil {
ibs.RevertToSnapshot(snap, err)
gasPool = new(protocol.GasPool).AddGas(gasSnap).AddBlobGas(blobGasSnap) // restore gasPool as well as ibs
return nil, err
}

header.GasUsed += gasUsed
logs := ibs.GetLogs(ibs.TxnIndex(), txn.Hash(), header.Number.Uint64(), header.Hash())
receipt := aa.CreateAAReceipt(txn.Hash(), status, gasUsed, header.GasUsed, header.Number.Uint64(), uint64(ibs.TxnIndex()), logs)

current.AddTxn(txn)
current.Receipts = append(current.Receipts, receipt)
return receipt.Logs, nil
}

gasUsed := protocol.NewGasUsed(header, current.Receipts.CumulativeGasUsed())
receipt, err := protocol.ApplyTransaction(chainConfig, protocol.GetHashFn(header, getHeader), engine, coinbase, gasPool, ibs, noop, header, txn, gasUsed, *vmConfig)
if err != nil {
ibs.RevertToSnapshot(snap, err)
gasPool = new(protocol.GasPool).AddGas(gasSnap).AddBlobGas(blobGasSnap) // restore gasPool as well as ibs
return nil, err
}
protocol.SetGasUsed(header, gasUsed)
current.AddTxn(txn)
current.Receipts = append(current.Receipts, receipt)
return receipt.Logs, nil
}

var stopped *time.Ticker
defer func() {
if stopped != nil {
stopped.Stop()
}
}()

done := false

LOOP:
for _, txn := range txns {
// see if we need to stop now
if stopped != nil {
select {
case <-stopped.C:
done = true
break LOOP
default:
}
}

if err := common.Stopped(ctx.Done()); err != nil {
return nil, true, err
}

if interrupt != nil && interrupt.Load() && stopped == nil {
logger.Debug("Transaction adding was requested to stop", "payload", payloadId)
// ensure we run for at least 500ms after the request to stop comes in from GetPayload
stopped = time.NewTicker(500 * time.Millisecond)
}
// If we don't have enough gas for any further transactions then we're done
if gasPool.Gas() < params.TxGas {
logger.Debug(fmt.Sprintf("[%s] Not enough gas for further transactions", logPrefix), "have", gasPool, "want", params.TxGas)
done = true
break
}

rlpSpacePostTxn := current.AvailableRlpSpace(chainConfig, txn)
if rlpSpacePostTxn < 0 {
rlpSpacePreTxn := current.AvailableRlpSpace(chainConfig)
logger.Debug(
fmt.Sprintf("[%s] Skipping transaction since it does not fit in available rlp space", logPrefix),
"hash", txn.Hash(),
"pre", rlpSpacePreTxn,
"post", rlpSpacePostTxn,
)
continue
}

// We use the eip155 signer regardless of the env hf.
from, err := txn.Sender(*signer)
if err != nil {
logger.Warn(fmt.Sprintf("[%s] Could not recover transaction sender", logPrefix), "hash", txn.Hash(), "err", err)
continue
}

// Check whether the txn is replay protected. If we're not in the EIP155 (Spurious Dragon) hf
// phase, start ignoring the sender until we do.
if txn.Protected() && !chainConfig.IsSpuriousDragon(header.Number.Uint64()) {
logger.Debug(fmt.Sprintf("[%s] Ignoring replay protected transaction", logPrefix), "hash", txn.Hash(), "eip155", chainConfig.SpuriousDragonBlock)
continue
}

// Start executing the transaction
logs, err := builderCommitTx(txn, coinbase, vmConfig, chainConfig, ibs, current)
if err == nil {
recordTxIO()
} else {
clearTxIO()
}
if errors.Is(err, protocol.ErrGasLimitReached) {
// Skip the env out-of-gas transaction
logger.Debug(fmt.Sprintf("[%s] Gas limit exceeded for env block", logPrefix), "hash", txn.Hash(), "sender", from)
} else if errors.Is(err, protocol.ErrNonceTooLow) {
// New head notification data race between the transaction pool and builder, skip
logger.Debug(fmt.Sprintf("[%s] Skipping transaction with low nonce", logPrefix), "hash", txn.Hash(), "sender", from, "nonce", txn.GetNonce(), "err", err)
} else if errors.Is(err, protocol.ErrNonceTooHigh) {
// Reorg notification data race between the transaction pool and builder, skip
logger.Debug(fmt.Sprintf("[%s] Skipping transaction with high nonce", logPrefix), "hash", txn.Hash(), "sender", from, "nonce", txn.GetNonce())
} else if err == nil {
// Everything ok, collect the logs and proceed to the next transaction
logger.Trace(fmt.Sprintf("[%s] Added transaction", logPrefix), "hash", txn.Hash(), "sender", from, "nonce", txn.GetNonce(), "payload", payloadId)
coalescedLogs = append(coalescedLogs, logs...)
txnIdx++
} else {
// Strange error, discard the transaction and get the next in line (note, the
// nonce-too-high clause will prevent us from executing in vain).
logger.Debug(fmt.Sprintf("[%s] Skipping transaction", logPrefix), "hash", txn.Hash(), "sender", from, "err", err)
}
}

/*
// Notify resubmit loop to decrease resubmitting interval if env interval is larger
// than the user-specified one.
if interrupt != nil {
w.resubmitAdjustCh <- &intervalAdjust{inc: false}
}
*/
return coalescedLogs, done, nil

}

func NotifyPendingLogs(logPrefix string, notifier stagedsync.ChainEventNotifier, logs types.Logs, logger log.Logger) {
if len(logs) == 0 {
return
Expand Down
5 changes: 4 additions & 1 deletion execution/builder/builderstages/finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ func SpawnBuilderFinishStage(s *stagedsync.StageState, sd *execctx.SharedDomains
//}

block := types.NewBlockForAsembling(current.Header, current.Txns, current.Uncles, current.Receipts, current.Withdrawals)
if current.BlockAccessList != nil {
// Only embed the BAL hash in the header for Amsterdam+ chains.
// For pre-Amsterdam chains with ExperimentalBAL, the BAL is computed
// and validated but NOT included in the block header.
if current.BlockAccessList != nil && cfg.chainConfig.IsAmsterdam(current.Header.Time) {
hash := current.BlockAccessList.Hash()
block.HeaderNoCopy().BlockAccessListHash = &hash
}
Expand Down
Loading
Loading