diff --git a/autopilot/prefattach_test.go b/autopilot/prefattach_test.go index 524116aa4c..735dcea537 100644 --- a/autopilot/prefattach_test.go +++ b/autopilot/prefattach_test.go @@ -2,6 +2,7 @@ package autopilot import ( "bytes" + "fmt" prand "math/rand" "testing" "time" @@ -33,6 +34,9 @@ func newDiskChanGraph(t *testing.T) (testGraph, error) { require.NoError(t, cdb.Close()) }) + // Wait for graph cache to be up and running. + _ = waitForGraphCache(cdb.ChannelGraph(), 5*time.Second) + return &databaseChannelGraph{ db: cdb.ChannelGraph(), }, nil @@ -60,6 +64,28 @@ var chanGraphs = []struct { }, } +func waitForGraphCache(g *channeldb.ChannelGraph, timeout time.Duration) error { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + timeoutChan := time.After(timeout) + for { + select { + case <-timeoutChan: + return fmt.Errorf("timed out waiting for graphCache " + + "to be ready") + case <-ticker.C: + graphCache, err := g.GetGraphCache() + if err != nil { + return fmt.Errorf("error getting graphCache: "+ + "%v", err) + } else if graphCache != nil { + return nil + } + } + } +} + // TestPrefAttachmentSelectEmptyGraph ensures that when passed an // empty graph, the NodeSores function always returns a score of 0. func TestPrefAttachmentSelectEmptyGraph(t *testing.T) { diff --git a/channeldb/error.go b/channeldb/error.go index 859af97464..ff33d224f4 100644 --- a/channeldb/error.go +++ b/channeldb/error.go @@ -43,6 +43,8 @@ var ( // created. ErrMetaNotFound = fmt.Errorf("unable to locate meta information") + ErrGraphCacheNotReady = fmt.Errorf("graph cache not ready") + // ErrGraphNotFound is returned when at least one of the components of // graph doesn't exist. ErrGraphNotFound = fmt.Errorf("graph bucket not initialized") diff --git a/channeldb/graph.go b/channeldb/graph.go index 4146721660..bb071d4577 100644 --- a/channeldb/graph.go +++ b/channeldb/graph.go @@ -12,6 +12,7 @@ import ( "net" "sort" "sync" + "sync/atomic" "time" "github.com/btcsuite/btcd/btcec/v2" @@ -184,8 +185,13 @@ type ChannelGraph struct { chanCache *channelCache graphCache *GraphCache + graphCacheReady atomic.Bool + graphCacheErr error + chanScheduler batch.Scheduler nodeScheduler batch.Scheduler + + writeOps chan func() error } // NewChannelGraph allocates a new ChannelGraph backed by a DB instance. The @@ -211,48 +217,101 @@ func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int, g.nodeScheduler = batch.NewTimeScheduler( db, nil, batchCommitInterval, ) + g.writeOps = make(chan func() error, 100) // The graph cache can be turned off (e.g. for mobile users) for a // speed/memory usage tradeoff. if useGraphCache { g.graphCache = NewGraphCache(preAllocCacheNumNodes) - startTime := time.Now() - log.Debugf("Populating in-memory channel graph, this might " + - "take a while...") + // Start populating the cache asynchronously + go g.populateGraphCache() + } + g.startWorker() - err := g.ForEachNodeCacheable( - func(tx kvdb.RTx, node GraphCacheNode) error { - g.graphCache.AddNodeFeatures(node) + return g, nil +} - return nil - }, - ) - if err != nil { - return nil, err +// channelMapKey is the key structure used for storing channel edge policies. +type channelMapKey struct { + nodeKey route.Vertex + chanID [8]byte +} + +func (c *ChannelGraph) startWorker() { + go func() { + for op := range c.writeOps { + if err := op(); err != nil { + log.Warnf("Error executing operation: %v", err) + } } + }() +} + +func (c *ChannelGraph) enqueueWriteOperation(op func() error) { + // Send the operation to the channel. + // This is non-blocking as long as the buffer isn't full. + select { + case c.writeOps <- op: + // Operation enqueued successfully + default: + log.Warn("writeOps queue is full, operation not enqueued") + } +} - err = g.ForEachChannel(func(info *models.ChannelEdgeInfo, - policy1, policy2 *models.ChannelEdgePolicy) error { +func (c *ChannelGraph) populateGraphCache() { + startTime := time.Now() + log.Debugf("Populating in-memory channel graph, this might " + + "take a while...") - g.graphCache.AddChannel(info, policy1, policy2) + err := c.ForEachNodeCacheable( + func(tx kvdb.RTx, node GraphCacheNode) error { + c.graphCache.AddNodeFeatures(node) return nil - }) - if err != nil { - return nil, err - } + }, + ) + if err != nil { + c.graphCacheErr = err + return + } + + err = c.ForEachChannel(func(info *models.ChannelEdgeInfo, + policy1, policy2 *models.ChannelEdgePolicy) error { + + c.graphCache.AddChannel(info, policy1, policy2) + + return nil + }) + if err != nil { + c.graphCacheErr = err + return + } + if c.graphCache != nil { log.Debugf("Finished populating in-memory channel graph (took "+ - "%v, %s)", time.Since(startTime), g.graphCache.Stats()) + "%v, %s)", time.Since(startTime), c.graphCache.Stats()) } - return g, nil + c.graphCacheReady.Store(true) } -// channelMapKey is the key structure used for storing channel edge policies. -type channelMapKey struct { - nodeKey route.Vertex - chanID [8]byte +func (c *ChannelGraph) GetGraphCache() (*GraphCache, error) { + if c.graphCache == nil { + return nil, nil + } + + // Check if graph cache is ready without waiting + if !c.graphCacheReady.Load() { + // Return an error to show that cache is not ready. + return nil, ErrGraphCacheNotReady + } + + if c.graphCacheErr != nil { + return nil, fmt.Errorf("graph cache population failed: %w", + c.graphCacheErr) + } + + return c.graphCache, nil } // getChannelMap loads all channel edge policies from the database and stores @@ -467,8 +526,9 @@ func (c *ChannelGraph) ForEachChannel(cb func(*models.ChannelEdgeInfo, func (c *ChannelGraph) ForEachNodeDirectedChannel(tx kvdb.RTx, node route.Vertex, cb func(channel *DirectedChannel) error) error { - if c.graphCache != nil { - return c.graphCache.ForEachChannel(node, cb) + graphCache, err := c.GetGraphCache() + if err == nil && graphCache != nil { + return graphCache.ForEachChannel(node, cb) } // Fallback that uses the database. @@ -524,8 +584,9 @@ func (c *ChannelGraph) ForEachNodeDirectedChannel(tx kvdb.RTx, func (c *ChannelGraph) FetchNodeFeatures( node route.Vertex) (*lnwire.FeatureVector, error) { - if c.graphCache != nil { - return c.graphCache.GetFeatures(node), nil + graphCache, err := c.GetGraphCache() + if err == nil && graphCache != nil { + return graphCache.GetFeatures(node), nil } // Fallback that uses the database. @@ -554,8 +615,9 @@ func (c *ChannelGraph) FetchNodeFeatures( func (c *ChannelGraph) ForEachNodeCached(cb func(node route.Vertex, chans map[uint64]*DirectedChannel) error) error { - if c.graphCache != nil { - return c.graphCache.ForEachNode(cb) + graphCache, err := c.GetGraphCache() + if err == nil && graphCache != nil { + return graphCache.ForEachNode(cb) } // Otherwise call back to a version that uses the database directly. @@ -833,12 +895,30 @@ func (c *ChannelGraph) AddLightningNode(node *LightningNode, r := &batch.Request{ Update: func(tx kvdb.RwTx) error { - if c.graphCache != nil { + graphCache, err := c.GetGraphCache() + if err != nil { + if errors.Is(err, ErrGraphCacheNotReady) { + // Queue this update function + c.enqueueWriteOperation(func() error { + return c.AddLightningNode( + node, + op..., + ) + }) + + return nil + } + + return err + } + + if graphCache != nil { cNode := newGraphCacheNode( node.PubKeyBytes, node.Features, ) - err := c.graphCache.AddNode(tx, cNode) + err := graphCache.AddNode(tx, cNode) if err != nil { + return err } } @@ -921,8 +1001,22 @@ func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error { return ErrGraphNodeNotFound } - if c.graphCache != nil { - c.graphCache.RemoveNode(nodePub) + graphCache, err := c.GetGraphCache() + if err != nil { + if errors.Is(err, ErrGraphCacheNotReady) { + // Queue this delete function + c.enqueueWriteOperation(func() error { + return c.DeleteLightningNode(nodePub) + }) + + return nil + } + + return err + } + + if graphCache != nil { + graphCache.RemoveNode(nodePub) } return c.deleteLightningNode(nodes, nodePub[:]) @@ -1052,8 +1146,22 @@ func (c *ChannelGraph) addChannelEdge(tx kvdb.RwTx, return ErrEdgeAlreadyExist } - if c.graphCache != nil { - c.graphCache.AddChannel(edge, nil, nil) + graphCache, err := c.GetGraphCache() + if err != nil { + if errors.Is(err, ErrGraphCacheNotReady) { + // Queue this function + c.enqueueWriteOperation(func() error { + return c.addChannelEdge(tx, edge) + }) + + return nil + } + + return err + } + + if graphCache != nil { + graphCache.AddChannel(edge, nil, nil) } // Before we insert the channel into the database, we'll ensure that @@ -1255,8 +1363,22 @@ func (c *ChannelGraph) UpdateChannelEdge(edge *models.ChannelEdgeInfo) error { return ErrEdgeNotFound } - if c.graphCache != nil { - c.graphCache.UpdateChannel(edge) + graphCache, err := c.GetGraphCache() + if err != nil { + if errors.Is(err, ErrGraphCacheNotReady) { + // Queue this update function + c.enqueueWriteOperation(func() error { + return c.UpdateChannelEdge(edge) + }) + + return nil + } + + return err + } + + if graphCache != nil { + graphCache.UpdateChannel(edge) } return putChanEdgeInfo(edgeIndex, edge, chanKey) @@ -1490,6 +1612,20 @@ func (c *ChannelGraph) pruneGraphNodes(nodes kvdb.RwBucket, return err } + graphCache, err := c.GetGraphCache() + if err != nil { + if errors.Is(err, ErrGraphCacheNotReady) { + // Queue this prune operation + c.enqueueWriteOperation(func() error { + return c.pruneGraphNodes(nodes, edgeIndex) + }) + + return nil + } + + return err + } + // Finally, we'll make a second pass over the set of nodes, and delete // any nodes that have a ref count of zero. var numNodesPruned int @@ -1501,8 +1637,8 @@ func (c *ChannelGraph) pruneGraphNodes(nodes kvdb.RwBucket, continue } - if c.graphCache != nil { - c.graphCache.RemoveNode(nodePubKey) + if graphCache != nil { + graphCache.RemoveNode(nodePubKey) } // If we reach this point, then there are no longer any edges @@ -2532,8 +2668,30 @@ func (c *ChannelGraph) delChannelEdgeUnsafe(edges, edgeIndex, chanIndex, return err } - if c.graphCache != nil { - c.graphCache.RemoveChannel( + graphCache, err := c.GetGraphCache() + if err != nil { + if errors.Is(err, ErrGraphCacheNotReady) { + // Queue this delete function + c.enqueueWriteOperation(func() error { + return c.delChannelEdgeUnsafe( + edges, + edgeIndex, + chanIndex, + zombieIndex, + chanID, + isZombie, + strictZombie, + ) + }) + + return nil + } + + return err + } + + if graphCache != nil { + graphCache.RemoveChannel( edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes, edgeInfo.ChannelID, ) @@ -2671,7 +2829,7 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, Update: func(tx kvdb.RwTx) error { var err error isUpdate1, err = updateEdgePolicy( - tx, edge, c.graphCache, + tx, edge, c, ) // Silence ErrEdgeNotFound so that the batch can @@ -2738,7 +2896,7 @@ func (c *ChannelGraph) updateEdgeCache(e *models.ChannelEdgePolicy, // true if the updated policy belongs to node1, and false if the policy belonged // to node2. func updateEdgePolicy(tx kvdb.RwTx, edge *models.ChannelEdgePolicy, - graphCache *GraphCache) (bool, error) { + c *ChannelGraph) (bool, error) { edges := tx.ReadWriteBucket(edgeBucket) if edges == nil { @@ -2789,7 +2947,27 @@ func updateEdgePolicy(tx kvdb.RwTx, edge *models.ChannelEdgePolicy, copy(fromNodePubKey[:], fromNode) copy(toNodePubKey[:], toNode) + graphCache, err := c.GetGraphCache() + if err != nil { + if errors.Is(err, ErrGraphCacheNotReady) { + // Queue this update function + c.enqueueWriteOperation(func() error { + _, err := updateEdgePolicy(tx, edge, c) + return err + }) + + return false, nil + } + + return false, err + } + if graphCache != nil { + var fromNodePubKey route.Vertex + var toNodePubKey route.Vertex + copy(fromNodePubKey[:], fromNode) + copy(toNodePubKey[:], toNode) + graphCache.UpdatePolicy( edge, fromNodePubKey, toNodePubKey, isUpdate1, ) @@ -3664,8 +3842,26 @@ func (c *ChannelGraph) MarkEdgeZombie(chanID uint64, "bucket: %w", err) } - if c.graphCache != nil { - c.graphCache.RemoveChannel(pubKey1, pubKey2, chanID) + graphCache, err := c.GetGraphCache() + if err != nil { + if errors.Is(err, ErrGraphCacheNotReady) { + // Queue this function + c.enqueueWriteOperation(func() error { + return c.MarkEdgeZombie( + chanID, + pubKey1, + pubKey2, + ) + }) + + return nil + } + + return err + } + + if graphCache != nil { + graphCache.RemoveChannel(pubKey1, pubKey2, chanID) } return markEdgeZombie(zombieIndex, chanID, pubKey1, pubKey2) @@ -3748,14 +3944,28 @@ func (c *ChannelGraph) markEdgeLiveUnsafe(tx kvdb.RwTx, chanID uint64) error { // We need to add the channel back into our graph cache, otherwise we // won't use it for path finding. - if c.graphCache != nil { + graphCache, err := c.GetGraphCache() + if err != nil { + if errors.Is(err, ErrGraphCacheNotReady) { + // Queue the operation to add the channel back. + c.enqueueWriteOperation(func() error { + return c.markEdgeLiveUnsafe(tx, chanID) + }) + + return nil + } + return err + } + + if graphCache != nil { + // Fetch the channel info to add back into the graph cache. edgeInfos, err := c.fetchChanInfos(tx, []uint64{chanID}) if err != nil { return err } for _, edgeInfo := range edgeInfos { - c.graphCache.AddChannel( + graphCache.AddChannel( edgeInfo.Info, edgeInfo.Policy1, edgeInfo.Policy2, ) diff --git a/channeldb/graph_test.go b/channeldb/graph_test.go index 2568512385..a6e85aee77 100644 --- a/channeldb/graph_test.go +++ b/channeldb/graph_test.go @@ -82,6 +82,9 @@ func MakeTestGraph(t testing.TB, modifiers ...OptionModifier) (*ChannelGraph, er backendCleanup() }) + // Wait for graph cache to be up and running. + _ = waitForGraphCache(graph, 5*time.Second) + return graph, nil } @@ -3986,6 +3989,28 @@ func TestGraphCacheForEachNodeChannel(t *testing.T) { require.Nil(t, getSingleChannel()) } +func waitForGraphCache(graph *ChannelGraph, timeout time.Duration) error { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + timeoutChan := time.After(timeout) + for { + select { + case <-timeoutChan: + return fmt.Errorf("timed out waiting for graphCache " + + "to be ready") + case <-ticker.C: + graphCache, err := graph.GetGraphCache() + if err != nil { + return fmt.Errorf("error getting graphCache: "+ + "%v", err) + } else if graphCache != nil { + return nil + } + } + } +} + // TestGraphLoading asserts that the cache is properly reconstructed after a // restart. func TestGraphLoading(t *testing.T) { @@ -4021,6 +4046,10 @@ func TestGraphLoading(t *testing.T) { ) require.NoError(t, err) + _ = waitForGraphCache(graphReloaded, 5*time.Second) + _, err = graphReloaded.GetGraphCache() + require.NoError(t, err) + // Assert that the cache content is identical. require.Equal( t, graph.graphCache.nodeChannels, diff --git a/graph/notifications_test.go b/graph/notifications_test.go index 09ebf1211b..44e17b705a 100644 --- a/graph/notifications_test.go +++ b/graph/notifications_test.go @@ -1086,6 +1086,28 @@ func (c *testCtx) RestartBuilder(t *testing.T) { c.builder = builder } +func waitForGraphCache(g *channeldb.ChannelGraph, timeout time.Duration) error { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + timeoutChan := time.After(timeout) + for { + select { + case <-timeoutChan: + return fmt.Errorf("timed out waiting for graphCache " + + "to be ready") + case <-ticker.C: + graphCache, err := g.GetGraphCache() + if err != nil { + return fmt.Errorf("error getting graphCache: "+ + "%v", err) + } else if graphCache != nil { + return nil + } + } + } +} + // makeTestGraph creates a new instance of a channeldb.ChannelGraph for testing // purposes. func makeTestGraph(t *testing.T, useCache bool) (*channeldb.ChannelGraph, @@ -1109,6 +1131,9 @@ func makeTestGraph(t *testing.T, useCache bool) (*channeldb.ChannelGraph, return nil, nil, err } + // Wait for graph cache to be up and running. + _ = waitForGraphCache(graph, 5*time.Second) + return graph, backend, nil } diff --git a/routing/pathfind_test.go b/routing/pathfind_test.go index 9c69cba5c9..584fc7a9f3 100644 --- a/routing/pathfind_test.go +++ b/routing/pathfind_test.go @@ -175,9 +175,34 @@ func makeTestGraph(t *testing.T, useCache bool) (*channeldb.ChannelGraph, return nil, nil, err } + // Wait for graph cache to be up and running. + _ = waitForGraphCache(graph, 5*time.Second) + return graph, backend, nil } +func waitForGraphCache(g *channeldb.ChannelGraph, timeout time.Duration) error { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + timeoutChan := time.After(timeout) + for { + select { + case <-timeoutChan: + return fmt.Errorf("timed out waiting for graphCache " + + "to be ready") + case <-ticker.C: + graphCache, err := g.GetGraphCache() + if err != nil { + return fmt.Errorf("error getting graphCache: "+ + "%v", err) + } else if graphCache != nil { + return nil + } + } + } +} + // parseTestGraph returns a fully populated ChannelGraph given a path to a JSON // file which encodes a test graph. func parseTestGraph(t *testing.T, useCache bool, path string) (