From 62deb8bab0303cd9afdbbcc40246564366b71c54 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 24 Nov 2025 18:07:32 +0200 Subject: [PATCH 01/23] Reject transactions that have already been submitted to the tx pool --- models/errors/errors.go | 5 +- services/requester/batch_tx_pool.go | 103 +++++++++++++++++++--------- tests/tx_batching_test.go | 61 ++++++++-------- 3 files changed, 105 insertions(+), 64 deletions(-) diff --git a/models/errors/errors.go b/models/errors/errors.go index 614c7403..fb0f38d6 100644 --- a/models/errors/errors.go +++ b/models/errors/errors.go @@ -28,9 +28,8 @@ var ( // Transaction errors - ErrFailedTransaction = errors.New("failed transaction") - ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction) - ErrDuplicateTransaction = fmt.Errorf("%w: %s", ErrInvalid, "transaction already in pool") + ErrFailedTransaction = errors.New("failed transaction") + ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction) // Storage errors diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index d82ae258..9f95e53c 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -3,6 +3,7 @@ package requester import ( "context" "encoding/hex" + "fmt" "slices" "sort" "sync" @@ -22,35 +23,43 @@ import ( "github.com/onflow/flow-evm-gateway/services/requester/keystore" ) -const eoaActivityCacheSize = 10_000 +const ( + eoaActivityCacheSize = 10_000 + eoaActivityCacheTTL = time.Second * 10 + maxTrackedTxHashesPerEOA = 15 +) type pooledEvmTx struct { txPayload cadence.String - txHash gethCommon.Hash nonce uint64 } -// BatchTxPool is a `TxPool` implementation that collects and groups -// transactions based on their EOA signer, and submits them for execution -// using a batch. +type eoaActivityMetadata struct { + lastSubmission time.Time + txHashes []gethCommon.Hash +} + +// BatchTxPool is a `TxPool` implementation that groups incoming transactions +// based on their EOA signer, and submits them for execution using a batch. // // The underlying Cadence EVM API used, is `EVM.batchRun`, instead of the // `EVM.run` used in `SingleTxPool`. // // The main advantage of this implementation over the `SingleTxPool`, is the -// guarantee that transactions originated from the same EOA address, which -// arrive in a short time interval (about the same as Flow's block production rate), -// will be executed in the same order their arrived. -// This helps to reduce the nonce mismatch errors which mainly occur from the -// re-ordering of Cadence transactions that happens from Collection nodes. +// guarantee that transactions originating from the same EOA address, which +// arrive in a short time interval (configurable by the node operator), +// will be executed in the same order they arrived. +// This helps to reduce the execution errors which may occur from the +// re-ordering of Cadence transactions that happens on Collection nodes. type BatchTxPool struct { *SingleTxPool - pooledTxs map[gethCommon.Address][]pooledEvmTx - txMux sync.Mutex - eoaActivity *expirable.LRU[gethCommon.Address, time.Time] + + pooledTxs map[gethCommon.Address][]pooledEvmTx + txMux sync.Mutex + eoaActivityCache *expirable.LRU[gethCommon.Address, eoaActivityMetadata] } -var _ TxPool = &BatchTxPool{} +var _ TxPool = (*BatchTxPool)(nil) func NewBatchTxPool( ctx context.Context, @@ -77,16 +86,16 @@ func NewBatchTxPool( return nil, err } - eoaActivity := expirable.NewLRU[gethCommon.Address, time.Time]( + eoaActivityCache := expirable.NewLRU[gethCommon.Address, eoaActivityMetadata]( eoaActivityCacheSize, nil, - config.EOAActivityCacheTTL, + eoaActivityCacheTTL, ) batchPool := &BatchTxPool{ - SingleTxPool: singleTxPool, - pooledTxs: make(map[gethCommon.Address][]pooledEvmTx), - txMux: sync.Mutex{}, - eoaActivity: eoaActivity, + SingleTxPool: singleTxPool, + pooledTxs: make(map[gethCommon.Address][]pooledEvmTx), + txMux: sync.Mutex{}, + eoaActivityCache: eoaActivityCache, } go batchPool.processPooledTransactions(ctx) @@ -123,6 +132,21 @@ func (t *BatchTxPool) Add( return err } + eoaActivity, found := t.eoaActivityCache.Get(from) + txHash := tx.Hash() + + // Reject transactions that have already been submitted, + // as they are *likely* to fail. Two transactions with + // identical hashes, are expected to have the exact same + // payload. + if found && slices.Contains(eoaActivity.txHashes, txHash) { + return fmt.Errorf( + "%w: a tx with hash %s has already been submitted", + errs.ErrInvalid, + txHash, + ) + } + // Scenarios // 1. EOA activity not found: // => We send the transaction individually, without adding it @@ -140,27 +164,42 @@ func (t *BatchTxPool) Add( // For all 3 cases, we record the activity time for the next // transactions that might come from the same EOA. // [X] is equal to the configured `TxBatchInterval` duration. - lastActivityTime, found := t.eoaActivity.Get(from) - if !found { // Case 1. EOA activity not found: err = t.submitSingleTransaction(ctx, hexEncodedTx) - } else if time.Since(lastActivityTime) > t.config.TxBatchInterval { - // Case 2. EOA activity found AND it was more than [X] seconds ago: - err = t.submitSingleTransaction(ctx, hexEncodedTx) + } else if time.Since(eoaActivity.lastSubmission) > t.config.TxBatchInterval { + if len(t.pooledTxs[from]) > 0 { + // If the EOA has pooled transactions, which are not yet processed, + // due to congestion or anything, make sure to include the current + // tx on that batch. + userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: tx.Nonce()} + t.pooledTxs[from] = append(t.pooledTxs[from], userTx) + } else { + // Case 2. EOA activity found AND it was more than [X] seconds ago: + err = t.submitSingleTransaction(ctx, hexEncodedTx) + } } else { // Case 3. EOA activity found AND it was less than [X] seconds ago: - userTx := pooledEvmTx{txPayload: hexEncodedTx, txHash: tx.Hash(), nonce: tx.Nonce()} - // Prevent submission of duplicate transactions, based on their tx hash - if slices.Contains(t.pooledTxs[from], userTx) { - return errs.ErrDuplicateTransaction - } + userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: tx.Nonce()} t.pooledTxs[from] = append(t.pooledTxs[from], userTx) } - t.eoaActivity.Add(from, time.Now()) + if err != nil { + return err + } + + // Update metadata for the last EOA activity only on successful add/submit. + eoaActivity.lastSubmission = time.Now() + eoaActivity.txHashes = append(eoaActivity.txHashes, txHash) + // To avoid the slice of hashes from growing indefinitely, + // maintain only a handful of the last tx hashes. + if len(eoaActivity.txHashes) > maxTrackedTxHashesPerEOA { + eoaActivity.txHashes = eoaActivity.txHashes[1:] + } + + t.eoaActivityCache.Add(from, eoaActivity) - return err + return nil } func (t *BatchTxPool) processPooledTransactions(ctx context.Context) { diff --git a/tests/tx_batching_test.go b/tests/tx_batching_test.go index fd47c310..64fed93c 100644 --- a/tests/tx_batching_test.go +++ b/tests/tx_batching_test.go @@ -332,7 +332,7 @@ func Test_MultipleTransactionSubmissionsWithinRecentInterval(t *testing.T) { // activity of the EOA was X seconds ago, where: // X = `cfg.TxBatchInterval`. // For the E2E tests the `cfg.TxBatchInterval` is equal - // to 2 seconds. + // to 2.5 seconds. for i := range uint64(2) { signed, _, err := evmSign( big.NewInt(500_000), @@ -446,7 +446,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) { // activity of the EOA was X seconds ago, where: // X = `cfg.TxBatchInterval`. // For the E2E tests the `cfg.TxBatchInterval` is equal - // to 2 seconds. + // to 2.5 seconds. for i := range uint64(2) { signed, _, err := evmSign( big.NewInt(500_000), @@ -514,7 +514,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) { ) } -func Test_MultipleTransactionSubmissionsWithDuplicates(t *testing.T) { +func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) { _, cfg, stop := setupGatewayNode(t) defer stop() @@ -525,36 +525,36 @@ func Test_MultipleTransactionSubmissionsWithDuplicates(t *testing.T) { eoaKey, err := crypto.HexToECDSA(eoaTestPrivateKey) require.NoError(t, err) - testAddr := common.HexToAddress("55253ed90B70b96C73092D8680915aaF50081194") - nonce := uint64(0) - hashes := make([]common.Hash, 0) - - signed, _, err := evmSign(big.NewInt(10), 21000, eoaKey, nonce, &testAddr, nil) - require.NoError(t, err) - - txHash, err := rpcTester.sendRawTx(signed) - require.NoError(t, err) - hashes = append(hashes, txHash) + testAddr := common.HexToAddress("0x061B63D29332e4de81bD9F51A48609824CD113a8") + nonces := []uint64{0, 1, 2, 3, 2, 3, 4, 5} - // Increment nonce for the duplicate test transactions that follow - nonce += 1 - dupSigned, _, err := evmSign(big.NewInt(10), 15_000_000, eoaKey, nonce, &testAddr, nil) - require.NoError(t, err) + var errors []error + hashes := []common.Hash{} + // transfer some funds to the test address + for _, nonce := range nonces { + signed, _, err := evmSign(big.NewInt(1_000_000_000), 23_500, eoaKey, nonce, &testAddr, nil) + require.NoError(t, err) - // Submit 5 identical transactions to test duplicate detection: - // the first should succeed, the rest should be rejected as duplicates. - for i := range 5 { - if i == 0 { - txHash, err := rpcTester.sendRawTx(dupSigned) - require.NoError(t, err) - hashes = append(hashes, txHash) + txHash, err := rpcTester.sendRawTx(signed) + if err != nil { + errors = append(errors, err) } else { - _, err := rpcTester.sendRawTx(dupSigned) - require.Error(t, err) - require.ErrorContains(t, err, "invalid: transaction already in pool") + hashes = append(hashes, txHash) } } + require.Len(t, errors, 2) + assert.ErrorContains( + t, + errors[0], + "a tx with hash 0x2bdf4aa4c3e273a624dddfdbde6614786b6a5329e246c531d3e0e9f92e79e04d has already been submitted", + ) + assert.ErrorContains( + t, + errors[1], + "a tx with hash 0xb72e1f83861a63b5ad4b927295af07fa9546b01aac5dfce046a5fb20f9be9f2f has already been submitted", + ) + assert.Eventually(t, func() bool { for _, h := range hashes { rcp, err := rpcTester.getReceipt(h.String()) @@ -604,9 +604,9 @@ func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) { EnforceGasPrice: true, LogLevel: zerolog.DebugLevel, LogWriter: testLogWriter(), - TxStateValidation: config.TxSealValidation, + TxStateValidation: config.LocalIndexValidation, TxBatchMode: true, - TxBatchInterval: time.Second * 2, + TxBatchInterval: time.Millisecond * 2500, // 2.5 seconds, the same as mainnet } bootstrapDone := make(chan struct{}) @@ -617,6 +617,9 @@ func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) { require.NoError(t, err) }() + // Allow the Gateway to catch up on indexing + time.Sleep(time.Second * 2) + <-bootstrapDone return emu, cfg, func() { From c6b18b1b178af9f9a5c01aeb3def5f989e8f0361 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Fri, 28 Nov 2025 13:28:23 +0200 Subject: [PATCH 02/23] Locking should only protected operations on pooledTxs field --- services/requester/batch_tx_pool.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index 9f95e53c..949ddab9 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -25,7 +25,6 @@ import ( const ( eoaActivityCacheSize = 10_000 - eoaActivityCacheTTL = time.Second * 10 maxTrackedTxHashesPerEOA = 15 ) @@ -89,7 +88,7 @@ func NewBatchTxPool( eoaActivityCache := expirable.NewLRU[gethCommon.Address, eoaActivityMetadata]( eoaActivityCacheSize, nil, - eoaActivityCacheTTL, + config.EOAActivityCacheTTL, ) batchPool := &BatchTxPool{ SingleTxPool: singleTxPool, @@ -113,11 +112,6 @@ func (t *BatchTxPool) Add( ) error { t.txPublisher.Publish(tx) // publish pending transaction event - // tx adding should be blocking, so we don't have races when - // pooled transactions are being processed in the background. - t.txMux.Lock() - defer t.txMux.Unlock() - from, err := models.DeriveTxSender(tx) if err != nil { return err @@ -168,20 +162,28 @@ func (t *BatchTxPool) Add( // Case 1. EOA activity not found: err = t.submitSingleTransaction(ctx, hexEncodedTx) } else if time.Since(eoaActivity.lastSubmission) > t.config.TxBatchInterval { - if len(t.pooledTxs[from]) > 0 { - // If the EOA has pooled transactions, which are not yet processed, - // due to congestion or anything, make sure to include the current - // tx on that batch. + // If the EOA has pooled transactions, which are not yet processed, + // due to congestion or anything, make sure to include the current + // tx on that batch. + t.txMux.Lock() + hasBatch := len(t.pooledTxs[from]) > 0 + if hasBatch { userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: tx.Nonce()} t.pooledTxs[from] = append(t.pooledTxs[from], userTx) - } else { + } + t.txMux.Unlock() + + // If it wasn't batched, submit individually + if !hasBatch { // Case 2. EOA activity found AND it was more than [X] seconds ago: err = t.submitSingleTransaction(ctx, hexEncodedTx) } } else { // Case 3. EOA activity found AND it was less than [X] seconds ago: + t.txMux.Lock() userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: tx.Nonce()} t.pooledTxs[from] = append(t.pooledTxs[from], userTx) + t.txMux.Unlock() } if err != nil { From 856da055dd420f9a642975be4ad3175bc98027ac Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Fri, 28 Nov 2025 13:56:03 +0200 Subject: [PATCH 03/23] Remove locking around account key fetching --- services/requester/single_tx_pool.go | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/services/requester/single_tx_pool.go b/services/requester/single_tx_pool.go index e8de3e89..89b1bc34 100644 --- a/services/requester/single_tx_pool.go +++ b/services/requester/single_tx_pool.go @@ -31,7 +31,6 @@ type SingleTxPool struct { pool *sync.Map txPublisher *models.Publisher[*gethTypes.Transaction] config config.Config - mux sync.Mutex keystore *keystore.KeyStore collector metrics.Collector // referenceBlockHeader is stored atomically to avoid races @@ -40,7 +39,7 @@ type SingleTxPool struct { // todo add methods to inspect transaction pool state } -var _ TxPool = &SingleTxPool{} +var _ TxPool = (*SingleTxPool)(nil) func NewSingleTxPool( ctx context.Context, @@ -188,7 +187,11 @@ func (t *SingleTxPool) buildTransaction( } } - accKey, err := t.fetchSigningAccountKey() + // getting an account key from the `KeyStore` for signing transactions, + // *does not* need to be lock-protected, as the keys are read from a + // channel. No two go-routines will end up getting the same account + // key at the same time. + accKey, err := t.keystore.Take() if err != nil { return nil, err } @@ -211,16 +214,6 @@ func (t *SingleTxPool) buildTransaction( return flowTx, nil } -func (t *SingleTxPool) fetchSigningAccountKey() (*keystore.AccountKey, error) { - // getting an account key from the `KeyStore` for signing transactions, - // should be lock-protected, so that we don't sign any two Flow - // transactions with the same account key - t.mux.Lock() - defer t.mux.Unlock() - - return t.keystore.Take() -} - func (t *SingleTxPool) getReferenceBlock() *flow.BlockHeader { if v := t.referenceBlockHeader.Load(); v != nil { return v.(*flow.BlockHeader) From 33d6624f16a1009c4555e71e810ee3dc49087ba8 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Fri, 28 Nov 2025 16:31:00 +0200 Subject: [PATCH 04/23] Track submitted tx nonces instead of tx hashes --- services/requester/batch_tx_pool.go | 30 ++++++++++++++--------------- tests/tx_batching_test.go | 4 ++-- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index 949ddab9..a37c8dcc 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -25,7 +25,7 @@ import ( const ( eoaActivityCacheSize = 10_000 - maxTrackedTxHashesPerEOA = 15 + maxTrackedTxNoncesPerEOA = 15 ) type pooledEvmTx struct { @@ -35,7 +35,7 @@ type pooledEvmTx struct { type eoaActivityMetadata struct { lastSubmission time.Time - txHashes []gethCommon.Hash + txNonces []uint64 } // BatchTxPool is a `TxPool` implementation that groups incoming transactions @@ -127,17 +127,15 @@ func (t *BatchTxPool) Add( } eoaActivity, found := t.eoaActivityCache.Get(from) - txHash := tx.Hash() + nonce := tx.Nonce() // Reject transactions that have already been submitted, - // as they are *likely* to fail. Two transactions with - // identical hashes, are expected to have the exact same - // payload. - if found && slices.Contains(eoaActivity.txHashes, txHash) { + // as they are *likely* to fail. + if found && slices.Contains(eoaActivity.txNonces, nonce) { return fmt.Errorf( - "%w: a tx with hash %s has already been submitted", + "%w: a tx with nonce %d has already been submitted", errs.ErrInvalid, - txHash, + nonce, ) } @@ -168,7 +166,7 @@ func (t *BatchTxPool) Add( t.txMux.Lock() hasBatch := len(t.pooledTxs[from]) > 0 if hasBatch { - userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: tx.Nonce()} + userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce} t.pooledTxs[from] = append(t.pooledTxs[from], userTx) } t.txMux.Unlock() @@ -181,7 +179,7 @@ func (t *BatchTxPool) Add( } else { // Case 3. EOA activity found AND it was less than [X] seconds ago: t.txMux.Lock() - userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: tx.Nonce()} + userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce} t.pooledTxs[from] = append(t.pooledTxs[from], userTx) t.txMux.Unlock() } @@ -192,11 +190,11 @@ func (t *BatchTxPool) Add( // Update metadata for the last EOA activity only on successful add/submit. eoaActivity.lastSubmission = time.Now() - eoaActivity.txHashes = append(eoaActivity.txHashes, txHash) - // To avoid the slice of hashes from growing indefinitely, - // maintain only a handful of the last tx hashes. - if len(eoaActivity.txHashes) > maxTrackedTxHashesPerEOA { - eoaActivity.txHashes = eoaActivity.txHashes[1:] + eoaActivity.txNonces = append(eoaActivity.txNonces, nonce) + // To avoid the slice of nonces from growing indefinitely, + // maintain only a handful of the last tx nonces. + if len(eoaActivity.txNonces) > maxTrackedTxNoncesPerEOA { + eoaActivity.txNonces = eoaActivity.txNonces[1:] } t.eoaActivityCache.Add(from, eoaActivity) diff --git a/tests/tx_batching_test.go b/tests/tx_batching_test.go index 64fed93c..a65b1ba7 100644 --- a/tests/tx_batching_test.go +++ b/tests/tx_batching_test.go @@ -547,12 +547,12 @@ func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) assert.ErrorContains( t, errors[0], - "a tx with hash 0x2bdf4aa4c3e273a624dddfdbde6614786b6a5329e246c531d3e0e9f92e79e04d has already been submitted", + "a tx with nonce 2 has already been submitted", ) assert.ErrorContains( t, errors[1], - "a tx with hash 0xb72e1f83861a63b5ad4b927295af07fa9546b01aac5dfce046a5fb20f9be9f2f has already been submitted", + "a tx with nonce 3 has already been submitted", ) assert.Eventually(t, func() bool { From 3833d6ea4fac3be224bbe586eb76b45281dc7566 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Sat, 29 Nov 2025 11:53:21 +0200 Subject: [PATCH 05/23] Update trimming logic for tracked tx nonces Co-authored-by: Peter Argue <89119817+peterargue@users.noreply.github.com> --- services/requester/batch_tx_pool.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index a37c8dcc..92871eb0 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -192,9 +192,10 @@ func (t *BatchTxPool) Add( eoaActivity.lastSubmission = time.Now() eoaActivity.txNonces = append(eoaActivity.txNonces, nonce) // To avoid the slice of nonces from growing indefinitely, - // maintain only a handful of the last tx nonces. + // keep only the last `maxTrackedTxNoncesPerEOA` nonces. if len(eoaActivity.txNonces) > maxTrackedTxNoncesPerEOA { - eoaActivity.txNonces = eoaActivity.txNonces[1:] + firstKeep := len(eoaActivity.txNonces) - maxTrackedTxNoncesPerEOA + eoaActivity.txNonces = eoaActivity.txNonces[firstKeep:] } t.eoaActivityCache.Add(from, eoaActivity) From 70b0bba0664dc34419e3fb1d4b26159d857b52b4 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Sat, 29 Nov 2025 11:59:10 +0200 Subject: [PATCH 06/23] Increase default maxTrackedTxNoncesPerEOA to 30 --- services/requester/batch_tx_pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index 92871eb0..3294f370 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -25,7 +25,7 @@ import ( const ( eoaActivityCacheSize = 10_000 - maxTrackedTxNoncesPerEOA = 15 + maxTrackedTxNoncesPerEOA = 30 ) type pooledEvmTx struct { From 85a2560032c616dfcaeaf20605651d0a25bc79f6 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Sat, 29 Nov 2025 13:12:56 +0200 Subject: [PATCH 07/23] Silently skip already submitted transactions instead of returning an error --- services/requester/batch_tx_pool.go | 15 ++++++------- tests/tx_batching_test.go | 33 ++++++++--------------------- 2 files changed, 16 insertions(+), 32 deletions(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index 3294f370..a6b20b56 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -3,7 +3,6 @@ package requester import ( "context" "encoding/hex" - "fmt" "slices" "sort" "sync" @@ -19,7 +18,6 @@ import ( "github.com/onflow/flow-evm-gateway/config" "github.com/onflow/flow-evm-gateway/metrics" "github.com/onflow/flow-evm-gateway/models" - errs "github.com/onflow/flow-evm-gateway/models/errors" "github.com/onflow/flow-evm-gateway/services/requester/keystore" ) @@ -129,14 +127,15 @@ func (t *BatchTxPool) Add( eoaActivity, found := t.eoaActivityCache.Get(from) nonce := tx.Nonce() - // Reject transactions that have already been submitted, + // Skip transactions that have been already submitted, // as they are *likely* to fail. if found && slices.Contains(eoaActivity.txNonces, nonce) { - return fmt.Errorf( - "%w: a tx with nonce %d has already been submitted", - errs.ErrInvalid, - nonce, - ) + t.logger.Info(). + Str("evm_tx", tx.Hash().Hex()). + Str("from", from.Hex()). + Uint64("nonce", nonce). + Msg("tx with same nonce has been already submitted") + return nil } // Scenarios diff --git a/tests/tx_batching_test.go b/tests/tx_batching_test.go index a65b1ba7..1db7e5c3 100644 --- a/tests/tx_batching_test.go +++ b/tests/tx_batching_test.go @@ -528,42 +528,27 @@ func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) testAddr := common.HexToAddress("0x061B63D29332e4de81bD9F51A48609824CD113a8") nonces := []uint64{0, 1, 2, 3, 2, 3, 4, 5} - var errors []error hashes := []common.Hash{} // transfer some funds to the test address + transferAmount := int64(1_000_000_000) for _, nonce := range nonces { - signed, _, err := evmSign(big.NewInt(1_000_000_000), 23_500, eoaKey, nonce, &testAddr, nil) + signed, _, err := evmSign(big.NewInt(transferAmount), 23_500, eoaKey, nonce, &testAddr, nil) require.NoError(t, err) txHash, err := rpcTester.sendRawTx(signed) - if err != nil { - errors = append(errors, err) - } else { - hashes = append(hashes, txHash) - } + require.NoError(t, err) + hashes = append(hashes, txHash) } - require.Len(t, errors, 2) - assert.ErrorContains( - t, - errors[0], - "a tx with nonce 2 has already been submitted", - ) - assert.ErrorContains( - t, - errors[1], - "a tx with nonce 3 has already been submitted", - ) + expectedBalance := big.NewInt(6 * transferAmount) assert.Eventually(t, func() bool { - for _, h := range hashes { - rcp, err := rpcTester.getReceipt(h.String()) - if err != nil || rcp == nil || rcp.Status != 1 { - return false - } + balance, err := rpcTester.getBalance(testAddr) + if err != nil { + return false } - return true + return balance.Cmp(expectedBalance) == 0 }, time.Second*15, time.Second*1, "all transactions were not executed") } From 7ac8f98eb360366c5c6288d8f9ca7dfa7bdddae7 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Sat, 29 Nov 2025 13:13:31 +0200 Subject: [PATCH 08/23] Remove dashes from log fields to comply with Grafana --- services/requester/requester.go | 2 +- services/requester/single_tx_pool.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/services/requester/requester.go b/services/requester/requester.go index 55c3440c..3006fbbe 100644 --- a/services/requester/requester.go +++ b/services/requester/requester.go @@ -238,7 +238,7 @@ func (e *EVM) SendRawTransaction(ctx context.Context, data []byte) (common.Hash, } e.logger.Info(). - Str("evm-id", tx.Hash().Hex()). + Str("evm_tx", tx.Hash().Hex()). Str("to", to). Str("from", from.Hex()). Str("value", tx.Value().String()). diff --git a/services/requester/single_tx_pool.go b/services/requester/single_tx_pool.go index 89b1bc34..b4997319 100644 --- a/services/requester/single_tx_pool.go +++ b/services/requester/single_tx_pool.go @@ -149,8 +149,8 @@ func (t *SingleTxPool) Add( } t.logger.Error().Err(res.Error). - Str("flow-id", flowTx.ID().String()). - Str("evm-id", tx.Hash().Hex()). + Str("flow_tx", flowTx.ID().String()). + Str("evm_tx", tx.Hash().Hex()). Msg("flow transaction error") // hide specific cause since it's an implementation issue From cb159ec8704a472fd66089c91e6cf4d50a552bc1 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Sat, 29 Nov 2025 19:10:21 +0200 Subject: [PATCH 09/23] Remove redundant error logging from resolveBlockTag function --- api/utils.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/api/utils.go b/api/utils.go index dc9d2ddd..9723e637 100644 --- a/api/utils.go +++ b/api/utils.go @@ -29,9 +29,6 @@ func resolveBlockTag( if number, ok := blockNumberOrHash.Number(); ok { height, err := resolveBlockNumber(number, blocksDB) if err != nil { - logger.Error().Err(err). - Stringer("block_number", number). - Msg("failed to resolve block by number") return 0, err } return height, nil @@ -40,9 +37,6 @@ func resolveBlockTag( if hash, ok := blockNumberOrHash.Hash(); ok { height, err := blocksDB.GetHeightByID(hash) if err != nil { - logger.Error().Err(err). - Stringer("block_hash", hash). - Msg("failed to resolve block by hash") return 0, err } return height, nil From 80ce2ad8e3bb578c56f3b9a93605ba256cd85aa1 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Sun, 30 Nov 2025 13:33:39 +0200 Subject: [PATCH 10/23] Improve logging & tracking of dropped transactions --- services/requester/batch_tx_pool.go | 12 +++++++++++- services/requester/single_tx_pool.go | 16 ++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index a6b20b56..2c793e44 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -184,6 +184,10 @@ func (t *BatchTxPool) Add( } if err != nil { + t.logger.Error().Err(err).Msgf( + "failed to submit single Flow transaction for EOA: %s", + from.Hex(), + ) return err } @@ -227,7 +231,7 @@ func (t *BatchTxPool) processPooledTransactions(ctx context.Context) { ) if err != nil { t.logger.Error().Err(err).Msgf( - "failed to submit Flow transaction from BatchTxPool for EOA: %s", + "failed to submit batch Flow transaction for EOA: %s", address.Hex(), ) continue @@ -274,6 +278,9 @@ func (t *BatchTxPool) batchSubmitTransactionsForSameAddress( } if err := t.client.SendTransaction(ctx, *flowTx); err != nil { + // If there was any error while sending the transaction, + // we record all transactions as dropped. + t.collector.TransactionsDropped(len(hexEncodedTxs)) return err } @@ -305,6 +312,9 @@ func (t *BatchTxPool) submitSingleTransaction( } if err := t.client.SendTransaction(ctx, *flowTx); err != nil { + // If there was any error while sending the transaction, + // we record it as a dropped transaction. + t.collector.TransactionsDropped(1) return err } diff --git a/services/requester/single_tx_pool.go b/services/requester/single_tx_pool.go index b4997319..01120e37 100644 --- a/services/requester/single_tx_pool.go +++ b/services/requester/single_tx_pool.go @@ -95,6 +95,11 @@ func (t *SingleTxPool) Add( ) error { t.txPublisher.Publish(tx) // publish pending transaction event + from, err := models.DeriveTxSender(tx) + if err != nil { + return err + } + txData, err := tx.MarshalBinary() if err != nil { return err @@ -120,10 +125,21 @@ func (t *SingleTxPool) Add( // If there was any error during the transaction build // process, we record it as a dropped transaction. t.collector.TransactionsDropped(1) + t.logger.Error().Err(err).Msgf( + "failed to build Flow transaction for EOA: %s", + from.Hex(), + ) return err } if err := t.client.SendTransaction(ctx, *flowTx); err != nil { + // If there was any error while sending the transaction, + // we record it as a dropped transaction. + t.collector.TransactionsDropped(1) + t.logger.Error().Err(err).Msgf( + "failed to submit Flow transaction for EOA: %s", + from.Hex(), + ) return err } From 0b94bc1814cb150ff0bdd3671217fa7965e26631 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Sun, 30 Nov 2025 16:12:02 +0200 Subject: [PATCH 11/23] Add lock-protection on writes to eoaActivityMetadata --- services/requester/batch_tx_pool.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index 2c793e44..49e33e7d 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -191,7 +191,11 @@ func (t *BatchTxPool) Add( return err } + t.txMux.Lock() + defer t.txMux.Unlock() + // Update metadata for the last EOA activity only on successful add/submit. + eoaActivity, _ = t.eoaActivityCache.Get(from) eoaActivity.lastSubmission = time.Now() eoaActivity.txNonces = append(eoaActivity.txNonces, nonce) // To avoid the slice of nonces from growing indefinitely, From e103d5767af07c279efdfb417e71a2ccc8532b70 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Sun, 30 Nov 2025 18:11:30 +0200 Subject: [PATCH 12/23] Add retry mechanism on pooled transactions --- services/requester/batch_tx_pool.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index 49e33e7d..80998feb 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -238,7 +238,11 @@ func (t *BatchTxPool) processPooledTransactions(ctx context.Context) { "failed to submit batch Flow transaction for EOA: %s", address.Hex(), ) - continue + // In case of any error, add the transactions back to the pool, + // as a retry mechanism. + t.txMux.Lock() + t.pooledTxs[address] = append(t.pooledTxs[address], pooledTxs...) + t.txMux.Unlock() } } } From cc3bad3fd7898bb7a4ccf5af83cf984d4418dba4 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Sun, 30 Nov 2025 18:40:34 +0200 Subject: [PATCH 13/23] Extract EOA activity metadata update to its own method --- services/requester/batch_tx_pool.go | 37 +++++++++++++++++------------ 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index 80998feb..ac81993b 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -191,21 +191,7 @@ func (t *BatchTxPool) Add( return err } - t.txMux.Lock() - defer t.txMux.Unlock() - - // Update metadata for the last EOA activity only on successful add/submit. - eoaActivity, _ = t.eoaActivityCache.Get(from) - eoaActivity.lastSubmission = time.Now() - eoaActivity.txNonces = append(eoaActivity.txNonces, nonce) - // To avoid the slice of nonces from growing indefinitely, - // keep only the last `maxTrackedTxNoncesPerEOA` nonces. - if len(eoaActivity.txNonces) > maxTrackedTxNoncesPerEOA { - firstKeep := len(eoaActivity.txNonces) - maxTrackedTxNoncesPerEOA - eoaActivity.txNonces = eoaActivity.txNonces[firstKeep:] - } - - t.eoaActivityCache.Add(from, eoaActivity) + t.updateEOAActivityMetadata(from, nonce) return nil } @@ -328,3 +314,24 @@ func (t *BatchTxPool) submitSingleTransaction( return nil } + +func (t *BatchTxPool) updateEOAActivityMetadata( + from gethCommon.Address, + nonce uint64, +) { + t.txMux.Lock() + defer t.txMux.Unlock() + + // Update metadata for the last EOA activity only on successful add/submit. + eoaActivity, _ := t.eoaActivityCache.Get(from) + eoaActivity.lastSubmission = time.Now() + eoaActivity.txNonces = append(eoaActivity.txNonces, nonce) + // To avoid the slice of nonces from growing indefinitely, + // keep only the last `maxTrackedTxNoncesPerEOA` nonces. + if len(eoaActivity.txNonces) > maxTrackedTxNoncesPerEOA { + firstKeep := len(eoaActivity.txNonces) - maxTrackedTxNoncesPerEOA + eoaActivity.txNonces = eoaActivity.txNonces[firstKeep:] + } + + t.eoaActivityCache.Add(from, eoaActivity) +} From 41c7839dc9080d64906b8deb1c31c037df1a6546 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Thu, 4 Dec 2025 13:05:02 +0200 Subject: [PATCH 14/23] Improve locking on batch transaction submission to better handler concurrent requests --- services/requester/batch_tx_pool.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index ac81993b..abda81fb 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -124,6 +124,9 @@ func (t *BatchTxPool) Add( return err } + t.txMux.Lock() + defer t.txMux.Unlock() + eoaActivity, found := t.eoaActivityCache.Get(from) nonce := tx.Nonce() @@ -162,13 +165,11 @@ func (t *BatchTxPool) Add( // If the EOA has pooled transactions, which are not yet processed, // due to congestion or anything, make sure to include the current // tx on that batch. - t.txMux.Lock() hasBatch := len(t.pooledTxs[from]) > 0 if hasBatch { userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce} t.pooledTxs[from] = append(t.pooledTxs[from], userTx) } - t.txMux.Unlock() // If it wasn't batched, submit individually if !hasBatch { @@ -177,10 +178,8 @@ func (t *BatchTxPool) Add( } } else { // Case 3. EOA activity found AND it was less than [X] seconds ago: - t.txMux.Lock() userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce} t.pooledTxs[from] = append(t.pooledTxs[from], userTx) - t.txMux.Unlock() } if err != nil { @@ -319,9 +318,6 @@ func (t *BatchTxPool) updateEOAActivityMetadata( from gethCommon.Address, nonce uint64, ) { - t.txMux.Lock() - defer t.txMux.Unlock() - // Update metadata for the last EOA activity only on successful add/submit. eoaActivity, _ := t.eoaActivityCache.Get(from) eoaActivity.lastSubmission = time.Now() From 6ccf8db049d8221a9c24d259f8c760bfc8a57de0 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Thu, 4 Dec 2025 15:15:10 +0200 Subject: [PATCH 15/23] Improve tests for tx submission with nonce validation --- tests/tx_batching_test.go | 54 ++++++++++++++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 7 deletions(-) diff --git a/tests/tx_batching_test.go b/tests/tx_batching_test.go index 1db7e5c3..057015dc 100644 --- a/tests/tx_batching_test.go +++ b/tests/tx_batching_test.go @@ -515,7 +515,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) { } func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) { - _, cfg, stop := setupGatewayNode(t) + emu, cfg, stop := setupGatewayNode(t) defer stop() rpcTester := &rpcTest{ @@ -528,16 +528,28 @@ func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) testAddr := common.HexToAddress("0x061B63D29332e4de81bD9F51A48609824CD113a8") nonces := []uint64{0, 1, 2, 3, 2, 3, 4, 5} + g := errgroup.Group{} + + startBlock, err := emu.GetLatestBlock() + require.NoError(t, err) + hashes := []common.Hash{} // transfer some funds to the test address transferAmount := int64(1_000_000_000) - for _, nonce := range nonces { - signed, _, err := evmSign(big.NewInt(transferAmount), 23_500, eoaKey, nonce, &testAddr, nil) - require.NoError(t, err) - txHash, err := rpcTester.sendRawTx(signed) - require.NoError(t, err) - hashes = append(hashes, txHash) + for range 3 { + g.Go(func() error { + for _, nonce := range nonces { + signed, _, err := evmSign(big.NewInt(transferAmount), 23_500, eoaKey, nonce, &testAddr, nil) + require.NoError(t, err) + + txHash, err := rpcTester.sendRawTx(signed) + require.NoError(t, err) + hashes = append(hashes, txHash) + } + + return nil + }) } expectedBalance := big.NewInt(6 * transferAmount) @@ -550,6 +562,34 @@ func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) return balance.Cmp(expectedBalance) == 0 }, time.Second*15, time.Second*1, "all transactions were not executed") + + endBlock, err := emu.GetLatestBlock() + require.NoError(t, err) + + blockEvents, err := emu.GetEventsForHeightRange( + "A.f8d6e0586b0a20c7.EVM.TransactionExecuted", + startBlock.Height+1, + endBlock.Height, + ) + + totalEVMEvents := 0 + for _, blockEvent := range blockEvents { + totalEVMEvents += len(blockEvent.Events) + } + assert.Equal(t, 6, totalEVMEvents) + + for i := startBlock.Height; i <= endBlock.Height; i++ { + block, err := emu.GetBlockByHeight(i) + require.NoError(t, err) + + txResults, err := emu.GetTransactionResultsByBlockID(block.ID()) + require.NoError(t, err) + + for _, txResult := range txResults { + // Assert that we have no errors on the submitted transactions + require.Empty(t, txResult.ErrorMessage) + } + } } func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) { From 35825546f97e70ba658031e11a2843cd634e132f Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Thu, 4 Dec 2025 18:33:10 +0200 Subject: [PATCH 16/23] Use context.WithTimeout in submitSingleTransaction method --- services/requester/batch_tx_pool.go | 70 +++++++++++++++++++---------- 1 file changed, 46 insertions(+), 24 deletions(-) diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index abda81fb..8830b1ae 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -284,34 +284,56 @@ func (t *BatchTxPool) submitSingleTransaction( ctx context.Context, hexEncodedTx cadence.String, ) error { - coinbaseAddress, err := cadence.NewString(t.config.Coinbase.Hex()) - if err != nil { - return err - } + done := make(chan struct{}) + var submitError error - script := replaceAddresses(runTxScript, t.config.FlowNetworkID) - flowTx, err := t.buildTransaction( - ctx, - t.getReferenceBlock(), - script, - cadence.NewArray([]cadence.Value{hexEncodedTx}), - coinbaseAddress, - ) - if err != nil { - // If there was any error during the transaction build - // process, we record it as a dropped transaction. - t.collector.TransactionsDropped(1) - return err - } + // This method is called while holding the `t.txMux` lock, + // don't let it run for a long time, to avoid lock-contention + ctx, cancel := context.WithTimeout(ctx, time.Second*3) + defer cancel() - if err := t.client.SendTransaction(ctx, *flowTx); err != nil { - // If there was any error while sending the transaction, - // we record it as a dropped transaction. - t.collector.TransactionsDropped(1) - return err + // build & submit transaction + go func() { + defer close(done) + + coinbaseAddress, err := cadence.NewString(t.config.Coinbase.Hex()) + if err != nil { + submitError = err + return + } + + script := replaceAddresses(runTxScript, t.config.FlowNetworkID) + flowTx, err := t.buildTransaction( + ctx, + t.getReferenceBlock(), + script, + cadence.NewArray([]cadence.Value{hexEncodedTx}), + coinbaseAddress, + ) + if err != nil { + // If there was any error during the transaction build + // process, we record it as a dropped transaction. + t.collector.TransactionsDropped(1) + submitError = err + return + } + + if err := t.client.SendTransaction(ctx, *flowTx); err != nil { + // If there was any error while sending the transaction, + // we record it as a dropped transaction. + t.collector.TransactionsDropped(1) + submitError = err + return + } + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-done: } - return nil + return submitError } func (t *BatchTxPool) updateEOAActivityMetadata( From 0c7403432f3d2e44e8e8ba933ec7cc15b900bd70 Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Sat, 27 Dec 2025 11:31:52 -0800 Subject: [PATCH 17/23] Set hardcoded end block for mainnet27 --- services/requester/cross-spork_client.go | 50 +++++++++++++++++++++--- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/services/requester/cross-spork_client.go b/services/requester/cross-spork_client.go index 60053663..5c0e92f3 100644 --- a/services/requester/cross-spork_client.go +++ b/services/requester/cross-spork_client.go @@ -15,6 +15,17 @@ import ( "go.uber.org/ratelimit" ) +const ( + // The following are hardcoded heights used to ensure gateways ignore blocks after the hard fork + // height even if they are returned by the upstream access node. + + // HardcodedMainnet27SporkRootHeight is the spork root block height (cadence) for the Mainnet 27 network. + HardcodedMainnet27SporkRootHeight = 130290659 + + // HardcodedMainnet27LastHeight is the hardcoded last block height (cadence) for the Mainnet 27 network. + HardcodedMainnet27LastHeight = 137363395 // TODO: confirm final height when ready +) + type sporkClient struct { firstHeight uint64 lastHeight uint64 @@ -32,6 +43,11 @@ func (s *sporkClient) GetEventsForHeightRange( ) ([]flow.BlockEvents, error) { s.getEventsForHeightRangeLimiter.Take() + // simplified check since the API will reject requests for startHeight > endHeight. + if endHeight > s.lastHeight { + return nil, errs.NewHeightOutOfRangeError(endHeight) + } + return s.client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight) } @@ -53,14 +69,27 @@ func (s *sporkClients) add(logger zerolog.Logger, client access.Client) error { return fmt.Errorf("could not get node info using the spork client: %w", err) } + lastHeight := header.Height + + // ensure that the last height is the expected block. + if info.SporkRootBlockHeight == HardcodedMainnet27SporkRootHeight && lastHeight != HardcodedMainnet27LastHeight { + logger.Warn(). + Uint64("nodeRootHeight", info.NodeRootBlockHeight). + Uint64("sporkRootHeight", info.SporkRootBlockHeight). + Uint64("lastHeight", lastHeight). + Uint64("hardcodedLastHeight", HardcodedMainnet27LastHeight). + Msg("access node returned unexpected last height. using hardcoded value.") + lastHeight = HardcodedMainnet27LastHeight + } + logger.Info(). Uint64("firstHeight", info.NodeRootBlockHeight). - Uint64("lastHeight", header.Height). + Uint64("lastHeight", lastHeight). Msg("adding spork client") *s = append(*s, &sporkClient{ firstHeight: info.NodeRootBlockHeight, - lastHeight: header.Height, + lastHeight: lastHeight, client: client, // TODO (JanezP): Make this configurable getEventsForHeightRangeLimiter: ratelimit.New(100, ratelimit.WithoutSlack), @@ -182,12 +211,19 @@ func (c *CrossSporkClient) getClientForHeight(height uint64) (access.Client, err // GetLatestHeightForSpork will determine the spork client in which the provided height is contained // and then find the latest height in that spork. func (c *CrossSporkClient) GetLatestHeightForSpork(ctx context.Context, height uint64) (uint64, error) { - client, err := c.getClientForHeight(height) - if err != nil { - return 0, err + if c.IsPastSpork(height) { + for _, spork := range c.sporkClients { + if spork.contains(height) { + // use the latest block height captured during bootstrapping. + return spork.lastHeight, nil + } + } + // otherwise, we don't have a spork client for the provided height. + return 0, errs.NewHeightOutOfRangeError(height) } - block, err := client.GetLatestBlockHeader(ctx, true) + // otherwise, return the latest height from the current spork. + block, err := c.Client.GetLatestBlockHeader(ctx, true) if err != nil { return 0, err } @@ -228,6 +264,8 @@ func (c *CrossSporkClient) SubscribeEventsByBlockHeight( if err != nil { return nil, nil, err } + + // TODO: we need to make sure that the AN ends the subscription at the last block return client.SubscribeEventsByBlockHeight(ctx, startHeight, filter, opts...) } From be9f5a133d289d7ef2a1d372ee114359d31a4f20 Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Sat, 27 Dec 2025 13:21:32 -0800 Subject: [PATCH 18/23] fix lint --- services/requester/cross-spork_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/requester/cross-spork_client.go b/services/requester/cross-spork_client.go index 5c0e92f3..38fa133d 100644 --- a/services/requester/cross-spork_client.go +++ b/services/requester/cross-spork_client.go @@ -223,7 +223,7 @@ func (c *CrossSporkClient) GetLatestHeightForSpork(ctx context.Context, height u } // otherwise, return the latest height from the current spork. - block, err := c.Client.GetLatestBlockHeader(ctx, true) + block, err := c.GetLatestBlockHeader(ctx, true) if err != nil { return 0, err } From 96ab645ddeb5dfb0c89dfb21c86031bd6e28d70c Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Sat, 27 Dec 2025 14:29:10 -0800 Subject: [PATCH 19/23] update to allow handle case where current client is mainnet27 --- services/ingestion/event_subscriber.go | 16 ++++++++++++++++ services/requester/cross-spork_client.go | 14 ++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/services/ingestion/event_subscriber.go b/services/ingestion/event_subscriber.go index ed86cd27..3c115f0f 100644 --- a/services/ingestion/event_subscriber.go +++ b/services/ingestion/event_subscriber.go @@ -134,6 +134,18 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha return eventsChan } + latestHeight, err := r.client.GetLatestHeightForSpork(ctx, height) + if err != nil { + err = fmt.Errorf("failed to get latest height for spork: %w", err) + eventsChan <- models.NewBlockEventsError(err) + close(eventsChan) + return eventsChan + } + + // if the resolved client for the height is the current spork client and the current spork client + // is for the mainnet 27 network, stop ingesting data from the stream at the hardcoded last height. + stopAtHardcodedLastHeight := latestHeight == requester.HardcodedMainnet27LastHeight + var blockEventsStream <-chan flow.BlockEvents var errChan <-chan error @@ -184,6 +196,10 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha return } + if stopAtHardcodedLastHeight && blockEvents.Height > requester.HardcodedMainnet27LastHeight { + continue // don't exit, otherwise the gateway will crash + } + evmEvents := models.NewSingleBlockEvents(blockEvents) // if events contain an error, or we are in a recovery mode if evmEvents.Err != nil || r.recovery { diff --git a/services/requester/cross-spork_client.go b/services/requester/cross-spork_client.go index 38fa133d..7aa0adc5 100644 --- a/services/requester/cross-spork_client.go +++ b/services/requester/cross-spork_client.go @@ -227,6 +227,12 @@ func (c *CrossSporkClient) GetLatestHeightForSpork(ctx context.Context, height u if err != nil { return 0, err } + + // if the current spork client is the mainnet 27 network and node reported a higher height, fix it to the hardcoded last height. + if c.currentSporkFirstHeight == HardcodedMainnet27SporkRootHeight && block.Height > HardcodedMainnet27LastHeight { + return HardcodedMainnet27LastHeight, nil + } + return block.Height, nil } @@ -284,6 +290,14 @@ func (c *CrossSporkClient) GetEventsForHeightRange( if endClient != client { return nil, fmt.Errorf("invalid height range, end height %d is not in the same spork as start height %d", endHeight, startHeight) } + + // if the current spork client is the mainnet 27 network and the requested end height is after the hardcoded last height, + // fix it to the hardcoded last height. If the requested start height is after the hardcoded last height, the API + // will return an error since the range will be invalid. + if c.currentSporkFirstHeight == HardcodedMainnet27SporkRootHeight && endHeight > HardcodedMainnet27LastHeight { + endHeight = HardcodedMainnet27LastHeight + } + return client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight) } From b0444fdfee38bddf34b9a1117f90fdb42187b235 Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Sat, 27 Dec 2025 14:32:41 -0800 Subject: [PATCH 20/23] remove todo and improve godocs --- services/requester/cross-spork_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/requester/cross-spork_client.go b/services/requester/cross-spork_client.go index 7aa0adc5..15b42c25 100644 --- a/services/requester/cross-spork_client.go +++ b/services/requester/cross-spork_client.go @@ -229,6 +229,7 @@ func (c *CrossSporkClient) GetLatestHeightForSpork(ctx context.Context, height u } // if the current spork client is the mainnet 27 network and node reported a higher height, fix it to the hardcoded last height. + // allow the case where the AN is behind the last height in case it is catching up. if c.currentSporkFirstHeight == HardcodedMainnet27SporkRootHeight && block.Height > HardcodedMainnet27LastHeight { return HardcodedMainnet27LastHeight, nil } @@ -271,7 +272,6 @@ func (c *CrossSporkClient) SubscribeEventsByBlockHeight( return nil, nil, err } - // TODO: we need to make sure that the AN ends the subscription at the last block return client.SubscribeEventsByBlockHeight(ctx, startHeight, filter, opts...) } From a9243e22a154872c1a5ea69bcb1a077e80c59da6 Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Sat, 27 Dec 2025 14:52:35 -0800 Subject: [PATCH 21/23] use consistent mainnet27 check --- services/requester/cross-spork_client.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/services/requester/cross-spork_client.go b/services/requester/cross-spork_client.go index 15b42c25..479e45df 100644 --- a/services/requester/cross-spork_client.go +++ b/services/requester/cross-spork_client.go @@ -185,6 +185,13 @@ func (c *CrossSporkClient) IsPastSpork(height uint64) bool { return height < c.currentSporkFirstHeight } +// IsMainnet27Client returns true if the current spork client is for the mainnet 27 network. +func (c *CrossSporkClient) IsMainnet27Client() bool { + // currentSporkFirstHeight is the node's root block, which is not the same as the spork root block if the node was + // bootstrapped after the spork. This check works for nodes bootstrapped during or after the mainnet 27 spork. + return c.currentSporkFirstHeight >= HardcodedMainnet27SporkRootHeight && c.currentSporkFirstHeight <= HardcodedMainnet27LastHeight +} + // getClientForHeight returns the client for the given height that contains the height range. // // If the height is not contained in any of the past spork clients we return an error. @@ -230,7 +237,7 @@ func (c *CrossSporkClient) GetLatestHeightForSpork(ctx context.Context, height u // if the current spork client is the mainnet 27 network and node reported a higher height, fix it to the hardcoded last height. // allow the case where the AN is behind the last height in case it is catching up. - if c.currentSporkFirstHeight == HardcodedMainnet27SporkRootHeight && block.Height > HardcodedMainnet27LastHeight { + if c.IsMainnet27Client() && block.Height > HardcodedMainnet27LastHeight { return HardcodedMainnet27LastHeight, nil } @@ -294,7 +301,7 @@ func (c *CrossSporkClient) GetEventsForHeightRange( // if the current spork client is the mainnet 27 network and the requested end height is after the hardcoded last height, // fix it to the hardcoded last height. If the requested start height is after the hardcoded last height, the API // will return an error since the range will be invalid. - if c.currentSporkFirstHeight == HardcodedMainnet27SporkRootHeight && endHeight > HardcodedMainnet27LastHeight { + if c.IsMainnet27Client() && endHeight > HardcodedMainnet27LastHeight { endHeight = HardcodedMainnet27LastHeight } From 05ce10652a089cc538b3d135501cb7170c66ce8c Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Sat, 27 Dec 2025 16:00:37 -0800 Subject: [PATCH 22/23] allow soft-finality flags --- cmd/run/cmd.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/cmd/run/cmd.go b/cmd/run/cmd.go index 871b3578..696c615f 100644 --- a/cmd/run/cmd.go +++ b/cmd/run/cmd.go @@ -293,6 +293,13 @@ func init() { Cmd.Flags().DurationVar(&cfg.EOAActivityCacheTTL, "eoa-activity-cache-ttl", time.Second*10, "Time interval used to track EOA activity. Tx send more frequently than this interval will be batched. Useful only when batch transaction submission is enabled.") Cmd.Flags().DurationVar(&cfg.RpcRequestTimeout, "rpc-request-timeout", time.Second*120, "Sets the maximum duration at which JSON-RPC requests should generate a response, before they timeout. The default is 120 seconds.") + // soft finality specific flags. include them as disabled to allow operators to run this image without any config changes + var experimentalSoftFinalityEnabled, experimentalSealingVerificationEnabled bool + Cmd.Flags().BoolVar(&experimentalSoftFinalityEnabled, "experimental-soft-finality-enabled", false, "feature disabled in this release") + Cmd.Flags().BoolVar(&experimentalSealingVerificationEnabled, "experimental-sealing-verification-enabled", false, "feature disabled in this release") + Cmd.Flags().MarkHidden("experimental-soft-finality-enabled") + Cmd.Flags().MarkHidden("experimental-sealing-verification-enabled") + err := Cmd.Flags().MarkDeprecated("init-cadence-height", "This flag is no longer necessary and will be removed in future version. The initial Cadence height is known for testnet/mainnet and this was only required for fresh deployments of EVM Gateway. Once the DB has been initialized, the latest index Cadence height will be used upon start-up.") if err != nil { panic(err) From 3586cced5829cbc44ff6df845786e6d7e574e895 Mon Sep 17 00:00:00 2001 From: Peter Argue <89119817+peterargue@users.noreply.github.com> Date: Sat, 27 Dec 2025 16:20:11 -0800 Subject: [PATCH 23/23] fix lint --- cmd/run/cmd.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/run/cmd.go b/cmd/run/cmd.go index 696c615f..31530b71 100644 --- a/cmd/run/cmd.go +++ b/cmd/run/cmd.go @@ -297,8 +297,8 @@ func init() { var experimentalSoftFinalityEnabled, experimentalSealingVerificationEnabled bool Cmd.Flags().BoolVar(&experimentalSoftFinalityEnabled, "experimental-soft-finality-enabled", false, "feature disabled in this release") Cmd.Flags().BoolVar(&experimentalSealingVerificationEnabled, "experimental-sealing-verification-enabled", false, "feature disabled in this release") - Cmd.Flags().MarkHidden("experimental-soft-finality-enabled") - Cmd.Flags().MarkHidden("experimental-sealing-verification-enabled") + _ = Cmd.Flags().MarkHidden("experimental-soft-finality-enabled") + _ = Cmd.Flags().MarkHidden("experimental-sealing-verification-enabled") err := Cmd.Flags().MarkDeprecated("init-cadence-height", "This flag is no longer necessary and will be removed in future version. The initial Cadence height is known for testnet/mainnet and this was only required for fresh deployments of EVM Gateway. Once the DB has been initialized, the latest index Cadence height will be used upon start-up.") if err != nil {