Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
9d1f22c
docs: add nonce-aware tx pool implementation plan
vishalchangrani Jun 11, 2026
ea8fa9d
feat(config): add nonce-aware tx pool flags and validation
vishalchangrani Jun 11, 2026
7745740
style(cmd): align nonce-aware validation error messages with house style
vishalchangrani Jun 11, 2026
7e1abcc
feat(requester): add NonceProvider for local index nonce lookups
vishalchangrani Jun 11, 2026
690170c
feat(requester): add nonce selection helpers for nonce-aware pool
vishalchangrani Jun 11, 2026
a74f178
feat(errors): add ErrInFlightNonce for in-flight nonce dedup
vishalchangrani Jun 11, 2026
c6320f6
feat(requester): add NonceAwareTxPool with fast path, gap handling an…
vishalchangrani Jun 11, 2026
45b8bb9
fix(requester): reconcile in-flight state on failed flush; single ind…
vishalchangrani Jun 11, 2026
5967088
feat(bootstrap): wire NonceAwareTxPool behind tx-nonce-aware-mode flag
vishalchangrani Jun 11, 2026
17bbdb8
test(requester): add e2e tests for nonce-aware tx pool
vishalchangrani Jun 11, 2026
2127a6f
chore: drop implementation plan doc and a vacuous test assertion befo…
vishalchangrani Jun 11, 2026
8e5e010
Update config/config.go
vishalchangrani Jun 12, 2026
7bf5129
fix(requester): address correctness review feedback in nonce-aware pool
vishalchangrani Jun 12, 2026
82aec02
fix(requester): treat local-index nonce read failures as exceptions
vishalchangrani Jun 15, 2026
b43115b
feat(metrics): add tx pool size gauges; stop dropped metric on prune
vishalchangrani Jun 15, 2026
9278c55
refactor(requester): rename NonceAwareTxPool to TxMemPool
vishalchangrani Jun 16, 2026
1060813
refactor(requester): reject early in Add; document TTL; add e2e tests
vishalchangrani Jun 16, 2026
57c79c4
refactor(requester): rename lastSentNonce to lastSubmittedNonce
vishalchangrani Jun 22, 2026
8f50460
refactor(requester): simplify in-flight handling per round-2 review
vishalchangrani Jun 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,23 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
// create transaction pool
var txPool requester.TxPool
var err error
if b.config.TxBatchMode {
if b.config.TxMemPoolMode {
nonceProvider := requester.NewLocalNonceProvider(
b.config.FlowNetworkID,
b.storages.Registers,
b.storages.Blocks,
)
txPool, err = requester.NewTxMemPool(
ctx,
b.client,
b.publishers.Transaction,
b.logger,
b.config,
b.collector,
b.keystore,
nonceProvider,
)
} else if b.config.TxBatchMode {
txPool, err = requester.NewBatchTxPool(
ctx,
b.client,
Expand Down
32 changes: 32 additions & 0 deletions cmd/run/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,33 @@ func parseConfigFromFlags() error {
return fmt.Errorf("tx-batch-mode should be enabled with tx-state-validation=local-index")
}

if cfg.TxMemPoolMode {
if cfg.TxBatchMode {
return fmt.Errorf("tx-mempool-mode and tx-batch-mode are mutually exclusive")
}
if cfg.TxStateValidation != config.LocalIndexValidation {
return fmt.Errorf("tx-mempool-mode requires tx-state-validation=local-index")
}
if cfg.TxCollectionWindow <= 0 {
return fmt.Errorf("tx-collection-window must be > 0 when tx-mempool-mode is enabled")
}
if cfg.TxSubmissionSpacing <= 0 {
return fmt.Errorf("tx-submission-spacing must be > 0 when tx-mempool-mode is enabled")
}
if cfg.TxCollectionWindow > cfg.TxSubmissionSpacing {
return fmt.Errorf(
"tx-collection-window (%s) must not exceed tx-submission-spacing (%s)",
cfg.TxCollectionWindow, cfg.TxSubmissionSpacing,
)
}
if cfg.TxPoolTTL <= 0 {
return fmt.Errorf("tx-pool-ttl must be > 0 when tx-mempool-mode is enabled")
}
if cfg.TxMaxBatchSize < 1 {
return fmt.Errorf("tx-max-batch-size must be >= 1 when tx-mempool-mode is enabled")
}
}

return nil
}

Expand Down Expand Up @@ -291,6 +318,11 @@ func init() {
Cmd.Flags().BoolVar(&cfg.TxBatchMode, "tx-batch-mode", false, "Enable batch transaction submission, to avoid nonce mismatch issues for high-volume EOAs.")
Cmd.Flags().DurationVar(&cfg.TxBatchInterval, "tx-batch-interval", time.Millisecond*1200, "Time interval upon which to submit the transaction batches to the Flow network.")
Cmd.Flags().DurationVar(&cfg.EOAActivityCacheTTL, "eoa-activity-cache-ttl", time.Second*10, "Time interval used to track EOA activity. Tx send more frequently than this interval will be batched. Useful only when batch transaction submission is enabled.")
Cmd.Flags().BoolVar(&cfg.TxMemPoolMode, "tx-mempool-mode", false, "Enable the transaction mempool: expected-nonce transactions are submitted immediately, out-of-order transactions are held until their nonce gap fills. Mutually exclusive with --tx-batch-mode and requires --tx-state-validation=local-index.")
Cmd.Flags().DurationVar(&cfg.TxCollectionWindow, "tx-collection-window", 300*time.Millisecond, "Per-EOA sliding collection window for the transaction mempool. Resets on each arrival from the same EOA.")
Cmd.Flags().DurationVar(&cfg.TxSubmissionSpacing, "tx-submission-spacing", 1200*time.Millisecond, "Minimum gap between consecutive Cadence submissions for the same EOA in the transaction mempool; also serves as the flush deadline for a continuously-fed collection window. Recommended ~1.5x the block production rate.")
Cmd.Flags().DurationVar(&cfg.TxPoolTTL, "tx-pool-ttl", 30*time.Second, "How long the transaction mempool holds an out-of-order transaction waiting for its nonce gap to fill, before submitting it anyway.")
Cmd.Flags().IntVar(&cfg.TxMaxBatchSize, "tx-max-batch-size", 5, "Maximum number of EVM transactions per EVM.batchRun Cadence transaction in the transaction mempool.")
Cmd.Flags().DurationVar(&cfg.RpcRequestTimeout, "rpc-request-timeout", time.Second*120, "Sets the maximum duration at which JSON-RPC requests should generate a response, before they timeout. The default is 120 seconds.")

err := Cmd.Flags().MarkDeprecated("init-cadence-height", "This flag is no longer necessary and will be removed in future version. The initial Cadence height is known for testnet/mainnet and this was only required for fresh deployments of EVM Gateway. Once the DB has been initialized, the latest index Cadence height will be used upon start-up.")
Expand Down
23 changes: 23 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,29 @@ type Config struct {
// frequently than this interval will be batched.
// Useful only when batch transaction submission is enabled.
EOAActivityCacheTTL time.Duration
// TxMemPoolMode configures the gateway to use the transaction mempool:
// transactions carrying the expected next nonce (with nothing in flight)
// are submitted immediately, out-of-order transactions are held until their
// nonce gap fills, and consecutive Cadence submissions for the same EOA are
// spaced apart to avoid Collection Node re-ordering.
TxMemPoolMode bool
// TxCollectionWindow is the per-EOA sliding collection window used by the
// transaction mempool. The window resets on each new transaction arrival
// from the same EOA; when it elapses, the collected transactions are flushed.
TxCollectionWindow time.Duration
// TxSubmissionSpacing is the minimum gap between two consecutive Cadence
// transaction submissions for the same EOA (recommended ~1.5x the block
// production rate). It also serves as the flush deadline for a
// continuously-fed collection window, anchored at first enqueue.
TxSubmissionSpacing time.Duration
// TxPoolTTL is how long the transaction mempool holds an out-of-order
// transaction waiting for its nonce gap to fill. On expiry the transaction
// is submitted anyway, so the failure is observable instead of a silent drop.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although the failure is not visible to the end-user, only to the COA of the node operator. It would show up as a Cadence transaction that errored out. Still, this is a good metric which can help us measure how well the tx mempool behaves.

TxPoolTTL time.Duration
// TxMaxBatchSize is the maximum number of EVM transactions submitted in a
// single EVM.batchRun Cadence transaction by the transaction mempool,
// bounded by the Cadence transaction computation limit.
TxMaxBatchSize int
// RpcRequestTimeout is the maximum duration at which JSON-RPC requests should generate
// a response, before they timeout.
RpcRequestTimeout time.Duration
Expand Down
22 changes: 22 additions & 0 deletions metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ var flowTotalSupply = prometheus.NewGauge(prometheus.GaugeOpts{
Help: "Total supply of FLOW tokens in EVM at a given time (in smallest unit, wei)",
})

var txPoolQueues = prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("txpool_queues"),
Help: "Number of per-EOA queues currently held by the transaction mempool",
})

var txPoolQueuedTransactions = prometheus.NewGauge(prometheus.GaugeOpts{
Name: prefixedName("txpool_queued_transactions"),
Help: "Total number of transactions currently held across all queues in the transaction mempool",
})

var metrics = []prometheus.Collector{
apiErrors,
serverPanicsCounters,
Expand All @@ -118,6 +128,8 @@ var metrics = []prometheus.Collector{
transactionsDroppedCounter,
rateLimitedTransactionsCounter,
flowTotalSupply,
txPoolQueues,
txPoolQueuedTransactions,
}

type Collector interface {
Expand All @@ -137,6 +149,7 @@ type Collector interface {
TransactionsDropped(count int)
TransactionRateLimited()
FlowTotalSupply(totalSupply *big.Int)
TxPoolSize(queues int, queuedTransactions int)
}

var _ Collector = &DefaultCollector{}
Expand All @@ -162,6 +175,8 @@ type DefaultCollector struct {
transactionsDroppedCounter prometheus.Counter
rateLimitedTransactionsCounter prometheus.Counter
flowTotalSupply prometheus.Gauge
txPoolQueues prometheus.Gauge
txPoolQueuedTransactions prometheus.Gauge
}

func NewCollector(logger zerolog.Logger) Collector {
Expand Down Expand Up @@ -189,6 +204,8 @@ func NewCollector(logger zerolog.Logger) Collector {
transactionsDroppedCounter: transactionsDroppedCounter,
rateLimitedTransactionsCounter: rateLimitedTransactionsCounter,
flowTotalSupply: flowTotalSupply,
txPoolQueues: txPoolQueues,
txPoolQueuedTransactions: txPoolQueuedTransactions,
}
}

Expand Down Expand Up @@ -288,6 +305,11 @@ func (c *DefaultCollector) FlowTotalSupply(totalSupply *big.Int) {
c.flowTotalSupply.Set(floatTotalSupply)
}

func (c *DefaultCollector) TxPoolSize(queues int, queuedTransactions int) {
c.txPoolQueues.Set(float64(queues))
c.txPoolQueuedTransactions.Set(float64(queuedTransactions))
}

func prefixedName(name string) string {
return fmt.Sprintf("evm_gateway_%s", name)
}
1 change: 1 addition & 0 deletions metrics/nop.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ func (c *nopCollector) RequestRateLimited(method string) {}
func (c *nopCollector) TransactionsDropped(count int) {}
func (c *nopCollector) TransactionRateLimited() {}
func (c *nopCollector) FlowTotalSupply(totalSupply *big.Int) {}
func (c *nopCollector) TxPoolSize(queues int, queued int) {}
4 changes: 4 additions & 0 deletions models/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ var (
ErrFailedTransaction = errors.New("failed transaction")
ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction)
ErrDuplicateTransaction = fmt.Errorf("%w: %s", ErrInvalid, "transaction already in pool")
// ErrInFlightNonce is returned when a transaction carries a nonce that has
// already been submitted to the network and is awaiting execution. Letting
// it through would burn Flow fees on a guaranteed nonce-mismatch failure.
ErrInFlightNonce = fmt.Errorf("%w: %s", ErrInvalid, "transaction with the same nonce already submitted")

// Storage errors

Expand Down
68 changes: 68 additions & 0 deletions services/requester/nonce_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package requester

import (
gethCommon "github.com/ethereum/go-ethereum/common"
"github.com/onflow/flow-go/fvm/evm"
"github.com/onflow/flow-go/fvm/evm/offchain/query"
flowGo "github.com/onflow/flow-go/model/flow"

"github.com/onflow/flow-evm-gateway/storage"
"github.com/onflow/flow-evm-gateway/storage/pebble"
)

// NonceProvider returns the current nonce of the given EOA address.
// The transaction mempool uses it to determine the expected next nonce.
type NonceProvider interface {
// GetNonce returns the current nonce of the given EOA address.
//
// A non-nil error represents an EXCEPTION, not an expected condition:
// the underlying read is a local state-index lookup that should not
// fail under normal operation. Callers must therefore treat an error
// as a hard failure (reject the transaction / abort the operation)
// rather than a routine, recoverable condition to swallow.
GetNonce(address gethCommon.Address) (uint64, error)
}

// LocalNonceProvider reads the EOA nonce from the latest height of the
// local state index.
type LocalNonceProvider struct {
chainID flowGo.ChainID
registerStore *pebble.RegisterStorage
blocks storage.BlockIndexer
}

var _ NonceProvider = &LocalNonceProvider{}

func NewLocalNonceProvider(
chainID flowGo.ChainID,
registerStore *pebble.RegisterStorage,
blocks storage.BlockIndexer,
) *LocalNonceProvider {
return &LocalNonceProvider{
chainID: chainID,
registerStore: registerStore,
blocks: blocks,
}
}

func (p *LocalNonceProvider) GetNonce(address gethCommon.Address) (uint64, error) {
height, err := p.blocks.LatestEVMHeight()
if err != nil {
return 0, err
}

viewProvider := query.NewViewProvider(
p.chainID,
evm.StorageAccountAddress(p.chainID),
p.registerStore,
NewOverridableBlocksProvider(p.blocks, p.chainID, nil),
blockGasLimit,
)

view, err := viewProvider.GetBlockView(height)
if err != nil {
return 0, err
}

return view.GetNonce(address)
}
Loading
Loading