Skip to content

Commit 5b484b8

Browse files
committed
channeldb: Update ChanUpdatesInHorizon
So that a start block and end block can also be passed in.
1 parent 1325380 commit 5b484b8

File tree

4 files changed

+87
-38
lines changed

4 files changed

+87
-38
lines changed

channeldb/graph.go

Lines changed: 73 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2106,7 +2106,7 @@ type ChannelEdge struct {
21062106
// ChanUpdatesInHorizon returns all the known channel edges which have at least
21072107
// one edge that has an update timestamp within the specified horizon.
21082108
func (c *ChannelGraph) ChanUpdatesInHorizon(startTime,
2109-
endTime time.Time) ([]ChannelEdge, error) {
2109+
endTime time.Time, startBlock, endBlock uint32) ([]ChannelEdge, error) {
21102110

21112111
// To ensure we don't return duplicate ChannelEdges, we'll use an
21122112
// additional map to keep track of the edges already seen to prevent
@@ -2115,50 +2115,29 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime,
21152115
var edgesToCache map[uint64]ChannelEdge
21162116
var edgesInHorizon []ChannelEdge
21172117

2118-
c.cacheMu.Lock()
2119-
defer c.cacheMu.Unlock()
2120-
21212118
var hits int
2122-
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
2123-
edges := tx.ReadBucket(edgeBucket)
2124-
if edges == nil {
2125-
return ErrGraphNoEdgesFound
2126-
}
2127-
edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
2128-
if edgeIndex == nil {
2129-
return ErrGraphNoEdgesFound
2130-
}
2131-
edgeUpdateIndex := edges.NestedReadBucket(edgeUpdateIndexBucket)
2119+
fetchUpdates := func(tx kvdb.RTx, edges, edgeIndex, nodes kvdb.RBucket,
2120+
updateIndexBkt []byte, startBytes, endBytes []byte,
2121+
chanIDFromKey func([]byte) []byte) error {
2122+
2123+
edgeUpdateIndex := edges.NestedReadBucket(updateIndexBkt)
21322124
if edgeUpdateIndex == nil {
21332125
return ErrGraphNoEdgesFound
21342126
}
21352127

2136-
nodes := tx.ReadBucket(nodeBucket)
2137-
if nodes == nil {
2138-
return ErrGraphNodesNotFound
2139-
}
2140-
21412128
// We'll now obtain a cursor to perform a range query within
21422129
// the index to find all channels within the horizon.
21432130
updateCursor := edgeUpdateIndex.ReadCursor()
21442131

2145-
var startTimeBytes, endTimeBytes [8 + 8]byte
2146-
byteOrder.PutUint64(
2147-
startTimeBytes[:8], uint64(startTime.Unix()),
2148-
)
2149-
byteOrder.PutUint64(
2150-
endTimeBytes[:8], uint64(endTime.Unix()),
2151-
)
2152-
21532132
// With our start and end times constructed, we'll step through
21542133
// the index collecting the info and policy of each update of
21552134
// each channel that has a last update within the time range.
2156-
for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil &&
2157-
bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() {
2135+
for indexKey, _ := updateCursor.Seek(startBytes[:]); indexKey != nil &&
2136+
bytes.Compare(indexKey, endBytes[:]) <= 0; indexKey, _ = updateCursor.Next() {
21582137

21592138
// We have a new eligible entry, so we'll slice of the
21602139
// chan ID so we can query it in the DB.
2161-
chanID := indexKey[8:]
2140+
chanID := chanIDFromKey(indexKey)
21622141

21632142
// If we've already retrieved the info and policies for
21642143
// this edge, then we can skip it as we don't need to do
@@ -2196,16 +2175,15 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime,
21962175
err)
21972176
}
21982177

2199-
var (
2200-
node1Bytes = edgeInfo.Node1Bytes()
2201-
node2Bytes = edgeInfo.Node2Bytes()
2202-
)
2178+
node1Bytes := edgeInfo.Node1Bytes()
22032179

22042180
node1, err := fetchLightningNode(nodes, node1Bytes[:])
22052181
if err != nil {
22062182
return err
22072183
}
22082184

2185+
node2Bytes := edgeInfo.Node2Bytes()
2186+
22092187
node2, err := fetchLightningNode(nodes, node2Bytes[:])
22102188
if err != nil {
22112189
return err
@@ -2226,6 +2204,67 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime,
22262204
}
22272205

22282206
return nil
2207+
}
2208+
2209+
c.cacheMu.Lock()
2210+
defer c.cacheMu.Unlock()
2211+
2212+
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
2213+
edges := tx.ReadBucket(edgeBucket)
2214+
if edges == nil {
2215+
return ErrGraphNoEdgesFound
2216+
}
2217+
edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
2218+
if edgeIndex == nil {
2219+
return ErrGraphNoEdgesFound
2220+
}
2221+
2222+
nodes := tx.ReadBucket(nodeBucket)
2223+
if nodes == nil {
2224+
return ErrGraphNodesNotFound
2225+
}
2226+
2227+
var startTimeBytes, endTimeBytes [8 + 8]byte
2228+
byteOrder.PutUint64(
2229+
startTimeBytes[:8], uint64(startTime.Unix()),
2230+
)
2231+
byteOrder.PutUint64(
2232+
endTimeBytes[:8], uint64(endTime.Unix()),
2233+
)
2234+
2235+
var noEdgesFound bool
2236+
err := fetchUpdates(
2237+
tx, edges, edgeIndex, nodes, edgeUpdateIndexBucket,
2238+
startTimeBytes[:], endTimeBytes[:],
2239+
func(key []byte) []byte {
2240+
return key[8:]
2241+
},
2242+
)
2243+
if errors.Is(err, ErrGraphNoEdgesFound) {
2244+
noEdgesFound = true
2245+
} else if err != nil {
2246+
return err
2247+
}
2248+
2249+
var startBlockBytes, endBlockBytes [4 + 8]byte
2250+
byteOrder.PutUint32(startTimeBytes[:4], startBlock)
2251+
byteOrder.PutUint32(endTimeBytes[:4], endBlock)
2252+
2253+
err = fetchUpdates(
2254+
tx, edges, edgeIndex, nodes, edgeUpdate2IndexBucket,
2255+
startBlockBytes[:], endBlockBytes[:],
2256+
func(key []byte) []byte {
2257+
return key[4:]
2258+
},
2259+
)
2260+
if errors.Is(err, ErrGraphNoEdgesFound) && noEdgesFound {
2261+
return err
2262+
} else if err != nil {
2263+
return err
2264+
}
2265+
2266+
return nil
2267+
22292268
}, func() {
22302269
edgesSeen = make(map[uint64]struct{})
22312270
edgesToCache = make(map[uint64]ChannelEdge)

channeldb/graph_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1671,7 +1671,7 @@ func TestChanUpdatesInHorizon(t *testing.T) {
16711671
// If we issue an arbitrary query before any channel updates are
16721672
// inserted in the database, we should get zero results.
16731673
chanUpdates, err := graph.ChanUpdatesInHorizon(
1674-
time.Unix(999, 0), time.Unix(9999, 0),
1674+
time.Unix(999, 0), time.Unix(9999, 0), 0, 0,
16751675
)
16761676
require.NoError(t, err, "unable to updates for updates")
16771677
if len(chanUpdates) != 0 {
@@ -1789,7 +1789,7 @@ func TestChanUpdatesInHorizon(t *testing.T) {
17891789
}
17901790
for _, queryCase := range queryCases {
17911791
resp, err := graph.ChanUpdatesInHorizon(
1792-
queryCase.start, queryCase.end,
1792+
queryCase.start, queryCase.end, 0, 0,
17931793
)
17941794
if err != nil {
17951795
t.Fatalf("unable to query for updates: %v", err)

discovery/chan_series.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (c *ChanSeries) UpdatesInHorizon(chain chainhash.Hash,
111111
// First, we'll query for all the set of channels that have an update
112112
// that falls within the specified horizon.
113113
chansInHorizon, err := c.graph.ChanUpdatesInHorizon(
114-
startTime, endTime,
114+
startTime, endTime, 0, 0,
115115
)
116116
if err != nil {
117117
return nil, err

routing/router.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1010,7 +1010,17 @@ func (r *ChannelRouter) pruneZombieChans() error {
10101010

10111011
startTime := time.Unix(0, 0)
10121012
endTime := time.Now().Add(-1 * chanExpiry)
1013-
oldEdges, err := r.cfg.Graph.ChanUpdatesInHorizon(startTime, endTime)
1013+
1014+
startBlock := 0
1015+
_, bestBlock, err := r.cfg.Chain.GetBestBlock()
1016+
if err != nil {
1017+
return err
1018+
}
1019+
endBlock := uint32(bestBlock) - uint32(chanExpiry.Hours()*6)
1020+
1021+
oldEdges, err := r.cfg.Graph.ChanUpdatesInHorizon(
1022+
startTime, endTime, uint32(startBlock), endBlock,
1023+
)
10141024
if err != nil {
10151025
return fmt.Errorf("unable to fetch expired channel updates "+
10161026
"chans: %v", err)

0 commit comments

Comments
 (0)