diff --git a/api/utils.go b/api/utils.go index dc9d2ddd..9723e637 100644 --- a/api/utils.go +++ b/api/utils.go @@ -29,9 +29,6 @@ func resolveBlockTag( if number, ok := blockNumberOrHash.Number(); ok { height, err := resolveBlockNumber(number, blocksDB) if err != nil { - logger.Error().Err(err). - Stringer("block_number", number). - Msg("failed to resolve block by number") return 0, err } return height, nil @@ -40,9 +37,6 @@ func resolveBlockTag( if hash, ok := blockNumberOrHash.Hash(); ok { height, err := blocksDB.GetHeightByID(hash) if err != nil { - logger.Error().Err(err). - Stringer("block_hash", hash). - Msg("failed to resolve block by hash") return 0, err } return height, nil diff --git a/cmd/run/cmd.go b/cmd/run/cmd.go index 871b3578..31530b71 100644 --- a/cmd/run/cmd.go +++ b/cmd/run/cmd.go @@ -293,6 +293,13 @@ func init() { Cmd.Flags().DurationVar(&cfg.EOAActivityCacheTTL, "eoa-activity-cache-ttl", time.Second*10, "Time interval used to track EOA activity. Tx send more frequently than this interval will be batched. Useful only when batch transaction submission is enabled.") Cmd.Flags().DurationVar(&cfg.RpcRequestTimeout, "rpc-request-timeout", time.Second*120, "Sets the maximum duration at which JSON-RPC requests should generate a response, before they timeout. The default is 120 seconds.") + // soft finality specific flags. include them as disabled to allow operators to run this image without any config changes + var experimentalSoftFinalityEnabled, experimentalSealingVerificationEnabled bool + Cmd.Flags().BoolVar(&experimentalSoftFinalityEnabled, "experimental-soft-finality-enabled", false, "feature disabled in this release") + Cmd.Flags().BoolVar(&experimentalSealingVerificationEnabled, "experimental-sealing-verification-enabled", false, "feature disabled in this release") + _ = Cmd.Flags().MarkHidden("experimental-soft-finality-enabled") + _ = Cmd.Flags().MarkHidden("experimental-sealing-verification-enabled") + err := Cmd.Flags().MarkDeprecated("init-cadence-height", "This flag is no longer necessary and will be removed in future version. The initial Cadence height is known for testnet/mainnet and this was only required for fresh deployments of EVM Gateway. Once the DB has been initialized, the latest index Cadence height will be used upon start-up.") if err != nil { panic(err) diff --git a/models/errors/errors.go b/models/errors/errors.go index 614c7403..fb0f38d6 100644 --- a/models/errors/errors.go +++ b/models/errors/errors.go @@ -28,9 +28,8 @@ var ( // Transaction errors - ErrFailedTransaction = errors.New("failed transaction") - ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction) - ErrDuplicateTransaction = fmt.Errorf("%w: %s", ErrInvalid, "transaction already in pool") + ErrFailedTransaction = errors.New("failed transaction") + ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction) // Storage errors diff --git a/services/ingestion/event_subscriber.go b/services/ingestion/event_subscriber.go index ed86cd27..3c115f0f 100644 --- a/services/ingestion/event_subscriber.go +++ b/services/ingestion/event_subscriber.go @@ -134,6 +134,18 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha return eventsChan } + latestHeight, err := r.client.GetLatestHeightForSpork(ctx, height) + if err != nil { + err = fmt.Errorf("failed to get latest height for spork: %w", err) + eventsChan <- models.NewBlockEventsError(err) + close(eventsChan) + return eventsChan + } + + // if the resolved client for the height is the current spork client and the current spork client + // is for the mainnet 27 network, stop ingesting data from the stream at the hardcoded last height. + stopAtHardcodedLastHeight := latestHeight == requester.HardcodedMainnet27LastHeight + var blockEventsStream <-chan flow.BlockEvents var errChan <-chan error @@ -184,6 +196,10 @@ func (r *RPCEventSubscriber) subscribe(ctx context.Context, height uint64) <-cha return } + if stopAtHardcodedLastHeight && blockEvents.Height > requester.HardcodedMainnet27LastHeight { + continue // don't exit, otherwise the gateway will crash + } + evmEvents := models.NewSingleBlockEvents(blockEvents) // if events contain an error, or we are in a recovery mode if evmEvents.Err != nil || r.recovery { diff --git a/services/requester/batch_tx_pool.go b/services/requester/batch_tx_pool.go index d82ae258..8830b1ae 100644 --- a/services/requester/batch_tx_pool.go +++ b/services/requester/batch_tx_pool.go @@ -18,39 +18,45 @@ import ( "github.com/onflow/flow-evm-gateway/config" "github.com/onflow/flow-evm-gateway/metrics" "github.com/onflow/flow-evm-gateway/models" - errs "github.com/onflow/flow-evm-gateway/models/errors" "github.com/onflow/flow-evm-gateway/services/requester/keystore" ) -const eoaActivityCacheSize = 10_000 +const ( + eoaActivityCacheSize = 10_000 + maxTrackedTxNoncesPerEOA = 30 +) type pooledEvmTx struct { txPayload cadence.String - txHash gethCommon.Hash nonce uint64 } -// BatchTxPool is a `TxPool` implementation that collects and groups -// transactions based on their EOA signer, and submits them for execution -// using a batch. +type eoaActivityMetadata struct { + lastSubmission time.Time + txNonces []uint64 +} + +// BatchTxPool is a `TxPool` implementation that groups incoming transactions +// based on their EOA signer, and submits them for execution using a batch. // // The underlying Cadence EVM API used, is `EVM.batchRun`, instead of the // `EVM.run` used in `SingleTxPool`. // // The main advantage of this implementation over the `SingleTxPool`, is the -// guarantee that transactions originated from the same EOA address, which -// arrive in a short time interval (about the same as Flow's block production rate), -// will be executed in the same order their arrived. -// This helps to reduce the nonce mismatch errors which mainly occur from the -// re-ordering of Cadence transactions that happens from Collection nodes. +// guarantee that transactions originating from the same EOA address, which +// arrive in a short time interval (configurable by the node operator), +// will be executed in the same order they arrived. +// This helps to reduce the execution errors which may occur from the +// re-ordering of Cadence transactions that happens on Collection nodes. type BatchTxPool struct { *SingleTxPool - pooledTxs map[gethCommon.Address][]pooledEvmTx - txMux sync.Mutex - eoaActivity *expirable.LRU[gethCommon.Address, time.Time] + + pooledTxs map[gethCommon.Address][]pooledEvmTx + txMux sync.Mutex + eoaActivityCache *expirable.LRU[gethCommon.Address, eoaActivityMetadata] } -var _ TxPool = &BatchTxPool{} +var _ TxPool = (*BatchTxPool)(nil) func NewBatchTxPool( ctx context.Context, @@ -77,16 +83,16 @@ func NewBatchTxPool( return nil, err } - eoaActivity := expirable.NewLRU[gethCommon.Address, time.Time]( + eoaActivityCache := expirable.NewLRU[gethCommon.Address, eoaActivityMetadata]( eoaActivityCacheSize, nil, config.EOAActivityCacheTTL, ) batchPool := &BatchTxPool{ - SingleTxPool: singleTxPool, - pooledTxs: make(map[gethCommon.Address][]pooledEvmTx), - txMux: sync.Mutex{}, - eoaActivity: eoaActivity, + SingleTxPool: singleTxPool, + pooledTxs: make(map[gethCommon.Address][]pooledEvmTx), + txMux: sync.Mutex{}, + eoaActivityCache: eoaActivityCache, } go batchPool.processPooledTransactions(ctx) @@ -104,11 +110,6 @@ func (t *BatchTxPool) Add( ) error { t.txPublisher.Publish(tx) // publish pending transaction event - // tx adding should be blocking, so we don't have races when - // pooled transactions are being processed in the background. - t.txMux.Lock() - defer t.txMux.Unlock() - from, err := models.DeriveTxSender(tx) if err != nil { return err @@ -123,6 +124,23 @@ func (t *BatchTxPool) Add( return err } + t.txMux.Lock() + defer t.txMux.Unlock() + + eoaActivity, found := t.eoaActivityCache.Get(from) + nonce := tx.Nonce() + + // Skip transactions that have been already submitted, + // as they are *likely* to fail. + if found && slices.Contains(eoaActivity.txNonces, nonce) { + t.logger.Info(). + Str("evm_tx", tx.Hash().Hex()). + Str("from", from.Hex()). + Uint64("nonce", nonce). + Msg("tx with same nonce has been already submitted") + return nil + } + // Scenarios // 1. EOA activity not found: // => We send the transaction individually, without adding it @@ -140,27 +158,41 @@ func (t *BatchTxPool) Add( // For all 3 cases, we record the activity time for the next // transactions that might come from the same EOA. // [X] is equal to the configured `TxBatchInterval` duration. - lastActivityTime, found := t.eoaActivity.Get(from) - if !found { // Case 1. EOA activity not found: err = t.submitSingleTransaction(ctx, hexEncodedTx) - } else if time.Since(lastActivityTime) > t.config.TxBatchInterval { - // Case 2. EOA activity found AND it was more than [X] seconds ago: - err = t.submitSingleTransaction(ctx, hexEncodedTx) + } else if time.Since(eoaActivity.lastSubmission) > t.config.TxBatchInterval { + // If the EOA has pooled transactions, which are not yet processed, + // due to congestion or anything, make sure to include the current + // tx on that batch. + hasBatch := len(t.pooledTxs[from]) > 0 + if hasBatch { + userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce} + t.pooledTxs[from] = append(t.pooledTxs[from], userTx) + } + + // If it wasn't batched, submit individually + if !hasBatch { + // Case 2. EOA activity found AND it was more than [X] seconds ago: + err = t.submitSingleTransaction(ctx, hexEncodedTx) + } } else { // Case 3. EOA activity found AND it was less than [X] seconds ago: - userTx := pooledEvmTx{txPayload: hexEncodedTx, txHash: tx.Hash(), nonce: tx.Nonce()} - // Prevent submission of duplicate transactions, based on their tx hash - if slices.Contains(t.pooledTxs[from], userTx) { - return errs.ErrDuplicateTransaction - } + userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce} t.pooledTxs[from] = append(t.pooledTxs[from], userTx) } - t.eoaActivity.Add(from, time.Now()) + if err != nil { + t.logger.Error().Err(err).Msgf( + "failed to submit single Flow transaction for EOA: %s", + from.Hex(), + ) + return err + } - return err + t.updateEOAActivityMetadata(from, nonce) + + return nil } func (t *BatchTxPool) processPooledTransactions(ctx context.Context) { @@ -188,10 +220,14 @@ func (t *BatchTxPool) processPooledTransactions(ctx context.Context) { ) if err != nil { t.logger.Error().Err(err).Msgf( - "failed to submit Flow transaction from BatchTxPool for EOA: %s", + "failed to submit batch Flow transaction for EOA: %s", address.Hex(), ) - continue + // In case of any error, add the transactions back to the pool, + // as a retry mechanism. + t.txMux.Lock() + t.pooledTxs[address] = append(t.pooledTxs[address], pooledTxs...) + t.txMux.Unlock() } } } @@ -235,6 +271,9 @@ func (t *BatchTxPool) batchSubmitTransactionsForSameAddress( } if err := t.client.SendTransaction(ctx, *flowTx); err != nil { + // If there was any error while sending the transaction, + // we record all transactions as dropped. + t.collector.TransactionsDropped(len(hexEncodedTxs)) return err } @@ -245,29 +284,72 @@ func (t *BatchTxPool) submitSingleTransaction( ctx context.Context, hexEncodedTx cadence.String, ) error { - coinbaseAddress, err := cadence.NewString(t.config.Coinbase.Hex()) - if err != nil { - return err - } + done := make(chan struct{}) + var submitError error - script := replaceAddresses(runTxScript, t.config.FlowNetworkID) - flowTx, err := t.buildTransaction( - ctx, - t.getReferenceBlock(), - script, - cadence.NewArray([]cadence.Value{hexEncodedTx}), - coinbaseAddress, - ) - if err != nil { - // If there was any error during the transaction build - // process, we record it as a dropped transaction. - t.collector.TransactionsDropped(1) - return err + // This method is called while holding the `t.txMux` lock, + // don't let it run for a long time, to avoid lock-contention + ctx, cancel := context.WithTimeout(ctx, time.Second*3) + defer cancel() + + // build & submit transaction + go func() { + defer close(done) + + coinbaseAddress, err := cadence.NewString(t.config.Coinbase.Hex()) + if err != nil { + submitError = err + return + } + + script := replaceAddresses(runTxScript, t.config.FlowNetworkID) + flowTx, err := t.buildTransaction( + ctx, + t.getReferenceBlock(), + script, + cadence.NewArray([]cadence.Value{hexEncodedTx}), + coinbaseAddress, + ) + if err != nil { + // If there was any error during the transaction build + // process, we record it as a dropped transaction. + t.collector.TransactionsDropped(1) + submitError = err + return + } + + if err := t.client.SendTransaction(ctx, *flowTx); err != nil { + // If there was any error while sending the transaction, + // we record it as a dropped transaction. + t.collector.TransactionsDropped(1) + submitError = err + return + } + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-done: } - if err := t.client.SendTransaction(ctx, *flowTx); err != nil { - return err + return submitError +} + +func (t *BatchTxPool) updateEOAActivityMetadata( + from gethCommon.Address, + nonce uint64, +) { + // Update metadata for the last EOA activity only on successful add/submit. + eoaActivity, _ := t.eoaActivityCache.Get(from) + eoaActivity.lastSubmission = time.Now() + eoaActivity.txNonces = append(eoaActivity.txNonces, nonce) + // To avoid the slice of nonces from growing indefinitely, + // keep only the last `maxTrackedTxNoncesPerEOA` nonces. + if len(eoaActivity.txNonces) > maxTrackedTxNoncesPerEOA { + firstKeep := len(eoaActivity.txNonces) - maxTrackedTxNoncesPerEOA + eoaActivity.txNonces = eoaActivity.txNonces[firstKeep:] } - return nil + t.eoaActivityCache.Add(from, eoaActivity) } diff --git a/services/requester/cross-spork_client.go b/services/requester/cross-spork_client.go index 60053663..479e45df 100644 --- a/services/requester/cross-spork_client.go +++ b/services/requester/cross-spork_client.go @@ -15,6 +15,17 @@ import ( "go.uber.org/ratelimit" ) +const ( + // The following are hardcoded heights used to ensure gateways ignore blocks after the hard fork + // height even if they are returned by the upstream access node. + + // HardcodedMainnet27SporkRootHeight is the spork root block height (cadence) for the Mainnet 27 network. + HardcodedMainnet27SporkRootHeight = 130290659 + + // HardcodedMainnet27LastHeight is the hardcoded last block height (cadence) for the Mainnet 27 network. + HardcodedMainnet27LastHeight = 137363395 // TODO: confirm final height when ready +) + type sporkClient struct { firstHeight uint64 lastHeight uint64 @@ -32,6 +43,11 @@ func (s *sporkClient) GetEventsForHeightRange( ) ([]flow.BlockEvents, error) { s.getEventsForHeightRangeLimiter.Take() + // simplified check since the API will reject requests for startHeight > endHeight. + if endHeight > s.lastHeight { + return nil, errs.NewHeightOutOfRangeError(endHeight) + } + return s.client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight) } @@ -53,14 +69,27 @@ func (s *sporkClients) add(logger zerolog.Logger, client access.Client) error { return fmt.Errorf("could not get node info using the spork client: %w", err) } + lastHeight := header.Height + + // ensure that the last height is the expected block. + if info.SporkRootBlockHeight == HardcodedMainnet27SporkRootHeight && lastHeight != HardcodedMainnet27LastHeight { + logger.Warn(). + Uint64("nodeRootHeight", info.NodeRootBlockHeight). + Uint64("sporkRootHeight", info.SporkRootBlockHeight). + Uint64("lastHeight", lastHeight). + Uint64("hardcodedLastHeight", HardcodedMainnet27LastHeight). + Msg("access node returned unexpected last height. using hardcoded value.") + lastHeight = HardcodedMainnet27LastHeight + } + logger.Info(). Uint64("firstHeight", info.NodeRootBlockHeight). - Uint64("lastHeight", header.Height). + Uint64("lastHeight", lastHeight). Msg("adding spork client") *s = append(*s, &sporkClient{ firstHeight: info.NodeRootBlockHeight, - lastHeight: header.Height, + lastHeight: lastHeight, client: client, // TODO (JanezP): Make this configurable getEventsForHeightRangeLimiter: ratelimit.New(100, ratelimit.WithoutSlack), @@ -156,6 +185,13 @@ func (c *CrossSporkClient) IsPastSpork(height uint64) bool { return height < c.currentSporkFirstHeight } +// IsMainnet27Client returns true if the current spork client is for the mainnet 27 network. +func (c *CrossSporkClient) IsMainnet27Client() bool { + // currentSporkFirstHeight is the node's root block, which is not the same as the spork root block if the node was + // bootstrapped after the spork. This check works for nodes bootstrapped during or after the mainnet 27 spork. + return c.currentSporkFirstHeight >= HardcodedMainnet27SporkRootHeight && c.currentSporkFirstHeight <= HardcodedMainnet27LastHeight +} + // getClientForHeight returns the client for the given height that contains the height range. // // If the height is not contained in any of the past spork clients we return an error. @@ -182,15 +218,29 @@ func (c *CrossSporkClient) getClientForHeight(height uint64) (access.Client, err // GetLatestHeightForSpork will determine the spork client in which the provided height is contained // and then find the latest height in that spork. func (c *CrossSporkClient) GetLatestHeightForSpork(ctx context.Context, height uint64) (uint64, error) { - client, err := c.getClientForHeight(height) - if err != nil { - return 0, err + if c.IsPastSpork(height) { + for _, spork := range c.sporkClients { + if spork.contains(height) { + // use the latest block height captured during bootstrapping. + return spork.lastHeight, nil + } + } + // otherwise, we don't have a spork client for the provided height. + return 0, errs.NewHeightOutOfRangeError(height) } - block, err := client.GetLatestBlockHeader(ctx, true) + // otherwise, return the latest height from the current spork. + block, err := c.GetLatestBlockHeader(ctx, true) if err != nil { return 0, err } + + // if the current spork client is the mainnet 27 network and node reported a higher height, fix it to the hardcoded last height. + // allow the case where the AN is behind the last height in case it is catching up. + if c.IsMainnet27Client() && block.Height > HardcodedMainnet27LastHeight { + return HardcodedMainnet27LastHeight, nil + } + return block.Height, nil } @@ -228,6 +278,7 @@ func (c *CrossSporkClient) SubscribeEventsByBlockHeight( if err != nil { return nil, nil, err } + return client.SubscribeEventsByBlockHeight(ctx, startHeight, filter, opts...) } @@ -246,6 +297,14 @@ func (c *CrossSporkClient) GetEventsForHeightRange( if endClient != client { return nil, fmt.Errorf("invalid height range, end height %d is not in the same spork as start height %d", endHeight, startHeight) } + + // if the current spork client is the mainnet 27 network and the requested end height is after the hardcoded last height, + // fix it to the hardcoded last height. If the requested start height is after the hardcoded last height, the API + // will return an error since the range will be invalid. + if c.IsMainnet27Client() && endHeight > HardcodedMainnet27LastHeight { + endHeight = HardcodedMainnet27LastHeight + } + return client.GetEventsForHeightRange(ctx, eventType, startHeight, endHeight) } diff --git a/services/requester/requester.go b/services/requester/requester.go index 55c3440c..3006fbbe 100644 --- a/services/requester/requester.go +++ b/services/requester/requester.go @@ -238,7 +238,7 @@ func (e *EVM) SendRawTransaction(ctx context.Context, data []byte) (common.Hash, } e.logger.Info(). - Str("evm-id", tx.Hash().Hex()). + Str("evm_tx", tx.Hash().Hex()). Str("to", to). Str("from", from.Hex()). Str("value", tx.Value().String()). diff --git a/services/requester/single_tx_pool.go b/services/requester/single_tx_pool.go index e8de3e89..01120e37 100644 --- a/services/requester/single_tx_pool.go +++ b/services/requester/single_tx_pool.go @@ -31,7 +31,6 @@ type SingleTxPool struct { pool *sync.Map txPublisher *models.Publisher[*gethTypes.Transaction] config config.Config - mux sync.Mutex keystore *keystore.KeyStore collector metrics.Collector // referenceBlockHeader is stored atomically to avoid races @@ -40,7 +39,7 @@ type SingleTxPool struct { // todo add methods to inspect transaction pool state } -var _ TxPool = &SingleTxPool{} +var _ TxPool = (*SingleTxPool)(nil) func NewSingleTxPool( ctx context.Context, @@ -96,6 +95,11 @@ func (t *SingleTxPool) Add( ) error { t.txPublisher.Publish(tx) // publish pending transaction event + from, err := models.DeriveTxSender(tx) + if err != nil { + return err + } + txData, err := tx.MarshalBinary() if err != nil { return err @@ -121,10 +125,21 @@ func (t *SingleTxPool) Add( // If there was any error during the transaction build // process, we record it as a dropped transaction. t.collector.TransactionsDropped(1) + t.logger.Error().Err(err).Msgf( + "failed to build Flow transaction for EOA: %s", + from.Hex(), + ) return err } if err := t.client.SendTransaction(ctx, *flowTx); err != nil { + // If there was any error while sending the transaction, + // we record it as a dropped transaction. + t.collector.TransactionsDropped(1) + t.logger.Error().Err(err).Msgf( + "failed to submit Flow transaction for EOA: %s", + from.Hex(), + ) return err } @@ -150,8 +165,8 @@ func (t *SingleTxPool) Add( } t.logger.Error().Err(res.Error). - Str("flow-id", flowTx.ID().String()). - Str("evm-id", tx.Hash().Hex()). + Str("flow_tx", flowTx.ID().String()). + Str("evm_tx", tx.Hash().Hex()). Msg("flow transaction error") // hide specific cause since it's an implementation issue @@ -188,7 +203,11 @@ func (t *SingleTxPool) buildTransaction( } } - accKey, err := t.fetchSigningAccountKey() + // getting an account key from the `KeyStore` for signing transactions, + // *does not* need to be lock-protected, as the keys are read from a + // channel. No two go-routines will end up getting the same account + // key at the same time. + accKey, err := t.keystore.Take() if err != nil { return nil, err } @@ -211,16 +230,6 @@ func (t *SingleTxPool) buildTransaction( return flowTx, nil } -func (t *SingleTxPool) fetchSigningAccountKey() (*keystore.AccountKey, error) { - // getting an account key from the `KeyStore` for signing transactions, - // should be lock-protected, so that we don't sign any two Flow - // transactions with the same account key - t.mux.Lock() - defer t.mux.Unlock() - - return t.keystore.Take() -} - func (t *SingleTxPool) getReferenceBlock() *flow.BlockHeader { if v := t.referenceBlockHeader.Load(); v != nil { return v.(*flow.BlockHeader) diff --git a/tests/tx_batching_test.go b/tests/tx_batching_test.go index fd47c310..057015dc 100644 --- a/tests/tx_batching_test.go +++ b/tests/tx_batching_test.go @@ -332,7 +332,7 @@ func Test_MultipleTransactionSubmissionsWithinRecentInterval(t *testing.T) { // activity of the EOA was X seconds ago, where: // X = `cfg.TxBatchInterval`. // For the E2E tests the `cfg.TxBatchInterval` is equal - // to 2 seconds. + // to 2.5 seconds. for i := range uint64(2) { signed, _, err := evmSign( big.NewInt(500_000), @@ -446,7 +446,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) { // activity of the EOA was X seconds ago, where: // X = `cfg.TxBatchInterval`. // For the E2E tests the `cfg.TxBatchInterval` is equal - // to 2 seconds. + // to 2.5 seconds. for i := range uint64(2) { signed, _, err := evmSign( big.NewInt(500_000), @@ -514,8 +514,8 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) { ) } -func Test_MultipleTransactionSubmissionsWithDuplicates(t *testing.T) { - _, cfg, stop := setupGatewayNode(t) +func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) { + emu, cfg, stop := setupGatewayNode(t) defer stop() rpcTester := &rpcTest{ @@ -525,46 +525,71 @@ func Test_MultipleTransactionSubmissionsWithDuplicates(t *testing.T) { eoaKey, err := crypto.HexToECDSA(eoaTestPrivateKey) require.NoError(t, err) - testAddr := common.HexToAddress("55253ed90B70b96C73092D8680915aaF50081194") - nonce := uint64(0) - hashes := make([]common.Hash, 0) + testAddr := common.HexToAddress("0x061B63D29332e4de81bD9F51A48609824CD113a8") + nonces := []uint64{0, 1, 2, 3, 2, 3, 4, 5} - signed, _, err := evmSign(big.NewInt(10), 21000, eoaKey, nonce, &testAddr, nil) - require.NoError(t, err) + g := errgroup.Group{} - txHash, err := rpcTester.sendRawTx(signed) + startBlock, err := emu.GetLatestBlock() require.NoError(t, err) - hashes = append(hashes, txHash) - // Increment nonce for the duplicate test transactions that follow - nonce += 1 - dupSigned, _, err := evmSign(big.NewInt(10), 15_000_000, eoaKey, nonce, &testAddr, nil) - require.NoError(t, err) + hashes := []common.Hash{} + // transfer some funds to the test address + transferAmount := int64(1_000_000_000) - // Submit 5 identical transactions to test duplicate detection: - // the first should succeed, the rest should be rejected as duplicates. - for i := range 5 { - if i == 0 { - txHash, err := rpcTester.sendRawTx(dupSigned) - require.NoError(t, err) - hashes = append(hashes, txHash) - } else { - _, err := rpcTester.sendRawTx(dupSigned) - require.Error(t, err) - require.ErrorContains(t, err, "invalid: transaction already in pool") - } + for range 3 { + g.Go(func() error { + for _, nonce := range nonces { + signed, _, err := evmSign(big.NewInt(transferAmount), 23_500, eoaKey, nonce, &testAddr, nil) + require.NoError(t, err) + + txHash, err := rpcTester.sendRawTx(signed) + require.NoError(t, err) + hashes = append(hashes, txHash) + } + + return nil + }) } + expectedBalance := big.NewInt(6 * transferAmount) + assert.Eventually(t, func() bool { - for _, h := range hashes { - rcp, err := rpcTester.getReceipt(h.String()) - if err != nil || rcp == nil || rcp.Status != 1 { - return false - } + balance, err := rpcTester.getBalance(testAddr) + if err != nil { + return false } - return true + return balance.Cmp(expectedBalance) == 0 }, time.Second*15, time.Second*1, "all transactions were not executed") + + endBlock, err := emu.GetLatestBlock() + require.NoError(t, err) + + blockEvents, err := emu.GetEventsForHeightRange( + "A.f8d6e0586b0a20c7.EVM.TransactionExecuted", + startBlock.Height+1, + endBlock.Height, + ) + + totalEVMEvents := 0 + for _, blockEvent := range blockEvents { + totalEVMEvents += len(blockEvent.Events) + } + assert.Equal(t, 6, totalEVMEvents) + + for i := startBlock.Height; i <= endBlock.Height; i++ { + block, err := emu.GetBlockByHeight(i) + require.NoError(t, err) + + txResults, err := emu.GetTransactionResultsByBlockID(block.ID()) + require.NoError(t, err) + + for _, txResult := range txResults { + // Assert that we have no errors on the submitted transactions + require.Empty(t, txResult.ErrorMessage) + } + } } func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) { @@ -604,9 +629,9 @@ func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) { EnforceGasPrice: true, LogLevel: zerolog.DebugLevel, LogWriter: testLogWriter(), - TxStateValidation: config.TxSealValidation, + TxStateValidation: config.LocalIndexValidation, TxBatchMode: true, - TxBatchInterval: time.Second * 2, + TxBatchInterval: time.Millisecond * 2500, // 2.5 seconds, the same as mainnet } bootstrapDone := make(chan struct{}) @@ -617,6 +642,9 @@ func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) { require.NoError(t, err) }() + // Allow the Gateway to catch up on indexing + time.Sleep(time.Second * 2) + <-bootstrapDone return emu, cfg, func() {