diff --git a/pkg/txm/storage/inmemory_store.go b/pkg/txm/storage/inmemory_store.go index 9cd640c2cc..ab5d2098c7 100644 --- a/pkg/txm/storage/inmemory_store.go +++ b/pkg/txm/storage/inmemory_store.go @@ -20,7 +20,8 @@ const ( maxQueuedTransactions = 250 // pruneSubset controls the subset of confirmed transactions to prune when the structure reaches its max limit. // i.e. if the value is 3 and the limit is 90, 30 transactions will be pruned. - pruneSubset = 3 + pruneSubset = 3 + pruneUnstartedTxDuration = 2 * time.Hour ) type InMemoryStore struct { @@ -269,15 +270,34 @@ func (m *InMemoryStore) UpdateTransactionBroadcast(txID uint64, txNonce uint64, return nil } +// Shouldn't call lock because it's being called by a method that already has the lock +func (m *InMemoryStore) pruneUnstartedTransactionsWithinDuration(threshold time.Duration) (txIDsToPrune []uint64) { + for i, tx := range m.UnstartedTransactions { + if time.Since(tx.CreatedAt) < threshold { + m.UnstartedTransactions = m.UnstartedTransactions[i:] + return txIDsToPrune + } + txIDsToPrune = append(txIDsToPrune, tx.ID) + delete(m.Transactions, tx.ID) + m.UnstartedTransactions[i] = nil // prevent memory leak + } + m.UnstartedTransactions = m.UnstartedTransactions[:0] + return +} + func (m *InMemoryStore) UpdateUnstartedTransactionWithNonce(nonce uint64) (*types.Transaction, error) { m.Lock() defer m.Unlock() + prunedTxIDs := m.pruneUnstartedTransactionsWithinDuration(pruneUnstartedTxDuration) + if len(prunedTxIDs) != 0 { + m.lggr.Debugf("Unstarted transactions map for address: %v exceeds cutoff time of: %s. Pruned %d oldest unstarted transactions. TxIDs: %v", + m.address, pruneUnstartedTxDuration, len(prunedTxIDs), prunedTxIDs) + } if len(m.UnstartedTransactions) == 0 { m.lggr.Debugf("Unstarted transactions queue is empty for address: %v", m.address) return nil, nil } - if tx, exists := m.UnconfirmedTransactions[nonce]; exists { return nil, fmt.Errorf("an unconfirmed tx with the same nonce already exists: %v", tx) } diff --git a/pkg/txm/storage/inmemory_store_test.go b/pkg/txm/storage/inmemory_store_test.go index 4e7a370f23..1b1968ef93 100644 --- a/pkg/txm/storage/inmemory_store_test.go +++ b/pkg/txm/storage/inmemory_store_test.go @@ -385,6 +385,22 @@ func TestUpdateUnstartedTransactionWithNonce(t *testing.T) { assert.Equal(t, txmgr.TxUnconfirmed, tx.State) assert.Empty(t, m.UnstartedTransactions) }) + + t.Run("prunes unstarted transaction beyond the cutoff duration", func(t *testing.T) { + var nonce uint64 + m := NewInMemoryStore(logger.Test(t), fromAddress, testutils.FixtureChainID) + insertUnstartedTransactionCreatedAt(m, time.Now().Add(-3*pruneUnstartedTxDuration)) + insertUnstartedTransactionCreatedAt(m, time.Now().Add(-2*pruneUnstartedTxDuration)) + insertUnstartedTransactionCreatedAt(m, time.Now().Add(-1*pruneUnstartedTxDuration)) + insertUnstartedTransactionCreatedAt(m, time.Now().Add(pruneUnstartedTxDuration/-2)) + insertUnstartedTransaction(m) + + tx, err := m.UpdateUnstartedTransactionWithNonce(nonce) + require.NoError(t, err) + assert.Equal(t, nonce, *tx.Nonce) + assert.Equal(t, txmgr.TxUnconfirmed, tx.State) + assert.Len(t, m.UnstartedTransactions, 1) // only the most recent one remains + }) } func TestDeleteAttemptForUnconfirmedTx(t *testing.T) { @@ -461,6 +477,10 @@ func TestPruneConfirmedTransactions(t *testing.T) { } func insertUnstartedTransaction(m *InMemoryStore) *types.Transaction { + return insertUnstartedTransactionCreatedAt(m, time.Now()) +} + +func insertUnstartedTransactionCreatedAt(m *InMemoryStore, createdAt time.Time) *types.Transaction { m.Lock() defer m.Unlock() @@ -474,7 +494,7 @@ func insertUnstartedTransaction(m *InMemoryStore) *types.Transaction { ToAddress: testutils.NewAddress(), Value: big.NewInt(0), SpecifiedGasLimit: 0, - CreatedAt: time.Now(), + CreatedAt: createdAt, State: txmgr.TxUnstarted, }