Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 54 additions & 60 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type (
anteHandler sdk.AnteHandler

/** Concurrency **/
mtx sync.Mutex
mu sync.Mutex

eventBus *cmttypes.EventBus

Expand Down Expand Up @@ -255,58 +255,56 @@ func (m *ExperimentalEVMMempool) GetTxPool() *txpool.TxPool {
// EVM transactions are routed to the EVM transaction pool, while all other
// transactions are inserted into the Cosmos sdkmempool.
func (m *ExperimentalEVMMempool) Insert(ctx context.Context, tx sdk.Tx) error {
m.logger.Debug("inserting transaction into mempool")

ethMsg, err := m.getEVMMessage(tx)
if err == nil {
return m.insertEVMTx(ctx, ethMsg.AsTransaction(), true)
}
return m.insertCosmosTx(ctx, tx)
return m.insert(ctx, tx, true)
}

// Insert adds a transaction to the appropriate mempool (EVM or Cosmos). EVM
// InsertAsync adds a transaction to the appropriate mempool (EVM or Cosmos). EVM
// transactions are routed to the EVM transaction pool, while all other
// transactions are inserted into the Cosmos sdkmempool. EVM transactions are
// inserted async, i.e. they are scheduled for promotion only, we do not wait
// for it to complete.
func (m *ExperimentalEVMMempool) InsertAsync(ctx context.Context, tx sdk.Tx) error {
m.logger.Debug("inserting transaction into mempool async")
return m.insert(ctx, tx, false)
}

func (m *ExperimentalEVMMempool) insert(ctx context.Context, tx sdk.Tx, sync bool) error {
ethMsg, err := evmTxFromCosmosTx(tx)

ethMsg, err := m.getEVMMessage(tx)
if err == nil {
ethTx := ethMsg.AsTransaction()
return m.insertEVMTx(ctx, ethTx, false)
switch {
case err == nil:
return m.insertEVM(ethMsg.AsTransaction(), sync)
case errors.Is(err, ErrNotEVMTransaction):
return m.insertCosmosTx(ctx, tx)
default:
return err
}
return m.insertCosmosTx(ctx, tx)
}

// insertEVMTx inserts a EVM tx into the legacypool (EVM) mempool This does not
// perform a CheckTx (anteHandler) on the tx, so this tx may be invalid.
// Checking the tx is the responsibility of the legacypool and it will drop the
// tx if it is found to be invalid (now or at a later point).
func (m *ExperimentalEVMMempool) insertEVMTx(_ context.Context, tx *ethtypes.Transaction, sync bool) error {
m.mtx.Lock()
defer m.mtx.Unlock()

// insertEVM inserts a EVM tx into the legacypool (EVM) mempool.
// CheckTx (anteHandler) is performed on the tx in async matter inside evm mempool.
// the lock is held inside evm mempool.
func (m *ExperimentalEVMMempool) insertEVM(tx *ethtypes.Transaction, sync bool) error {
hash := tx.Hash()
m.logger.Debug("inserting EVM transaction", "tx_hash", hash)

errs := m.txPool.Add([]*ethtypes.Transaction{tx}, sync)
if len(errs) != 1 {
switch {
case len(errs) != 1:
panic(fmt.Errorf("expected a single error when compacting evm tx add errors"))
case errs[0] != nil:
m.logger.Error("Failed to insert EVM transaction", "tx_hash", hash, "err", errs[0])
fallthrough
default:
return errs[0]
}
if errs[0] != nil {
m.logger.Error("failed to insert EVM transaction", "tx_hash", hash, "err", errs[0])
}

return errs[0]
}

// insertCosmosTx inserts a cosmos tx into the cosmos mempool. This also
// performs a CheckTx (anteHandler) call in the hot path.
func (m *ExperimentalEVMMempool) insertCosmosTx(goCtx context.Context, tx sdk.Tx) error {
m.mtx.Lock()
defer m.mtx.Unlock()
// we have to process cosmos txs serially.
m.mu.Lock()
defer m.mu.Unlock()

ctx := sdk.UnwrapSDKContext(goCtx)

Expand Down Expand Up @@ -402,19 +400,13 @@ func (m *ExperimentalEVMMempool) ReapNewValidTxs(maxBytes uint64, maxGas uint64)
// The iterator prioritizes transactions based on their fees and manages proper
// sequencing. The i parameter contains transaction hashes to exclude from selection.
func (m *ExperimentalEVMMempool) Select(goCtx context.Context, i [][]byte) sdkmempool.Iterator {
m.mtx.Lock()
defer m.mtx.Unlock()

return m.buildIterator(goCtx, i)
}

// SelectBy iterates through transactions until the provided filter function returns false.
// It uses the same unified iterator as Select but allows early termination based on
// custom criteria defined by the filter function.
func (m *ExperimentalEVMMempool) SelectBy(goCtx context.Context, txs [][]byte, filter func(sdk.Tx) bool) {
m.mtx.Lock()
defer m.mtx.Unlock()

iter := m.buildIterator(goCtx, txs)

for iter != nil && filter(iter.Tx()) {
Expand Down Expand Up @@ -454,18 +446,17 @@ func (m *ExperimentalEVMMempool) Remove(tx sdk.Tx) error {
})
}

// Remove removes a transaction from the appropriate sdkmempool.
// RemoveWithReason removes a transaction from the appropriate sdkmempool.
// For EVM transactions, removal is typically handled automatically by the pool
// based on nonce progression. Cosmos transactions are removed from the Cosmos pool.
func (m *ExperimentalEVMMempool) RemoveWithReason(ctx context.Context, tx sdk.Tx, reason sdkmempool.RemoveReason) error {
m.mtx.Lock()
defer m.mtx.Unlock()

if m.blockchain.latestCtx.BlockHeight() == 0 {
chainCtx, err := m.blockchain.GetLatestContext()
if err != nil || chainCtx.BlockHeight() == 0 {
m.logger.Warn("Failed to get latest context, skipping removal")
return nil
}

msgEthereumTx, err := m.getEVMMessage(tx)
msgEthereumTx, err := evmTxFromCosmosTx(tx)
switch {
case errors.Is(err, ErrNoMessages):
return err
Expand Down Expand Up @@ -579,23 +570,6 @@ func (m *ExperimentalEVMMempool) Close() error {
return errors.Join(errs...)
}

// getEVMMessage validates that the transaction contains exactly one message and returns it if it's an EVM message.
// Returns an error if the transaction has no messages, multiple messages, or the single message is not an EVM transaction.
func (m *ExperimentalEVMMempool) getEVMMessage(tx sdk.Tx) (*evmtypes.MsgEthereumTx, error) {
msgs := tx.GetMsgs()
if len(msgs) == 0 {
return nil, ErrNoMessages
}
if len(msgs) != 1 {
return nil, fmt.Errorf("%w, got %d", ErrExpectedOneMessage, len(msgs))
}
ethMsg, ok := msgs[0].(*evmtypes.MsgEthereumTx)
if !ok {
return nil, ErrNotEVMTransaction
}
return ethMsg, nil
}

// getIterators prepares iterators over pending EVM and Cosmos transactions.
// It configures EVM transactions with proper base fee filtering and priority ordering,
// while setting up the Cosmos iterator with the provided exclusion list.
Expand All @@ -607,6 +581,7 @@ func (m *ExperimentalEVMMempool) getIterators(goCtx context.Context, txs [][]byt
baseFeeUint = uint256.MustFromBig(baseFee)
}

// this is a thread-safe operation.
evmPendingTxs := m.txPool.Pending(txpool.PendingFilter{
MinTip: m.minTip,
BaseFee: baseFeeUint,
Expand All @@ -616,6 +591,8 @@ func (m *ExperimentalEVMMempool) getIterators(goCtx context.Context, txs [][]byt
})

evmIterator := miner.NewTransactionsByPriceAndNonce(nil, evmPendingTxs, baseFee)

// this is also a thread-safe operation.
cosmosIterator := m.cosmosPool.Select(ctx, txs)

return evmIterator, cosmosIterator
Expand All @@ -624,3 +601,20 @@ func (m *ExperimentalEVMMempool) getIterators(goCtx context.Context, txs [][]byt
func (m *ExperimentalEVMMempool) TrackTx(hash common.Hash) error {
return m.txTracker.Track(hash)
}

// getEVMMessage validates that the transaction contains exactly one message and returns it if it's an EVM message.
// Returns an error if the transaction has no messages, multiple messages, or the single message is not an EVM transaction.
func evmTxFromCosmosTx(tx sdk.Tx) (*evmtypes.MsgEthereumTx, error) {
msgs := tx.GetMsgs()
if len(msgs) == 0 {
return nil, ErrNoMessages
}
if len(msgs) != 1 {
return nil, fmt.Errorf("%w, got %d", ErrExpectedOneMessage, len(msgs))
}
ethMsg, ok := msgs[0].(*evmtypes.MsgEthereumTx)
if !ok {
return nil, ErrNotEVMTransaction
}
return ethMsg, nil
}
Loading