Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
62deb8b
Reject transactions that have already been submitted to the tx pool
m-Peter Nov 24, 2025
c6b18b1
Locking should only protected operations on pooledTxs field
m-Peter Nov 28, 2025
856da05
Remove locking around account key fetching
m-Peter Nov 28, 2025
33d6624
Track submitted tx nonces instead of tx hashes
m-Peter Nov 28, 2025
3833d6e
Update trimming logic for tracked tx nonces
m-Peter Nov 29, 2025
70b0bba
Increase default maxTrackedTxNoncesPerEOA to 30
m-Peter Nov 29, 2025
85a2560
Silently skip already submitted transactions instead of returning an …
m-Peter Nov 29, 2025
7ac8f98
Remove dashes from log fields to comply with Grafana
m-Peter Nov 29, 2025
cb159ec
Remove redundant error logging from resolveBlockTag function
m-Peter Nov 29, 2025
80ce2ad
Improve logging & tracking of dropped transactions
m-Peter Nov 30, 2025
0b94bc1
Add lock-protection on writes to eoaActivityMetadata
m-Peter Nov 30, 2025
e103d57
Add retry mechanism on pooled transactions
m-Peter Nov 30, 2025
cc3bad3
Extract EOA activity metadata update to its own method
m-Peter Nov 30, 2025
41c7839
Improve locking on batch transaction submission to better handler con…
m-Peter Dec 4, 2025
6ccf8db
Improve tests for tx submission with nonce validation
m-Peter Dec 4, 2025
3582554
Use context.WithTimeout in submitSingleTransaction method
m-Peter Dec 4, 2025
0c74034
Set hardcoded end block for mainnet27
peterargue Dec 27, 2025
be9f5a1
fix lint
peterargue Dec 27, 2025
96ab645
update to allow handle case where current client is mainnet27
peterargue Dec 27, 2025
b0444fd
remove todo and improve godocs
peterargue Dec 27, 2025
a9243e2
use consistent mainnet27 check
peterargue Dec 27, 2025
05ce106
allow soft-finality flags
peterargue Dec 28, 2025
3586cce
fix lint
peterargue Dec 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions api/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ func resolveBlockTag(
if number, ok := blockNumberOrHash.Number(); ok {
height, err := resolveBlockNumber(number, blocksDB)
if err != nil {
logger.Error().Err(err).
Stringer("block_number", number).
Msg("failed to resolve block by number")
return 0, err
}
return height, nil
Expand All @@ -40,9 +37,6 @@ func resolveBlockTag(
if hash, ok := blockNumberOrHash.Hash(); ok {
height, err := blocksDB.GetHeightByID(hash)
if err != nil {
logger.Error().Err(err).
Stringer("block_hash", hash).
Msg("failed to resolve block by hash")
return 0, err
}
return height, nil
Expand Down
7 changes: 7 additions & 0 deletions cmd/run/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,13 @@ func init() {
Cmd.Flags().DurationVar(&cfg.EOAActivityCacheTTL, "eoa-activity-cache-ttl", time.Second*10, "Time interval used to track EOA activity. Tx send more frequently than this interval will be batched. Useful only when batch transaction submission is enabled.")
Cmd.Flags().DurationVar(&cfg.RpcRequestTimeout, "rpc-request-timeout", time.Second*120, "Sets the maximum duration at which JSON-RPC requests should generate a response, before they timeout. The default is 120 seconds.")

// soft finality specific flags. include them as disabled to allow operators to run this image without any config changes
var experimentalSoftFinalityEnabled, experimentalSealingVerificationEnabled bool
Cmd.Flags().BoolVar(&experimentalSoftFinalityEnabled, "experimental-soft-finality-enabled", false, "feature disabled in this release")
Cmd.Flags().BoolVar(&experimentalSealingVerificationEnabled, "experimental-sealing-verification-enabled", false, "feature disabled in this release")
_ = Cmd.Flags().MarkHidden("experimental-soft-finality-enabled")
_ = Cmd.Flags().MarkHidden("experimental-sealing-verification-enabled")
Comment on lines +296 to +301
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Handle errors from MarkHidden() consistently with MarkDeprecated().

Lines 300-301 ignore errors from MarkHidden(), while lines 303-306 check errors from MarkDeprecated() and panic if present. Both methods can fail (e.g., if the flag doesn't exist), so error handling should be consistent to catch initialization issues early.

🔎 Proposed fix for consistent error handling
-	_ = Cmd.Flags().MarkHidden("experimental-soft-finality-enabled")
-	_ = Cmd.Flags().MarkHidden("experimental-sealing-verification-enabled")
+	if err := Cmd.Flags().MarkHidden("experimental-soft-finality-enabled"); err != nil {
+		panic(err)
+	}
+	if err := Cmd.Flags().MarkHidden("experimental-sealing-verification-enabled"); err != nil {
+		panic(err)
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// soft finality specific flags. include them as disabled to allow operators to run this image without any config changes
var experimentalSoftFinalityEnabled, experimentalSealingVerificationEnabled bool
Cmd.Flags().BoolVar(&experimentalSoftFinalityEnabled, "experimental-soft-finality-enabled", false, "feature disabled in this release")
Cmd.Flags().BoolVar(&experimentalSealingVerificationEnabled, "experimental-sealing-verification-enabled", false, "feature disabled in this release")
_ = Cmd.Flags().MarkHidden("experimental-soft-finality-enabled")
_ = Cmd.Flags().MarkHidden("experimental-sealing-verification-enabled")
// soft finality specific flags. include them as disabled to allow operators to run this image without any config changes
var experimentalSoftFinalityEnabled, experimentalSealingVerificationEnabled bool
Cmd.Flags().BoolVar(&experimentalSoftFinalityEnabled, "experimental-soft-finality-enabled", false, "feature disabled in this release")
Cmd.Flags().BoolVar(&experimentalSealingVerificationEnabled, "experimental-sealing-verification-enabled", false, "feature disabled in this release")
if err := Cmd.Flags().MarkHidden("experimental-soft-finality-enabled"); err != nil {
panic(err)
}
if err := Cmd.Flags().MarkHidden("experimental-sealing-verification-enabled"); err != nil {
panic(err)
}
🤖 Prompt for AI Agents
In cmd/run/cmd.go around lines 296 to 301, the calls to
Cmd.Flags().MarkHidden(...) ignore returned errors while later
MarkDeprecated(...) errors are checked and panic on failure; make error handling
consistent by capturing the error return values from MarkHidden and panic (or
log + panic) if non-nil. Replace the ignored assignments with error checks
(e.g., if err := Cmd.Flags().MarkHidden("experimental-soft-finality-enabled");
err != nil { panic(fmt.Errorf("failed to mark flag hidden: %w", err)) }) and do
the same for "experimental-sealing-verification-enabled" so initialization fails
fast if the flags are missing or the operation errors.


err := Cmd.Flags().MarkDeprecated("init-cadence-height", "This flag is no longer necessary and will be removed in future version. The initial Cadence height is known for testnet/mainnet and this was only required for fresh deployments of EVM Gateway. Once the DB has been initialized, the latest index Cadence height will be used upon start-up.")
if err != nil {
panic(err)
Expand Down
5 changes: 2 additions & 3 deletions models/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ var (

// Transaction errors

ErrFailedTransaction = errors.New("failed transaction")
ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction)
ErrDuplicateTransaction = fmt.Errorf("%w: %s", ErrInvalid, "transaction already in pool")
ErrFailedTransaction = errors.New("failed transaction")
ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction)

// Storage errors

Expand Down
16 changes: 16 additions & 0 deletions services/ingestion/event_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,18 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
return eventsChan
}

latestHeight, err := r.client.GetLatestHeightForSpork(ctx, height)
if err != nil {
err = fmt.Errorf("failed to get latest height for spork: %w", err)
eventsChan <- models.NewBlockEventsError(err)
close(eventsChan)
return eventsChan
}

// if the resolved client for the height is the current spork client and the current spork client
// is for the mainnet 27 network, stop ingesting data from the stream at the hardcoded last height.
stopAtHardcodedLastHeight := latestHeight == requester.HardcodedMainnet27LastHeight

var blockEventsStream <-chan flow.BlockEvents
var errChan <-chan error

Expand Down Expand Up @@ -184,6 +196,10 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha
return
}

if stopAtHardcodedLastHeight && blockEvents.Height > requester.HardcodedMainnet27LastHeight {
continue // don't exit, otherwise the gateway will crash
}

evmEvents := models.NewSingleBlockEvents(blockEvents)
// if events contain an error, or we are in a recovery mode
if evmEvents.Err != nil || r.recovery {
Expand Down
200 changes: 141 additions & 59 deletions services/requester/batch_tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,45 @@ import (
"github.com/onflow/flow-evm-gateway/config"
"github.com/onflow/flow-evm-gateway/metrics"
"github.com/onflow/flow-evm-gateway/models"
errs "github.com/onflow/flow-evm-gateway/models/errors"
"github.com/onflow/flow-evm-gateway/services/requester/keystore"
)

const eoaActivityCacheSize = 10_000
const (
eoaActivityCacheSize = 10_000
maxTrackedTxNoncesPerEOA = 30
)

type pooledEvmTx struct {
txPayload cadence.String
txHash gethCommon.Hash
nonce uint64
}

// BatchTxPool is a `TxPool` implementation that collects and groups
// transactions based on their EOA signer, and submits them for execution
// using a batch.
type eoaActivityMetadata struct {
lastSubmission time.Time
txNonces []uint64
}

// BatchTxPool is a `TxPool` implementation that groups incoming transactions
// based on their EOA signer, and submits them for execution using a batch.
//
// The underlying Cadence EVM API used, is `EVM.batchRun`, instead of the
// `EVM.run` used in `SingleTxPool`.
//
// The main advantage of this implementation over the `SingleTxPool`, is the
// guarantee that transactions originated from the same EOA address, which
// arrive in a short time interval (about the same as Flow's block production rate),
// will be executed in the same order their arrived.
// This helps to reduce the nonce mismatch errors which mainly occur from the
// re-ordering of Cadence transactions that happens from Collection nodes.
// guarantee that transactions originating from the same EOA address, which
// arrive in a short time interval (configurable by the node operator),
// will be executed in the same order they arrived.
// This helps to reduce the execution errors which may occur from the
// re-ordering of Cadence transactions that happens on Collection nodes.
type BatchTxPool struct {
*SingleTxPool
pooledTxs map[gethCommon.Address][]pooledEvmTx
txMux sync.Mutex
eoaActivity *expirable.LRU[gethCommon.Address, time.Time]

pooledTxs map[gethCommon.Address][]pooledEvmTx
txMux sync.Mutex
eoaActivityCache *expirable.LRU[gethCommon.Address, eoaActivityMetadata]
}

var _ TxPool = &BatchTxPool{}
var _ TxPool = (*BatchTxPool)(nil)

func NewBatchTxPool(
ctx context.Context,
Expand All @@ -77,16 +83,16 @@ func NewBatchTxPool(
return nil, err
}

eoaActivity := expirable.NewLRU[gethCommon.Address, time.Time](
eoaActivityCache := expirable.NewLRU[gethCommon.Address, eoaActivityMetadata](
eoaActivityCacheSize,
nil,
config.EOAActivityCacheTTL,
)
batchPool := &BatchTxPool{
SingleTxPool: singleTxPool,
pooledTxs: make(map[gethCommon.Address][]pooledEvmTx),
txMux: sync.Mutex{},
eoaActivity: eoaActivity,
SingleTxPool: singleTxPool,
pooledTxs: make(map[gethCommon.Address][]pooledEvmTx),
txMux: sync.Mutex{},
eoaActivityCache: eoaActivityCache,
}

go batchPool.processPooledTransactions(ctx)
Expand All @@ -104,11 +110,6 @@ func (t *BatchTxPool) Add(
) error {
t.txPublisher.Publish(tx) // publish pending transaction event

// tx adding should be blocking, so we don't have races when
// pooled transactions are being processed in the background.
t.txMux.Lock()
defer t.txMux.Unlock()

from, err := models.DeriveTxSender(tx)
if err != nil {
return err
Expand All @@ -123,6 +124,23 @@ func (t *BatchTxPool) Add(
return err
}

t.txMux.Lock()
defer t.txMux.Unlock()

eoaActivity, found := t.eoaActivityCache.Get(from)
nonce := tx.Nonce()

// Skip transactions that have been already submitted,
// as they are *likely* to fail.
if found && slices.Contains(eoaActivity.txNonces, nonce) {
t.logger.Info().
Str("evm_tx", tx.Hash().Hex()).
Str("from", from.Hex()).
Uint64("nonce", nonce).
Msg("tx with same nonce has been already submitted")
return nil
}

// Scenarios
// 1. EOA activity not found:
// => We send the transaction individually, without adding it
Expand All @@ -140,27 +158,41 @@ func (t *BatchTxPool) Add(
// For all 3 cases, we record the activity time for the next
// transactions that might come from the same EOA.
// [X] is equal to the configured `TxBatchInterval` duration.
lastActivityTime, found := t.eoaActivity.Get(from)

if !found {
// Case 1. EOA activity not found:
err = t.submitSingleTransaction(ctx, hexEncodedTx)
} else if time.Since(lastActivityTime) > t.config.TxBatchInterval {
// Case 2. EOA activity found AND it was more than [X] seconds ago:
err = t.submitSingleTransaction(ctx, hexEncodedTx)
} else if time.Since(eoaActivity.lastSubmission) > t.config.TxBatchInterval {
// If the EOA has pooled transactions, which are not yet processed,
// due to congestion or anything, make sure to include the current
// tx on that batch.
hasBatch := len(t.pooledTxs[from]) > 0
if hasBatch {
userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce}
t.pooledTxs[from] = append(t.pooledTxs[from], userTx)
}

// If it wasn't batched, submit individually
if !hasBatch {
// Case 2. EOA activity found AND it was more than [X] seconds ago:
err = t.submitSingleTransaction(ctx, hexEncodedTx)
}
} else {
// Case 3. EOA activity found AND it was less than [X] seconds ago:
userTx := pooledEvmTx{txPayload: hexEncodedTx, txHash: tx.Hash(), nonce: tx.Nonce()}
// Prevent submission of duplicate transactions, based on their tx hash
if slices.Contains(t.pooledTxs[from], userTx) {
return errs.ErrDuplicateTransaction
}
userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce}
t.pooledTxs[from] = append(t.pooledTxs[from], userTx)
}

t.eoaActivity.Add(from, time.Now())
if err != nil {
t.logger.Error().Err(err).Msgf(
"failed to submit single Flow transaction for EOA: %s",
from.Hex(),
)
return err
}

return err
t.updateEOAActivityMetadata(from, nonce)

return nil
}

func (t *BatchTxPool) processPooledTransactions(ctx context.Context) {
Expand Down Expand Up @@ -188,10 +220,14 @@ func (t *BatchTxPool) processPooledTransactions(ctx context.Context) {
)
if err != nil {
t.logger.Error().Err(err).Msgf(
"failed to submit Flow transaction from BatchTxPool for EOA: %s",
"failed to submit batch Flow transaction for EOA: %s",
address.Hex(),
)
continue
// In case of any error, add the transactions back to the pool,
// as a retry mechanism.
t.txMux.Lock()
t.pooledTxs[address] = append(t.pooledTxs[address], pooledTxs...)
t.txMux.Unlock()
}
}
}
Expand Down Expand Up @@ -235,6 +271,9 @@ func (t *BatchTxPool) batchSubmitTransactionsForSameAddress(
}

if err := t.client.SendTransaction(ctx, *flowTx); err != nil {
// If there was any error while sending the transaction,
// we record all transactions as dropped.
t.collector.TransactionsDropped(len(hexEncodedTxs))
return err
}

Expand All @@ -245,29 +284,72 @@ func (t *BatchTxPool) submitSingleTransaction(
ctx context.Context,
hexEncodedTx cadence.String,
) error {
coinbaseAddress, err := cadence.NewString(t.config.Coinbase.Hex())
if err != nil {
return err
}
done := make(chan struct{})
var submitError error

script := replaceAddresses(runTxScript, t.config.FlowNetworkID)
flowTx, err := t.buildTransaction(
ctx,
t.getReferenceBlock(),
script,
cadence.NewArray([]cadence.Value{hexEncodedTx}),
coinbaseAddress,
)
if err != nil {
// If there was any error during the transaction build
// process, we record it as a dropped transaction.
t.collector.TransactionsDropped(1)
return err
// This method is called while holding the `t.txMux` lock,
// don't let it run for a long time, to avoid lock-contention
ctx, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()

// build & submit transaction
go func() {
defer close(done)

coinbaseAddress, err := cadence.NewString(t.config.Coinbase.Hex())
if err != nil {
submitError = err
return
}

script := replaceAddresses(runTxScript, t.config.FlowNetworkID)
flowTx, err := t.buildTransaction(
ctx,
t.getReferenceBlock(),
script,
cadence.NewArray([]cadence.Value{hexEncodedTx}),
coinbaseAddress,
)
if err != nil {
// If there was any error during the transaction build
// process, we record it as a dropped transaction.
t.collector.TransactionsDropped(1)
submitError = err
return
}

if err := t.client.SendTransaction(ctx, *flowTx); err != nil {
// If there was any error while sending the transaction,
// we record it as a dropped transaction.
t.collector.TransactionsDropped(1)
submitError = err
return
}
}()

select {
case <-ctx.Done():
return ctx.Err()
case <-done:
}

if err := t.client.SendTransaction(ctx, *flowTx); err != nil {
return err
return submitError
}

func (t *BatchTxPool) updateEOAActivityMetadata(
from gethCommon.Address,
nonce uint64,
) {
// Update metadata for the last EOA activity only on successful add/submit.
eoaActivity, _ := t.eoaActivityCache.Get(from)
eoaActivity.lastSubmission = time.Now()
eoaActivity.txNonces = append(eoaActivity.txNonces, nonce)
// To avoid the slice of nonces from growing indefinitely,
// keep only the last `maxTrackedTxNoncesPerEOA` nonces.
if len(eoaActivity.txNonces) > maxTrackedTxNoncesPerEOA {
firstKeep := len(eoaActivity.txNonces) - maxTrackedTxNoncesPerEOA
eoaActivity.txNonces = eoaActivity.txNonces[firstKeep:]
}

return nil
t.eoaActivityCache.Add(from, eoaActivity)
}
Loading