Skip to content
4 changes: 4 additions & 0 deletions common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
"errors"

"github.com/multiversx/mx-chain-core-go/core"
)

Expand Down Expand Up @@ -85,3 +86,6 @@ var ErrInvalidCacheExpiry = errors.New("invalid cache expiry")

// ErrDBIsClosed is raised when the DB is closed
var ErrDBIsClosed = core.ErrDBIsClosed

// ErrNilEvictionHandler signals that a nil eviction handler has been provided
var ErrNilEvictionHandler = errors.New("nil eviction handler")
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/multiversx/mx-chain-storage-go
go 1.20

require (
github.com/gammazero/workerpool v1.1.3
github.com/hashicorp/golang-lru v0.6.0
github.com/multiversx/concurrent-map v0.1.4
github.com/multiversx/mx-chain-core-go v1.2.16
Expand All @@ -14,6 +15,7 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/denisbrodbeck/machineid v1.0.1 // indirect
github.com/gammazero/deque v0.2.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4
github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/gammazero/deque v0.2.0 h1:SkieyNB4bg2/uZZLxvya0Pq6diUlwx7m2TeT7GAIWaA=
github.com/gammazero/deque v0.2.0/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44HWy9Q=
github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down Expand Up @@ -71,6 +75,7 @@ github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d h1:vfofYNRScrDd
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d/go.mod h1:RRCYJbIwD5jmqPI9XoAFR0OcDxqUctll6zUj/+B4S48=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down
18 changes: 18 additions & 0 deletions testscommon/evictionNotifierStub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package testscommon

// EvictionNotifierStub -
type EvictionNotifierStub struct {
NotifyEvictionCalled func(txHash []byte)
}

// NotifyEviction -
func (stub *EvictionNotifierStub) NotifyEviction(txHash []byte) {
if stub.NotifyEvictionCalled != nil {
stub.NotifyEvictionCalled(txHash)
}
}

// IsInterfaceNil -
func (stub *EvictionNotifierStub) IsInterfaceNil() bool {
return stub == nil
}
51 changes: 51 additions & 0 deletions txcache/baseTxCache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package txcache

import (
"sync"

"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-storage-go/common"
"github.com/multiversx/mx-chain-storage-go/types"
)

const maxNumOfEvictionWorkers = 5

type evictionWorkerPool interface {
Stop()
Submit(task func())
}

type baseTxCache struct {
mutEvictionHandlers sync.RWMutex
evictionHandlers []types.EvictionNotifier
evictionWorkerPool evictionWorkerPool
}

// RegisterEvictionHandler registers a handler which will be called when a tx is evicted from cache
func (cache *baseTxCache) RegisterEvictionHandler(handler types.EvictionNotifier) error {
if check.IfNil(handler) {
return common.ErrNilEvictionHandler
}

cache.mutEvictionHandlers.Lock()
cache.evictionHandlers = append(cache.evictionHandlers, handler)
cache.mutEvictionHandlers.Unlock()

return nil
}

// enqueueEvictedHashesForNotification will enqueue the provided hashes on the workers pool
func (cache *baseTxCache) enqueueEvictedHashesForNotification(txHashes [][]byte) {
cache.mutEvictionHandlers.RLock()
handlers := make([]types.EvictionNotifier, len(cache.evictionHandlers))
copy(handlers, cache.evictionHandlers)
cache.mutEvictionHandlers.RUnlock()

for _, handler := range handlers {
for _, txHash := range txHashes {
cache.evictionWorkerPool.Submit(func() {
handler.NotifyEviction(txHash)
})
}
}
}
20 changes: 18 additions & 2 deletions txcache/crossTxCache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package txcache

import (
"github.com/gammazero/workerpool"
"github.com/multiversx/mx-chain-storage-go/immunitycache"
"github.com/multiversx/mx-chain-storage-go/types"
)
Expand All @@ -10,6 +11,7 @@ var _ types.Cacher = (*CrossTxCache)(nil)
// CrossTxCache holds cross-shard transactions (where destination == me)
type CrossTxCache struct {
*immunitycache.ImmunityCache
*baseTxCache
config ConfigDestinationMe
}

Expand Down Expand Up @@ -37,7 +39,11 @@ func NewCrossTxCache(config ConfigDestinationMe) (*CrossTxCache, error) {

cache := CrossTxCache{
ImmunityCache: immunityCache,
config: config,
baseTxCache: &baseTxCache{
evictionHandlers: make([]types.EvictionNotifier, 0),
evictionWorkerPool: workerpool.New(maxNumOfEvictionWorkers),
},
config: config,
}

return &cache, nil
Expand Down Expand Up @@ -93,7 +99,11 @@ func (cache *CrossTxCache) Peek(key []byte) (value interface{}, ok bool) {

// RemoveTxByHash removes tx by hash
func (cache *CrossTxCache) RemoveTxByHash(txHash []byte) bool {
return cache.RemoveWithResult(txHash)
ok := cache.RemoveWithResult(txHash)
if ok {
cache.enqueueEvictedHashesForNotification([][]byte{txHash})
}
return ok
}

// ForEachTransaction iterates over the transactions in the cache
Expand All @@ -115,6 +125,12 @@ func (cache *CrossTxCache) GetTransactionsPoolForSender(_ string) []*WrappedTran
return make([]*WrappedTransaction, 0)
}

// Close closes the eviction worker pool
func (cache *CrossTxCache) Close() error {
cache.evictionWorkerPool.Stop()
return nil
}

// IsInterfaceNil returns true if there is no value under the interface
func (cache *CrossTxCache) IsInterfaceNil() bool {
return cache == nil
Expand Down
39 changes: 39 additions & 0 deletions txcache/crossTxCache_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package txcache

import (
"bytes"
"fmt"
"math"
"testing"
"time"

"github.com/multiversx/mx-chain-storage-go/common"
"github.com/multiversx/mx-chain-storage-go/testscommon"
"github.com/stretchr/testify/require"
)

func TestCrossTxCache_DoImmunizeTxsAgainstEviction(t *testing.T) {
cache := newCrossTxCacheToTest(1, 8, math.MaxUint16)
defer func() { require.Nil(t, cache.Close()) }()

cache.addTestTxs("a", "b", "c", "d")
numNow, numFuture := cache.ImmunizeKeys(hashesAsBytes([]string{"a", "b", "e", "f"}))
Expand All @@ -26,6 +31,7 @@ func TestCrossTxCache_DoImmunizeTxsAgainstEviction(t *testing.T) {

func TestCrossTxCache_Get(t *testing.T) {
cache := newCrossTxCacheToTest(1, 8, math.MaxUint16)
defer func() { require.Nil(t, cache.Close()) }()

cache.addTestTxs("a", "b", "c", "d")
a, ok := cache.GetByTxHash([]byte("a"))
Expand Down Expand Up @@ -57,6 +63,39 @@ func TestCrossTxCache_Get(t *testing.T) {
require.Equal(t, make([]*WrappedTransaction, 0), cache.GetTransactionsPoolForSender(""))
}

func TestCrossTxCache_RegisterEvictionHandler(t *testing.T) {
t.Parallel()

cache := newCrossTxCacheToTest(1, 8, math.MaxUint16)
defer func() { require.Nil(t, cache.Close()) }()

cache.addTestTx("hash-1")

err := cache.RegisterEvictionHandler(nil)
require.Equal(t, common.ErrNilEvictionHandler, err)

ch := make(chan struct{})
err = cache.RegisterEvictionHandler(&testscommon.EvictionNotifierStub{
NotifyEvictionCalled: func(hash []byte) {
require.True(t, bytes.Equal([]byte("hash-1"), hash))
ch <- struct{}{}
},
})
require.NoError(t, err)

removed := cache.RemoveTxByHash([]byte("hash-1"))
require.True(t, removed)
select {
case <-ch:
case <-time.After(time.Second):
require.Fail(t, "timeout")
}

foundTx, ok := cache.GetByTxHash([]byte("hash-1"))
require.False(t, ok)
require.Nil(t, foundTx)
}

func newCrossTxCacheToTest(numChunks uint32, maxNumItems uint32, numMaxBytes uint32) *CrossTxCache {
cache, err := NewCrossTxCache(ConfigDestinationMe{
Name: "test",
Expand Down
2 changes: 2 additions & 0 deletions txcache/eviction.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func (cache *TxCache) areThereTooManyTxs() bool {

// This is called concurrently by two goroutines: the eviction one and the sweeping one
func (cache *TxCache) doEvictItems(txsToEvict [][]byte, sendersToEvict []string) (countTxs uint32, countSenders uint32) {
cache.enqueueEvictedHashesForNotification(txsToEvict)

countTxs = cache.txByHash.RemoveTxsBulk(txsToEvict)
countSenders = cache.txListBySender.RemoveSendersBulk(sendersToEvict)
return
Expand Down
9 changes: 9 additions & 0 deletions txcache/eviction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestEviction_EvictSendersWhileTooManyTxs(t *testing.T) {
require.Equal(t, uint32(100), nSenders)
require.Equal(t, int64(100), cache.txListBySender.counter.Get())
require.Equal(t, int64(100), cache.txByHash.counter.Get())
require.Nil(t, cache.Close())
}

func TestEviction_EvictSendersWhileTooManyBytes(t *testing.T) {
Expand Down Expand Up @@ -79,6 +80,7 @@ func TestEviction_EvictSendersWhileTooManyBytes(t *testing.T) {
require.Equal(t, uint32(100), nSenders)
require.Equal(t, int64(100), cache.txListBySender.counter.Get())
require.Equal(t, int64(100), cache.txByHash.counter.Get())
require.Nil(t, cache.Close())
}

func TestEviction_DoEvictionDoneInPassTwo_BecauseOfCount(t *testing.T) {
Expand Down Expand Up @@ -110,6 +112,7 @@ func TestEviction_DoEvictionDoneInPassTwo_BecauseOfCount(t *testing.T) {
require.True(t, ok)
require.Equal(t, uint64(1), cache.CountSenders())
require.Equal(t, uint64(1), cache.CountTx())
require.Nil(t, cache.Close())
}

func TestEviction_DoEvictionDoneInPassTwo_BecauseOfSize(t *testing.T) {
Expand Down Expand Up @@ -164,6 +167,7 @@ func TestEviction_DoEvictionDoneInPassTwo_BecauseOfSize(t *testing.T) {
require.True(t, ok)
require.Equal(t, uint64(5), cache.CountSenders())
require.Equal(t, uint64(5), cache.CountTx())
require.Nil(t, cache.Close())
}

func TestEviction_doEvictionDoesNothingWhenAlreadyInProgress(t *testing.T) {
Expand All @@ -187,6 +191,7 @@ func TestEviction_doEvictionDoesNothingWhenAlreadyInProgress(t *testing.T) {
cache.doEviction()

require.False(t, cache.evictionJournal.evictionPerformed)
require.Nil(t, cache.Close())
}

func TestEviction_evictSendersInLoop_CoverLoopBreak_WhenSmallBatch(t *testing.T) {
Expand All @@ -212,6 +217,7 @@ func TestEviction_evictSendersInLoop_CoverLoopBreak_WhenSmallBatch(t *testing.T)
require.Equal(t, uint32(0), steps)
require.Equal(t, uint32(1), nTxs)
require.Equal(t, uint32(1), nSenders)
require.Nil(t, cache.Close())
}

func TestEviction_evictSendersWhile_ShouldContinueBreak(t *testing.T) {
Expand Down Expand Up @@ -241,6 +247,7 @@ func TestEviction_evictSendersWhile_ShouldContinueBreak(t *testing.T) {
require.Equal(t, uint32(0), steps)
require.Equal(t, uint32(0), nTxs)
require.Equal(t, uint32(0), nSenders)
require.Nil(t, cache.Close())
}

// This seems to be the most reasonable "bad-enough" (not worst) scenario to benchmark:
Expand Down Expand Up @@ -271,6 +278,7 @@ func Test_AddWithEviction_UniformDistribution_25000x10(t *testing.T) {
// Sometimes (due to map iteration non-determinism), more eviction happens - one more step of 100 senders.
require.LessOrEqual(t, uint32(cache.CountTx()), config.CountThreshold)
require.GreaterOrEqual(t, uint32(cache.CountTx()), config.CountThreshold-config.NumSendersToPreemptivelyEvict*uint32(numTxsPerSender))
require.Nil(t, cache.Close())
}

func Test_EvictSendersAndTheirTxs_Concurrently(t *testing.T) {
Expand Down Expand Up @@ -304,4 +312,5 @@ func Test_EvictSendersAndTheirTxs_Concurrently(t *testing.T) {
}

wg.Wait()
require.Nil(t, cache.Close())
}
3 changes: 3 additions & 0 deletions txcache/sweeping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestSweeping_CollectSweepable(t *testing.T) {
require.Equal(t, 3, cache.getNumFailedSelectionsOfSender("alice"))
require.Equal(t, 3, cache.getNumFailedSelectionsOfSender("bob"))
require.Equal(t, 0, cache.getNumFailedSelectionsOfSender("carol"))
require.Nil(t, cache.Close())
}

func TestSweeping_WhenSendersEscapeCollection(t *testing.T) {
Expand Down Expand Up @@ -93,6 +94,7 @@ func TestSweeping_WhenSendersEscapeCollection(t *testing.T) {
require.Equal(t, 0, cache.getNumFailedSelectionsOfSender("alice"))
require.Equal(t, 0, cache.getNumFailedSelectionsOfSender("bob"))
require.Equal(t, 0, cache.getNumFailedSelectionsOfSender("carol"))
require.Nil(t, cache.Close())
}

func TestSweeping_SweepSweepable(t *testing.T) {
Expand All @@ -115,4 +117,5 @@ func TestSweeping_SweepSweepable(t *testing.T) {

require.Equal(t, uint64(1), cache.CountTx())
require.Equal(t, uint64(1), cache.CountSenders())
require.Nil(t, cache.Close())
}
Loading