Skip to content

[3/7] multi: start using the new interfaces throughout the codebase #8252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: elle-g175ChanDBUpdates
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions autopilot/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (d *dbNode) Addrs() []net.Addr {
// NOTE: Part of the autopilot.Node interface.
func (d *dbNode) ForEachChannel(cb func(ChannelEdge) error) error {
return d.db.ForEachNodeChannelTx(d.tx, d.node.PubKeyBytes,
func(tx kvdb.RTx, ei *models.ChannelEdgeInfo1, ep,
func(tx kvdb.RTx, ei models.ChannelEdgeInfo, ep,
_ *models.ChannelEdgePolicy1) error {

// Skip channels for which no outgoing edge policy is
Expand All @@ -116,7 +116,7 @@ func (d *dbNode) ForEachChannel(cb func(ChannelEdge) error) error {
ChanID: lnwire.NewShortChanIDFromInt(
ep.ChannelID,
),
Capacity: ei.Capacity,
Capacity: ei.GetCapacity(),
Peer: &dbNode{
tx: tx,
db: d.db,
Expand Down
160 changes: 97 additions & 63 deletions channeldb/edge_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,89 @@ const (
edgePolicy2EncodingType edgePolicyEncodingType = 0
)

type edgePolicyEncodingInfo struct {
updateBucketKey []byte
updateKey []byte
serialize func(w io.Writer, toNode []byte) error
typeByte func() (edgePolicyEncodingType, bool)
}

func encodingInfoFromEdgePolicy(policy models.ChannelEdgePolicy) (
*edgePolicyEncodingInfo, error) {

switch p := policy.(type) {
case *models.ChannelEdgePolicy1:
updateUnix := uint64(p.LastUpdate.Unix())
var indexKey [8 + 8]byte
byteOrder.PutUint64(indexKey[:8], updateUnix)
byteOrder.PutUint64(indexKey[8:], p.ChannelID)

return &edgePolicyEncodingInfo{
updateBucketKey: edgeUpdateIndexBucket,
updateKey: indexKey[:],
serialize: func(w io.Writer, toNode []byte) error {
copy(p.ToNode[:], toNode)

return serializeChanEdgePolicy1(w, p)
},
typeByte: func() (edgePolicyEncodingType, bool) {
return 0, false
},
}, nil

case *models.ChannelEdgePolicy2:
indexKey := make([]byte, 4+8)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the block height and the scid value? The scid encodes a block height (unrolled)

byteOrder.PutUint32(
indexKey[:4], p.BlockHeight.Val,
)
byteOrder.PutUint64(
indexKey[4:], p.ShortChannelID.Val.ToUint64(),
)

return &edgePolicyEncodingInfo{
updateBucketKey: edgeUpdate2IndexBucket,
updateKey: indexKey,
serialize: func(w io.Writer, toNode []byte) error {
copy(p.ToNode[:], toNode)

return serializeChanEdgePolicy2(w, p)
},
typeByte: func() (edgePolicyEncodingType, bool) {
return edgePolicy2EncodingType, true
},
}, nil

default:
return nil, fmt.Errorf("unhandled implementation of the "+
"models.ChannelEdgePolicy interface: %T", policy)
}
}

func putChanEdgePolicy(edges kvdb.RwBucket, edge *models.ChannelEdgePolicy1,
from, to []byte) error {

encodingInfo, err := encodingInfoFromEdgePolicy(edge)
if err != nil {
return err
}

chanID := edge.SCID().ToUint64()

var edgeKey [33 + 8]byte
copy(edgeKey[:], from)
byteOrder.PutUint64(edgeKey[33:], edge.ChannelID)
byteOrder.PutUint64(edgeKey[33:], chanID)

var b bytes.Buffer
if err := serializeChanEdgePolicy(&b, edge, to); err != nil {
if err := serializeChanEdgePolicy(&b, encodingInfo, to); err != nil {
return err
}

// Before we write out the new edge, we'll create a new entry in the
// update index in order to keep it fresh.
updateUnix := uint64(edge.LastUpdate.Unix())
var indexKey [8 + 8]byte
byteOrder.PutUint64(indexKey[:8], updateUnix)
byteOrder.PutUint64(indexKey[8:], edge.ChannelID)

updateIndex, err := edges.CreateBucketIfNotExists(edgeUpdateIndexBucket)
indexKey := encodingInfo.updateKey
updateIndex, err := edges.CreateBucketIfNotExists(
encodingInfo.updateBucketKey,
)
if err != nil {
return err
}
Expand Down Expand Up @@ -88,32 +151,36 @@ func putChanEdgePolicy(edges kvdb.RwBucket, edge *models.ChannelEdgePolicy1,
return err
}

oldPol, ok := oldEdgePolicy.(*models.ChannelEdgePolicy1)
if !ok {
return fmt.Errorf("expected "+
"*models.ChannelEdgePolicy1, got: %T",
oldEdgePolicy)
info, err := encodingInfoFromEdgePolicy(oldEdgePolicy)
if err != nil {
return err
}

oldUpdateTime := uint64(oldPol.LastUpdate.Unix())

var oldIndexKey [8 + 8]byte
byteOrder.PutUint64(oldIndexKey[:8], oldUpdateTime)
byteOrder.PutUint64(oldIndexKey[8:], edge.ChannelID)
// Sanity check that the old update is assigned to the same
// update bucket as the new one.
if !bytes.Equal(
info.updateBucketKey, encodingInfo.updateBucketKey,
) {

return fmt.Errorf("received a new update belonging "+
"to bucket %s where previous update for the "+
"same channel belonged to bucket %s",
string(encodingInfo.updateBucketKey),
string(info.updateKey))
}

if err := updateIndex.Delete(oldIndexKey[:]); err != nil {
oldIndexKey := info.updateKey
if err := updateIndex.Delete(oldIndexKey); err != nil {
return err
}
}

if err := updateIndex.Put(indexKey[:], nil); err != nil {
if err := updateIndex.Put(indexKey, nil); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consider splitting this bucket structure.

We commonly use this index to implement an in-order to send out updates ordered by timestamp to a peer:

lnd/channeldb/graph.go

Lines 1922 to 1948 in 13a7bec

edgeUpdateIndex := edges.NestedReadBucket(edgeUpdateIndexBucket)
if edgeUpdateIndex == nil {
return ErrGraphNoEdgesFound
}
nodes := tx.ReadBucket(nodeBucket)
if nodes == nil {
return ErrGraphNodesNotFound
}
// We'll now obtain a cursor to perform a range query within
// the index to find all channels within the horizon.
updateCursor := edgeUpdateIndex.ReadCursor()
var startTimeBytes, endTimeBytes [8 + 8]byte
byteOrder.PutUint64(
startTimeBytes[:8], uint64(startTime.Unix()),
)
byteOrder.PutUint64(
endTimeBytes[:8], uint64(endTime.Unix()),
)
// With our start and end times constructed, we'll step through
// the index collecting the info and policy of each update of
// each channel that has a last update within the time range.
for indexKey, _ := updateCursor.Seek(startTimeBytes[:]); indexKey != nil &&
bytes.Compare(indexKey, endTimeBytes[:]) <= 0; indexKey, _ = updateCursor.Next() {

If we overload the key in this structure, then the ordering is inherently broken, and then requires extra logic to skip over those new entries for a peer that only understands the legacy update type.

Assuming that the desired behavior for an updated peer is to send both edge types, coalescing the block based and timestamp based, then we can implement a composite cursor/iterator (using slow/fast pointers) to create a consistently ordered response.

return err
}

err = updateEdgePolicyDisabledIndex(
edges, edge.ChannelID,
edge.ChannelFlags&lnwire.ChanUpdateDirection > 0,
edge.IsDisabled(),
edges, chanID, !edge.IsNode1(), edge.IsDisabled(),
)
if err != nil {
return err
Expand Down Expand Up @@ -172,7 +239,7 @@ func putChanEdgePolicyUnknown(edges kvdb.RwBucket, channelID uint64,
}

func fetchChanEdgePolicy(edges kvdb.RBucket, chanID []byte,
nodePub []byte) (*models.ChannelEdgePolicy1, error) {
nodePub []byte) (models.ChannelEdgePolicy, error) {

var edgeKey [33 + 8]byte
copy(edgeKey[:], nodePub)
Expand Down Expand Up @@ -201,17 +268,11 @@ func fetchChanEdgePolicy(edges kvdb.RBucket, chanID []byte,
return nil, err
}

pol, ok := ep.(*models.ChannelEdgePolicy1)
if !ok {
return nil, fmt.Errorf("expected *models.ChannelEdgePolicy1, "+
"got: %T", ep)
}

return pol, nil
return ep, nil
}

func fetchChanEdgePolicies(edgeIndex kvdb.RBucket, edges kvdb.RBucket,
chanID []byte) (*models.ChannelEdgePolicy1, *models.ChannelEdgePolicy1,
chanID []byte) (models.ChannelEdgePolicy, models.ChannelEdgePolicy,
error) {

edgeInfoBytes := edgeIndex.Get(chanID)
Expand Down Expand Up @@ -239,37 +300,10 @@ func fetchChanEdgePolicies(edgeIndex kvdb.RBucket, edges kvdb.RBucket,
return edge1, edge2, nil
}

func serializeChanEdgePolicy(w io.Writer,
edgePolicy models.ChannelEdgePolicy, toNode []byte) error {

var (
withTypeByte bool
typeByte edgePolicyEncodingType
serialize func(w io.Writer) error
)

switch policy := edgePolicy.(type) {
case *models.ChannelEdgePolicy1:
serialize = func(w io.Writer) error {
copy(policy.ToNode[:], toNode)

return serializeChanEdgePolicy1(w, policy)
}
case *models.ChannelEdgePolicy2:
withTypeByte = true
typeByte = edgePolicy2EncodingType

serialize = func(w io.Writer) error {
copy(policy.ToNode[:], toNode)

return serializeChanEdgePolicy2(w, policy)
}
default:
return fmt.Errorf("unhandled implementation of "+
"ChannelEdgePolicy: %T", edgePolicy)
}
func serializeChanEdgePolicy(w io.Writer, info *edgePolicyEncodingInfo,
toNode []byte) error {

if withTypeByte {
if typeByte, ok := info.typeByte(); ok {
// First, write the identifying encoding byte to signal that
// this is not using the legacy encoding.
_, err := w.Write([]byte{chanEdgePolicyNewEncodingPrefix})
Expand All @@ -284,7 +318,7 @@ func serializeChanEdgePolicy(w io.Writer,
}
}

return serialize(w)
return info.serialize(w, toNode)
}

func serializeChanEdgePolicy1(w io.Writer,
Expand Down
5 changes: 4 additions & 1 deletion channeldb/edge_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ func TestEdgePolicySerialisation(t *testing.T) {
toNode = info.GetToNode()
)

err := serializeChanEdgePolicy(&b, info, toNode[:])
encodingInfo, err := encodingInfoFromEdgePolicy(info)
require.NoError(t, err)

err = serializeChanEdgePolicy(&b, encodingInfo, toNode[:])
require.NoError(t, err)

newInfo, err := deserializeChanEdgePolicy(&b)
Expand Down
Loading
Loading