Skip to content

Commit d47d3ef

Browse files
committed
chore(pkg/database): implement database manager
Databases are now opened and closed on demand, up to the MaxActiveDatabases limit, to reduce memory consumption when managing a large number of databases. Signed-off-by: Stefano Scafiti <[email protected]>
1 parent 1e78388 commit d47d3ef

40 files changed

+2040
-374
lines changed

cmd/immudb/command/immudbcmdtest/immuServerMock_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func TestImmuServerMock(t *testing.T) {
5555
mock.WithStreamServiceFactory(ssf)
5656
require.Same(t, ssf, mock.Ssf)
5757

58-
list := database.NewDatabaseList()
58+
list := database.NewDatabaseList(nil)
5959
mock.WithDbList(list)
6060
require.Same(t, list, mock.DbList)
6161

cmd/immudb/command/init.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (cl *Commandline) setupFlags(cmd *cobra.Command, options *server.Options) {
4444
cmd.Flags().Bool("replication-allow-tx-discarding", options.ReplicationOptions.AllowTxDiscarding, "allow precommitted transactions to be discarded if the replica diverges from the primary")
4545
cmd.Flags().Bool("replication-skip-integrity-check", options.ReplicationOptions.SkipIntegrityCheck, "disable integrity check when reading data during replication")
4646
cmd.Flags().Bool("replication-wait-for-indexing", options.ReplicationOptions.WaitForIndexing, "wait for indexing to be up to date during replication")
47-
cmd.Flags().Int("shared-index-cache-size", options.SharedIndexCacheSize, "size (in bytes) of shared index cache")
47+
cmd.Flags().Int("max-active-databases", options.MaxActiveDatabases, "the maximum number of databases that can be active simultaneously")
4848

4949
cmd.PersistentFlags().StringVar(&cl.config.CfgFn, "config", "", "config file (default path are configs or $HOME. Default filename is immudb.toml)")
5050
cmd.Flags().String("pidfile", options.Pidfile, "pid path with filename e.g. /var/run/immudb.pid")
@@ -162,6 +162,7 @@ func setupDefaults(options *server.Options) {
162162
viper.SetDefault("max-sessions", 100)
163163
viper.SetDefault("max-session-inactivity-time", 3*time.Minute)
164164
viper.SetDefault("max-session-age-time", 0)
165+
viper.SetDefault("max-active-databases", options.MaxActiveDatabases)
165166
viper.SetDefault("session-timeout", 2*time.Minute)
166167
viper.SetDefault("sessions-guard-check-interval", 1*time.Minute)
167168
viper.SetDefault("logformat", logger.LogFormatText)

cmd/immudb/command/parse_options.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ func parseOptions() (options *server.Options, err error) {
9292
swaggerUIEnabled := viper.GetBool("swaggerui")
9393
logRequestMetadata := viper.GetBool("log-request-metadata")
9494

95+
maxActiveDatabases := viper.GetInt("max-active-databases")
96+
9597
s3Storage := viper.GetBool("s3-storage")
9698
s3RoleEnabled := viper.GetBool("s3-role-enabled")
9799
s3Role := viper.GetString("s3-role")
@@ -165,7 +167,8 @@ func parseOptions() (options *server.Options, err error) {
165167
WithLogFormat(logFormat).
166168
WithSwaggerUIEnabled(swaggerUIEnabled).
167169
WithGRPCReflectionServerEnabled(grpcReflectionServerEnabled).
168-
WithLogRequestMetadata(logRequestMetadata)
170+
WithLogRequestMetadata(logRequestMetadata).
171+
WithMaxActiveDatabases(maxActiveDatabases)
169172

170173
return options, nil
171174
}

embedded/cache/cache.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,12 @@ var (
2828
ErrIllegalArguments = errors.New("illegal arguments")
2929
ErrKeyNotFound = errors.New("key not found")
3030
ErrIllegalState = errors.New("illegal state")
31+
ErrCannotEvictItem = errors.New("cannot find an item to evict")
3132
)
3233

34+
type EvictFilterFunc func(key interface{}, value interface{}) bool
35+
type EvictCallbackFunc func(key, value interface{})
36+
3337
// Cache implements the SIEVE cache replacement policy.
3438
type Cache struct {
3539
data map[interface{}]*entry
@@ -40,6 +44,9 @@ type Cache struct {
4044
maxWeight int
4145

4246
mutex sync.RWMutex
47+
48+
canEvict EvictFilterFunc
49+
onEvict EvictCallbackFunc
4350
}
4451

4552
type entry struct {
@@ -59,15 +66,34 @@ func NewCache(maxWeight int) (*Cache, error) {
5966
list: list.New(),
6067
weight: 0,
6168
maxWeight: maxWeight,
69+
onEvict: nil,
70+
canEvict: nil,
6271
}, nil
6372
}
6473

74+
func (c *Cache) SetCanEvict(canEvict EvictFilterFunc) {
75+
c.mutex.Lock()
76+
defer c.mutex.Unlock()
77+
78+
c.canEvict = canEvict
79+
}
80+
81+
func (c *Cache) SetOnEvict(onEvict EvictCallbackFunc) {
82+
c.mutex.Lock()
83+
defer c.mutex.Unlock()
84+
85+
c.onEvict = onEvict
86+
}
87+
6588
func (c *Cache) Resize(newWeight int) {
6689
c.mutex.Lock()
6790
defer c.mutex.Unlock()
6891

6992
for c.weight > newWeight {
70-
_, entry, _ := c.evict()
93+
key, entry, _ := c.evict()
94+
if c.onEvict != nil {
95+
c.onEvict(key, entry.value)
96+
}
7197
c.weight -= entry.weight
7298
}
7399

@@ -133,6 +159,9 @@ func (c *Cache) evictWhileFull(weight int) (interface{}, interface{}, error) {
133159
rkey = evictedKey
134160
rvalue = entry.value
135161

162+
if c.onEvict != nil {
163+
c.onEvict(rkey, rvalue)
164+
}
136165
c.weight -= entry.weight
137166
}
138167
return rkey, rvalue, nil
@@ -144,15 +173,15 @@ func (c *Cache) evict() (rkey interface{}, e *entry, err error) {
144173
}
145174

146175
curr := c.hand
147-
for {
176+
for i := 0; i < 2*c.list.Len(); i++ {
148177
if curr == nil {
149178
curr = c.list.Back()
150179
}
151180

152181
key := curr.Value
153182

154183
e := c.data[key]
155-
if e.visited == 0 {
184+
if e.visited == 0 && c.shouldEvict(key, e.value) {
156185
c.hand = curr.Prev()
157186

158187
c.list.Remove(curr)
@@ -164,6 +193,11 @@ func (c *Cache) evict() (rkey interface{}, e *entry, err error) {
164193
e.visited = 0
165194
curr = curr.Prev()
166195
}
196+
return nil, nil, ErrCannotEvictItem
197+
}
198+
199+
func (c *Cache) shouldEvict(key, value interface{}) bool {
200+
return c.canEvict == nil || c.canEvict(key, value)
167201
}
168202

169203
func (c *Cache) Get(key interface{}) (interface{}, error) {

embedded/cache/cache_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,5 +394,61 @@ func TestPutWeighted(t *testing.T) {
394394
require.Equal(t, 4, cache.Weight())
395395
require.Equal(t, 1, cache.EntriesCount())
396396
})
397+
}
398+
399+
func TestOnEvict(t *testing.T) {
400+
cache, err := NewCache(5)
401+
require.NoError(t, err)
402+
403+
var onEvictCalled int
404+
cache.SetOnEvict(func(_, value interface{}) {
405+
onEvictCalled++
406+
})
407+
408+
for i := 0; i < 5; i++ {
409+
cache.Put(i, i)
410+
}
411+
require.Zero(t, onEvictCalled)
412+
413+
_, _, err = cache.PutWeighted(6, 6, 3)
414+
require.NoError(t, err)
415+
416+
require.Equal(t, onEvictCalled, 3)
417+
418+
_, _, err = cache.PutWeighted(7, 7, 2)
419+
require.NoError(t, err)
420+
require.Equal(t, onEvictCalled, 5)
421+
422+
cache.Resize(0)
423+
require.Equal(t, onEvictCalled, 7)
424+
}
397425

426+
func TestCanEvict(t *testing.T) {
427+
cache, err := NewCache(5)
428+
require.NoError(t, err)
429+
430+
for i := 0; i < 5; i++ {
431+
_, _, err := cache.Put(i, i)
432+
require.NoError(t, err)
433+
}
434+
435+
t.Run("cannot evict any item", func(t *testing.T) {
436+
cache.SetCanEvict(func(_, _ interface{}) bool {
437+
return false
438+
})
439+
440+
_, _, err := cache.Put(6, 6)
441+
require.ErrorIs(t, err, ErrCannotEvictItem)
442+
})
443+
444+
t.Run("cannot evict any item", func(t *testing.T) {
445+
keyToEvict := rand.Intn(5)
446+
cache.SetCanEvict(func(key, _ interface{}) bool {
447+
return key == keyToEvict
448+
})
449+
450+
evictedKey, _, err := cache.Put(6, 6)
451+
require.NoError(t, err)
452+
require.Equal(t, evictedKey, keyToEvict)
453+
})
398454
}

embedded/store/immustore.go

Lines changed: 42 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ type ImmuStore struct {
216216
commitWHub *watchers.WatchersHub
217217

218218
indexers map[[sha256.Size]byte]*indexer
219-
nextIndexerID uint32
219+
nextIndexerID uint64
220220
indexCache *cache.Cache
221221

222222
memSemaphore *semaphore.Semaphore // used by indexers to control amount acquired of memory
@@ -720,47 +720,52 @@ func OpenWith(path string, vLogs []appendable.Appendable, txLog, cLog appendable
720720
}
721721

722722
if store.synced {
723-
go func() {
724-
for {
725-
committedTxID := store.LastCommittedTxID()
723+
go store.syncer()
724+
}
726725

727-
// passive wait for one new transaction at least
728-
store.inmemPrecommitWHub.WaitFor(context.Background(), committedTxID+1)
726+
return store, nil
727+
}
729728

730-
// TODO: waiting on earlier stages of transaction processing may also be possible
731-
prevLatestPrecommitedTx := committedTxID + 1
729+
func (s *ImmuStore) syncer() {
730+
for {
731+
committedTxID := s.LastCommittedTxID()
732732

733-
// TODO: parametrize concurrency evaluation
734-
for i := 0; i < 4; i++ {
735-
// give some time for more transactions to be precommitted
736-
time.Sleep(store.syncFrequency / 4)
733+
// passive wait for one new transaction at least
734+
err := s.inmemPrecommitWHub.WaitFor(context.Background(), committedTxID+1)
735+
if errors.Is(err, watchers.ErrAlreadyClosed) {
736+
return
737+
}
737738

738-
latestPrecommitedTx := store.LastPrecommittedTxID()
739+
// TODO: waiting on earlier stages of transaction processing may also be possible
740+
prevLatestPrecommitedTx := committedTxID + 1
739741

740-
if prevLatestPrecommitedTx == latestPrecommitedTx {
741-
// avoid waiting if there are no new transactions
742-
break
743-
}
742+
// TODO: parametrize concurrency evaluation
743+
for i := 0; i < 4; i++ {
744+
// give some time for more transactions to be precommitted
745+
time.Sleep(s.syncFrequency / 4)
744746

745-
prevLatestPrecommitedTx = latestPrecommitedTx
746-
}
747+
latestPrecommitedTx := s.LastPrecommittedTxID()
747748

748-
// ensure durability
749-
err := store.sync()
750-
if errors.Is(err, ErrAlreadyClosed) ||
751-
errors.Is(err, multiapp.ErrAlreadyClosed) ||
752-
errors.Is(err, singleapp.ErrAlreadyClosed) ||
753-
errors.Is(err, watchers.ErrAlreadyClosed) {
754-
return
755-
}
756-
if err != nil {
757-
store.notify(Error, true, "%s: while syncing transactions", err)
758-
}
749+
if prevLatestPrecommitedTx == latestPrecommitedTx {
750+
// avoid waiting if there are no new transactions
751+
break
759752
}
760-
}()
761-
}
762753

763-
return store, nil
754+
prevLatestPrecommitedTx = latestPrecommitedTx
755+
}
756+
757+
// ensure durability
758+
err = s.sync()
759+
if errors.Is(err, ErrAlreadyClosed) ||
760+
errors.Is(err, multiapp.ErrAlreadyClosed) ||
761+
errors.Is(err, singleapp.ErrAlreadyClosed) ||
762+
errors.Is(err, watchers.ErrAlreadyClosed) {
763+
return
764+
}
765+
if err != nil {
766+
s.notify(Error, true, "%s: while syncing transactions", err)
767+
}
768+
}
764769
}
765770

766771
type NotificationType = int
@@ -855,15 +860,11 @@ func (s *ImmuStore) InitIndexing(spec *IndexSpec) error {
855860
}
856861

857862
if s.indexCache == nil {
858-
if indexFactoryFunc := s.opts.IndexOpts.CacheFactory; indexFactoryFunc != nil {
859-
s.indexCache = indexFactoryFunc()
860-
} else {
861-
c, err := cache.NewCache(s.opts.IndexOpts.CacheSize)
862-
if err != nil {
863-
return err
864-
}
865-
s.indexCache = c
863+
c, err := cache.NewCache(s.opts.IndexOpts.CacheSize)
864+
if err != nil {
865+
return err
866866
}
867+
s.indexCache = c
867868
}
868869

869870
indexer, err := newIndexer(indexPath, s, s.opts)
@@ -3292,7 +3293,6 @@ func (s *ImmuStore) Sync() error {
32923293
if s.closed {
32933294
return ErrAlreadyClosed
32943295
}
3295-
32963296
return s.sync()
32973297
}
32983298

embedded/store/indexer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func newIndexer(path string, store *ImmuStore, opts *Options) (*indexer, error)
106106
return nil, fmt.Errorf("%w: nil store", ErrIllegalArguments)
107107
}
108108

109-
id := atomic.AddUint32(&store.nextIndexerID, 1)
109+
id := atomic.AddUint64(&store.nextIndexerID, 1)
110110
if id-1 > math.MaxUint16 {
111111
return nil, ErrMaxIndexersLimitExceeded
112112
}

embedded/store/options.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,6 @@ type IndexOptions struct {
206206

207207
// Maximum time waiting for more transactions to be committed and included into the same bulk
208208
BulkPreparationTimeout time.Duration
209-
210-
// CacheFactory function
211-
CacheFactory IndexCacheFactoryFunc
212209
}
213210

214211
type AHTOptions struct {
@@ -710,11 +707,6 @@ func (opts *IndexOptions) WithMaxGlobalBufferedDataSize(size int) *IndexOptions
710707
return opts
711708
}
712709

713-
func (opts *IndexOptions) WithCacheFactoryFunc(indexCacheFactory IndexCacheFactoryFunc) *IndexOptions {
714-
opts.CacheFactory = indexCacheFactory
715-
return opts
716-
}
717-
718710
// AHTOptions
719711

720712
func (opts *AHTOptions) WithWriteBufferSize(writeBufferSize int) *AHTOptions {

0 commit comments

Comments
 (0)