Skip to content

Commit 5fe900d

Browse files
authored
Merge pull request #9534 from ellemouton/graph13
graph: refactor Builder network message handling
2 parents 1227eb1 + c89b616 commit 5fe900d

File tree

1 file changed

+51
-90
lines changed

1 file changed

+51
-90
lines changed

graph/builder.go

+51-90
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,10 @@ type Builder struct {
123123
// of our currently known best chain are sent over.
124124
staleBlocks <-chan *chainview.FilteredBlock
125125

126-
// networkUpdates is a channel that carries new topology updates
126+
// topologyUpdates is a channel that carries new topology updates
127127
// messages from outside the Builder to be processed by the
128128
// networkHandler.
129-
networkUpdates chan *routingMsg
129+
topologyUpdates chan any
130130

131131
// topologyClients maps a client's unique notification ID to a
132132
// topologyClient client that contains its notification dispatch
@@ -164,7 +164,7 @@ var _ ChannelGraphSource = (*Builder)(nil)
164164
func NewBuilder(cfg *Config) (*Builder, error) {
165165
return &Builder{
166166
cfg: cfg,
167-
networkUpdates: make(chan *routingMsg),
167+
topologyUpdates: make(chan any),
168168
topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{},
169169
ntfnClientUpdates: make(chan *topologyClientUpdate),
170170
channelEdgeMtx: multimutex.NewMutex[uint64](),
@@ -656,59 +656,26 @@ func (b *Builder) pruneZombieChans() error {
656656
return nil
657657
}
658658

659-
// handleNetworkUpdate is responsible for processing the update message and
660-
// notifies topology changes, if any.
659+
// handleTopologyUpdate is responsible for sending any topology changes
660+
// notifications to registered clients.
661661
//
662662
// NOTE: must be run inside goroutine.
663-
func (b *Builder) handleNetworkUpdate(update *routingMsg) {
663+
func (b *Builder) handleTopologyUpdate(update any) {
664664
defer b.wg.Done()
665665

666-
// Process the routing update to determine if this is either a new
667-
// update from our PoV or an update to a prior vertex/edge we
668-
// previously accepted.
669-
var err error
670-
switch msg := update.msg.(type) {
671-
case *models.LightningNode:
672-
err = b.addNode(msg, update.op...)
673-
674-
case *models.ChannelEdgeInfo:
675-
err = b.addEdge(msg, update.op...)
676-
677-
case *models.ChannelEdgePolicy:
678-
err = b.updateEdge(msg, update.op...)
679-
680-
default:
681-
err = errors.Errorf("wrong routing update message type")
682-
}
683-
update.err <- err
684-
685-
// If the error is not nil here, there's no need to send topology
686-
// change.
687-
if err != nil {
688-
// Log as a debug message if this is not an error we need to be
689-
// concerned about.
690-
if IsError(err, ErrIgnored, ErrOutdated) {
691-
log.Debugf("process network updates got: %v", err)
692-
} else {
693-
log.Errorf("process network updates got: %v", err)
694-
}
695-
696-
return
697-
}
698-
699-
// Otherwise, we'll send off a new notification for the newly accepted
700-
// update, if any.
701666
topChange := &TopologyChange{}
702-
err = addToTopologyChange(b.cfg.Graph, topChange, update.msg)
667+
err := addToTopologyChange(b.cfg.Graph, topChange, update)
703668
if err != nil {
704669
log.Errorf("unable to update topology change notification: %v",
705670
err)
706671
return
707672
}
708673

709-
if !topChange.isEmpty() {
710-
b.notifyTopologyChange(topChange)
674+
if topChange.isEmpty() {
675+
return
711676
}
677+
678+
b.notifyTopologyChange(topChange)
712679
}
713680

714681
// networkHandler is the primary goroutine for the Builder. The roles of
@@ -734,12 +701,11 @@ func (b *Builder) networkHandler() {
734701
}
735702

736703
select {
737-
// A new fully validated network update has just arrived. As a
738-
// result we'll modify the channel graph accordingly depending
739-
// on the exact type of the message.
740-
case update := <-b.networkUpdates:
704+
// A new fully validated topology update has just arrived.
705+
// We'll notify any registered clients.
706+
case update := <-b.topologyUpdates:
741707
b.wg.Add(1)
742-
go b.handleNetworkUpdate(update)
708+
go b.handleTopologyUpdate(update)
743709

744710
// TODO(roasbeef): remove all unconnected vertexes
745711
// after N blocks pass with no corresponding
@@ -1033,14 +999,6 @@ func (b *Builder) MarkZombieEdge(chanID uint64) error {
1033999
return nil
10341000
}
10351001

1036-
// routingMsg couples a routing related routing topology update to the
1037-
// error channel.
1038-
type routingMsg struct {
1039-
msg interface{}
1040-
op []batch.SchedulerOption
1041-
err chan error
1042-
}
1043-
10441002
// ApplyChannelUpdate validates a channel update and if valid, applies it to the
10451003
// database. It returns a bool indicating whether the updates were successful.
10461004
func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool {
@@ -1102,23 +1060,20 @@ func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool {
11021060
func (b *Builder) AddNode(node *models.LightningNode,
11031061
op ...batch.SchedulerOption) error {
11041062

1105-
rMsg := &routingMsg{
1106-
msg: node,
1107-
op: op,
1108-
err: make(chan error, 1),
1063+
err := b.addNode(node, op...)
1064+
if err != nil {
1065+
logNetworkMsgProcessError(err)
1066+
1067+
return err
11091068
}
11101069

11111070
select {
1112-
case b.networkUpdates <- rMsg:
1113-
select {
1114-
case err := <-rMsg.err:
1115-
return err
1116-
case <-b.quit:
1117-
return ErrGraphBuilderShuttingDown
1118-
}
1071+
case b.topologyUpdates <- node:
11191072
case <-b.quit:
11201073
return ErrGraphBuilderShuttingDown
11211074
}
1075+
1076+
return nil
11221077
}
11231078

11241079
// addNode does some basic checks on the given LightningNode against what we
@@ -1155,23 +1110,20 @@ func (b *Builder) addNode(node *models.LightningNode,
11551110
func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
11561111
op ...batch.SchedulerOption) error {
11571112

1158-
rMsg := &routingMsg{
1159-
msg: edge,
1160-
op: op,
1161-
err: make(chan error, 1),
1113+
err := b.addEdge(edge, op...)
1114+
if err != nil {
1115+
logNetworkMsgProcessError(err)
1116+
1117+
return err
11621118
}
11631119

11641120
select {
1165-
case b.networkUpdates <- rMsg:
1166-
select {
1167-
case err := <-rMsg.err:
1168-
return err
1169-
case <-b.quit:
1170-
return ErrGraphBuilderShuttingDown
1171-
}
1121+
case b.topologyUpdates <- edge:
11721122
case <-b.quit:
11731123
return ErrGraphBuilderShuttingDown
11741124
}
1125+
1126+
return nil
11751127
}
11761128

11771129
// addEdge does some validation on the new channel edge against what we
@@ -1265,23 +1217,20 @@ func (b *Builder) addEdge(edge *models.ChannelEdgeInfo,
12651217
func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
12661218
op ...batch.SchedulerOption) error {
12671219

1268-
rMsg := &routingMsg{
1269-
msg: update,
1270-
op: op,
1271-
err: make(chan error, 1),
1220+
err := b.updateEdge(update, op...)
1221+
if err != nil {
1222+
logNetworkMsgProcessError(err)
1223+
1224+
return err
12721225
}
12731226

12741227
select {
1275-
case b.networkUpdates <- rMsg:
1276-
select {
1277-
case err := <-rMsg.err:
1278-
return err
1279-
case <-b.quit:
1280-
return ErrGraphBuilderShuttingDown
1281-
}
1228+
case b.topologyUpdates <- update:
12821229
case <-b.quit:
12831230
return ErrGraphBuilderShuttingDown
12841231
}
1232+
1233+
return nil
12851234
}
12861235

12871236
// updateEdge validates the new edge policy against what we currently have
@@ -1375,6 +1324,18 @@ func (b *Builder) updateEdge(policy *models.ChannelEdgePolicy,
13751324
return nil
13761325
}
13771326

1327+
// logNetworkMsgProcessError logs the error received from processing a network
1328+
// message. It logs as a debug message if the error is not critical.
1329+
func logNetworkMsgProcessError(err error) {
1330+
if IsError(err, ErrIgnored, ErrOutdated) {
1331+
log.Debugf("process network updates got: %v", err)
1332+
1333+
return
1334+
}
1335+
1336+
log.Errorf("process network updates got: %v", err)
1337+
}
1338+
13781339
// CurrentBlockHeight returns the block height from POV of the router subsystem.
13791340
//
13801341
// NOTE: This method is part of the ChannelGraphSource interface.

0 commit comments

Comments
 (0)