Skip to content

graph/db: init SQLStore caches and batch schedulers #9853

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 26, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config_builder.go
Original file line number Diff line number Diff line change
@@ -1026,7 +1026,7 @@ func (d *DefaultDatabaseBuilder) BuildDatabase(
"instances")
}

graphDBOptions := []graphdb.KVStoreOptionModifier{
graphDBOptions := []graphdb.StoreOptionModifier{
graphdb.WithRejectCacheSize(cfg.Caches.RejectCacheSize),
graphdb.WithChannelCacheSize(cfg.Caches.ChannelCacheSize),
graphdb.WithBatchCommitInterval(cfg.DB.BatchCommitInterval),
9 changes: 9 additions & 0 deletions graph/db/graph_test.go
Original file line number Diff line number Diff line change
@@ -4232,6 +4232,15 @@ func TestGraphLoading(t *testing.T) {
// Next, create the graph for the first time.
graphStore := NewTestDB(t)

// Temporarily add a manual skip for this test, until all the methods
// it uses have been implemented on the SQLStore struct. We have to
// manually add this skip because it is the only test that doesn't use
// the MakeTestGraph helper to create the graph store.
_, ok := graphStore.(*KVStore)
if !ok {
t.Skipf("Skipping TestGraphLoading for non-bbolt graph store")
}

graph, err := NewChannelGraph(graphStore)
require.NoError(t, err)
require.NoError(t, graph.Start())
2 changes: 1 addition & 1 deletion graph/db/kv_store.go
Original file line number Diff line number Diff line change
@@ -203,7 +203,7 @@ var _ V1Store = (*KVStore)(nil)

// NewKVStore allocates a new KVStore backed by a DB instance. The
// returned instance has its own unique reject cache and channel cache.
func NewKVStore(db kvdb.Backend, options ...KVStoreOptionModifier) (*KVStore,
func NewKVStore(db kvdb.Backend, options ...StoreOptionModifier) (*KVStore,
error) {

opts := DefaultOptions()
28 changes: 14 additions & 14 deletions graph/db/options.go
Original file line number Diff line number Diff line change
@@ -61,8 +61,8 @@ func WithPreAllocCacheNumNodes(n int) ChanGraphOption {
}
}

// KVStoreOptions holds parameters for tuning and customizing a graph.DB.
type KVStoreOptions struct {
// StoreOptions holds parameters for tuning and customizing a graph DB.
type StoreOptions struct {
// RejectCacheSize is the maximum number of rejectCacheEntries to hold
// in the rejection cache.
RejectCacheSize int
@@ -81,37 +81,37 @@ type KVStoreOptions struct {
NoMigration bool
}

// DefaultOptions returns a KVStoreOptions populated with default values.
func DefaultOptions() *KVStoreOptions {
return &KVStoreOptions{
// DefaultOptions returns a StoreOptions populated with default values.
func DefaultOptions() *StoreOptions {
return &StoreOptions{
RejectCacheSize: DefaultRejectCacheSize,
ChannelCacheSize: DefaultChannelCacheSize,
NoMigration: false,
}
}

// KVStoreOptionModifier is a function signature for modifying the default
// KVStoreOptions.
type KVStoreOptionModifier func(*KVStoreOptions)
// StoreOptionModifier is a function signature for modifying the default
// StoreOptions.
type StoreOptionModifier func(*StoreOptions)

// WithRejectCacheSize sets the RejectCacheSize to n.
func WithRejectCacheSize(n int) KVStoreOptionModifier {
return func(o *KVStoreOptions) {
func WithRejectCacheSize(n int) StoreOptionModifier {
return func(o *StoreOptions) {
o.RejectCacheSize = n
}
}

// WithChannelCacheSize sets the ChannelCacheSize to n.
func WithChannelCacheSize(n int) KVStoreOptionModifier {
return func(o *KVStoreOptions) {
func WithChannelCacheSize(n int) StoreOptionModifier {
return func(o *StoreOptions) {
o.ChannelCacheSize = n
}
}

// WithBatchCommitInterval sets the batch commit interval for the interval batch
// schedulers.
func WithBatchCommitInterval(interval time.Duration) KVStoreOptionModifier {
return func(o *KVStoreOptions) {
func WithBatchCommitInterval(interval time.Duration) StoreOptionModifier {
return func(o *StoreOptions) {
o.BatchCommitInterval = interval
}
}
45 changes: 41 additions & 4 deletions graph/db/sql_store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package graphdb

import (
"fmt"
"sync"

"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/sqldb"
)

@@ -26,6 +30,16 @@ type BatchedSQLQueries interface {
type SQLStore struct {
db BatchedSQLQueries

// cacheMu guards all caches (rejectCache and chanCache). If
// this mutex will be acquired at the same time as the DB mutex then
// the cacheMu MUST be acquired first to prevent deadlock.
cacheMu sync.RWMutex
rejectCache *rejectCache
chanCache *channelCache

chanScheduler batch.Scheduler[SQLQueries]
nodeScheduler batch.Scheduler[SQLQueries]

// Temporary fall-back to the KVStore so that we can implement the
// interface incrementally.
*KVStore
@@ -37,9 +51,32 @@ var _ V1Store = (*SQLStore)(nil)

// NewSQLStore creates a new SQLStore instance given an open BatchedSQLQueries
// storage backend.
func NewSQLStore(db BatchedSQLQueries, kvStore *KVStore) *SQLStore {
return &SQLStore{
db: db,
KVStore: kvStore,
func NewSQLStore(db BatchedSQLQueries, kvStore *KVStore,
options ...StoreOptionModifier) (*SQLStore, error) {

opts := DefaultOptions()
for _, o := range options {
o(opts)
}

if opts.NoMigration {
return nil, fmt.Errorf("the NoMigration option is not yet " +
"supported for SQL stores")
}

s := &SQLStore{
db: db,
KVStore: kvStore,
rejectCache: newRejectCache(opts.RejectCacheSize),
chanCache: newChannelCache(opts.ChannelCacheSize),
}

s.chanScheduler = batch.NewTimeScheduler(
db, &s.cacheMu, opts.BatchCommitInterval,
)
s.nodeScheduler = batch.NewTimeScheduler(
db, nil, opts.BatchCommitInterval,
)

return s, nil
}
2 changes: 1 addition & 1 deletion graph/db/test_kvdb.go
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ import (
)

// NewTestDB is a helper function that creates an BBolt database for testing.
func NewTestDB(t testing.TB) *KVStore {
func NewTestDB(t testing.TB) V1Store {
backend, backendCleanup, err := kvdb.GetTestBackend(t.TempDir(), "cgr")
require.NoError(t, err)

7 changes: 5 additions & 2 deletions graph/db/test_postgres.go
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ import (
// NewTestDB is a helper function that creates a SQLStore backed by a postgres
// database for testing. At the moment, it embeds a KVStore but once the
// SQLStore fully implements the V1Store interface, the KVStore will be removed.
func NewTestDB(t testing.TB) *SQLStore {
func NewTestDB(t testing.TB) V1Store {
backend, backendCleanup, err := kvdb.GetTestBackend(t.TempDir(), "cgr")
require.NoError(t, err)

@@ -38,5 +38,8 @@ func NewTestDB(t testing.TB) *SQLStore {
},
)

return NewSQLStore(executor, graphStore)
store, err := NewSQLStore(executor, graphStore)
require.NoError(t, err)

return store
}
7 changes: 5 additions & 2 deletions graph/db/test_sqlite.go
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ import (
// NewTestDB is a helper function that creates a SQLStore backed by a sqlite
// database for testing. At the moment, it embeds a KVStore but once the
// SQLStore fully implements the V1Store interface, the KVStore will be removed.
func NewTestDB(t testing.TB) *SQLStore {
func NewTestDB(t testing.TB) V1Store {
backend, backendCleanup, err := kvdb.GetTestBackend(t.TempDir(), "cgr")
require.NoError(t, err)

@@ -31,5 +31,8 @@ func NewTestDB(t testing.TB) *SQLStore {
},
)

return NewSQLStore(executor, graphStore)
store, err := NewSQLStore(executor, graphStore)
require.NoError(t, err)

return store
}