Skip to content

Commit c89b616

Browse files
committed
graph: refactor Builder network message handling
The exposed AddNode, AddEdge and UpdateEdge methods of the Builder are currently synchronous since even though they pass messages to the network handler which spins off the handling in a goroutine, the public methods still wait for a response from the handling before returning. The only part that is actually done asynchronously is the topology notifications. We previously tried to simplify things in [this commit](d757b3b) but we soon realised that there was a reason for sending the messages to the central/synchronous network handler first: it was to ensure consistency for topology clients: ie, the ordering between when there is a new topology client or if it is cancelled needs to be consistent and handled synchronously with new network updates. So for example, if a new update comes in right after a topology client cancels its subscription, then it should _not_ be notified. Similariy for new subscriptions. So this commit was reverted soon after. We can, however, still simplify things as is done in this commit by noting that _only topology subscriptions and notifications_ need to be handled separately. The actual network updates do not need to. So that is what is done here. This refactor will make moving the topology subscription logic to a new subsystem later on much easier.
1 parent 27440e8 commit c89b616

File tree

1 file changed

+51
-90
lines changed

1 file changed

+51
-90
lines changed

graph/builder.go

+51-90
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,10 @@ type Builder struct {
123123
// of our currently known best chain are sent over.
124124
staleBlocks <-chan *chainview.FilteredBlock
125125

126-
// networkUpdates is a channel that carries new topology updates
126+
// topologyUpdates is a channel that carries new topology updates
127127
// messages from outside the Builder to be processed by the
128128
// networkHandler.
129-
networkUpdates chan *routingMsg
129+
topologyUpdates chan any
130130

131131
// topologyClients maps a client's unique notification ID to a
132132
// topologyClient client that contains its notification dispatch
@@ -164,7 +164,7 @@ var _ ChannelGraphSource = (*Builder)(nil)
164164
func NewBuilder(cfg *Config) (*Builder, error) {
165165
return &Builder{
166166
cfg: cfg,
167-
networkUpdates: make(chan *routingMsg),
167+
topologyUpdates: make(chan any),
168168
topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{},
169169
ntfnClientUpdates: make(chan *topologyClientUpdate),
170170
channelEdgeMtx: multimutex.NewMutex[uint64](),
@@ -656,59 +656,26 @@ func (b *Builder) pruneZombieChans() error {
656656
return nil
657657
}
658658

659-
// handleNetworkUpdate is responsible for processing the update message and
660-
// notifies topology changes, if any.
659+
// handleTopologyUpdate is responsible for sending any topology changes
660+
// notifications to registered clients.
661661
//
662662
// NOTE: must be run inside goroutine.
663-
func (b *Builder) handleNetworkUpdate(update *routingMsg) {
663+
func (b *Builder) handleTopologyUpdate(update any) {
664664
defer b.wg.Done()
665665

666-
// Process the routing update to determine if this is either a new
667-
// update from our PoV or an update to a prior vertex/edge we
668-
// previously accepted.
669-
var err error
670-
switch msg := update.msg.(type) {
671-
case *models.LightningNode:
672-
err = b.addNode(msg, update.op...)
673-
674-
case *models.ChannelEdgeInfo:
675-
err = b.addEdge(msg, update.op...)
676-
677-
case *models.ChannelEdgePolicy:
678-
err = b.updateEdge(msg, update.op...)
679-
680-
default:
681-
err = errors.Errorf("wrong routing update message type")
682-
}
683-
update.err <- err
684-
685-
// If the error is not nil here, there's no need to send topology
686-
// change.
687-
if err != nil {
688-
// Log as a debug message if this is not an error we need to be
689-
// concerned about.
690-
if IsError(err, ErrIgnored, ErrOutdated) {
691-
log.Debugf("process network updates got: %v", err)
692-
} else {
693-
log.Errorf("process network updates got: %v", err)
694-
}
695-
696-
return
697-
}
698-
699-
// Otherwise, we'll send off a new notification for the newly accepted
700-
// update, if any.
701666
topChange := &TopologyChange{}
702-
err = addToTopologyChange(b.cfg.Graph, topChange, update.msg)
667+
err := addToTopologyChange(b.cfg.Graph, topChange, update)
703668
if err != nil {
704669
log.Errorf("unable to update topology change notification: %v",
705670
err)
706671
return
707672
}
708673

709-
if !topChange.isEmpty() {
710-
b.notifyTopologyChange(topChange)
674+
if topChange.isEmpty() {
675+
return
711676
}
677+
678+
b.notifyTopologyChange(topChange)
712679
}
713680

714681
// networkHandler is the primary goroutine for the Builder. The roles of
@@ -734,12 +701,11 @@ func (b *Builder) networkHandler() {
734701
}
735702

736703
select {
737-
// A new fully validated network update has just arrived. As a
738-
// result we'll modify the channel graph accordingly depending
739-
// on the exact type of the message.
740-
case update := <-b.networkUpdates:
704+
// A new fully validated topology update has just arrived.
705+
// We'll notify any registered clients.
706+
case update := <-b.topologyUpdates:
741707
b.wg.Add(1)
742-
go b.handleNetworkUpdate(update)
708+
go b.handleTopologyUpdate(update)
743709

744710
// TODO(roasbeef): remove all unconnected vertexes
745711
// after N blocks pass with no corresponding
@@ -1033,14 +999,6 @@ func (b *Builder) MarkZombieEdge(chanID uint64) error {
1033999
return nil
10341000
}
10351001

1036-
// routingMsg couples a routing related routing topology update to the
1037-
// error channel.
1038-
type routingMsg struct {
1039-
msg interface{}
1040-
op []batch.SchedulerOption
1041-
err chan error
1042-
}
1043-
10441002
// ApplyChannelUpdate validates a channel update and if valid, applies it to the
10451003
// database. It returns a bool indicating whether the updates were successful.
10461004
func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool {
@@ -1102,23 +1060,20 @@ func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool {
11021060
func (b *Builder) AddNode(node *models.LightningNode,
11031061
op ...batch.SchedulerOption) error {
11041062

1105-
rMsg := &routingMsg{
1106-
msg: node,
1107-
op: op,
1108-
err: make(chan error, 1),
1063+
err := b.addNode(node, op...)
1064+
if err != nil {
1065+
logNetworkMsgProcessError(err)
1066+
1067+
return err
11091068
}
11101069

11111070
select {
1112-
case b.networkUpdates <- rMsg:
1113-
select {
1114-
case err := <-rMsg.err:
1115-
return err
1116-
case <-b.quit:
1117-
return ErrGraphBuilderShuttingDown
1118-
}
1071+
case b.topologyUpdates <- node:
11191072
case <-b.quit:
11201073
return ErrGraphBuilderShuttingDown
11211074
}
1075+
1076+
return nil
11221077
}
11231078

11241079
// addNode does some basic checks on the given LightningNode against what we
@@ -1155,23 +1110,20 @@ func (b *Builder) addNode(node *models.LightningNode,
11551110
func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
11561111
op ...batch.SchedulerOption) error {
11571112

1158-
rMsg := &routingMsg{
1159-
msg: edge,
1160-
op: op,
1161-
err: make(chan error, 1),
1113+
err := b.addEdge(edge, op...)
1114+
if err != nil {
1115+
logNetworkMsgProcessError(err)
1116+
1117+
return err
11621118
}
11631119

11641120
select {
1165-
case b.networkUpdates <- rMsg:
1166-
select {
1167-
case err := <-rMsg.err:
1168-
return err
1169-
case <-b.quit:
1170-
return ErrGraphBuilderShuttingDown
1171-
}
1121+
case b.topologyUpdates <- edge:
11721122
case <-b.quit:
11731123
return ErrGraphBuilderShuttingDown
11741124
}
1125+
1126+
return nil
11751127
}
11761128

11771129
// addEdge does some validation on the new channel edge against what we
@@ -1265,23 +1217,20 @@ func (b *Builder) addEdge(edge *models.ChannelEdgeInfo,
12651217
func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
12661218
op ...batch.SchedulerOption) error {
12671219

1268-
rMsg := &routingMsg{
1269-
msg: update,
1270-
op: op,
1271-
err: make(chan error, 1),
1220+
err := b.updateEdge(update, op...)
1221+
if err != nil {
1222+
logNetworkMsgProcessError(err)
1223+
1224+
return err
12721225
}
12731226

12741227
select {
1275-
case b.networkUpdates <- rMsg:
1276-
select {
1277-
case err := <-rMsg.err:
1278-
return err
1279-
case <-b.quit:
1280-
return ErrGraphBuilderShuttingDown
1281-
}
1228+
case b.topologyUpdates <- update:
12821229
case <-b.quit:
12831230
return ErrGraphBuilderShuttingDown
12841231
}
1232+
1233+
return nil
12851234
}
12861235

12871236
// updateEdge validates the new edge policy against what we currently have
@@ -1375,6 +1324,18 @@ func (b *Builder) updateEdge(policy *models.ChannelEdgePolicy,
13751324
return nil
13761325
}
13771326

1327+
// logNetworkMsgProcessError logs the error received from processing a network
1328+
// message. It logs as a debug message if the error is not critical.
1329+
func logNetworkMsgProcessError(err error) {
1330+
if IsError(err, ErrIgnored, ErrOutdated) {
1331+
log.Debugf("process network updates got: %v", err)
1332+
1333+
return
1334+
}
1335+
1336+
log.Errorf("process network updates got: %v", err)
1337+
}
1338+
13781339
// CurrentBlockHeight returns the block height from POV of the router subsystem.
13791340
//
13801341
// NOTE: This method is part of the ChannelGraphSource interface.

0 commit comments

Comments
 (0)