Skip to content

Commit 146fc9e

Browse files
committed
channeldb: Update ChanUpdatesInHorizon
So that a start block and end block can also be passed in.
1 parent 97c6c70 commit 146fc9e

File tree

4 files changed

+90
-39
lines changed

4 files changed

+90
-39
lines changed

channeldb/graph.go

+76-35
Original file line numberDiff line numberDiff line change
@@ -2104,7 +2104,7 @@ type ChannelEdge struct {
21042104
// ChanUpdatesInHorizon returns all the known channel edges which have at least
21052105
// one edge that has an update timestamp within the specified horizon.
21062106
func (c *ChannelGraph) ChanUpdatesInHorizon(startTime,
2107-
endTime time.Time) ([]ChannelEdge, error) {
2107+
endTime time.Time, startBlock, endBlock uint32) ([]ChannelEdge, error) {
21082108

21092109
// To ensure we don't return duplicate ChannelEdges, we'll use an
21102110
// additional map to keep track of the edges already seen to prevent
@@ -2113,50 +2113,30 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime,
21132113
var edgesToCache map[uint64]ChannelEdge
21142114
var edgesInHorizon []ChannelEdge
21152115

2116-
c.cacheMu.Lock()
2117-
defer c.cacheMu.Unlock()
2118-
21192116
var hits int
2120-
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
2121-
edges := tx.ReadBucket(edgeBucket)
2122-
if edges == nil {
2123-
return ErrGraphNoEdgesFound
2124-
}
2125-
edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
2126-
if edgeIndex == nil {
2127-
return ErrGraphNoEdgesFound
2128-
}
2129-
edgeUpdateIndex := edges.NestedReadBucket(edgeUpdateIndexBucket)
2117+
fetchUpdates := func(tx kvdb.RTx, edges, edgeIndex, nodes kvdb.RBucket,
2118+
updateIndexBkt []byte, startBytes, endBytes []byte,
2119+
chanIDFromKey func([]byte) []byte) error {
2120+
2121+
edgeUpdateIndex := edges.NestedReadBucket(updateIndexBkt)
21302122
if edgeUpdateIndex == nil {
21312123
return ErrGraphNoEdgesFound
21322124
}
21332125

2134-
nodes := tx.ReadBucket(nodeBucket)
2135-
if nodes == nil {
2136-
return ErrGraphNodesNotFound
2137-
}
2138-
21392126
// We'll now obtain a cursor to perform a range query within
21402127
// the index to find all channels within the horizon.
21412128
updateCursor := edgeUpdateIndex.ReadCursor()
21422129

2143-
var startTimeBytes, endTimeBytes [8 + 8]byte
2144-
byteOrder.PutUint64(
2145-
startTimeBytes[:8], uint64(startTime.Unix()),
2146-
)
2147-
byteOrder.PutUint64(
2148-
endTimeBytes[:8], uint64(endTime.Unix()),
2149-
)
2150-
21512130
// With our start and end times constructed, we'll step through
21522131
// the index collecting the info and policy of each update of
21532132
// each channel that has a last update within the time range.
2154-
for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil &&
2155-
bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() {
2133+
//nolint:lll
2134+
for indexKey, _ := updateCursor.Seek(startBytes); indexKey != nil &&
2135+
bytes.Compare(indexKey, endBytes) <= 0; indexKey, _ = updateCursor.Next() { //nolint:whitespace
21562136

21572137
// We have a new eligible entry, so we'll slice of the
21582138
// chan ID so we can query it in the DB.
2159-
chanID := indexKey[8:]
2139+
chanID := chanIDFromKey(indexKey)
21602140

21612141
// If we've already retrieved the info and policies for
21622142
// this edge, then we can skip it as we don't need to do
@@ -2194,16 +2174,15 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime,
21942174
err)
21952175
}
21962176

2197-
var (
2198-
node1Bytes = edgeInfo.Node1Bytes()
2199-
node2Bytes = edgeInfo.Node2Bytes()
2200-
)
2177+
node1Bytes := edgeInfo.Node1Bytes()
22012178

22022179
node1, err := fetchLightningNode(nodes, node1Bytes[:])
22032180
if err != nil {
22042181
return err
22052182
}
22062183

2184+
node2Bytes := edgeInfo.Node2Bytes()
2185+
22072186
node2, err := fetchLightningNode(nodes, node2Bytes[:])
22082187
if err != nil {
22092188
return err
@@ -2223,6 +2202,66 @@ func (c *ChannelGraph) ChanUpdatesInHorizon(startTime,
22232202
edgesToCache[chanIDInt] = channel
22242203
}
22252204

2205+
return nil
2206+
}
2207+
2208+
c.cacheMu.Lock()
2209+
defer c.cacheMu.Unlock()
2210+
2211+
err := kvdb.View(c.db, func(tx kvdb.RTx) error {
2212+
edges := tx.ReadBucket(edgeBucket)
2213+
if edges == nil {
2214+
return ErrGraphNoEdgesFound
2215+
}
2216+
edgeIndex := edges.NestedReadBucket(edgeIndexBucket)
2217+
if edgeIndex == nil {
2218+
return ErrGraphNoEdgesFound
2219+
}
2220+
2221+
nodes := tx.ReadBucket(nodeBucket)
2222+
if nodes == nil {
2223+
return ErrGraphNodesNotFound
2224+
}
2225+
2226+
var startTimeBytes, endTimeBytes [8 + 8]byte
2227+
byteOrder.PutUint64(
2228+
startTimeBytes[:8], uint64(startTime.Unix()),
2229+
)
2230+
byteOrder.PutUint64(
2231+
endTimeBytes[:8], uint64(endTime.Unix()),
2232+
)
2233+
2234+
var noEdgesFound bool
2235+
err := fetchUpdates(
2236+
tx, edges, edgeIndex, nodes, edgeUpdateIndexBucket,
2237+
startTimeBytes[:], endTimeBytes[:],
2238+
func(key []byte) []byte {
2239+
return key[8:]
2240+
},
2241+
)
2242+
if errors.Is(err, ErrGraphNoEdgesFound) {
2243+
noEdgesFound = true
2244+
} else if err != nil {
2245+
return err
2246+
}
2247+
2248+
var startBlockBytes, endBlockBytes [4 + 8]byte
2249+
byteOrder.PutUint32(startTimeBytes[:4], startBlock)
2250+
byteOrder.PutUint32(endTimeBytes[:4], endBlock)
2251+
2252+
err = fetchUpdates(
2253+
tx, edges, edgeIndex, nodes, edgeUpdate2IndexBucket,
2254+
startBlockBytes[:], endBlockBytes[:],
2255+
func(key []byte) []byte {
2256+
return key[4:]
2257+
},
2258+
)
2259+
if errors.Is(err, ErrGraphNoEdgesFound) && noEdgesFound {
2260+
return err
2261+
} else if err != nil {
2262+
return err
2263+
}
2264+
22262265
return nil
22272266
}, func() {
22282267
edgesSeen = make(map[uint64]struct{})
@@ -3533,7 +3572,9 @@ func (c *ChannelGraph) FetchOtherNode(tx kvdb.RTx,
35333572
// otherwise we can use the existing db transaction.
35343573
var err error
35353574
if tx == nil {
3536-
err = kvdb.View(c.db, fetchNodeFunc, func() { targetNode = nil })
3575+
err = kvdb.View(c.db, fetchNodeFunc, func() {
3576+
targetNode = nil
3577+
})
35373578
} else {
35383579
err = fetchNodeFunc(tx)
35393580
}

channeldb/graph_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1672,7 +1672,7 @@ func TestChanUpdatesInHorizon(t *testing.T) {
16721672
// If we issue an arbitrary query before any channel updates are
16731673
// inserted in the database, we should get zero results.
16741674
chanUpdates, err := graph.ChanUpdatesInHorizon(
1675-
time.Unix(999, 0), time.Unix(9999, 0),
1675+
time.Unix(999, 0), time.Unix(9999, 0), 0, 0,
16761676
)
16771677
require.NoError(t, err, "unable to updates for updates")
16781678
if len(chanUpdates) != 0 {
@@ -1790,7 +1790,7 @@ func TestChanUpdatesInHorizon(t *testing.T) {
17901790
}
17911791
for _, queryCase := range queryCases {
17921792
resp, err := graph.ChanUpdatesInHorizon(
1793-
queryCase.start, queryCase.end,
1793+
queryCase.start, queryCase.end, 0, 0,
17941794
)
17951795
if err != nil {
17961796
t.Fatalf("unable to query for updates: %v", err)

discovery/chan_series.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func (c *ChanSeries) UpdatesInHorizon(chain chainhash.Hash,
113113
// First, we'll query for all the set of channels that have an update
114114
// that falls within the specified horizon.
115115
chansInHorizon, err := c.graph.ChanUpdatesInHorizon(
116-
startTime, endTime,
116+
startTime, endTime, 0, 0,
117117
)
118118
if err != nil {
119119
return nil, err

routing/router.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -1022,7 +1022,17 @@ func (r *ChannelRouter) pruneZombieChans() error {
10221022

10231023
startTime := time.Unix(0, 0)
10241024
endTime := time.Now().Add(-1 * chanExpiry)
1025-
oldEdges, err := r.cfg.Graph.ChanUpdatesInHorizon(startTime, endTime)
1025+
1026+
startBlock := 0
1027+
_, bestBlock, err := r.cfg.Chain.GetBestBlock()
1028+
if err != nil {
1029+
return err
1030+
}
1031+
endBlock := uint32(bestBlock) - uint32(chanExpiry.Hours()*6)
1032+
1033+
oldEdges, err := r.cfg.Graph.ChanUpdatesInHorizon(
1034+
startTime, endTime, uint32(startBlock), endBlock,
1035+
)
10261036
if err != nil {
10271037
return fmt.Errorf("unable to fetch expired channel updates "+
10281038
"chans: %v", err)

0 commit comments

Comments
 (0)