Skip to content

Commit 720f0a0

Browse files
committed
channeldb: setup simple pending queue
1 parent ecb8d1b commit 720f0a0

File tree

2 files changed

+155
-22
lines changed

2 files changed

+155
-22
lines changed

channeldb/error.go

+2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ var (
4343
// created.
4444
ErrMetaNotFound = fmt.Errorf("unable to locate meta information")
4545

46+
ErrGraphCacheNotReady = fmt.Errorf("graph cache not ready")
47+
4648
// ErrGraphNotFound is returned when at least one of the components of
4749
// graph doesn't exist.
4850
ErrGraphNotFound = fmt.Errorf("graph bucket not initialized")

channeldb/graph.go

+153-22
Original file line numberDiff line numberDiff line change
@@ -185,12 +185,13 @@ type ChannelGraph struct {
185185
chanCache *channelCache
186186
graphCache *GraphCache
187187

188-
graphCacheOnce sync.Once
189188
graphCacheReady atomic.Bool
190189
graphCacheErr error
191190

192191
chanScheduler batch.Scheduler
193192
nodeScheduler batch.Scheduler
193+
194+
writeOps chan func() error
194195
}
195196

196197
// NewChannelGraph allocates a new ChannelGraph backed by a DB instance. The
@@ -216,6 +217,7 @@ func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int,
216217
g.nodeScheduler = batch.NewTimeScheduler(
217218
db, nil, batchCommitInterval,
218219
)
220+
g.writeOps = make(chan func() error, 100)
219221

220222
// The graph cache can be turned off (e.g. for mobile users) for a
221223
// speed/memory usage tradeoff.
@@ -224,6 +226,7 @@ func NewChannelGraph(db kvdb.Backend, rejectCacheSize, chanCacheSize int,
224226
// Start populating the cache asynchronously
225227
go g.populateGraphCache()
226228
}
229+
g.startWorker()
227230

228231
return g, nil
229232
}
@@ -234,6 +237,29 @@ type channelMapKey struct {
234237
chanID [8]byte
235238
}
236239

