From efad19a7d16d8af425a51b81561a9dfc70240ec2 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 24 Nov 2025 18:07:32 +0200 Subject: [PATCH 01/22] Reject transactions that have already been submitted to the tx pool --- models/errors/errors.go | 5 +- services/requester/batch_tx_pool.go | 103 +++++++++++++++++++--------- tests/tx_batching_test.go | 61 ++++++++-------- 3 files changed, 105 insertions(+), 64 deletions(-) diff --git a/models/errors/errors.go b/models/errors/errors.go index 614c7403..fb0f38d6 100644 --- a/models/errors/errors.go +++ b/models/errors/errors.go @@ -28,9 +28,8 @@ var ( // Transaction errors - ErrFailedTransaction = errors.New("failed transaction") - ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction) - ErrDuplicateTransaction = fmt.Errorf("%w: %s", ErrInvalid, "transaction already in pool") + ErrFailedTransaction = errors.New("failed transaction") + ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction) // Storage errors diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index d82ae258..9f95e53c 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -3,6 +3,7 @@ package requester import ( "context" "encoding/hex" + "fmt" "slices" "sort" "sync" @@ -22,35 +23,43 @@ import ( "github.com/onflow/flow-evm-gateway/services/requester/keystore" ) -const eoaActivityCacheSize = 10_000 +const ( + eoaActivityCacheSize = 10_000 + eoaActivityCacheTTL = time.Second * 10 + maxTrackedTxHashesPerEOA = 15 +) type pooledEvmTx struct { txPayload cadence.String - txHash gethCommon.Hash nonce uint64 } -// BatchTxPool is a `TxPool` implementation that collects and groups -// transactions based on their EOA signer, and submits them for execution -// using a batch. +type eoaActivityMetadata struct { + lastSubmission time.Time + txHashes []gethCommon.Hash +} + +// BatchTxPool is a `TxPool` implementation that groups incoming transactions +// based on their EOA signer, and submits them for execution using a batch. // // The underlying Cadence EVM API used, is `EVM.batchRun`, instead of the // `EVM.run` used in `SingleTxPool`. // // The main advantage of this implementation over the `SingleTxPool`, is the -// guarantee that transactions originated from the same EOA address, which -// arrive in a short time interval (about the same as Flow's block production rate), -// will be executed in the same order their arrived. -// This helps to reduce the nonce mismatch errors which mainly occur from the -// re-ordering of Cadence transactions that happens from Collection nodes. +// guarantee that transactions originating from the same EOA address, which +// arrive in a short time interval (configurable by the node operator), +// will be executed in the same order they arrived. +// This helps to reduce the execution errors which may occur from the +// re-ordering of Cadence transactions that happens on Collection nodes. type BatchTxPool struct { *SingleTxPool - pooledTxs map[gethCommon.Address][]pooledEvmTx - txMux sync.Mutex - eoaActivity *expirable.LRU[gethCommon.Address, time.Time] + + pooledTxs map[gethCommon.Address][]pooledEvmTx + txMux sync.Mutex + eoaActivityCache *expirable.LRU[gethCommon.Address, eoaActivityMetadata] } -var _ TxPool = &BatchTxPool{} +var _ TxPool = (*BatchTxPool)(nil) func NewBatchTxPool( ctx context.Context, @@ -77,16 +86,16 @@ func NewBatchTxPool( return nil, err } - eoaActivity := expirable.NewLRU[gethCommon.Address, time.Time]( + eoaActivityCache := expirable.NewLRU[gethCommon.Address, eoaActivityMetadata]( eoaActivityCacheSize, nil, - config.EOAActivityCacheTTL, + eoaActivityCacheTTL, ) batchPool := &BatchTxPool{ - SingleTxPool: singleTxPool, - pooledTxs: make(map[gethCommon.Address][]pooledEvmTx), - txMux: sync.Mutex{}, - eoaActivity: eoaActivity, + SingleTxPool: singleTxPool, + pooledTxs: make(map[gethCommon.Address][]pooledEvmTx), + txMux: sync.Mutex{}, + eoaActivityCache: eoaActivityCache, } go batchPool.processPooledTransactions(ctx) @@ -123,6 +132,21 @@ func (t *BatchTxPool) Add( return err } + eoaActivity, found := t.eoaActivityCache.Get(from) + txHash := tx.Hash() + + // Reject transactions that have already been submitted, + // as they are *likely* to fail. Two transactions with + // identical hashes, are expected to have the exact same + // payload. + if found && slices.Contains(eoaActivity.txHashes, txHash) { + return fmt.Errorf( + "%w: a tx with hash %s has already been submitted", + errs.ErrInvalid, + txHash, + ) + } + // Scenarios // 1. EOA activity not found: // => We send the transaction individually, without adding it @@ -140,27 +164,42 @@ func (t *BatchTxPool) Add( // For all 3 cases, we record the activity time for the next // transactions that might come from the same EOA. // [X] is equal to the configured `TxBatchInterval` duration. - lastActivityTime, found := t.eoaActivity.Get(from) - if !found { // Case 1. EOA activity not found: err = t.submitSingleTransaction(ctx, hexEncodedTx) - } else if time.Since(lastActivityTime) > t.config.TxBatchInterval { - // Case 2. EOA activity found AND it was more than [X] seconds ago: - err = t.submitSingleTransaction(ctx, hexEncodedTx) + } else if time.Since(eoaActivity.lastSubmission) > t.config.TxBatchInterval { + if len(t.pooledTxs[from]) > 0 { + // If the EOA has pooled transactions, which are not yet processed, + // due to congestion or anything, make sure to include the current + // tx on that batch. + userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: tx.Nonce()} + t.pooledTxs[from] = append(t.pooledTxs[from], userTx) + } else { + // Case 2. EOA activity found AND it was more than [X] seconds ago: + err = t.submitSingleTransaction(ctx, hexEncodedTx) + } } else { // Case 3. EOA activity found AND it was less than [X] seconds ago: - userTx := pooledEvmTx{txPayload: hexEncodedTx, txHash: tx.Hash(), nonce: tx.Nonce()} - // Prevent submission of duplicate transactions, based on their tx hash - if slices.Contains(t.pooledTxs[from], userTx) { - return errs.ErrDuplicateTransaction - } + userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: tx.Nonce()} t.pooledTxs[from] = append(t.pooledTxs[from], userTx) } - t.eoaActivity.Add(from, time.Now()) + if err != nil { + return err + } + + // Update metadata for the last EOA activity only on successful add/submit. + eoaActivity.lastSubmission = time.Now() + eoaActivity.txHashes = append(eoaActivity.txHashes, txHash) + // To avoid the slice of hashes from growing indefinitely, + // maintain only a handful of the last tx hashes. + if len(eoaActivity.txHashes) > maxTrackedTxHashesPerEOA { + eoaActivity.txHashes = eoaActivity.txHashes[1:] + } + + t.eoaActivityCache.Add(from, eoaActivity) - return err + return nil } func (t *BatchTxPool) processPooledTransactions(ctx context.Context) { diff --git a/tests/tx_batching_test.go b/tests/tx_batching_test.go index fd47c310..64fed93c 100644 --- a/tests/tx_batching_test.go +++ b/tests/tx_batching_test.go @@ -332,7 +332,7 @@ func Test_MultipleTransactionSubmissionsWithinRecentInterval(t *testing.T) { // activity of the EOA was X seconds ago, where: // X = `cfg.TxBatchInterval`. // For the E2E tests the `cfg.TxBatchInterval` is equal - // to 2 seconds. + // to 2.5 seconds. for i := range uint64(2) { signed, _, err := evmSign( big.NewInt(500_000), @@ -446,7 +446,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) { // activity of the EOA was X seconds ago, where: // X = `cfg.TxBatchInterval`. // For the E2E tests the `cfg.TxBatchInterval` is equal - // to 2 seconds. + // to 2.5 seconds. for i := range uint64(2) { signed, _, err := evmSign( big.NewInt(500_000), @@ -514,7 +514,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) { ) } -func Test_MultipleTransactionSubmissionsWithDuplicates(t *testing.T) { +func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) { _, cfg, stop := setupGatewayNode(t) defer stop() @@ -525,36 +525,36 @@ func Test_MultipleTransactionSubmissionsWithDuplicates(t *testing.T) { eoaKey, err := crypto.HexToECDSA(eoaTestPrivateKey) require.NoError(t, err) - testAddr := common.HexToAddress("55253ed90B70b96C73092D8680915aaF50081194") - nonce := uint64(0) - hashes := make([]common.Hash, 0) - - signed, _, err := evmSign(big.NewInt(10), 21000, eoaKey, nonce, &testAddr, nil) - require.NoError(t, err) - - txHash, err := rpcTester.sendRawTx(signed) - require.NoError(t, err) - hashes = append(hashes, txHash) + testAddr := common.HexToAddress("0x061B63D29332e4de81bD9F51A48609824CD113a8") + nonces := []uint64{0, 1, 2, 3, 2, 3, 4, 5} - // Increment nonce for the duplicate test transactions that follow - nonce += 1 - dupSigned, _, err := evmSign(big.NewInt(10), 15_000_000, eoaKey, nonce, &testAddr, nil) - require.NoError(t, err) + var errors []error + hashes := []common.Hash{} + // transfer some funds to the test address + for _, nonce := range nonces { + signed, _, err := evmSign(big.NewInt(1_000_000_000), 23_500, eoaKey, nonce, &testAddr, nil) + require.NoError(t, err) - // Submit 5 identical transactions to test duplicate detection: - // the first should succeed, the rest should be rejected as duplicates. - for i := range 5 { - if i == 0 { - txHash, err := rpcTester.sendRawTx(dupSigned) - require.NoError(t, err) - hashes = append(hashes, txHash) + txHash, err := rpcTester.sendRawTx(signed) + if err != nil { + errors = append(errors, err) } else { - _, err := rpcTester.sendRawTx(dupSigned) - require.Error(t, err) - require.ErrorContains(t, err, "invalid: transaction already in pool") + hashes = append(hashes, txHash) } } + require.Len(t, errors, 2) + assert.ErrorContains( + t, + errors[0], + "a tx with hash 0x2bdf4aa4c3e273a624dddfdbde6614786b6a5329e246c531d3e0e9f92e79e04d has already been submitted", + ) + assert.ErrorContains( + t, + errors[1], + "a tx with hash 0xb72e1f83861a63b5ad4b927295af07fa9546b01aac5dfce046a5fb20f9be9f2f has already been submitted", + ) + assert.Eventually(t, func() bool { for _, h := range hashes { rcp, err := rpcTester.getReceipt(h.String()) @@ -604,9 +604,9 @@ func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) { EnforceGasPrice: true, LogLevel: zerolog.DebugLevel, LogWriter: testLogWriter(), - TxStateValidation: config.TxSealValidation, + TxStateValidation: config.LocalIndexValidation, TxBatchMode: true, - TxBatchInterval: time.Second * 2, + TxBatchInterval: time.Millisecond * 2500, // 2.5 seconds, the same as mainnet } bootstrapDone := make(chan struct{}) @@ -617,6 +617,9 @@ func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) { require.NoError(t, err) }() + // Allow the Gateway to catch up on indexing + time.Sleep(time.Second * 2) + <-bootstrapDone return emu, cfg, func() { From 687c657e5d84fd66194ddf4be1d308db1ab7f5c3 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Fri, 28 Nov 2025 13:28:23 +0200 Subject: [PATCH 02/22] Locking should only protected operations on pooledTxs field --- services/requester/batch_tx_pool.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index 9f95e53c..949ddab9 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -25,7 +25,6 @@ import ( const ( eoaActivityCacheSize = 10_000 - eoaActivityCacheTTL = time.Second * 10 maxTrackedTxHashesPerEOA = 15 ) @@ -89,7 +88,7 @@ func NewBatchTxPool( eoaActivityCache := expirable.NewLRU[gethCommon.Address, eoaActivityMetadata]( eoaActivityCacheSize, nil, - eoaActivityCacheTTL, + config.EOAActivityCacheTTL, ) batchPool := &BatchTxPool{ SingleTxPool: singleTxPool, @@ -113,11 +112,6 @@ func (t *BatchTxPool) Add( ) error { t.txPublisher.Publish(tx) // publish pending transaction event - // tx adding should be blocking, so we don't have races when - // pooled transactions are being processed in the background. - t.txMux.Lock() - defer t.txMux.Unlock() - from, err := models.DeriveTxSender(tx) if err != nil { return err @@ -168,20 +162,28 @@ func (t *BatchTxPool) Add( // Case 1. EOA activity not found: err = t.submitSingleTransaction(ctx, hexEncodedTx) } else if time.Since(eoaActivity.lastSubmission) > t.config.TxBatchInterval { - if len(t.pooledTxs[from]) > 0 { - // If the EOA has pooled transactions, which are not yet processed, - // due to congestion or anything, make sure to include the current - // tx on that batch. + // If the EOA has pooled transactions, which are not yet processed, + // due to congestion or anything, make sure to include the current + // tx on that batch. + t.txMux.Lock() + hasBatch := len(t.pooledTxs[from]) > 0 + if hasBatch { userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: tx.Nonce()} t.pooledTxs[from] = append(t.pooledTxs[from], userTx) - } else { + } + t.txMux.Unlock() + + // If it wasn't batched, submit individually + if !hasBatch { // Case 2. EOA activity found AND it was more than [X] seconds ago: err = t.submitSingleTransaction(ctx, hexEncodedTx) } } else { // Case 3. EOA activity found AND it was less than [X] seconds ago: + t.txMux.Lock() userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: tx.Nonce()} t.pooledTxs[from] = append(t.pooledTxs[from], userTx) + t.txMux.Unlock() } if err != nil { From cd83f628e5e2559b6c7f93fe600dfd18fcb7073f Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Fri, 28 Nov 2025 13:56:03 +0200 Subject: [PATCH 03/22] Remove locking around account key fetching --- services/requester/single_tx_pool.go | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/services/requester/single_tx_pool.go b/services/requester/single_tx_pool.go index e8de3e89..89b1bc34 100644 --- a/services/requester/single_tx_pool.go +++ b/services/requester/single_tx_pool.go @@ -31,7 +31,6 @@ type SingleTxPool struct { pool *sync.Map txPublisher *models.Publisher[*gethTypes.Transaction] config config.Config - mux sync.Mutex keystore *keystore.KeyStore collector metrics.Collector // referenceBlockHeader is stored atomically to avoid races @@ -40,7 +39,7 @@ type SingleTxPool struct { // todo add methods to inspect transaction pool state } -var _ TxPool = &SingleTxPool{} +var _ TxPool = (*SingleTxPool)(nil) func NewSingleTxPool( ctx context.Context, @@ -188,7 +187,11 @@ func (t *SingleTxPool) buildTransaction( } } - accKey, err := t.fetchSigningAccountKey() + // getting an account key from the `KeyStore` for signing transactions, + // *does not* need to be lock-protected, as the keys are read from a + // channel. No two go-routines will end up getting the same account + // key at the same time. + accKey, err := t.keystore.Take() if err != nil { return nil, err } @@ -211,16 +214,6 @@ func (t *SingleTxPool) buildTransaction( return flowTx, nil } -func (t *SingleTxPool) fetchSigningAccountKey() (*keystore.AccountKey, error) { - // getting an account key from the `KeyStore` for signing transactions, - // should be lock-protected, so that we don't sign any two Flow - // transactions with the same account key - t.mux.Lock() - defer t.mux.Unlock() - - return t.keystore.Take() -} - func (t *SingleTxPool) getReferenceBlock() *flow.BlockHeader { if v := t.referenceBlockHeader.Load(); v != nil { return v.(*flow.BlockHeader) From 0659bad90397f30657243d3d80ea1cf8178da691 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Fri, 28 Nov 2025 16:31:00 +0200 Subject: [PATCH 04/22] Track submitted tx nonces instead of tx hashes --- services/requester/batch_tx_pool.go | 30 ++++++++++++++--------------- tests/tx_batching_test.go | 4 ++-- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index 949ddab9..a37c8dcc 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -25,7 +25,7 @@ import ( const ( eoaActivityCacheSize = 10_000 - maxTrackedTxHashesPerEOA = 15 + maxTrackedTxNoncesPerEOA = 15 ) type pooledEvmTx struct { @@ -35,7 +35,7 @@ type pooledEvmTx struct { type eoaActivityMetadata struct { lastSubmission time.Time - txHashes []gethCommon.Hash + txNonces []uint64 } // BatchTxPool is a `TxPool` implementation that groups incoming transactions @@ -127,17 +127,15 @@ func (t *BatchTxPool) Add( } eoaActivity, found := t.eoaActivityCache.Get(from) - txHash := tx.Hash() + nonce := tx.Nonce() // Reject transactions that have already been submitted, - // as they are *likely* to fail. Two transactions with - // identical hashes, are expected to have the exact same - // payload. - if found && slices.Contains(eoaActivity.txHashes, txHash) { + // as they are *likely* to fail. + if found && slices.Contains(eoaActivity.txNonces, nonce) { return fmt.Errorf( - "%w: a tx with hash %s has already been submitted", + "%w: a tx with nonce %d has already been submitted", errs.ErrInvalid, - txHash, + nonce, ) } @@ -168,7 +166,7 @@ func (t *BatchTxPool) Add( t.txMux.Lock() hasBatch := len(t.pooledTxs[from]) > 0 if hasBatch { - userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: tx.Nonce()} + userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce} t.pooledTxs[from] = append(t.pooledTxs[from], userTx) } t.txMux.Unlock() @@ -181,7 +179,7 @@ func (t *BatchTxPool) Add( } else { // Case 3. EOA activity found AND it was less than [X] seconds ago: t.txMux.Lock() - userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: tx.Nonce()} + userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce} t.pooledTxs[from] = append(t.pooledTxs[from], userTx) t.txMux.Unlock() } @@ -192,11 +190,11 @@ func (t *BatchTxPool) Add( // Update metadata for the last EOA activity only on successful add/submit. eoaActivity.lastSubmission = time.Now() - eoaActivity.txHashes = append(eoaActivity.txHashes, txHash) - // To avoid the slice of hashes from growing indefinitely, - // maintain only a handful of the last tx hashes. - if len(eoaActivity.txHashes) > maxTrackedTxHashesPerEOA { - eoaActivity.txHashes = eoaActivity.txHashes[1:] + eoaActivity.txNonces = append(eoaActivity.txNonces, nonce) + // To avoid the slice of nonces from growing indefinitely, + // maintain only a handful of the last tx nonces. + if len(eoaActivity.txNonces) > maxTrackedTxNoncesPerEOA { + eoaActivity.txNonces = eoaActivity.txNonces[1:] } t.eoaActivityCache.Add(from, eoaActivity) diff --git a/tests/tx_batching_test.go b/tests/tx_batching_test.go index 64fed93c..a65b1ba7 100644 --- a/tests/tx_batching_test.go +++ b/tests/tx_batching_test.go @@ -547,12 +547,12 @@ func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) assert.ErrorContains( t, errors[0], - "a tx with hash 0x2bdf4aa4c3e273a624dddfdbde6614786b6a5329e246c531d3e0e9f92e79e04d has already been submitted", + "a tx with nonce 2 has already been submitted", ) assert.ErrorContains( t, errors[1], - "a tx with hash 0xb72e1f83861a63b5ad4b927295af07fa9546b01aac5dfce046a5fb20f9be9f2f has already been submitted", + "a tx with nonce 3 has already been submitted", ) assert.Eventually(t, func() bool { From 1d15208ac8cd3442570e5bab182b772190ea468e Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Sat, 29 Nov 2025 11:53:21 +0200 Subject: [PATCH 05/22] Update trimming logic for tracked tx nonces Co-authored-by: Peter Argue <89119817+peterargue@users.noreply.github.com> --- services/requester/batch_tx_pool.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index a37c8dcc..92871eb0 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -192,9 +192,10 @@ func (t *BatchTxPool) Add( eoaActivity.lastSubmission = time.Now() eoaActivity.txNonces = append(eoaActivity.txNonces, nonce) // To avoid the slice of nonces from growing indefinitely, - // maintain only a handful of the last tx nonces. + // keep only the last `maxTrackedTxNoncesPerEOA` nonces. if len(eoaActivity.txNonces) > maxTrackedTxNoncesPerEOA { - eoaActivity.txNonces = eoaActivity.txNonces[1:] + firstKeep := len(eoaActivity.txNonces) - maxTrackedTxNoncesPerEOA + eoaActivity.txNonces = eoaActivity.txNonces[firstKeep:] } t.eoaActivityCache.Add(from, eoaActivity) From 11375618afe76fc5ca829067a9b0fc003a2b2432 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Sat, 29 Nov 2025 11:59:10 +0200 Subject: [PATCH 06/22] Increase default maxTrackedTxNoncesPerEOA to 30 --- services/requester/batch_tx_pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index 92871eb0..3294f370 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -25,7 +25,7 @@ import ( const ( eoaActivityCacheSize = 10_000 - maxTrackedTxNoncesPerEOA = 15 + maxTrackedTxNoncesPerEOA = 30 ) type pooledEvmTx struct { From 3933650e2fec09493a7562f69358719063362a9b Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Sat, 29 Nov 2025 13:12:56 +0200 Subject: [PATCH 07/22] Silently skip already submitted transactions instead of returning an error --- services/requester/batch_tx_pool.go | 15 ++++++------- tests/tx_batching_test.go | 33 ++++++++--------------------- 2 files changed, 16 insertions(+), 32 deletions(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index 3294f370..a6b20b56 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -3,7 +3,6 @@ package requester import ( "context" "encoding/hex" - "fmt" "slices" "sort" "sync" @@ -19,7 +18,6 @@ import ( "github.com/onflow/flow-evm-gateway/config" "github.com/onflow/flow-evm-gateway/metrics" "github.com/onflow/flow-evm-gateway/models" - errs "github.com/onflow/flow-evm-gateway/models/errors" "github.com/onflow/flow-evm-gateway/services/requester/keystore" ) @@ -129,14 +127,15 @@ func (t *BatchTxPool) Add( eoaActivity, found := t.eoaActivityCache.Get(from) nonce := tx.Nonce() - // Reject transactions that have already been submitted, + // Skip transactions that have been already submitted, // as they are *likely* to fail. if found && slices.Contains(eoaActivity.txNonces, nonce) { - return fmt.Errorf( - "%w: a tx with nonce %d has already been submitted", - errs.ErrInvalid, - nonce, - ) + t.logger.Info(). + Str("evm_tx", tx.Hash().Hex()). + Str("from", from.Hex()). + Uint64("nonce", nonce). + Msg("tx with same nonce has been already submitted") + return nil } // Scenarios diff --git a/tests/tx_batching_test.go b/tests/tx_batching_test.go index a65b1ba7..1db7e5c3 100644 --- a/tests/tx_batching_test.go +++ b/tests/tx_batching_test.go @@ -528,42 +528,27 @@ func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) testAddr := common.HexToAddress("0x061B63D29332e4de81bD9F51A48609824CD113a8") nonces := []uint64{0, 1, 2, 3, 2, 3, 4, 5} - var errors []error hashes := []common.Hash{} // transfer some funds to the test address + transferAmount := int64(1_000_000_000) for _, nonce := range nonces { - signed, _, err := evmSign(big.NewInt(1_000_000_000), 23_500, eoaKey, nonce, &testAddr, nil) + signed, _, err := evmSign(big.NewInt(transferAmount), 23_500, eoaKey, nonce, &testAddr, nil) require.NoError(t, err) txHash, err := rpcTester.sendRawTx(signed) - if err != nil { - errors = append(errors, err) - } else { - hashes = append(hashes, txHash) - } + require.NoError(t, err) + hashes = append(hashes, txHash) } - require.Len(t, errors, 2) - assert.ErrorContains( - t, - errors[0], - "a tx with nonce 2 has already been submitted", - ) - assert.ErrorContains( - t, - errors[1], - "a tx with nonce 3 has already been submitted", - ) + expectedBalance := big.NewInt(6 * transferAmount) assert.Eventually(t, func() bool { - for _, h := range hashes { - rcp, err := rpcTester.getReceipt(h.String()) - if err != nil || rcp == nil || rcp.Status != 1 { - return false - } + balance, err := rpcTester.getBalance(testAddr) + if err != nil { + return false } - return true + return balance.Cmp(expectedBalance) == 0 }, time.Second*15, time.Second*1, "all transactions were not executed") } From a7aed9dea5bd9d03a8b9c714b9beecefced399f5 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Sat, 29 Nov 2025 13:13:31 +0200 Subject: [PATCH 08/22] Remove dashes from log fields to comply with Grafana --- services/requester/requester.go | 2 +- services/requester/single_tx_pool.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/services/requester/requester.go b/services/requester/requester.go index 55c3440c..3006fbbe 100644 --- a/services/requester/requester.go +++ b/services/requester/requester.go @@ -238,7 +238,7 @@ func (e *EVM) SendRawTransaction(ctx context.Context, data []byte) (common.Hash, } e.logger.Info(). - Str("evm-id", tx.Hash().Hex()). + Str("evm_tx", tx.Hash().Hex()). Str("to", to). Str("from", from.Hex()). Str("value", tx.Value().String()). diff --git a/services/requester/single_tx_pool.go b/services/requester/single_tx_pool.go index 89b1bc34..b4997319 100644 --- a/services/requester/single_tx_pool.go +++ b/services/requester/single_tx_pool.go @@ -149,8 +149,8 @@ func (t *SingleTxPool) Add( } t.logger.Error().Err(res.Error). - Str("flow-id", flowTx.ID().String()). - Str("evm-id", tx.Hash().Hex()). + Str("flow_tx", flowTx.ID().String()). + Str("evm_tx", tx.Hash().Hex()). Msg("flow transaction error") // hide specific cause since it's an implementation issue From e30f936d48a7366f8942ecfc631ba52c0eb48082 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Sat, 29 Nov 2025 19:10:21 +0200 Subject: [PATCH 09/22] Remove redundant error logging from resolveBlockTag function --- api/utils.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/api/utils.go b/api/utils.go index dc9d2ddd..9723e637 100644 --- a/api/utils.go +++ b/api/utils.go @@ -29,9 +29,6 @@ func resolveBlockTag( if number, ok := blockNumberOrHash.Number(); ok { height, err := resolveBlockNumber(number, blocksDB) if err != nil { - logger.Error().Err(err). - Stringer("block_number", number). - Msg("failed to resolve block by number") return 0, err } return height, nil @@ -40,9 +37,6 @@ func resolveBlockTag( if hash, ok := blockNumberOrHash.Hash(); ok { height, err := blocksDB.GetHeightByID(hash) if err != nil { - logger.Error().Err(err). - Stringer("block_hash", hash). - Msg("failed to resolve block by hash") return 0, err } return height, nil From 39e9c3e694409f4a67d596d3335f69c4487a9e7f Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Sun, 30 Nov 2025 13:33:39 +0200 Subject: [PATCH 10/22] Improve logging & tracking of dropped transactions --- services/requester/batch_tx_pool.go | 12 +++++++++++- services/requester/single_tx_pool.go | 16 ++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index a6b20b56..2c793e44 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -184,6 +184,10 @@ func (t *BatchTxPool) Add( } if err != nil { + t.logger.Error().Err(err).Msgf( + "failed to submit single Flow transaction for EOA: %s", + from.Hex(), + ) return err } @@ -227,7 +231,7 @@ func (t *BatchTxPool) processPooledTransactions(ctx context.Context) { ) if err != nil { t.logger.Error().Err(err).Msgf( - "failed to submit Flow transaction from BatchTxPool for EOA: %s", + "failed to submit batch Flow transaction for EOA: %s", address.Hex(), ) continue @@ -274,6 +278,9 @@ func (t *BatchTxPool) batchSubmitTransactionsForSameAddress( } if err := t.client.SendTransaction(ctx, *flowTx); err != nil { + // If there was any error while sending the transaction, + // we record all transactions as dropped. + t.collector.TransactionsDropped(len(hexEncodedTxs)) return err } @@ -305,6 +312,9 @@ func (t *BatchTxPool) submitSingleTransaction( } if err := t.client.SendTransaction(ctx, *flowTx); err != nil { + // If there was any error while sending the transaction, + // we record it as a dropped transaction. + t.collector.TransactionsDropped(1) return err } diff --git a/services/requester/single_tx_pool.go b/services/requester/single_tx_pool.go index b4997319..01120e37 100644 --- a/services/requester/single_tx_pool.go +++ b/services/requester/single_tx_pool.go @@ -95,6 +95,11 @@ func (t *SingleTxPool) Add( ) error { t.txPublisher.Publish(tx) // publish pending transaction event + from, err := models.DeriveTxSender(tx) + if err != nil { + return err + } + txData, err := tx.MarshalBinary() if err != nil { return err @@ -120,10 +125,21 @@ func (t *SingleTxPool) Add( // If there was any error during the transaction build // process, we record it as a dropped transaction. t.collector.TransactionsDropped(1) + t.logger.Error().Err(err).Msgf( + "failed to build Flow transaction for EOA: %s", + from.Hex(), + ) return err } if err := t.client.SendTransaction(ctx, *flowTx); err != nil { + // If there was any error while sending the transaction, + // we record it as a dropped transaction. + t.collector.TransactionsDropped(1) + t.logger.Error().Err(err).Msgf( + "failed to submit Flow transaction for EOA: %s", + from.Hex(), + ) return err } From 9ceaa425216fb1c1a704d9e0602775e2547374b0 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Sun, 30 Nov 2025 16:12:02 +0200 Subject: [PATCH 11/22] Add lock-protection on writes to eoaActivityMetadata --- services/requester/batch_tx_pool.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index 2c793e44..49e33e7d 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -191,7 +191,11 @@ func (t *BatchTxPool) Add( return err } + t.txMux.Lock() + defer t.txMux.Unlock() + // Update metadata for the last EOA activity only on successful add/submit. + eoaActivity, _ = t.eoaActivityCache.Get(from) eoaActivity.lastSubmission = time.Now() eoaActivity.txNonces = append(eoaActivity.txNonces, nonce) // To avoid the slice of nonces from growing indefinitely, From 4cd0a1e10b33c8f6bf5720489a5b1095914b9322 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Sun, 30 Nov 2025 18:11:30 +0200 Subject: [PATCH 12/22] Add retry mechanism on pooled transactions --- services/requester/batch_tx_pool.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index 49e33e7d..80998feb 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -238,7 +238,11 @@ func (t *BatchTxPool) processPooledTransactions(ctx context.Context) { "failed to submit batch Flow transaction for EOA: %s", address.Hex(), ) - continue + // In case of any error, add the transactions back to the pool, + // as a retry mechanism. + t.txMux.Lock() + t.pooledTxs[address] = append(t.pooledTxs[address], pooledTxs...) + t.txMux.Unlock() } } } From ab2a71c4614bee38156cc13db3ab8aad70989627 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Sun, 30 Nov 2025 18:40:34 +0200 Subject: [PATCH 13/22] Extract EOA activity metadata update to its own method --- services/requester/batch_tx_pool.go | 37 +++++++++++++++++------------ 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index 80998feb..ac81993b 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -191,21 +191,7 @@ func (t *BatchTxPool) Add( return err } - t.txMux.Lock() - defer t.txMux.Unlock() - - // Update metadata for the last EOA activity only on successful add/submit. - eoaActivity, _ = t.eoaActivityCache.Get(from) - eoaActivity.lastSubmission = time.Now() - eoaActivity.txNonces = append(eoaActivity.txNonces, nonce) - // To avoid the slice of nonces from growing indefinitely, - // keep only the last `maxTrackedTxNoncesPerEOA` nonces. - if len(eoaActivity.txNonces) > maxTrackedTxNoncesPerEOA { - firstKeep := len(eoaActivity.txNonces) - maxTrackedTxNoncesPerEOA - eoaActivity.txNonces = eoaActivity.txNonces[firstKeep:] - } - - t.eoaActivityCache.Add(from, eoaActivity) + t.updateEOAActivityMetadata(from, nonce) return nil } @@ -328,3 +314,24 @@ func (t *BatchTxPool) submitSingleTransaction( return nil } + +func (t *BatchTxPool) updateEOAActivityMetadata( + from gethCommon.Address, + nonce uint64, +) { + t.txMux.Lock() + defer t.txMux.Unlock() + + // Update metadata for the last EOA activity only on successful add/submit. + eoaActivity, _ := t.eoaActivityCache.Get(from) + eoaActivity.lastSubmission = time.Now() + eoaActivity.txNonces = append(eoaActivity.txNonces, nonce) + // To avoid the slice of nonces from growing indefinitely, + // keep only the last `maxTrackedTxNoncesPerEOA` nonces. + if len(eoaActivity.txNonces) > maxTrackedTxNoncesPerEOA { + firstKeep := len(eoaActivity.txNonces) - maxTrackedTxNoncesPerEOA + eoaActivity.txNonces = eoaActivity.txNonces[firstKeep:] + } + + t.eoaActivityCache.Add(from, eoaActivity) +} From a91b5be9cb206d47638d4e0e539a9e7ef075718c Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Thu, 4 Dec 2025 13:05:02 +0200 Subject: [PATCH 14/22] Improve locking on batch transaction submission to better handler concurrent requests --- services/requester/batch_tx_pool.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index ac81993b..abda81fb 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -124,6 +124,9 @@ func (t *BatchTxPool) Add( return err } + t.txMux.Lock() + defer t.txMux.Unlock() + eoaActivity, found := t.eoaActivityCache.Get(from) nonce := tx.Nonce() @@ -162,13 +165,11 @@ func (t *BatchTxPool) Add( // If the EOA has pooled transactions, which are not yet processed, // due to congestion or anything, make sure to include the current // tx on that batch. - t.txMux.Lock() hasBatch := len(t.pooledTxs[from]) > 0 if hasBatch { userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce} t.pooledTxs[from] = append(t.pooledTxs[from], userTx) } - t.txMux.Unlock() // If it wasn't batched, submit individually if !hasBatch { @@ -177,10 +178,8 @@ func (t *BatchTxPool) Add( } } else { // Case 3. EOA activity found AND it was less than [X] seconds ago: - t.txMux.Lock() userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce} t.pooledTxs[from] = append(t.pooledTxs[from], userTx) - t.txMux.Unlock() } if err != nil { @@ -319,9 +318,6 @@ func (t *BatchTxPool) updateEOAActivityMetadata( from gethCommon.Address, nonce uint64, ) { - t.txMux.Lock() - defer t.txMux.Unlock() - // Update metadata for the last EOA activity only on successful add/submit. eoaActivity, _ := t.eoaActivityCache.Get(from) eoaActivity.lastSubmission = time.Now() From f8d85a69b7802a319d883eefb93c07b07547236b Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Thu, 4 Dec 2025 15:15:10 +0200 Subject: [PATCH 15/22] Improve tests for tx submission with nonce validation --- tests/tx_batching_test.go | 54 ++++++++++++++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 7 deletions(-) diff --git a/tests/tx_batching_test.go b/tests/tx_batching_test.go index 1db7e5c3..057015dc 100644 --- a/tests/tx_batching_test.go +++ b/tests/tx_batching_test.go @@ -515,7 +515,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) { } func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) { - _, cfg, stop := setupGatewayNode(t) + emu, cfg, stop := setupGatewayNode(t) defer stop() rpcTester := &rpcTest{ @@ -528,16 +528,28 @@ func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) testAddr := common.HexToAddress("0x061B63D29332e4de81bD9F51A48609824CD113a8") nonces := []uint64{0, 1, 2, 3, 2, 3, 4, 5} + g := errgroup.Group{} + + startBlock, err := emu.GetLatestBlock() + require.NoError(t, err) + hashes := []common.Hash{} // transfer some funds to the test address transferAmount := int64(1_000_000_000) - for _, nonce := range nonces { - signed, _, err := evmSign(big.NewInt(transferAmount), 23_500, eoaKey, nonce, &testAddr, nil) - require.NoError(t, err) - txHash, err := rpcTester.sendRawTx(signed) - require.NoError(t, err) - hashes = append(hashes, txHash) + for range 3 { + g.Go(func() error { + for _, nonce := range nonces { + signed, _, err := evmSign(big.NewInt(transferAmount), 23_500, eoaKey, nonce, &testAddr, nil) + require.NoError(t, err) + + txHash, err := rpcTester.sendRawTx(signed) + require.NoError(t, err) + hashes = append(hashes, txHash) + } + + return nil + }) } expectedBalance := big.NewInt(6 * transferAmount) @@ -550,6 +562,34 @@ func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) return balance.Cmp(expectedBalance) == 0 }, time.Second*15, time.Second*1, "all transactions were not executed") + + endBlock, err := emu.GetLatestBlock() + require.NoError(t, err) + + blockEvents, err := emu.GetEventsForHeightRange( + "A.f8d6e0586b0a20c7.EVM.TransactionExecuted", + startBlock.Height+1, + endBlock.Height, + ) + + totalEVMEvents := 0 + for _, blockEvent := range blockEvents { + totalEVMEvents += len(blockEvent.Events) + } + assert.Equal(t, 6, totalEVMEvents) + + for i := startBlock.Height; i <= endBlock.Height; i++ { + block, err := emu.GetBlockByHeight(i) + require.NoError(t, err) + + txResults, err := emu.GetTransactionResultsByBlockID(block.ID()) + require.NoError(t, err) + + for _, txResult := range txResults { + // Assert that we have no errors on the submitted transactions + require.Empty(t, txResult.ErrorMessage) + } + } } func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) { From b15fc47106e46ce57a7aacda9d0dacc85fb288de Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Thu, 4 Dec 2025 18:33:10 +0200 Subject: [PATCH 16/22] Use context.WithTimeout in submitSingleTransaction method --- services/requester/batch_tx_pool.go | 70 +++++++++++++++++++---------- 1 file changed, 46 insertions(+), 24 deletions(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index abda81fb..8830b1ae 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -284,34 +284,56 @@ func (t *BatchTxPool) submitSingleTransaction( ctx context.Context, hexEncodedTx cadence.String, ) error { - coinbaseAddress, err := cadence.NewString(t.config.Coinbase.Hex()) - if err != nil { - return err - } + done := make(chan struct{}) + var submitError error - script := replaceAddresses(runTxScript, t.config.FlowNetworkID) - flowTx, err := t.buildTransaction( - ctx, - t.getReferenceBlock(), - script, - cadence.NewArray([]cadence.Value{hexEncodedTx}), - coinbaseAddress, - ) - if err != nil { - // If there was any error during the transaction build - // process, we record it as a dropped transaction. - t.collector.TransactionsDropped(1) - return err - } + // This method is called while holding the `t.txMux` lock, + // don't let it run for a long time, to avoid lock-contention + ctx, cancel := context.WithTimeout(ctx, time.Second*3) + defer cancel() - if err := t.client.SendTransaction(ctx, *flowTx); err != nil { - // If there was any error while sending the transaction, - // we record it as a dropped transaction. - t.collector.TransactionsDropped(1) - return err + // build & submit transaction + go func() { + defer close(done) + + coinbaseAddress, err := cadence.NewString(t.config.Coinbase.Hex()) + if err != nil { + submitError = err + return + } + + script := replaceAddresses(runTxScript, t.config.FlowNetworkID) + flowTx, err := t.buildTransaction( + ctx, + t.getReferenceBlock(), + script, + cadence.NewArray([]cadence.Value{hexEncodedTx}), + coinbaseAddress, + ) + if err != nil { + // If there was any error during the transaction build + // process, we record it as a dropped transaction. + t.collector.TransactionsDropped(1) + submitError = err + return + } + + if err := t.client.SendTransaction(ctx, *flowTx); err != nil { + // If there was any error while sending the transaction, + // we record it as a dropped transaction. + t.collector.TransactionsDropped(1) + submitError = err + return + } + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-done: } - return nil + return submitError } func (t *BatchTxPool) updateEOAActivityMetadata( From ee7c26ab9123c1acd3be1a5d2ce74e2e0d3e42a9 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 8 Dec 2025 16:12:01 +0200 Subject: [PATCH 17/22] Increase the context deadline for tx submission to 4 seconds --- services/requester/batch_tx_pool.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index 8830b1ae..e71d061f 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -287,9 +287,9 @@ func (t *BatchTxPool) submitSingleTransaction( done := make(chan struct{}) var submitError error - // This method is called while holding the `t.txMux` lock, - // don't let it run for a long time, to avoid lock-contention - ctx, cancel := context.WithTimeout(ctx, time.Second*3) + // This method is called while holding the `t.txMux` lock. + // Do not let it run for a long time, to avoid lock-contention. + ctx, cancel := context.WithTimeout(ctx, time.Second*4) defer cancel() // build & submit transaction From 36475a97b4987467c998bbb02ef34ac3594cfbed Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 8 Dec 2025 16:13:25 +0200 Subject: [PATCH 18/22] Add comment to describe the workaround of context.WithTimeout for submitSingleTransaction --- services/requester/batch_tx_pool.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index e71d061f..5508bce9 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -292,7 +292,13 @@ func (t *BatchTxPool) submitSingleTransaction( ctx, cancel := context.WithTimeout(ctx, time.Second*4) defer cancel() - // build & submit transaction + // Build & submit the transaction, in a separate goroutine. The AN calls + // do not respect the `context.WithTimeout` deadline, and can run for as + // long as is necessary for their completion. + // `context.WithTimeout` arranges for Done to be closed when the specified + // timeout elapses, and at that point we return an error to abort the + // transaction submission, and release the `t.txMux` lock for the next + // requests. go func() { defer close(done) From c5f0acadecdab270d917779c75bf989491a01800 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 8 Dec 2025 17:29:35 +0200 Subject: [PATCH 19/22] fixup! Increase the context deadline for tx submission to 4 seconds --- services/requester/batch_tx_pool.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index 5508bce9..1293d5ee 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -289,6 +289,8 @@ func (t *BatchTxPool) submitSingleTransaction( // This method is called while holding the `t.txMux` lock. // Do not let it run for a long time, to avoid lock-contention. + // The 4-second timeout provides a 1-second buffer on top of ANs + // 3-second timeout for LN requests. ctx, cancel := context.WithTimeout(ctx, time.Second*4) defer cancel() From 967887e817a16f5d76ff9f9e25b820e71850f9f2 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 8 Dec 2025 19:32:03 +0200 Subject: [PATCH 20/22] Improve locking on batch transaction submission to better handler concurrent requests --- services/requester/batch_tx_pool.go | 45 +++++++++++++++++++---------- tests/tx_batching_test.go | 3 ++ 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index 1293d5ee..a9d05237 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -125,7 +125,6 @@ func (t *BatchTxPool) Add( } t.txMux.Lock() - defer t.txMux.Unlock() eoaActivity, found := t.eoaActivityCache.Get(from) nonce := tx.Nonce() @@ -133,14 +132,21 @@ func (t *BatchTxPool) Add( // Skip transactions that have been already submitted, // as they are *likely* to fail. if found && slices.Contains(eoaActivity.txNonces, nonce) { + t.txMux.Unlock() t.logger.Info(). Str("evm_tx", tx.Hash().Hex()). Str("from", from.Hex()). Uint64("nonce", nonce). Msg("tx with same nonce has been already submitted") + return nil } + t.updateEOAActivityMetadata(from, nonce) + + // Determine action while holding lock + var shouldSubmitSingle bool + // Scenarios // 1. EOA activity not found: // => We send the transaction individually, without adding it @@ -160,29 +166,40 @@ func (t *BatchTxPool) Add( // [X] is equal to the configured `TxBatchInterval` duration. if !found { // Case 1. EOA activity not found: - err = t.submitSingleTransaction(ctx, hexEncodedTx) + shouldSubmitSingle = true } else if time.Since(eoaActivity.lastSubmission) > t.config.TxBatchInterval { + // Case 2. EOA activity found AND it was more than [X] seconds ago: + // If the EOA has pooled transactions, which are not yet processed, // due to congestion or anything, make sure to include the current // tx on that batch. - hasBatch := len(t.pooledTxs[from]) > 0 - if hasBatch { - userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce} - t.pooledTxs[from] = append(t.pooledTxs[from], userTx) - } - - // If it wasn't batched, submit individually - if !hasBatch { - // Case 2. EOA activity found AND it was more than [X] seconds ago: - err = t.submitSingleTransaction(ctx, hexEncodedTx) - } + shouldSubmitSingle = (len(t.pooledTxs[from]) == 0) } else { // Case 3. EOA activity found AND it was less than [X] seconds ago: + shouldSubmitSingle = false + } + + // Pool transaction in the batch + if !shouldSubmitSingle { userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce} t.pooledTxs[from] = append(t.pooledTxs[from], userTx) } + // Release lock before network I/O operation + t.txMux.Unlock() + + // Submit single transaction without holding lock + if shouldSubmitSingle { + err = t.submitSingleTransaction(ctx, hexEncodedTx) + } + if err != nil { + t.txMux.Lock() + // If there was an error during tx submission, remove the entry + // from the cache, to not block future requests with same nonce. + t.eoaActivityCache.Remove(from) + t.txMux.Unlock() + t.logger.Error().Err(err).Msgf( "failed to submit single Flow transaction for EOA: %s", from.Hex(), @@ -190,8 +207,6 @@ func (t *BatchTxPool) Add( return err } - t.updateEOAActivityMetadata(from, nonce) - return nil } diff --git a/tests/tx_batching_test.go b/tests/tx_batching_test.go index 057015dc..48d96094 100644 --- a/tests/tx_batching_test.go +++ b/tests/tx_batching_test.go @@ -552,6 +552,9 @@ func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) }) } + err = g.Wait() + require.NoError(t, err) + expectedBalance := big.NewInt(6 * transferAmount) assert.Eventually(t, func() bool { From b88528a2b8dabf2f76ddd7bfef60e1e80177ad4b Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 8 Dec 2025 19:33:29 +0200 Subject: [PATCH 21/22] Update timeout for tx submission --- services/requester/batch_tx_pool.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index a9d05237..e1c60326 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -302,11 +302,9 @@ func (t *BatchTxPool) submitSingleTransaction( done := make(chan struct{}) var submitError error - // This method is called while holding the `t.txMux` lock. - // Do not let it run for a long time, to avoid lock-contention. - // The 4-second timeout provides a 1-second buffer on top of ANs + // The 5-second timeout provides a 2-second buffer on top of ANs // 3-second timeout for LN requests. - ctx, cancel := context.WithTimeout(ctx, time.Second*4) + ctx, cancel := context.WithTimeout(ctx, time.Second*5) defer cancel() // Build & submit the transaction, in a separate goroutine. The AN calls @@ -314,8 +312,7 @@ func (t *BatchTxPool) submitSingleTransaction( // long as is necessary for their completion. // `context.WithTimeout` arranges for Done to be closed when the specified // timeout elapses, and at that point we return an error to abort the - // transaction submission, and release the `t.txMux` lock for the next - // requests. + // transaction submission. go func() { defer close(done) From eacef7196270a6547dc5fe537b10cfd00985f75a Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 26 Jan 2026 12:04:03 +0200 Subject: [PATCH 22/22] Remove redundant lock acqusition when calling Remove() on expirable.LRU --- services/requester/batch_tx_pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index e1c60326..11b507c1 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -194,11 +194,11 @@ func (t *BatchTxPool) Add( } if err != nil { - t.txMux.Lock() // If there was an error during tx submission, remove the entry // from the cache, to not block future requests with same nonce. + // Note: No need to acquire the `t.txMux` lock, `Remove` already + // has an internal lock. t.eoaActivityCache.Remove(from) - t.txMux.Unlock() t.logger.Error().Err(err).Msgf( "failed to submit single Flow transaction for EOA: %s",