Skip to content

Commit 28edd01

Browse files
committed
Merge branch 'mpeter/submitted-tx-validations' into mpeter/backport-submitted-tx-validations
2 parents 9917917 + 3582554 commit 28edd01

2 files changed

Lines changed: 96 additions & 38 deletions

File tree

services/requester/batch_tx_pool.go

Lines changed: 49 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ func (t *BatchTxPool) Add(
124124
return err
125125
}
126126

127+
t.txMux.Lock()
128+
defer t.txMux.Unlock()
129+
127130
eoaActivity, found := t.eoaActivityCache.Get(from)
128131
nonce := tx.Nonce()
129132

@@ -162,13 +165,11 @@ func (t *BatchTxPool) Add(
162165
// If the EOA has pooled transactions, which are not yet processed,
163166
// due to congestion or anything, make sure to include the current
164167
// tx on that batch.
165-
t.txMux.Lock()
166168
hasBatch := len(t.pooledTxs[from]) > 0
167169
if hasBatch {
168170
userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce}
169171
t.pooledTxs[from] = append(t.pooledTxs[from], userTx)
170172
}
171-
t.txMux.Unlock()
172173

173174
// If it wasn't batched, submit individually
174175
if !hasBatch {
@@ -177,10 +178,8 @@ func (t *BatchTxPool) Add(
177178
}
178179
} else {
179180
// Case 3. EOA activity found AND it was less than [X] seconds ago:
180-
t.txMux.Lock()
181181
userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce}
182182
t.pooledTxs[from] = append(t.pooledTxs[from], userTx)
183-
t.txMux.Unlock()
184183
}
185184