240+
func (c *ChannelGraph) startWorker() {
241+
go func() {
242+
for op := range c.writeOps {
243+
if err := op(); err != nil {
244+
log.Warnf("Error executing operation: %v", err)
245+
}
246+
}
247+
}()
248+
}
249+
250+
func (c *ChannelGraph) enqueueWriteOperation(op func() error) {
251+
// Send the operation to the channel; non-blocking if the buffer isn't full
252+
select {
253+
case c.writeOps <- op:
254+
// Operation enqueued successfully
255+
default:
256+
// Handle the case where the channel is full
257+
// Could log an error, block until space is available, or drop the operation
258+
// For example, to block until space is available, remove the select and default case
259+
log.Warn("writeOps queue is full, operation not enqueued")
260+
}
261+
}
262+
237263
func (g *ChannelGraph) populateGraphCache() {
238264
startTime := time.Now()
239265
log.Debugf("Populating in-memory channel graph, this might " +
@@ -274,16 +300,11 @@ func (c *ChannelGraph) getGraphCache() (*GraphCache, error) {
274300
return nil, nil
275301
}
276302

277-
// Wait for up to 5 seconds for the cache to be ready
278-
timeout := time.After(5 * time.Second)
279-
for !c.graphCacheReady.Load() {
280-
select {
281-
case <-timeout:
282-
return nil, fmt.Errorf("timeout waiting for graph " +
283-
"cache to be ready")
284-
case <-time.After(100 * time.Millisecond):
285-
// Check again
286-
}
303+
// Check if graph cache is ready without waiting
304+
if !c.graphCacheReady.Load() {
305+
// Return an error or a special indicator that cache is not ready
306+
// Caller should handle this case appropriately, maybe by queuing write operations
307+
return nil, ErrGraphCacheNotReady
287308
}
288309

289310
if c.graphCacheErr != nil {
@@ -876,7 +897,21 @@ func (c *ChannelGraph) AddLightningNode(node *LightningNode,
876897
r := &batch.Request{
877898
Update: func(tx kvdb.RwTx) error {
878899
graphCache, err := c.getGraphCache()
879-
if err == nil && graphCache != nil {
900+
if err != nil {
901+
if err == ErrGraphCacheNotReady {
902+
// Queue this update function
903+
c.enqueueWriteOperation(func() error {
904+
return c.AddLightningNode(
905+
node,
906+
op...,
907+
)
908+
})
909+
return nil
910+
}
911+
return err
912+
}
913+
914+
if graphCache != nil {
880915
cNode := newGraphCacheNode(
881916
node.PubKeyBytes, node.Features,
882917
)
@@ -965,7 +1000,18 @@ func (c *ChannelGraph) DeleteLightningNode(nodePub route.Vertex) error {
9651000
}
9661001

9671002
graphCache, err := c.getGraphCache()
968-
if err == nil && graphCache != nil {
1003+
if err != nil {
1004+
if err == ErrGraphCacheNotReady {
1005+
// Queue this delete function
1006+
c.enqueueWriteOperation(func() error {
1007+
return c.DeleteLightningNode(nodePub)
1008+
})
1009+
return nil
1010+
}
1011+
return err
1012+
}
1013+
1014+
if graphCache != nil {
9691015
graphCache.RemoveNode(nodePub)
9701016
}
9711017

@@ -1097,7 +1143,18 @@ func (c *ChannelGraph) addChannelEdge(tx kvdb.RwTx,
10971143
}
10981144

10991145
graphCache, err := c.getGraphCache()
1100-
if err == nil && graphCache != nil {
1146+
if err != nil {
1147+
if err == ErrGraphCacheNotReady {
1148+
// Queue this function
1149+
c.enqueueWriteOperation(func() error {
1150+
return c.addChannelEdge(tx, edge)
1151+
})
1152+
return nil
1153+
}
1154+
return err
1155+
}
1156+
1157+
if graphCache != nil {
11011158
graphCache.AddChannel(edge, nil, nil)
11021159
}
11031160

@@ -1301,7 +1358,18 @@ func (c *ChannelGraph) UpdateChannelEdge(edge *models.ChannelEdgeInfo) error {
13011358
}
13021359

13031360
graphCache, err := c.getGraphCache()
1304-
if err == nil && graphCache != nil {
1361+
if err != nil {
1362+
if err == ErrGraphCacheNotReady {
1363+
// Queue this update function
1364+
c.enqueueWriteOperation(func() error {
1365+
return c.UpdateChannelEdge(edge)
1366+
})
1367+
return nil
1368+
}
1369+
return err
1370+
}
1371+
1372+
if graphCache != nil {
13051373
graphCache.UpdateChannel(edge)
13061374
}
13071375

@@ -1536,6 +1604,18 @@ func (c *ChannelGraph) pruneGraphNodes(nodes kvdb.RwBucket,
15361604
return err
15371605
}
15381606

1607+
graphCache, err := c.getGraphCache()
1608+
if err != nil {
1609+
if err == ErrGraphCacheNotReady {
1610+
// Queue this prune operation
1611+
c.enqueueWriteOperation(func() error {
1612+
return c.pruneGraphNodes(nodes, edgeIndex)
1613+
})
1614+
return nil
1615+
}
1616+
return err
1617+
}
1618+
15391619
// Finally, we'll make a second pass over the set of nodes, and delete
15401620
// any nodes that have a ref count of zero.
15411621
var numNodesPruned int
@@ -1547,8 +1627,7 @@ func (c *ChannelGraph) pruneGraphNodes(nodes kvdb.RwBucket,
15471627
continue
15481628
}
15491629

1550-
graphCache, err := c.getGraphCache()
1551-
if err == nil && graphCache != nil {
1630+
if graphCache != nil {
15521631
graphCache.RemoveNode(nodePubKey)
15531632
}
15541633

@@ -2580,7 +2659,18 @@ func (c *ChannelGraph) delChannelEdgeUnsafe(edges, edgeIndex, chanIndex,
25802659
}
25812660

25822661
graphCache, err := c.getGraphCache()
2583-
if err == nil && graphCache != nil {
2662+
if err != nil {
2663+
if err == ErrGraphCacheNotReady {
2664+
// Queue this delete function
2665+
c.enqueueWriteOperation(func() error {
2666+
return c.delChannelEdgeUnsafe(edges, edgeIndex, chanIndex, zombieIndex, chanID, isZombie, strictZombie)
2667+
})
2668+
return nil
2669+
}
2670+
return err
2671+
}
2672+
2673+
if graphCache != nil {
25842674
graphCache.RemoveChannel(
25852675
edgeInfo.NodeKey1Bytes, edgeInfo.NodeKey2Bytes,
25862676
edgeInfo.ChannelID,
@@ -2719,7 +2809,7 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
27192809
Update: func(tx kvdb.RwTx) error {
27202810
var err error
27212811
isUpdate1, err = updateEdgePolicy(
2722-
tx, edge, c.graphCache,
2812+
tx, edge, c,
27232813
)
27242814

27252815
// Silence ErrEdgeNotFound so that the batch can
@@ -2786,7 +2876,7 @@ func (c *ChannelGraph) updateEdgeCache(e *models.ChannelEdgePolicy,
27862876
// true if the updated policy belongs to node1, and false if the policy belonged
27872877
// to node2.
27882878
func updateEdgePolicy(tx kvdb.RwTx, edge *models.ChannelEdgePolicy,
2789-
graphCache *GraphCache) (bool, error) {
2879+
c *ChannelGraph) (bool, error) {
27902880

27912881
edges := tx.ReadWriteBucket(edgeBucket)
27922882
if edges == nil {
@@ -2837,7 +2927,25 @@ func updateEdgePolicy(tx kvdb.RwTx, edge *models.ChannelEdgePolicy,
28372927
copy(fromNodePubKey[:], fromNode)
28382928
copy(toNodePubKey[:], toNode)
28392929

2930+
graphCache, err := c.getGraphCache()
2931+
if err != nil {
2932+
if err == ErrGraphCacheNotReady {
2933+
// Queue this update function
2934+
c.enqueueWriteOperation(func() error {
2935+
_, err := updateEdgePolicy(tx, edge, c)
2936+
return err
2937+
})
2938+
return false, nil
2939+
}
2940+
return false, err
2941+
}
2942+
28402943
if graphCache != nil {
2944+
var fromNodePubKey route.Vertex
2945+
var toNodePubKey route.Vertex
2946+
copy(fromNodePubKey[:], fromNode)
2947+
copy(toNodePubKey[:], toNode)
2948+
28412949
graphCache.UpdatePolicy(
28422950
edge, fromNodePubKey, toNodePubKey, isUpdate1,
28432951
)
@@ -3713,7 +3821,18 @@ func (c *ChannelGraph) MarkEdgeZombie(chanID uint64,
37133821
}
37143822

37153823
graphCache, err := c.getGraphCache()
3716-
if err == nil && graphCache != nil {
3824+
if err != nil {
3825+
if err == ErrGraphCacheNotReady {
3826+
// Queue this function
3827+
c.enqueueWriteOperation(func() error {
3828+
return c.MarkEdgeZombie(chanID, pubKey1, pubKey2)
3829+
})
3830+
return nil
3831+
}
3832+
return err
3833+
}
3834+
3835+
if graphCache != nil {
37173836
graphCache.RemoveChannel(pubKey1, pubKey2, chanID)
37183837
}
37193838

@@ -3798,7 +3917,19 @@ func (c *ChannelGraph) markEdgeLiveUnsafe(tx kvdb.RwTx, chanID uint64) error {
37983917
// We need to add the channel back into our graph cache, otherwise we
37993918
// won't use it for path finding.
38003919
graphCache, err := c.getGraphCache()
3801-
if err == nil && graphCache != nil {
3920+
if err != nil {
3921+
if err == ErrGraphCacheNotReady {
3922+
// Queue the operation to add the channel back.
3923+
c.enqueueWriteOperation(func() error {
3924+
return c.markEdgeLiveUnsafe(tx, chanID)
3925+
})
3926+
return nil
3927+
}
3928+
return err
3929+
}
3930+
3931+
if graphCache != nil {
3932+
// Fetch the channel info to add back into the graph cache.
38023933
edgeInfos, err := c.fetchChanInfos(tx, []uint64{chanID})
38033934
if err != nil {
38043935
return err

0 commit comments

Comments
 (0)