Add nonce-aware transaction pool#971
Conversation
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…d spacing Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…ex read; add pool unit tests Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…re PR Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
📝 WalkthroughWalkthroughAdds a nonce-aware transaction mempool ( ChangesNonce-Aware Transaction Mempool
Sequence Diagram(s)sequenceDiagram
participant Client as EVM RPC Client
participant TxMemPool
participant LocalNonceProvider
participant BlockIndexer
participant eoaQueue as EOA Queue
participant processQueues as Background Loop
participant CrossSporkClient
Client->>TxMemPool: Add(tx)
TxMemPool->>LocalNonceProvider: GetNonce(sender)
LocalNonceProvider->>BlockIndexer: LatestEVMHeight()
BlockIndexer-->>LocalNonceProvider: height
LocalNonceProvider-->>TxMemPool: expectedNonce
alt fast path eligible
TxMemPool->>CrossSporkClient: SendTransaction (single Cadence tx)
TxMemPool->>eoaQueue: mark in-flight
else enqueue
TxMemPool->>eoaQueue: enqueue tx, update window/deadline
end
loop ticker tick
processQueues->>eoaQueue: collectDueBatches (prune, select prefix or TTL batch)
eoaQueue-->>processQueues: []flushWork
processQueues->>CrossSporkClient: SendTransaction (batched Cadence tx)
CrossSporkClient-->>processQueues: err
processQueues->>eoaQueue: reconcileSubmission (update or rollback state)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@services/requester/nonce_aware_tx_pool.go`:
- Around line 183-197: The pending-tx event is being published too early by
t.txPublisher.Publish(tx); update Add so it only publishes after the transaction
is successfully accepted into the pool/queue (i.e., after the fast-path
acceptance or after enqueue returns success), moving the
t.txPublisher.Publish(tx) call from its current position to immediately after
the success branch that uses models.DeriveTxSender/tx.MarshalBinary and the
queue acceptance logic; apply the same fix for the duplicate publish site around
the later block (the one referenced at 243-251) so subscribers see only
transactions that were actually enqueued/submitted.
- Around line 230-235: The code sets q.lastSubmittedAt before calling
t.submitBatch (e.g., the q.lastSubmittedAt assignment around submitBatch and
related blocks), which causes spacingElapsed/TxSubmissionSpacing to rate-limit
EOAs even when submitBatch fails; move the q.lastSubmittedAt assignment to
immediately after a successful return from t.submitBatch (i.e., only set
q.lastSubmittedAt and q.lastSentNonce and q.hasInFlight = true after submitBatch
returns nil), and ensure failure paths do not touch lastSubmittedAt (also update
the other occurrences referenced in the 397–420 region to follow the same
pattern), keeping submitBatch error handling as-is and adding any necessary
tests to cover transient submit failures.
- Around line 412-427: selectExpired can return arbitrarily many expired txs but
the current code appends them as a single flushWork (flushWork{from: from, txs:
expired}), bypassing the configured batch size and risking an unbounded Flow
transaction; change the logic to split the expired slice into bounded chunks of
at most t.config.TxMaxBatchSize (or the equivalent TxMaxBatchSize config) and
append one flushWork per chunk (each flushWork{from: from, txs: chunk}) instead
of a single large flushWork; keep the existing deletions and q.lastSubmittedAt
update but ensure the work queue receives multiple bounded batches so build/send
failures only affect a single chunk rather than the entire expired set.
- Around line 353-358: Empty queues removed by pruneStaleTxs never age out
because the deletion only checks q.lastSubmittedAt and that stays zero for EOAs
that never submitted; to fix, track a last-activity timestamp that pruneStaleTxs
updates (e.g., q.lastPrunedAt or a generic q.lastActivityAt) whenever it removes
txs and then change the idle deletion condition in the empty-queue branch (the
block that checks q.txs, q.hasInFlight, q.lastSubmittedAt and
idleQueueRetention) to consider either lastSubmittedAt OR the new last-activity
timestamp when deciding to delete stale empty queues; make the same change in
the other identical block (around lines 437-453) so empty queues pruned by
pruneStaleTxs can be removed after idleQueueRetention.
- Around line 254-260: Detect whether the nonce already existed and whether the
queue was empty before overwriting q.txs: capture wasEmpty := len(q.txs) == 0
and existed := (q.txs[tx.Nonce()] present) before assigning q.txs[tx.Nonce()] =
userTx; then only set q.windowDeadline and q.flushDeadline when wasEmpty is true
(i.e., on the first enqueue), so replacements of an existing nonce do not reset
the original deadlines; keep using t.config.TxCollectionWindow and
t.config.TxSubmissionSpacing for the deadline values.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 136b3b65-6469-4c8a-8a81-6d6ab310954a
📒 Files selected for processing (8)
bootstrap/bootstrap.gocmd/run/cmd.goconfig/config.gomodels/errors/errors.goservices/requester/nonce_aware_tx_pool.goservices/requester/nonce_aware_tx_pool_test.goservices/requester/nonce_provider.gotests/nonce_aware_tx_pool_test.go
| b.storages.Registers, | ||
| b.storages.Blocks, | ||
| ) | ||
| txPool, err = requester.NewNonceAwareTxPool( |
There was a problem hiding this comment.
A more fitting name, that aligns with the terminology used in Ethereum, would be TxMemPool. Essentially what we have with this new strategy, is an in-memory tx pool, where transactions are being kept/submitted according to the proper nonce ordering.
There was a problem hiding this comment.
will rename to txMemPool
| TxSubmissionSpacing time.Duration | ||
| // TxPoolTTL is how long the nonce-aware tx pool holds an out-of-order | ||
| // transaction waiting for its nonce gap to fill. On expiry the transaction | ||
| // is submitted anyway, so the failure is observable instead of a silent drop. |
There was a problem hiding this comment.
Although the failure is not visible to the end-user, only to the COA of the node operator. It would show up as a Cadence transaction that errored out. Still, this is a good metric which can help us measure how well the tx mempool behaves.
| if len(q.txs) == 0 && !q.hasInFlight && | ||
| t.spacingElapsed(q, now) && tx.Nonce() == indexNonce { | ||
| q.lastSubmittedAt = now | ||
| if submitErr := t.submitBatch(ctx, []heldTx{userTx}); submitErr != nil { |
There was a problem hiding this comment.
Given that t.submitBatch is a network I/O operation, with variable execution time, the q.lastSubmittedAt should be set after its completion, and reflect the actual submission time, e.g.
q.lastSubmittedAt = time.Now()| enqueuedAt: now, | ||
| } | ||
|
|
||
| // Read the index nonce at most once per Add — each read builds a full |
There was a problem hiding this comment.
That's a good remark. Maybe it would be worthwhile to introduce a dedicated DB index nonce, just like we have for the other models, such as blocks/transactions/receipts etc. It would be a lot more lightweight, compared to using the entire EVM state.
| } | ||
|
|
||
| // Reject an exact duplicate of a transaction already in the queue. | ||
| if existing, ok := q.txs[tx.Nonce()]; ok && existing.txHash == tx.Hash() { |
There was a problem hiding this comment.
The previous condition of if q.hasInFlight || (len(q.txs) == 0 && t.spacingElapsed(q, now)) doesn't always guarantee that the q.txs is empty, so it would be more efficient to do the check for duplicates much earlier, and return early whenever possible.
|
|
||
| // Reject a nonce that has been submitted and is still in flight: it | ||
| // would burn Flow fees on a guaranteed nonce-mismatch failure. | ||
| if q.hasInFlight && tx.Nonce() <= q.lastSentNonce { |
There was a problem hiding this comment.
Regardless of whether q.hasInFlight is true or false, whenever tx.Nonce() <= q.lastSentNonce, doesn't this imply that tx.Nonce() is inherently inflight? In which case, we wouldn't want to re-submit a transaction with tx.Nonce(). Also, I think that this check could be done before the condition of if q.hasInFlight || (len(q.txs) == 0 && t.spacingElapsed(q, now)) . What do you think?
| } | ||
|
|
||
| // Enqueue. A same-nonce, different-payload resubmission replaces the | ||
| // queued transaction (last write wins), matching mempool semantics. |
There was a problem hiding this comment.
That's correct, this should be possible, as long as both transactions are still in the pool, and tx.Nonce() > q.lastSentNonce.
| } | ||
| } | ||
| if len(stale) > 0 { | ||
| t.collector.TransactionsDropped(len(stale)) |
There was a problem hiding this comment.
We shouldn't be emitting that metric here, as it is meant to track the error rate during the build/submission of the Cadence transaction to the Flow network. We use it to track service/network errors, it wouldn't make sense to use it for state EVM txs that were pruned.
| // Read the index nonce at most once per Add — each read builds a full | ||
| // block view — and only when it is actually needed: to refresh a stale | ||
| // in-flight marker, or to evaluate the fast path. | ||
| if q.hasInFlight || (len(q.txs) == 0 && t.spacingElapsed(q, now)) { |
There was a problem hiding this comment.
We seem to use quite frequently the len(q.txs) outcome, maybe we could add some helper methods on q, to view the txs state, whether it is Empty or has a single tx etc.
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-ticker.C: |
There was a problem hiding this comment.
The nonceAwarePoolTickInterval is quite frequent, at 50ms. Is it a sufficient time for both t.collectDueBatches() & the for-loop of t.submitWork(ctx, w) ? case <-ticker.C: will block until t.collectDueBatches() & t.submitWork(ctx, w) have finished. I wonder if we need to split the slice of flushWork into multiple goroutines 🤔
There was a problem hiding this comment.
In general, the more transaction submissions that fall under the fast-path in Add(), the less processing will accumulate in queues. The tx-batch-mode would process and submit the grouped transactions every 2.5 seconds, which was much longer time for processing.
| continue | ||
| } | ||
|
|
||
| indexNonce, err := t.nonceProvider.GetNonce(from) |
There was a problem hiding this comment.
Although we read the nonce from the local-state index on Add, it makes sense to do it again during submission, for extra safety. I wonder though how costly it is to create the BlockView for each EOA. maybe we can do it once, just for the batch collection?
| totalEVMEvents += len(blockEvent.Events) | ||
| } | ||
|
|
||
| // Exactly 10 EVM transactions executed: no drops, no duplicates. |
There was a problem hiding this comment.
It would be nice to add one more assertion, which makes sure that we had 2 Cadence transactions with EVM.batchRun of 5 EVM transactions. This will validate the TxMaxBatchSize config. I see that we set it to 10 in the config, but maybe we can change it to 5, for the sake of testing.
| // sub-millisecond gap between the two sends. | ||
| _, err = rpcTester.sendRawTx(signedSecond) | ||
| require.Error(t, err) | ||
| require.ErrorContains(t, err, "transaction with the same nonce already submitted") |
There was a problem hiding this comment.
We can also add the following test case, for the exact duplicate tx scenario:
func Test_NonceAwarePool_DuplicateTransactionRejection(t *testing.T) {
_, cfg, stop := setupNonceAwareGatewayNode(t)
defer stop()
rpcTester := &rpcTest{
url: fmt.Sprintf("%s:%d", cfg.RPCHost, cfg.RPCPort),
}
testAddr := common.HexToAddress("0x061B63D29332e4de81bD9F51A48609824CD113a8")
privateKey, err := crypto.HexToECDSA("ddcb1e965557474fd13de3a66a40e4bc9b759a306e5db1046bac5ca47aafd584")
require.NoError(t, err)
fundEOA(t, rpcTester, testAddr)
testEoaReceiver := common.HexToAddress("0x6F416dcC9BEFe43b7dDF53f2662F76dD34A9fc11")
totalTxs := 10
transferAmount := int64(50_000)
// Sign 10 transfers with nonces 0..9.
signedTxs := make([][]byte, totalTxs)
for nonce := range totalTxs {
signed, _, err := evmSign(
big.NewInt(transferAmount),
23_500,
privateKey,
uint64(nonce),
&testEoaReceiver,
nil,
)
require.NoError(t, err)
signedTxs[nonce] = signed
}
// Send them concurrently in shuffled nonce order.
// Duplicate the transaction with nonce 2 at the end.
shuffledNonces := []int{6, 2, 8, 0, 1, 9, 3, 5, 4, 7, 2}
g := errgroup.Group{}
for _, nonce := range shuffledNonces {
signed := signedTxs[nonce]
g.Go(func() error {
_, err := rpcTester.sendRawTx(signed)
return err
})
}
err = g.Wait()
require.Error(t, err)
require.ErrorContains(t, err, "transaction already in pool")
expectedBalance := int64(totalTxs) * transferAmount
// The first transfer eventually executes.
assert.Eventually(t, func() bool {
balance, err := rpcTester.getBalance(testEoaReceiver)
require.NoError(t, err)
return balance.Cmp(big.NewInt(expectedBalance)) == 0
}, time.Second*15, time.Second*1, "first transaction was not executed")
}| Cmd.Flags().DurationVar(&cfg.TxCollectionWindow, "tx-collection-window", 300*time.Millisecond, "Per-EOA sliding collection window for the nonce-aware tx pool. Resets on each arrival from the same EOA.") | ||
| Cmd.Flags().DurationVar(&cfg.TxSubmissionSpacing, "tx-submission-spacing", 1200*time.Millisecond, "Minimum gap between consecutive Cadence submissions for the same EOA in the nonce-aware tx pool; also serves as the flush deadline for a continuously-fed collection window. Recommended ~1.5x the block production rate.") | ||
| Cmd.Flags().DurationVar(&cfg.TxPoolTTL, "tx-pool-ttl", 30*time.Second, "How long the nonce-aware tx pool holds an out-of-order transaction waiting for its nonce gap to fill, before submitting it anyway.") | ||
| Cmd.Flags().IntVar(&cfg.TxMaxBatchSize, "tx-max-batch-size", 5, "Maximum number of EVM transactions per EVM.batchRun Cadence transaction in the nonce-aware tx pool.") |
There was a problem hiding this comment.
One note on the default value of 5 for TxMaxBatchSize. This is a rough computation, assuming 5 EVM transactions with the max gas limit of 16.7M under the Fusaka hard-fork. In reality, we should be able to fit much more EVM transactions with EVM.batchRun, in a single Cadence transaction. It's good that this is configurable, so we can fine-tune it based on some testnet validation.
| return nil | ||
| } | ||
| } | ||
| // On a nonce lookup error or an unexpected nonce, fall through to |
There was a problem hiding this comment.
should at least print a warning
| // in-flight marker, or to evaluate the fast path. | ||
| if q.hasInFlight || (len(q.txs) == 0 && t.spacingElapsed(q, now)) { | ||
| indexNonce, nonceErr := t.nonceProvider.GetNonce(from) | ||
| if nonceErr == nil { |
There was a problem hiding this comment.
When nonceErr != nil, we could just return an error, it's ok to reject the tx, because the gateway is in a unknown state. Looking up a nonce is a local db operation, should never fail, otherwise it's an exception.
We should also add comments to the GetNonce method in the interface that the error returned is exception.
|
|
||
| indexNonce, err := t.nonceProvider.GetNonce(from) | ||
| if err != nil { | ||
| t.logger.Warn().Err(err).Str("eoa", from.Hex()). |
There was a problem hiding this comment.
same here, should just return error.
| for from, q := range t.queues { | ||
| if len(q.txs) == 0 { | ||
| // Bound memory: drop queues idle past the retention period. | ||
| if !q.hasInFlight && !q.lastSubmittedAt.IsZero() && |
There was a problem hiding this comment.
This check is too complex to me, I can't convince myself it does a good job of preventing memory leak.
Can we simplify it? Maybe we can use a maintain an extra state: q.lastActivity, so that if there is no tx, and no activity past the retention period, then delete it.
We can update lastActivity when we are doing something to an EOA:
- a transaction is received for the EOA (Add)
- a batch is flushed for the EOA (collectDueBatches)
|
|
||
| // No eligible prefix (gap at the head). Submit transactions held | ||
| // past their TTL anyway: they will fail on-chain with a real error, | ||
| // which is observable, instead of being silently dropped. |
There was a problem hiding this comment.
Can we write down why we decide to submit anyway, instead of dropping?
I'm afraid that if some EOA has nonce 10, and then a lot of txs had nonce 10000+ that far ahead would still get sent, and wasting fees, no? Should we reject some txs with nonce too far ahead in Add?
| } | ||
|
|
||
| // Reject a nonce that has been submitted and is still in flight: it | ||
| // would burn Flow fees on a guaranteed nonce-mismatch failure. |
There was a problem hiding this comment.
I'm afraid this might cause confusion because when the nonce might be even lower than the last indexed nonce. In that case, the tx should be rejected because the nonce has already taken and will ever be available. And returning this ErrInFlightNonce would seem as if the nonce is above the indexed nonce but lower than the in-flight nonce.
However, if we need to check whether it's above the indexed nonce would require an additional query, and it doesn't make sense to me to make additional db query just to return a better error message. what about update the message to say something that the mouse is lower than the in-flight mouse but without implying it's still a valid, pending nonce?
| // transaction with the same nonce (e.g. to change its payload). | ||
| txs map[uint64]heldTx | ||
| // windowDeadline is lastArrival + TxCollectionWindow. | ||
| windowDeadline time.Time |
There was a problem hiding this comment.
The name "Deadline" is a bit misleading. Deadline usually means you have to do something before this time, but here we don't want anything to be done before this time, like we don't want to send any transaction before this time, because we want to wait to see if there is a new transaction coming to be batched up together.
Maybe collectionWindowEndsAt
// collectionWindowEndsAt is lastArrival + TxCollectionWindow, meaning we hold
// the tx and wait until this time before flushing, so a burst can be batched
// together. It slides forward on each arrival, capped by flushDeadline.
BTW, the name flushDeadline is fine, because it's an actual deadline.
| type NonceAwareTxPool struct { | ||
| *SingleTxPool | ||
| nonceProvider NonceProvider | ||
| queues map[gethCommon.Address]*eoaQueue |
There was a problem hiding this comment.
Can we add a metrics monitoring the queue size? This could confirm there is no memory leak.
Co-authored-by: Ardit Marku <markoupetr@gmail.com>
Four correctness fixes from PR #971 review (m-Peter, zhangchiqing, CodeRabbit): 1. lastSubmittedAt was stamped before the Flow submission completed, so a transient build/send error rate-limited the EOA for TxSubmissionSpacing even though nothing was sent. The fast path now stamps it only after a successful submit; the queue path carries the prior value in flushWork and reconcileSubmission restores it on failure (covering the TTL path too, which previously never rolled back) and stamps the actual completion time on success. 2. A same-nonce replacement re-armed the first-enqueue flush deadline via len(q.txs)==1, letting a client defer a flush indefinitely by resubmitting one held tx before each deadline. Anchor the deadline at first enqueue only (wasEmpty guard). 3. TTL-expiry flushes submitted every expired tx as one Cadence batch, bypassing TxMaxBatchSize and risking an unbounded Flow transaction. Cap the expired batch at TxMaxBatchSize; the remainder drains on later ticks. 4. Empty queues could leak: pruning could empty a queue before any submit (lastSubmittedAt stayed zero), and a fast-path submit left hasInFlight set forever on an empty queue — both blocked the old retention guards. Track q.lastActivity (set on Add and on flush) and retain purely on idle time. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Thanks for the thorough review, @m-Peter @zhangchiqing 🙏 Working through the feedback in groups. Here's the plan and status so far. ✅ Group 1 — Correctness fixes (pushed in
|
PR #971 review (zhangchiqing): a local state-index nonce lookup failing is an exception, not a routine condition to swallow. - Add: on a GetNonce error, return the error and reject the tx instead of silently falling through to the queue path. The unexpected-nonce case (no error, nonce != index) still queues as before. - collectDueBatches: a GetNonce error now logs at Error level (was Warn) and is framed as an unexpected exception; the EOA is skipped for this tick. - Document on the NonceProvider.GetNonce interface that a non-nil error is an exception callers must treat as a hard failure. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
PR #971 review: - m-Peter: TransactionsDropped tracks Cadence build/submission errors only, so do not increment it when stale txs (nonce already used on-chain) are pruned. Removed that call from pruneStaleTxs; the warning log is kept. - zhangchiqing: expose pool size to confirm there is no memory leak. Added two gauges, evm_gateway_txpool_queues and evm_gateway_txpool_queued_ transactions, set every tick from collectDueBatches under queueMux. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
PR #971 review naming/clarity: - m-Peter: the pool is an in-memory, nonce-ordered tx mempool, so use Ethereum-aligned terminology. Renamed NonceAwareTxPool -> TxMemPool, NewNonceAwareTxPool -> NewTxMemPool, config TxNonceAwareMode -> TxMemPoolMode, flag --tx-nonce-aware-mode -> --tx-mempool-mode, and the source/test files to tx_mempool*.go. Updated help text, validation errors, log lines and prose comments. - zhangchiqing: windowDeadline is misleading (we WAIT until it, not act before it) -> renamed to collectionWindowEndsAt with clarified doc. - m-Peter: added eoaQueue.isEmpty()/size() helpers for the frequent len(q.txs) checks. NonceProvider/GetNonce and all nonce-value identifiers are unchanged. Pure rename + readability; no behavior change. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
PR #971 review (m-Peter, zhangchiqing): - Move the exact-duplicate and in-flight-nonce rejection checks ahead of the index-nonce read / fast path in Add, so a known rejection never pays for the block-view-building nonce read (m-Peter). - Document why TTL-expired held txs are submitted anyway (observable on-chain failure vs silent drop), the known far-ahead-nonce fee tradeoff (left as an open design question, not implemented), and the ErrInFlightNonce vs already-used-nonce semantics (zhangchiqing). Comments only. - e2e: parametrize setupTxMemPoolGatewayNode by max batch size; add Test_TxMemPool_BatchSizeCap (cap honored, no drops, batching occurred) and Test_TxMemPool_DuplicateTransactionRejection (queued out-of-order dup). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
|
Pushed Groups 2–5 as separate commits (on top of Group 1). CI should pick them up.
All unit tests + the affected e2e tests pass locally. Left open intentionally (design questions — replying on each thread, no code yet)
Let me know your take on the far-ahead-nonce policy and the BlockView optimization and I'll fold them in. |
| // Read the index nonce at most once per Add — each read builds a full | ||
| // block view — and only when it is actually needed: to refresh a stale | ||
| // in-flight marker, or to evaluate the fast path. | ||
| if q.hasInFlight || (q.isEmpty() && t.spacingElapsed(q, now)) { |
There was a problem hiding this comment.
There are too many if branches in this condition. Can we simply it and explain with comments for each case? Note q.hasInFlight and q.isEmpty() and t.spacingElasped is checked again at L278-279.
Can we simply by handling each condition separately?
Maybe we can combine query nonce and refresh in flight together to be q.queryAndRefreshInFlight(t.nonceProvider, from).
Also the current refreshInFlight just set hasInFlight = false without checking the t.queue. So let's say tx with nonce 2,3,4 are queued, hasInFlight is true, which means the onchain nonce is 1, and we are waiting for block that contains nonce 2, 3, 4. But after we query the nonceProivder, which returns 4, in refreshInFlight we just set hasInFlight to false, and left the tx 2,3,4 in the t.queue, this would cause memory leak.
There was a problem hiding this comment.
Just noticed, we have the pruneStaleTxs that is called after q.refreshInFlight. But it's only called in collectDueBatches, not in Add. I can see the point, I guess it's to defer the pruning work, and do it only once. I'm ok with it, but it's better we add comments here that when IndexNonce is updated, we don't use it to prune staled txs, but wait until the collectDueBatches is triggered, which will traverse each tx anyway.
There was a problem hiding this comment.
Done in 8f50460. Folded the GetNonce read + refreshInFlight into q.queryAndRefreshInFlight(...) and commented each branch of the index-read condition.
On the stale-tx point (txs 2,3,4 left after the marker clears): that's intentional rather than a leak — we don't prune in Add; pruning is deferred to collectDueBatches, which walks every queued tx anyway, so those get pruned on the next tick. Added a comment saying so (your line-262 follow-up thread).
There was a problem hiding this comment.
Done in 8f50460 — added a comment in Add noting we deliberately don't prune stale txs there; pruning is deferred to collectDueBatches, which traverses every tx anyway.
| lastSentNonce uint64 | ||
| // hasInFlight reports whether a submission exists that the local index | ||
| // has not yet confirmed (index nonce <= lastSentNonce). | ||
| hasInFlight bool |
There was a problem hiding this comment.
Maybe it's better storing the local index instead, because then:
- hasInFlight can be derived from
lastSentNonce > lastIndexedNonce - and q.txs will only contain txs whose nonce is above lastIndexedNonce
- no need to refresh the hasInFlight, instead, we just need to update the lastIndexedNonce.
There was a problem hiding this comment.
I thought about this one carefully and decided to keep hasInFlight rather than store lastIndexedNonce. Reasoning:
- The in-flight nonce rejection in
Addruns before the index read (we moved it earlier last round so we skip the block-view-buildingGetNoncewhen we're going to reject anyway). So we need some stored marker there — and the bool is the leanest one that answers "is this nonce in flight?" without a DB read. lastIndexedNoncecaches an externally-mutating value: the index advances in the background as blocks execute, so the stored copy is stale the moment after we read it. That invites a future stale-read bug. The bool sidesteps it — every actual decision still reads the index fresh.- Because nonce
0is valid,lastIndexedNonce = 0is ambiguous (never-read vs index-is-0), so it needs an extrahasSentsentinel — net more state, not less. - The one real win, eager pruning, we already get from the fresh read in
collectDueBatcheswithout the redesign.
Happy to revisit if you feel strongly, but I lean toward the leaner, always-read-fresh shape.
| lastActivity time.Time | ||
| // lastSentNonce is the highest nonce included in the last submission. | ||
| // Only meaningful while hasInFlight is true. | ||
| lastSentNonce uint64 |
There was a problem hiding this comment.
we are using two words for the same activity: lastSubmittedAt and lastSentNonce, which makes me thinking is submit and send two different actions? I think they are the same. If so, maybe rename this to lastSubmittedNonce would be better, and move lastSubmittedAt and lastSubmittedNonce close to each other. If they are different, better explain the difference.
| lastSentNonce uint64 | |
| lastSubmittedNonce uint64 |
There was a problem hiding this comment.
Done in 57c79c4 — renamed lastSentNonce → lastSubmittedNonce and moved it next to lastSubmittedAt.
| // On failure it rolls back the state committed optimistically at collection | ||
| // time: it restores lastSubmittedAt (so the EOA is not rate-limited behind a | ||
| // submission that never happened) and clears the in-flight marker, but only | ||
| // when it still belongs to the failed batch — a newer submission may have | ||
| // replaced it while the failed one was on the wire. The dropped transactions | ||
| // stay dropped (already counted and logged by submitBatch); the rollback only | ||
| // re-opens the EOA so resubmissions are not rejected with ErrInFlightNonce | ||
| // forever. |
There was a problem hiding this comment.
I feel both error case and success case are handled unnecessarily.
For error case, do we know what would cause it to fail? If it's an exception, then we should log it at least, and then I'm not sure we need to update the last submitted at and hasInFlight.
so the EOA is not rate-limited behind a
// submission that never happened
I'm not concerned being rate limited in this case, because we don't know why it failed, I think it's ok even if it's rate limited if failure did happen. I would prefer we collect enough logs before taking actions on unknown cases. Otherwise, the behavior would be too complex to reason about.
For success case, I also feel unnecessary to update the last submitted at, yes, it won't be that accurate, since it's used for measuring submission spacing, so I guess the downside would be we might send tx sooner. But I think that can be mitigated by increasing submission spacing parameter. Note, the default submission spacing is already 1.2 second, much longer than the txMemPoolTickInterval which is 50ms, so I don't think the small latency here would matter much, no?
So IMO, we would do nothing (also get rid of the locking) if succeed. And if failed, simply log the submission error.
There was a problem hiding this comment.
Mostly done in 8f50460.
- Success case: agreed — dropped the completion-time stamp and its lock entirely. The optimistic
lastSubmittedAtset at collection is plenty given the 1.2s spacing vs the 50ms tick. - Failure case: split the difference. I stopped restoring
lastSubmittedAt(agree the small spacing delay after a rare failure is harmless), and the failure is logged by the caller (processQueueslogs it at Error). But I kept clearing the in-flight marker, because dropping that reintroduces a permanent wedge:collectDueBatchesadvanceslastSubmittedNonceand setshasInFlightbefore the network call, so on failure the txs are gone but the marker stays — every resubmission of those nonces is then rejected withErrInFlightNonceforever, and the index never advances to clear it. There's a regression test for exactly this (Test_TxMemPool_FailedFlushDoesNotWedgeEOA).
So reconcileSubmission is now rollbackFailedSubmission: on success it does nothing; on failure it only clears the marker.
| // inFlight reports whether the batch was marked in flight on its queue | ||
| // (the consecutive-prefix path). TTL-expiry batches are not marked in | ||
| // flight and must never clear the in-flight marker on rollback. | ||
| inFlight bool | ||
| // prevLastSubmittedAt is the queue's lastSubmittedAt before this batch was | ||
| // optimistically collected, restored on a failed submission so the EOA is | ||
| // not rate-limited by a submission that never happened. | ||
| prevLastSubmittedAt time.Time |
There was a problem hiding this comment.
I feel we can remove these two fields to simplify, if you agree my idea of simplifying the reconcileSubmission.
There was a problem hiding this comment.
Partially done in 8f50460 — removed prevLastSubmittedAt. I kept inFlight: it's needed to distinguish consecutive-prefix batches (which set the marker and must roll it back on failure) from TTL-expiry batches (which never set it and must never clear it). See the wedge explanation in the line-378 thread.
| // flushWork is a batch selected for submission, detached from the queue so | ||
| // the network call happens outside queueMux. |
There was a problem hiding this comment.
There might be two flushWork for the same EOA in a collectDueBatch call. The first flushWork (which is the case here), is for consecutive nonces, and the second flushWork is for the nonces that has at least 1 gap.
Question1: are the 2 flushWork for the same EOA sent in 2 cadence batch txs?
Question2: why not merge the 2 flushWork? Is it because a gap in the nonce would cause the entire batch to fail? So splitting into 2 could at least give the first flushWork, which has consecutive nonce, a higher likelihood to succeed. Whereas the second flushWork will most likely fail, which anyway is to notify the users that a wrong nonce was used?
There was a problem hiding this comment.
Good questions.
Q1: No — collectDueBatches produces at most one flushWork per EOA per tick. The consecutive-prefix path continues, so the TTL-expiry path is only reached when there's no eligible prefix; the two are mutually exclusive within a single call. The post-gap / over-cap remainder stays queued and is collected on a later tick, spaced by TxSubmissionSpacing.
Q2: Correct — they're deliberately not merged, for exactly the reason you guessed: a head gap would make the whole EVM.batchRun fail, so submitting the consecutive prefix first gives it a clean shot to land, while the gapped remainder either fills in later or surfaces an observable failure at TTL. Added a comment to that effect in 8f50460.
PR #971 round-2 review (zhangchiqing): "submit" and "send" are the same action, so the two-word split (lastSubmittedAt vs lastSentNonce) was confusing. Rename lastSentNonce -> lastSubmittedNonce and place it next to lastSubmittedAt. Pure rename, no behavior change. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
PR #971 round-2 review (zhangchiqing): - Fold the GetNonce read + refreshInFlight into a single q.queryAndRefreshInFlight helper, and comment each case of the Add index-read condition (#1). - Document that stale txs are intentionally not pruned in Add — pruning is deferred to collectDueBatches, which walks every tx anyway (#5). - Simplify reconcileSubmission -> rollbackFailedSubmission: do nothing on a successful submission (drop the completion-time stamp and its lock), and on failure only clear the in-flight marker. The marker clear is kept (NOT dropped) because removing it reintroduces the permanent-wedge bug: a never-landed nonce would be rejected with ErrInFlightNonce forever. Drop the now-unused flushWork.prevLastSubmittedAt field (#4, #6). - Note in collectDueBatches that at most one flushWork is produced per EOA per tick (prefix path continues); the remainder drains on later ticks and is never merged because a head gap would fail the whole batch (#7). lastSubmittedAt is no longer restored on failure: a brief self-correcting spacing delay after a rare failure is harmless. Tests updated accordingly. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
services/requester/tx_mempool.go (1)
264-272: 🩺 Stability & Availability | 🟡 Minor | ⚡ Quick winClean up newly-created empty queues on rejected
Addpaths.For a previously unseen EOA,
Addstoresqbefore these error returns. IfGetNonceor the fast-path submit fails, the tx is rejected but an empty queue remains until idle retention, which can cause avoidable memory spikes during local-index or Flow outages.Proposed fix
q, ok := t.queues[from] + createdQueue := false if !ok { q = &eoaQueue{txs: make(map[uint64]heldTx)} t.queues[from] = q + createdQueue = true } + + cleanupRejectedQueue := func() { + if createdQueue && q.isEmpty() && !q.hasInFlight { + delete(t.queues, from) + } + } // ... indexNonce, nonceErr := q.queryAndRefreshInFlight(t.nonceProvider, from) if nonceErr != nil { + cleanupRejectedQueue() // A nonce lookup failure is an exception, not an expected // condition: this is a local state-index read that should not // fail under normal operation. The gateway is in an unknown // state, so reject the transaction rather than silently routing // it through the queue path. return nonceErr } // ... if submitErr := t.submitBatch(ctx, []heldTx{userTx}); submitErr != nil { + cleanupRejectedQueue() // Submission failed: leave queue state untouched so the EOA // is neither marked in flight nor rate-limited behind a tx // that never landed. return submitErr }Also applies to: 286-290
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@services/requester/tx_mempool.go` around lines 264 - 272, When the Add function encounters errors during nonce lookup or fast-path submission that cause it to reject a transaction, the empty queue q that was previously stored for a new EOA should be cleaned up before returning the error. Currently, when queryAndRefreshInFlight returns nonceErr or other error conditions occur on the fast-path, the queue remains in memory until idle retention. Remove or delete the stored queue from the mempool's internal structures at the error return paths (the nonceErr return and the other error path mentioned around line 286-290) to prevent empty queues from accumulating during local-index or Flow outages.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@services/requester/tx_mempool.go`:
- Around line 380-389: The timing issue occurs because `lastSubmittedAt` is
stamped when work is collected in `collectDueBatches`, not when actual
submission happens in `submitWork` via `submitBatch`. This allows delays between
collection and submission to expire the spacing window before actual submission.
Fix this by: (1) Move the `lastSubmittedAt` stamping from collection time to
after the successful `submitBatch` call within the `submitWork` function, (2)
Set `q.submissionPending = true` when appending `flushWork` in
`collectDueBatches` and at the other locations mentioned (lines 492-493, 540),
and (3) Add checks for `q.submissionPending` in the fast-path and flush
eligibility checks to prevent admitting a second same-EOA submission while
detached work is still pending.
---
Outside diff comments:
In `@services/requester/tx_mempool.go`:
- Around line 264-272: When the Add function encounters errors during nonce
lookup or fast-path submission that cause it to reject a transaction, the empty
queue q that was previously stored for a new EOA should be cleaned up before
returning the error. Currently, when queryAndRefreshInFlight returns nonceErr or
other error conditions occur on the fast-path, the queue remains in memory until
idle retention. Remove or delete the stored queue from the mempool's internal
structures at the error return paths (the nonceErr return and the other error
path mentioned around line 286-290) to prevent empty queues from accumulating
during local-index or Flow outages.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c58d021c-f074-424f-ac05-a7037d430f6f
📒 Files selected for processing (2)
services/requester/tx_mempool.goservices/requester/tx_mempool_test.go
| // submitWork submits one detached batch. On failure it rolls back the state | ||
| // the queue committed optimistically when the batch was collected; on success | ||
| // there is nothing to do — that optimistic state already reflects the | ||
| // submission. | ||
| func (t *TxMemPool) submitWork(ctx context.Context, w flushWork) error { | ||
| err := t.submitBatch(ctx, w.txs) | ||
| if err != nil { | ||
| t.rollbackFailedSubmission(w) | ||
| } | ||
| return err |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | 🏗️ Heavy lift
Anchor spacing to the actual detached submission, not collection time.
collectDueBatches stamps lastSubmittedAt before the detached flushWork is sent. With many due EOAs or a slow Flow client, this EOA's work can sit in the work slice long enough that the spacing window expires before submitWork actually calls submitBatch, allowing the next same-EOA submission too soon and weakening the ordering guarantee.
Direction for a safer fix
type eoaQueue struct {
// ...
+ // submissionPending means a Cadence submission has been detached from the
+ // queue but submitWork has not yet reconciled its actual attempt time.
+ submissionPending bool
}
func (t *TxMemPool) submitWork(ctx context.Context, w flushWork) error {
+ submittedAt := time.Now()
err := t.submitBatch(ctx, w.txs)
if err != nil {
t.rollbackFailedSubmission(w)
+ t.finishSubmission(w, submittedAt)
+ return err
}
- return err
+ t.finishSubmission(w, submittedAt)
+ return nil
}
+func (t *TxMemPool) finishSubmission(w flushWork, submittedAt time.Time) {
+ t.queueMux.Lock()
+ defer t.queueMux.Unlock()
+
+ q, ok := t.queues[w.from]
+ if !ok {
+ return
+ }
+ q.submissionPending = false
+ if submittedAt.After(q.lastSubmittedAt) {
+ q.lastSubmittedAt = submittedAt
+ q.lastActivity = submittedAt
+ }
+}Also set q.submissionPending = true when appending both prefix and TTL flushWork, and include that marker in fast-path / flush eligibility checks so no second same-EOA Cadence submission is admitted while detached work is still pending.
Also applies to: 492-493, 540-540
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@services/requester/tx_mempool.go` around lines 380 - 389, The timing issue
occurs because `lastSubmittedAt` is stamped when work is collected in
`collectDueBatches`, not when actual submission happens in `submitWork` via
`submitBatch`. This allows delays between collection and submission to expire
the spacing window before actual submission. Fix this by: (1) Move the
`lastSubmittedAt` stamping from collection time to after the successful
`submitBatch` call within the `submitWork` function, (2) Set
`q.submissionPending = true` when appending `flushWork` in `collectDueBatches`
and at the other locations mentioned (lines 492-493, 540), and (3) Add checks
for `q.submissionPending` in the fast-path and flush eligibility checks to
prevent admitting a second same-EOA submission while detached work is still
pending.
| now := time.Now() | ||
| q.lastActivity = now | ||
|
|
||
| userTx := heldTx{ |
There was a problem hiding this comment.
we can postpone creating this heldTx until it's needed. For instance, if the tx is a duplication, we don't need to create it.
| if !ok { | ||
| return | ||
| } | ||
| if w.inFlight && q.hasInFlight && q.lastSubmittedNonce == w.txs[len(w.txs)-1].nonce { |
There was a problem hiding this comment.
I don't quite understand how rollback works.
I think the failed txs have been removed from the queue. So if they are failed to submit, I thought we will add them back to the queue for retry, but actually no?
I think we should not silently drop any tx, if a tx has been added, then it should either be successfully sent, in which case, we might see no log, which implies it has been sent successfully (BTW, maybe it's good to add a debug log saying we've sent txs for account X with for a certain nonce range). And if for whatever reason failed to submit, we need to add a log, so that if we can't find a certain tx on flowscan, I should expect to see this tx in log, which could include localIndexedNonce, error message for debugging purpose.
| return | ||
| } | ||
| if w.inFlight && q.hasInFlight && q.lastSubmittedNonce == w.txs[len(w.txs)-1].nonce { | ||
| q.hasInFlight = false |
There was a problem hiding this comment.
I feel q.lastSubmittedNonce == w.txs[len(w.txs)-1].nonce check is weak.
Let's exam this case:
- received a tx1 (with nonce 1) for account A. Enqueued, local indexed nonce is 0, so we are about to send tx1.
- collectDueBatch triggered, and we tried to submit tx1, but it failed. the
lastSubmittedNonceis set to1. - then we are calling into
rollbackFailedSubmission, which will hitt.queueMux.Lock(), however, it is blocked, because we just received a tx2 (with nonce 2) for account A. And we enqueued it while holding the lock. - now after
tx2is enqueued, the queueMux is released, and rollbackFailedSubmission is unblocked. And it noticed the lastSubmittedNonce1is different from the last tx (w.txs[len(w.txs)-1].nonce), which is 2. So we keepq.hasInFlight = trueand won't update it back tofalse. - however, tx1 has been dropped, tx2 was enqueued but not sent, the
hasInFlightis true, but none oftx1andtx2was in flight. And worse is that, we will never Addtx1, because thelastSubmittedNonceis 1, andhasInFlightis true, which means the following logic will always returnerrs.ErrInFlightNonce.
if q.hasInFlight && tx.Nonce() <= q.lastSubmittedNonce {
return errs.ErrInFlightNonce
}
Summary
Implements the Nonce-Aware Transaction Pool — the third
TxPoolstrategy designed in the #965 discussion (revised plan) and follow-up. It replaces the blanket batch-window delay with nonce-aware behavior so the fix can serve all production traffic, not just the dedicated DFNS gateway. See also the animated scenario walkthroughs.Behavior
--tx-collection-window, default 300ms, reset on each arrival). On flush, the longest consecutive nonce prefix is submitted as oneEVM.batchRun, capped at--tx-max-batch-size(default 5).--tx-submission-spacingapart (default 1200ms ≈ 1.5x block production rate, per @m-Peter's suggestion) so they land in different blocks and cannot be reordered by Collection Nodes. The spacing also serves as the flush deadline anchored at first enqueue — there is deliberately no separate hard-cap knob (see fix(batch): eliminate silent tx drops from first-tx nonce race in BatchTxPool #965 discussion).--tx-pool-ttl(default 30s) expires — on expiry they are submitted anyway so the failure is observable on-chain rather than a silent drop (@janezpodhostnik's concern).transaction with the same nonce already submitted), since letting it through burns Flow fees on a guaranteed nonce-mismatch failure (@m-Peter's cost concern). A same-nonce resubmission of a queued (not yet sent) tx replaces it, matching mempool semantics.Configuration
Validated at startup: window ≤ spacing; requires
--tx-state-validation=local-index; mutually exclusive with--tx-batch-mode.SingleTxPoolandBatchTxPoolare untouched.Known tradeoff
The fast path submits while holding the pool-wide mutex, so one EOA's fast-path submission briefly serializes other EOAs'
Addcalls behind one Flow round-trip. This is documented in the type comment; per-EOA locking is the known upgrade path if contention shows up under multi-EOA load.Testing
services/requester): 13 tests covering prefix/expiry selection, fast-path gating, in-flight rejection, and a regression test for a failed-flush reconcile path (a failed submission previously could have permanently wedged an EOA's queue;submitWork/rollbackInFlightfix that and the test exercises the real path).tests/, emulator): out-of-order 10-tx burst lands exactly 10 executions (the DFNS regression scenario), fast-path single tx submits via theEVM.runpath, gap hold-and-fill, TTL eviction (duplicate-check lifecycle proves it), and in-flight same-nonce rejection. Existing batch-mode e2e tests still pass.Deployment notes
🤖 Generated with Claude Code
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Tests