Skip to content

perf: Pool allocations of cachekv stores #24608

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (types) [#24668](https://github.com/cosmos/cosmos-sdk/pull/24668) Scope the global config to a particular binary so that multiple SDK binaries can be properly run on the same machine.
* (baseapp) [#24655](https://github.com/cosmos/cosmos-sdk/pull/24655) Add mutex locks for `state` and make `lastCommitInfo` atomic to prevent race conditions between `Commit` and `CreateQueryContext`.
* (proto) [#24161](https://github.com/cosmos/cosmos-sdk/pull/24161) Remove unnecessary annotations from `x/staking` authz proto.
* (baseapp) [#24608](https://github.com/cosmos/cosmos-sdk/pull/24608) Use a `PooledCacheMultiStore` in `cacheTxContext` to prevent allocating a new cache for every transaction.
* (x/bank) [#24660](https://github.com/cosmos/cosmos-sdk/pull/24660) Improve performance of the `GetAllBalances` and `GetAccountsBalances` keeper methods.

### Bug Fixes
Expand Down
13 changes: 12 additions & 1 deletion baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,12 @@ func (app *BaseApp) getContextForTx(mode sdk.ExecMode, txBytes []byte) sdk.Conte
// a branched multi-store.
func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context, storetypes.CacheMultiStore) {
ms := ctx.MultiStore()
msCache := ms.CacheMultiStore()
var msCache storetypes.CacheMultiStore
if msPooled, ok := ms.(storetypes.PoolingMultiStore); ok {
msCache = msPooled.CacheMultiStorePooled()
} else {
msCache = ms.CacheMultiStore()
Comment on lines 624 to +631
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change potentially affects state.

Call sequence:

(*github.com/cosmos/cosmos-sdk/baseapp.BaseApp).cacheTxContext (baseapp/baseapp.go:625)
(*github.com/cosmos/cosmos-sdk/baseapp.BaseApp).runTx (baseapp/baseapp.go:760)
(*github.com/cosmos/cosmos-sdk/baseapp.BaseApp).deliverTx (baseapp/baseapp.go:688)
(*github.com/cosmos/cosmos-sdk/baseapp.BaseApp).internalFinalizeBlock (baseapp/baseapp.go:718)
(*github.com/cosmos/cosmos-sdk/baseapp.BaseApp).FinalizeBlock (baseapp/baseapp.go:884)

}
if msCache.TracingEnabled() {
msCache = msCache.SetTracingContext(
map[string]any{
Expand Down Expand Up @@ -839,6 +844,9 @@ func (app *BaseApp) runTx(mode sdk.ExecMode, txBytes []byte, tx sdk.Tx) (gInfo s
// writes do not happen if aborted/failed. This may have some
// performance benefits, but it'll be more difficult to get right.
anteCtx, msCache = app.cacheTxContext(ctx, txBytes)
if pooledMSCache, ok := msCache.(storetypes.PooledCacheMultiStore); ok {
defer pooledMSCache.Release()
}
anteCtx = anteCtx.WithEventManager(sdk.NewEventManager())
newCtx, err := app.anteHandler(anteCtx, tx, mode == execModeSimulate)

Expand Down Expand Up @@ -889,6 +897,9 @@ func (app *BaseApp) runTx(mode sdk.ExecMode, txBytes []byte, tx sdk.Tx) (gInfo s
// in case message processing fails. At this point, the MultiStore
// is a branch of a branch.
runMsgCtx, msCache := app.cacheTxContext(ctx, txBytes)
if pooledMSCache, ok := msCache.(storetypes.PooledCacheMultiStore); ok {
defer pooledMSCache.Release()
}

// Attempt to execute all messages and only update state if all messages pass
// and we're in DeliverTx. Note, runMsgs will never return a reference to a
Expand Down
4 changes: 4 additions & 0 deletions store/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ Ref: https://keepachangelog.com/en/1.0.0/

## [Unreleased]

### Improvements

* [#24608](https://github.com/cosmos/cosmos-sdk/pull/24608) Introduced pooled versions of cache stores to avoid allocating new caches.

### Bug Fixes

* [#20425](https://github.com/cosmos/cosmos-sdk/pull/20425) Fix nil pointer panic when querying historical state where a new store does not exist.
Expand Down
5 changes: 5 additions & 0 deletions store/cachekv/internal/btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func NewBTree() BTree {
}
}

// Clear resets the tree by setting the root node to nil, dropping all items.
func (bt BTree) Clear() {
bt.tree.Clear()
}

func (bt BTree) Set(key, value []byte) {
bt.tree.Set(newItem(key, value))
}
Expand Down
46 changes: 43 additions & 3 deletions store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,16 @@ type Store struct {
parent types.KVStore
}

var _ types.CacheKVStore = (*Store)(nil)
// PooledStore wraps a Store object and implements the types.PooledCacheKVStore interface,
// which allows it to be pooled and reused without the overhead of allocation.
type PooledStore struct {
Store
}

var (
_ types.CacheKVStore = (*Store)(nil)
_ types.PooledCacheKVStore = (*PooledStore)(nil)
)

// NewStore creates a new Store object
func NewStore(parent types.KVStore) *Store {
Expand All @@ -44,6 +53,37 @@ func NewStore(parent types.KVStore) *Store {
}
}

// storePool is a pool of PooledStore instances. It contains a set of objects
// that can be reused instead of allocating new ones. It's thread safe.
// Callers can use Get() to retrieve a store (or allocate a new one if none are available).
// Callers should use Put() when done with the store to return it to the pool.
var storePool = sync.Pool{
New: func() any {
return &PooledStore{
Store: Store{
cache: make(map[string]*cValue),
unsortedCache: make(map[string]struct{}),
sortedCache: internal.NewBTree(),
},
}
},
}

// Release releases the PooledStore object back to the pool.
func (store *PooledStore) Release() {
store.resetCaches()
store.parent = nil
store.mtx = sync.Mutex{}
storePool.Put(store)
}

// NewPooledStore gets a PooledStore object from the pool.
func NewPooledStore(parent types.KVStore) *PooledStore {
store := storePool.Get().(*PooledStore)
store.parent = parent
return store
}

// GetStoreType implements Store.
func (store *Store) GetStoreType() types.StoreType {
return store.parent.GetStoreType()
Expand Down Expand Up @@ -112,7 +152,7 @@ func (store *Store) resetCaches() {
delete(store.unsortedCache, key)
}
}
store.sortedCache = internal.NewBTree()
store.sortedCache.Clear()
}

// Write implements Cachetypes.KVStore.
Expand All @@ -121,7 +161,7 @@ func (store *Store) Write() {
defer store.mtx.Unlock()

if len(store.cache) == 0 && len(store.unsortedCache) == 0 {
store.sortedCache = internal.NewBTree()
store.sortedCache.Clear()
return
}

Expand Down
109 changes: 92 additions & 17 deletions store/cachemulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"fmt"
"io"
"maps"
"sync"

dbm "github.com/cosmos/cosmos-db"

Expand Down Expand Up @@ -33,16 +34,25 @@
traceContext types.TraceContext
}

var _ types.CacheMultiStore = Store{}
// PooledStore is a wrapper around Store that implements the PooledCacheKVStore interface.
// It's used to avoid allocating new Store instances .
type PooledStore struct {
Store
}

var (
_ types.CacheMultiStore = &Store{}
_ types.PooledCacheMultiStore = &PooledStore{}
)

// NewFromKVStore creates a new Store object from a mapping of store keys to
// CacheWrapper objects and a KVStore as the database. Each CacheWrapper store
// is a branched store.
func NewFromKVStore(
store types.KVStore, stores map[types.StoreKey]types.CacheWrapper,
keys map[string]types.StoreKey, traceWriter io.Writer, traceContext types.TraceContext,
) Store {
cms := Store{
) *Store {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did we need to change this function sig?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is ultimately that sync.Pool works with pointers--if it didn't you'd end up copying the struct that it's trying to allocate for you--so I needed to be able to return a pointer to the PooledStore in newFromKVStorePooled. Once I'd made that change, interface compliance broke everywhere because the methods had value receivers. It ended up being simplest to just change everything to pointers for consistency.

cms := &Store{
db: cachekv.NewStore(store),
stores: make(map[types.StoreKey]types.CacheWrap, len(stores)),
keys: keys,
Expand All @@ -69,11 +79,71 @@
func NewStore(
db dbm.DB, stores map[types.StoreKey]types.CacheWrapper, keys map[string]types.StoreKey,
traceWriter io.Writer, traceContext types.TraceContext,
) Store {
) *Store {
return NewFromKVStore(dbadapter.Store{DB: db}, stores, keys, traceWriter, traceContext)
}

func newCacheMultiStoreFromCMS(cms Store) Store {
// storePool is a pool of PooledStore instances. It contains a set of objects
// that can be reused instead of allocating new ones. It's thread safe.
// Callers can use Get() to retrieve a store (or allocate a new one if none are available).
// Callers should use Put() when done with the store to return it to the pool.
var storePool = sync.Pool{
New: func() any {
return &PooledStore{
Store: Store{
stores: make(map[types.StoreKey]types.CacheWrap),
keys: make(map[string]types.StoreKey),
},
}
},
}

// newFromKVStorePooled returns a PooledStore object, populated with a mapping of store keys to
// CacheWrapper objects and a KVStore as the database.
func newFromKVStorePooled(
store types.KVStore, stores map[types.StoreKey]types.CacheWrap,
traceWriter io.Writer, traceContext types.TraceContext,
) *PooledStore {
cms := storePool.Get().(*PooledStore)
cms.traceWriter = traceWriter
cms.traceContext = traceContext
for key, store := range stores {
var cwStore types.CacheWrapper = store
if cms.TracingEnabled() {
tctx := cms.traceContext.Clone().Merge(types.TraceContext{
storeNameCtxKey: key.Name(),
})

cwStore = tracekv.NewStore(store.(types.KVStore), cms.traceWriter, tctx)
}
cms.stores[key] = cachekv.NewPooledStore(cwStore.(types.KVStore))
}
cms.db = cachekv.NewPooledStore(store)
return cms
}

// Release releases the PooledStore object back to the pool.
func (cms *PooledStore) Release() {
// clear the stores map
for k, v := range cms.stores {
if pStore, ok := v.(types.PooledCacheKVStore); ok {
pStore.Release()
}
delete(cms.stores, k)
}
Comment on lines +128 to +133

Check warning

Code scanning / CodeQL

Iteration over map Warning

Iteration over map may be a possible source of non-determinism
for k := range cms.keys {
delete(cms.keys, k)
}
Comment on lines +134 to +136

Check warning

Code scanning / CodeQL

Iteration over map Warning

Iteration over map may be a possible source of non-determinism
if pStoreDb, ok := cms.db.(types.PooledCacheKVStore); ok {
pStoreDb.Release()
}
cms.db = nil
cms.traceContext = nil
cms.traceWriter = nil
storePool.Put(cms)
}

func newCacheMultiStoreFromCMS(cms *Store) *Store {
stores := make(map[types.StoreKey]types.CacheWrapper)
for k, v := range cms.stores {
stores[k] = v
Expand All @@ -84,7 +154,7 @@

// SetTracer sets the tracer for the MultiStore that the underlying
// stores will utilize to trace operations. A MultiStore is returned.
func (cms Store) SetTracer(w io.Writer) types.MultiStore {
func (cms *Store) SetTracer(w io.Writer) types.MultiStore {
cms.traceWriter = w
return cms
}
Expand All @@ -93,7 +163,7 @@
// the given context with the existing context by key. Any existing keys will
// be overwritten. It is implied that the caller should update the context when
// necessary between tracing operations. It returns a modified MultiStore.
func (cms Store) SetTracingContext(tc types.TraceContext) types.MultiStore {
func (cms *Store) SetTracingContext(tc types.TraceContext) types.MultiStore {
if cms.traceContext != nil {
maps.Copy(cms.traceContext, tc)
} else {
Expand All @@ -104,55 +174,60 @@
}

// TracingEnabled returns if tracing is enabled for the MultiStore.
func (cms Store) TracingEnabled() bool {
func (cms *Store) TracingEnabled() bool {
return cms.traceWriter != nil
}

// LatestVersion returns the branch version of the store
func (cms Store) LatestVersion() int64 {
func (cms *Store) LatestVersion() int64 {
panic("cannot get latest version from branch cached multi-store")
}

// GetStoreType returns the type of the store.
func (cms Store) GetStoreType() types.StoreType {
func (cms *Store) GetStoreType() types.StoreType {
return types.StoreTypeMulti
}

// Write calls Write on each underlying store.
func (cms Store) Write() {
func (cms *Store) Write() {
cms.db.Write()
for _, store := range cms.stores {
store.Write()
}
}

// CacheWrap implements CacheWrapper, returns the cache multi-store as a CacheWrap.
func (cms Store) CacheWrap() types.CacheWrap {
func (cms *Store) CacheWrap() types.CacheWrap {
return cms.CacheMultiStore().(types.CacheWrap)
}

// CacheWrapWithTrace implements the CacheWrapper interface.
func (cms Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.CacheWrap {
func (cms *Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.CacheWrap {
return cms.CacheWrap()
}

// CacheMultiStore implements MultiStore, returns a new CacheMultiStore from the
// underlying CacheMultiStore.
func (cms Store) CacheMultiStore() types.CacheMultiStore {
func (cms *Store) CacheMultiStore() types.CacheMultiStore {
return newCacheMultiStoreFromCMS(cms)
}

// CacheMultiStorePooled returns a PooledCacheMultiStore object from a pool.
func (cms *Store) CacheMultiStorePooled() types.PooledCacheMultiStore {
return newFromKVStorePooled(cms.db, cms.stores, cms.traceWriter, cms.traceContext)
}

// CacheMultiStoreWithVersion implements the MultiStore interface. It will panic
// as an already cached multi-store cannot load previous versions.
//
// TODO: The store implementation can possibly be modified to support this as it
// seems safe to load previous versions (heights).
func (cms Store) CacheMultiStoreWithVersion(_ int64) (types.CacheMultiStore, error) {
func (cms *Store) CacheMultiStoreWithVersion(_ int64) (types.CacheMultiStore, error) {
panic("cannot branch cached multi-store with a version")
}

// GetStore returns an underlying Store by key.
func (cms Store) GetStore(key types.StoreKey) types.Store {
func (cms *Store) GetStore(key types.StoreKey) types.Store {
s := cms.stores[key]
if key == nil || s == nil {
panic(fmt.Sprintf("kv store with key %v has not been registered in stores", key))
Expand All @@ -161,7 +236,7 @@
}

// GetKVStore returns an underlying KVStore by key.
func (cms Store) GetKVStore(key types.StoreKey) types.KVStore {
func (cms *Store) GetKVStore(key types.StoreKey) types.KVStore {
store := cms.stores[key]
if key == nil || store == nil {
panic(fmt.Sprintf("kv store with key %v has not been registered in stores", key))
Expand Down
6 changes: 3 additions & 3 deletions store/rootmulti/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestCacheMultiStore(t *testing.T) {
ms := newMultiStoreWithMounts(db, pruningtypes.NewPruningOptions(pruningtypes.PruningNothing))

cacheMulti := ms.CacheMultiStore()
require.IsType(t, cachemulti.Store{}, cacheMulti)
require.IsType(t, &cachemulti.Store{}, cacheMulti)
}

func TestCacheMultiStoreWithVersion(t *testing.T) {
Expand Down Expand Up @@ -839,10 +839,10 @@ func TestCacheWraps(t *testing.T) {
multi := newMultiStoreWithMounts(db, pruningtypes.NewPruningOptions(pruningtypes.PruningNothing))

cacheWrapper := multi.CacheWrap()
require.IsType(t, cachemulti.Store{}, cacheWrapper)
require.IsType(t, &cachemulti.Store{}, cacheWrapper)

cacheWrappedWithTrace := multi.CacheWrapWithTrace(nil, nil)
require.IsType(t, cachemulti.Store{}, cacheWrappedWithTrace)
require.IsType(t, &cachemulti.Store{}, cacheWrappedWithTrace)
}

func TestTraceConcurrency(t *testing.T) {
Expand Down
Loading
Loading