Skip to content

Commit 9f4b350

Browse files
committed
wip: AddLightningNode cache updates
1 parent ecb8d1b commit 9f4b350

File tree

3 files changed

+65
-29
lines changed

3 files changed

+65
-29
lines changed

channeldb/error.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ var (
4343
// created.
4444
ErrMetaNotFound = fmt.Errorf("unable to locate meta information")
4545

46+
ErrGraphNotReady = fmt.Errorf("graph cache is not ready")
47+
4648
// ErrGraphNotFound is returned when at least one of the components of
4749
// graph doesn't exist.
4850
ErrGraphNotFound = fmt.Errorf("graph bucket not initialized")

channeldb/graph.go

Lines changed: 58 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ import (
2020
"github.com/btcsuite/btcd/chaincfg/chainhash"
2121
"github.com/btcsuite/btcd/txscript"
2222
"github.com/btcsuite/btcd/wire"
23+
"github.com/btcsuite/btcwallet/walletdb"
2324
"github.com/lightningnetwork/lnd/aliasmgr"
2425
"github.com/lightningnetwork/lnd/batch"
2526
"github.com/lightningnetwork/lnd/channeldb/models"
27+
"github.com/lightningnetwork/lnd/fn"
2628
"github.com/lightningnetwork/lnd/input"
2729
"github.com/lightningnetwork/lnd/kvdb"
2830
"github.com/lightningnetwork/lnd/lnwire"
@@ -185,9 +187,11 @@ type ChannelGraph struct {
185187
chanCache *channelCache
186188
graphCache *GraphCache
187189

188-
graphCacheOnce sync.Once
189190
graphCacheReady atomic.Bool
190191
graphCacheErr error
192+
updateQueue *fn.ConcurrentQueue[func(tx walletdb.ReadWriteTx) error]
193+
194+
cachePopulated sync.WaitGroup
191195

192196
chanScheduler batch.Scheduler
193197
nodeScheduler batch.Scheduler
@@ -216,11 +220,14 @@ func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int,
216220
g.nodeScheduler = batch.NewTimeScheduler(
217221
db, nil, batchCommitInterval,
218222
)
223+
g.updateQueue =
224+
fn.NewConcurrentQueue[func(tx walletdb.ReadWriteTx) error](100)
219225

220226
// The graph cache can be turned off (e.g. for mobile users) for a
221227
// speed/memory usage tradeoff.
222228
if useGraphCache {
223229
g.graphCache = NewGraphCache(preAllocCacheNumNodes)
230+
g.cachePopulated.Add(1)
224231
// Start populating the cache asynchronously
225232
go g.populateGraphCache()
226233
}
@@ -235,6 +242,8 @@ type channelMapKey struct {
235242
}
236243

237244
func (g *ChannelGraph) populateGraphCache() {
245+
defer g.cachePopulated.Done()
246+
238247
startTime := time.Now()
239248
log.Debugf("Populating in-memory channel graph, this might " +
240249
"take a while...")
@@ -263,27 +272,36 @@ func (g *ChannelGraph) populateGraphCache() {
263272
return
264273
}
265274

266-
log.Debugf("Finished populating in-memory channel graph (took "+
267-
"%v, %s)", time.Since(startTime), g.graphCache.Stats())
275+
if g.graphCache != nil {
276+
log.Debugf("Finished populating in-memory channel graph (took "+
277+
"%v, %s)", time.Since(startTime), g.graphCache.Stats())
278+
}
268279

280+
// Set graphCache to be ready and process pending updates.
269281
g.graphCacheReady.Store(true)
282+
g.updateQueue.Start()
283+
go func() {
284+
for update := range g.updateQueue.ChanOut() {
285+
err := g.nodeScheduler.Execute(&batch.Request{
286+
Update: func(tx kvdb.RwTx) error {
287+
return update(tx)
288+
},
289+
})
290+
if err != nil {
291+
fmt.Printf("Failed to execute cache update "+
292+
"function: %v\n", err)
293+
}
294+
}
295+
}()
270296
}
271297

272298
func (c *ChannelGraph) getGraphCache() (*GraphCache, error) {
273299
if c.graphCache == nil {
274300
return nil, nil
275301
}
276302

277-
// Wait for up to 5 seconds for the cache to be ready
278-
timeout := time.After(5 * time.Second)
279-
for !c.graphCacheReady.Load() {
280-
select {
281-
case <-timeout:
282-
return nil, fmt.Errorf("timeout waiting for graph " +
283-
"cache to be ready")
284-
case <-time.After(100 * time.Millisecond):
285-
// Check again
286-
}
303+
if !c.graphCacheReady.Load() {
304+
return nil, ErrGraphNotReady
287305
}
288306

289307
if c.graphCacheErr != nil {
@@ -506,6 +524,7 @@ func (c *ChannelGraph) ForEachChannel(cb func(*models.ChannelEdgeInfo,
506524
func (c *ChannelGraph) ForEachNodeDirectedChannel(tx kvdb.RTx,
507525
node route.Vertex, cb func(channel *DirectedChannel) error) error {
508526

527+
// If graphCache is fully populated, use it.
509528
graphCache, err := c.getGraphCache()
510529
if err == nil && graphCache != nil {
511530
return graphCache.ForEachChannel(node, cb)
@@ -564,6 +583,7 @@ func (c *ChannelGraph) ForEachNodeDirectedChannel(tx kvdb.RTx,
564583
func (c *ChannelGraph) FetchNodeFeatures(
565584
node route.Vertex) (*lnwire.FeatureVector, error) {
566585

586+
// If graphCache is fully populated, use it.
567587
graphCache, err := c.getGraphCache()
568588
if err == nil && graphCache != nil {
569589
return graphCache.GetFeatures(node), nil
@@ -875,17 +895,6 @@ func (c *ChannelGraph) AddLightningNode(node *LightningNode,
875895

876896
r := &batch.Request{
877897
Update: func(tx kvdb.RwTx) error {
878-
graphCache, err := c.getGraphCache()
879-
if err == nil && graphCache != nil {
880-
cNode := newGraphCacheNode(
881-
node.PubKeyBytes, node.Features,
882-
)
883-
err := graphCache.AddNode(tx, cNode)
884-
if err != nil {
885-
return err
886-
}
887-
}
888-
889898
return addLightningNode(tx, node)
890899
},
891900
}
@@ -894,6 +903,31 @@ func (c *ChannelGraph) AddLightningNode(node *LightningNode,
894903
f(r)
895904
}
896905

906+
updateCache := func(tx kvdb.RwTx) error {
907+
graphCache, err := c.getGraphCache()
908+
if err == nil && graphCache != nil {
909+
cNode := newGraphCacheNode(node.PubKeyBytes, node.Features)
910+
err := graphCache.AddNode(tx, cNode)
911+
if err != nil {
912+
return err
913+
}
914+
}
915+
return nil
916+
}
917+
918+
// If the graph cache is not ready, queue the cache update
919+
if !c.graphCacheReady.Load() {
920+
c.updateQueue.ChanIn() <- updateCache
921+
} else {
922+
// If the cache is ready, execute the cache update immediately
923+
err := c.nodeScheduler.Execute(&batch.Request{
924+
Update: updateCache,
925+
})
926+
if err != nil {
927+
return err
928+
}
929+
}
930+
897931
return c.nodeScheduler.Execute(r)
898932
}
899933

channeldb/graph_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4007,14 +4007,14 @@ func TestGraphLoading(t *testing.T) {
40074007
)
40084008
require.NoError(t, err)
40094009

4010-
_, err = graph.getGraphCache()
4011-
require.NoError(t, err)
4012-
40134010
// Populate the graph with test data.
40144011
const numNodes = 100
40154012
const numChannels = 4
40164013
_, _ = fillTestGraph(t, graph, numNodes, numChannels)
40174014

4015+
// Wait for graph cache to be populated.
4016+
graph.cachePopulated.Wait()
4017+
40184018
// Recreate the graph. This should cause the graph cache to be
40194019
// populated.
40204020
graphReloaded, err := NewChannelGraph(
@@ -4024,8 +4024,8 @@ func TestGraphLoading(t *testing.T) {
40244024
)
40254025
require.NoError(t, err)
40264026

4027-
_, err = graphReloaded.getGraphCache()
4028-
require.NoError(t, err)
4027+
// Wait for graph cache to be populated.
4028+
graphReloaded.cachePopulated.Wait()
40294029

40304030
// Assert that the cache content is identical.
40314031
require.Equal(

0 commit comments

Comments
 (0)