186185
if err != nil {
@@ -285,43 +284,62 @@ func (t *BatchTxPool) submitSingleTransaction(
285284
ctx context.Context,
286285
hexEncodedTx cadence.String,
287286
) error {
288-
coinbaseAddress, err := cadence.NewString(t.config.Coinbase.Hex())
289-
if err != nil {
290-
return err
291-
}
287+
done := make(chan struct{})
288+
var submitError error
292289

293-
script := replaceAddresses(runTxScript, t.config.FlowNetworkID)
294-
flowTx, err := t.buildTransaction(
295-
ctx,
296-
t.getReferenceBlock(),
297-
script,
298-
cadence.NewArray([]cadence.Value{hexEncodedTx}),
299-
coinbaseAddress,
300-
)
301-
if err != nil {
302-
// If there was any error during the transaction build
303-
// process, we record it as a dropped transaction.
304-
t.collector.TransactionsDropped(1)
305-
return err
306-
}
290+
// This method is called while holding the `t.txMux` lock,
291+
// don't let it run for a long time, to avoid lock-contention
292+
ctx, cancel := context.WithTimeout(ctx, time.Second*3)
293+
defer cancel()
307294

308-
if err := t.client.SendTransaction(ctx, *flowTx); err != nil {
309-
// If there was any error while sending the transaction,
310-
// we record it as a dropped transaction.
311-
t.collector.TransactionsDropped(1)
312-
return err
295+
// build & submit transaction
296+
go func() {
297+
defer close(done)
298+
299+
coinbaseAddress, err := cadence.NewString(t.config.Coinbase.Hex())
300+
if err != nil {
301+
submitError = err
302+
return
303+
}
304+
305+
script := replaceAddresses(runTxScript, t.config.FlowNetworkID)
306+
flowTx, err := t.buildTransaction(
307+
ctx,
308+
t.getReferenceBlock(),
309+
script,
310+
cadence.NewArray([]cadence.Value{hexEncodedTx}),
311+
coinbaseAddress,
312+
)
313+
if err != nil {
314+
// If there was any error during the transaction build
315+
// process, we record it as a dropped transaction.
316+
t.collector.TransactionsDropped(1)
317+
submitError = err
318+
return
319+
}
320+
321+
if err := t.client.SendTransaction(ctx, *flowTx); err != nil {
322+
// If there was any error while sending the transaction,
323+
// we record it as a dropped transaction.
324+
t.collector.TransactionsDropped(1)
325+
submitError = err
326+
return
327+
}
328+
}()
329+
330+
select {
331+
case <-ctx.Done():
332+
return ctx.Err()
333+
case <-done:
313334
}
314335

315-
return nil
336+
return submitError
316337
}
317338

318339
func (t *BatchTxPool) updateEOAActivityMetadata(
319340
from gethCommon.Address,
320341
nonce uint64,
321342
) {
322-
t.txMux.Lock()
323-
defer t.txMux.Unlock()
324-
325343
// Update metadata for the last EOA activity only on successful add/submit.
326344
eoaActivity, _ := t.eoaActivityCache.Get(from)
327345
eoaActivity.lastSubmission = time.Now()

tests/tx_batching_test.go

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) {
515515
}
516516

517517
func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) {
518-
_, cfg, stop := setupGatewayNode(t)
518+
emu, cfg, stop := setupGatewayNode(t)
519519
defer stop()
520520

521521
rpcTester := &rpcTest{
@@ -528,16 +528,28 @@ func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T)
528528
testAddr := common.HexToAddress("0x061B63D29332e4de81bD9F51A48609824CD113a8")
529529
nonces := []uint64{0, 1, 2, 3, 2, 3, 4, 5}
530530

531+
g := errgroup.Group{}
532+
533+
startBlock, err := emu.GetLatestBlock()
534+
require.NoError(t, err)
535+
531536
hashes := []common.Hash{}
532537
// transfer some funds to the test address
533538
transferAmount := int64(1_000_000_000)
534-
for _, nonce := range nonces {
535-
signed, _, err := evmSign(big.NewInt(transferAmount), 23_500, eoaKey, nonce, &testAddr, nil)
536-
require.NoError(t, err)
537539

538-
txHash, err := rpcTester.sendRawTx(signed)
539-
require.NoError(t, err)
540-
hashes = append(hashes, txHash)
540+
for range 3 {
541+
g.Go(func() error {
542+
for _, nonce := range nonces {
543+
signed, _, err := evmSign(big.NewInt(transferAmount), 23_500, eoaKey, nonce, &testAddr, nil)
544+
require.NoError(t, err)
545+
546+
txHash, err := rpcTester.sendRawTx(signed)
547+
require.NoError(t, err)
548+
hashes = append(hashes, txHash)
549+
}
550+
551+
return nil
552+
})
541553
}
542554

543555
expectedBalance := big.NewInt(6 * transferAmount)
@@ -550,6 +562,34 @@ func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T)
550562

551563
return balance.Cmp(expectedBalance) == 0
552564
}, time.Second*15, time.Second*1, "all transactions were not executed")
565+
566+
endBlock, err := emu.GetLatestBlock()
567+
require.NoError(t, err)
568+
569+
blockEvents, err := emu.GetEventsForHeightRange(
570+
"A.f8d6e0586b0a20c7.EVM.TransactionExecuted",
571+
startBlock.Height+1,
572+
endBlock.Height,
573+
)
574+
575+
totalEVMEvents := 0
576+
for _, blockEvent := range blockEvents {
577+
totalEVMEvents += len(blockEvent.Events)
578+
}
579+
assert.Equal(t, 6, totalEVMEvents)
580+
581+
for i := startBlock.Height; i <= endBlock.Height; i++ {
582+
block, err := emu.GetBlockByHeight(i)
583+
require.NoError(t, err)
584+
585+
txResults, err := emu.GetTransactionResultsByBlockID(block.ID())
586+
require.NoError(t, err)
587+
588+
for _, txResult := range txResults {
589+
// Assert that we have no errors on the submitted transactions
590+
require.Empty(t, txResult.ErrorMessage)
591+
}
592+
}
553593
}
554594

555595
func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) {

0 commit comments

Comments
 (0)