Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions pkg/txm/storage/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ const (
// pruneSubset controls the subset of confirmed transactions to prune when the structure reaches its max limit.
// i.e. if the value is 3 and the limit is 90, 30 transactions will be pruned.
pruneSubset = 3
// TODO: This was chosen arbitrarily, we should determine a better value based on expected usage.
pruneUnstartedTxDuration = 1 * time.Hour
)

type InMemoryStore struct {
Expand Down Expand Up @@ -269,6 +271,27 @@ func (m *InMemoryStore) UpdateTransactionBroadcast(txID uint64, txNonce uint64,
return nil
}

// Shouldn't call lock because it's being called by a method that already has the lock
func (m *InMemoryStore) pruneUnstartedTransactionsWithinDuration(threshold time.Duration) []uint64 {
var txIDsToPrune []uint64
idxTxToRetain := 0
for ; idxTxToRetain < len(m.UnstartedTransactions); idxTxToRetain++ {
tx := m.UnstartedTransactions[idxTxToRetain]
if time.Since(tx.CreatedAt) < threshold {
break
}
txIDsToPrune = append(txIDsToPrune, tx.ID)
delete(m.Transactions, tx.ID)
m.UnstartedTransactions[idxTxToRetain] = nil // prevent memory leak
}
if len(txIDsToPrune) == 0 {
return nil
}
m.UnstartedTransactions = m.UnstartedTransactions[idxTxToRetain:]
sort.Slice(txIDsToPrune, func(i, j int) bool { return txIDsToPrune[i] < txIDsToPrune[j] })
return txIDsToPrune
}

Comment on lines 273 to 287
Copy link
Contributor

@dimriou dimriou Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the work 👍 . Here's a more idiomatic way for future reference:

Suggested change
// Shouldn't call lock because it's being called by a method that already has the lock
func (m *InMemoryStore) pruneUnstartedTransactionsWithinDuration(threshold time.Duration) []uint64 {
var txIDsToPrune []uint64
idxTxToRetain := 0
for ; idxTxToRetain < len(m.UnstartedTransactions); idxTxToRetain++ {
tx := m.UnstartedTransactions[idxTxToRetain]
if time.Since(tx.CreatedAt) < threshold {
break
}
txIDsToPrune = append(txIDsToPrune, tx.ID)
delete(m.Transactions, tx.ID)
m.UnstartedTransactions[idxTxToRetain] = nil // prevent memory leak
}
if len(txIDsToPrune) == 0 {
return nil
}
m.UnstartedTransactions = m.UnstartedTransactions[idxTxToRetain:]
sort.Slice(txIDsToPrune, func(i, j int) bool { return txIDsToPrune[i] < txIDsToPrune[j] })
return txIDsToPrune
}
// Shouldn't call lock because it's being called by a method that already has the lock
func (m *InMemoryStore) pruneUnstartedTransactionsWithinDuration(threshold time.Duration) (txIDsToPrune []uint64) {
for i, tx := range m.UnstartedTransactions {
if time.Since(tx.CreatedAt) < threshold {
m.UnstartedTransactions = m.UnstartedTransactions[i:]
return txIDsToPrune // you can sort before this if you want to
}
txIDsToPrune = append(txIDsToPrune, tx.ID)
delete(m.Transactions, tx.ID)
m.UnstartedTransactions[i] = nil // prevent memory leak
}
m.UnstartedTransactions = m.UnstartedTransactions[:0]
return
}

func (m *InMemoryStore) UpdateUnstartedTransactionWithNonce(nonce uint64) (*types.Transaction, error) {
m.Lock()
defer m.Unlock()
Expand All @@ -277,6 +300,11 @@ func (m *InMemoryStore) UpdateUnstartedTransactionWithNonce(nonce uint64) (*type
m.lggr.Debugf("Unstarted transactions queue is empty for address: %v", m.address)
return nil, nil
}
prunedTxIDs := m.pruneUnstartedTransactionsWithinDuration(pruneUnstartedTxDuration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will panic. Needs to be before the check on line 299.

if prunedTxIDs != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's safer to check like this:

Suggested change
if prunedTxIDs != nil {
if len(prunedTxIDs) != 0 {

because depending on the implementation, a slice could be != nil, but still have 0 elements. len method is safe to use even for nil slices.

m.lggr.Debugf("Unstarted transactions map for address: %v exceeds cutoff time of: %s. Pruned %d oldest unstarted transactions. TxIDs: %v",
m.address, pruneUnstartedTxDuration, len(prunedTxIDs), prunedTxIDs)
}

if tx, exists := m.UnconfirmedTransactions[nonce]; exists {
return nil, fmt.Errorf("an unconfirmed tx with the same nonce already exists: %v", tx)
Expand Down
21 changes: 20 additions & 1 deletion pkg/txm/storage/inmemory_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,21 @@ func TestUpdateUnstartedTransactionWithNonce(t *testing.T) {
assert.Equal(t, txmgr.TxUnconfirmed, tx.State)
assert.Empty(t, m.UnstartedTransactions)
})

t.Run("prunes unstarted transaction beyond the cutoff duration", func(t *testing.T) {
var nonce uint64
m := NewInMemoryStore(logger.Test(t), fromAddress, testutils.FixtureChainID)
insertUnstartedTransactionCreatedAt(m, time.Now().Add(-3*pruneUnstartedTxDuration))
insertUnstartedTransactionCreatedAt(m, time.Now().Add(-2*pruneUnstartedTxDuration))
insertUnstartedTransactionCreatedAt(m, time.Now().Add(-1*pruneUnstartedTxDuration))
insertUnstartedTransaction(m)

tx, err := m.UpdateUnstartedTransactionWithNonce(nonce)
require.NoError(t, err)
assert.Equal(t, nonce, *tx.Nonce)
assert.Equal(t, txmgr.TxUnconfirmed, tx.State)
assert.Empty(t, m.UnstartedTransactions)
})
}

func TestDeleteAttemptForUnconfirmedTx(t *testing.T) {
Expand Down Expand Up @@ -461,6 +476,10 @@ func TestPruneConfirmedTransactions(t *testing.T) {
}

func insertUnstartedTransaction(m *InMemoryStore) *types.Transaction {
return insertUnstartedTransactionCreatedAt(m, time.Now())
}

func insertUnstartedTransactionCreatedAt(m *InMemoryStore, createdAt time.Time) *types.Transaction {
m.Lock()
defer m.Unlock()

Expand All @@ -474,7 +493,7 @@ func insertUnstartedTransaction(m *InMemoryStore) *types.Transaction {
ToAddress: testutils.NewAddress(),
Value: big.NewInt(0),
SpecifiedGasLimit: 0,
CreatedAt: time.Now(),
CreatedAt: createdAt,
State: txmgr.TxUnstarted,
}

Expand Down
Loading