Skip to content

Commit c05a7c5

Browse files
authored
Merge pull request #1988 from valentinewallace/subscribe-chans-rpc
rpc: Add SubscribeChannels RPC.
2 parents 8ac5f2b + b826101 commit c05a7c5

19 files changed

+1982
-685
lines changed

channelnotifier/channelnotifier.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package channelnotifier
2+
3+
import (
4+
"sync/atomic"
5+
6+
"github.com/btcsuite/btcd/wire"
7+
"github.com/lightningnetwork/lnd/channeldb"
8+
"github.com/lightningnetwork/lnd/subscribe"
9+
)
10+
11+
// ChannelNotifier is a subsystem which all active, inactive, and closed channel
12+
// events pipe through. It takes subscriptions for its events, and whenever
13+
// it receives a new event it notifies its subscribers over the proper channel.
14+
type ChannelNotifier struct {
15+
started uint32
16+
stopped uint32
17+
18+
ntfnServer *subscribe.Server
19+
20+
chanDB *channeldb.DB
21+
}
22+
23+
// OpenChannelEvent represents a new event where a channel goes from pending
24+
// open to open.
25+
type OpenChannelEvent struct {
26+
// Channel is the channel that has become open.
27+
Channel *channeldb.OpenChannel
28+
}
29+
30+
// ActiveChannelEvent represents a new event where a channel becomes active.
31+
type ActiveChannelEvent struct {
32+
// ChannelPoint is the channelpoint for the newly active channel.
33+
ChannelPoint *wire.OutPoint
34+
}
35+
36+
// InactiveChannelEvent represents a new event where a channel becomes inactive.
37+
type InactiveChannelEvent struct {
38+
// ChannelPoint is the channelpoint for the newly inactive channel.
39+
ChannelPoint *wire.OutPoint
40+
}
41+
42+
// ClosedChannelEvent represents a new event where a channel becomes closed.
43+
type ClosedChannelEvent struct {
44+
// CloseSummary is the summary of the channel close that has occurred.
45+
CloseSummary *channeldb.ChannelCloseSummary
46+
}
47+
48+
// New creates a new channel notifier. The ChannelNotifier gets channel
49+
// events from peers and from the chain arbitrator, and dispatches them to
50+
// its clients.
51+
func New(chanDB *channeldb.DB) *ChannelNotifier {
52+
return &ChannelNotifier{
53+
ntfnServer: subscribe.NewServer(),
54+
chanDB: chanDB,
55+
}
56+
}
57+
58+
// Start starts the ChannelNotifier and all goroutines it needs to carry out its task.
59+
func (c *ChannelNotifier) Start() error {
60+
if !atomic.CompareAndSwapUint32(&c.started, 0, 1) {
61+
return nil
62+
}
63+
64+
log.Tracef("ChannelNotifier %v starting", c)
65+
66+
if err := c.ntfnServer.Start(); err != nil {
67+
return err
68+
}
69+
70+
return nil
71+
}
72+
73+
// Stop signals the notifier for a graceful shutdown.
74+
func (c *ChannelNotifier) Stop() {
75+
if !atomic.CompareAndSwapUint32(&c.stopped, 0, 1) {
76+
return
77+
}
78+
79+
c.ntfnServer.Stop()
80+
}
81+
82+
// SubscribeChannelEvents returns a subscribe.Client that will receive updates
83+
// any time the Server is made aware of a new event.
84+
func (c *ChannelNotifier) SubscribeChannelEvents() (*subscribe.Client, error) {
85+
return c.ntfnServer.Subscribe()
86+
}
87+
88+
// NotifyOpenChannelEvent notifies the channelEventNotifier goroutine that a
89+
// channel has gone from pending open to open.
90+
func (c *ChannelNotifier) NotifyOpenChannelEvent(chanPoint wire.OutPoint) {
91+
92+
// Fetch the relevant channel from the database.
93+
channel, err := c.chanDB.FetchChannel(chanPoint)
94+
if err != nil {
95+
log.Warnf("Unable to fetch open channel from the db: %v", err)
96+
}
97+
98+
// Send the open event to all channel event subscribers.
99+
event := OpenChannelEvent{Channel: channel}
100+
if err := c.ntfnServer.SendUpdate(event); err != nil {
101+
log.Warnf("Unable to send open channel update: %v", err)
102+
}
103+
}
104+
105+
// NotifyClosedChannelEvent notifies the channelEventNotifier goroutine that a
106+
// channel has closed.
107+
func (c *ChannelNotifier) NotifyClosedChannelEvent(chanPoint wire.OutPoint) {
108+
// Fetch the relevant closed channel from the database.
109+
closeSummary, err := c.chanDB.FetchClosedChannel(&chanPoint)
110+
if err != nil {
111+
log.Warnf("Unable to fetch closed channel summary from the db: %v", err)
112+
}
113+
114+
// Send the closed event to all channel event subscribers.
115+
event := ClosedChannelEvent{CloseSummary: closeSummary}
116+
if err := c.ntfnServer.SendUpdate(event); err != nil {
117+
log.Warnf("Unable to send closed channel update: %v", err)
118+
}
119+
}
120+
121+
// NotifyActiveChannelEvent notifies the channelEventNotifier goroutine that a
122+
// channel is active.
123+
func (c *ChannelNotifier) NotifyActiveChannelEvent(chanPoint wire.OutPoint) {
124+
event := ActiveChannelEvent{ChannelPoint: &chanPoint}
125+
if err := c.ntfnServer.SendUpdate(event); err != nil {
126+
log.Warnf("Unable to send active channel update: %v", err)
127+
}
128+
}
129+
130+
// NotifyInactiveChannelEvent notifies the channelEventNotifier goroutine that a
131+
// channel is inactive.
132+
func (c *ChannelNotifier) NotifyInactiveChannelEvent(chanPoint wire.OutPoint) {
133+
event := InactiveChannelEvent{ChannelPoint: &chanPoint}
134+
if err := c.ntfnServer.SendUpdate(event); err != nil {
135+
log.Warnf("Unable to send inactive channel update: %v", err)
136+
}
137+
}

channelnotifier/log.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package channelnotifier
2+
3+
import (
4+
"github.com/btcsuite/btclog"
5+
"github.com/lightningnetwork/lnd/build"
6+
)
7+
8+
// log is a logger that is initialized with no output filters. This means the
9+
// package will not perform any logging by default until the caller requests
10+
// it.
11+
var log btclog.Logger
12+
13+
// The default amount of logging is none.
14+
func init() {
15+
UseLogger(build.NewSubLogger("CHNF", nil))
16+
}
17+
18+
// DisableLog disables all library log output. Logging output is disabled by
19+
// default until UseLogger is called.
20+
func DisableLog() {
21+
UseLogger(btclog.Disabled)
22+
}
23+
24+
// UseLogger uses a specified Logger to output package logging info. This
25+
// should be used in preference to SetLogWriter if the caller is also using
26+
// btclog.
27+
func UseLogger(logger btclog.Logger) {
28+
log = logger
29+
}
30+
31+
// logClosure is used to provide a closure over expensive logging operations so
32+
// don't have to be performed when the logging level doesn't warrant it.
33+
type logClosure func() string
34+
35+
// String invokes the underlying function and returns the result.
36+
func (c logClosure) String() string {
37+
return c()
38+
}
39+
40+
// newLogClosure returns a new closure over a function that returns a string
41+
// which itself provides a Stringer interface so that it can be used with the
42+
// logging system.
43+
func newLogClosure(c func() string) logClosure {
44+
return logClosure(c)
45+
}

contractcourt/chain_arbitrator.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,10 @@ type ChainArbitratorConfig struct {
141141
// the given payment hash. ErrInvoiceNotFound is returned if an invoice
142142
// is not found.
143143
SettleInvoice func(lntypes.Hash, lnwire.MilliSatoshi) error
144+
145+
// NotifyClosedChannel is a function closure that the ChainArbitrator
146+
// will use to notify the ChannelNotifier about a newly closed channel.
147+
NotifyClosedChannel func(wire.OutPoint)
144148
}
145149

146150
// ChainArbitrator is a sub-system that oversees the on-chain resolution of all
@@ -245,10 +249,16 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
245249
return chanMachine.ForceClose()
246250
},
247251
MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted,
248-
MarkChannelClosed: channel.CloseChannel,
249-
IsPendingClose: false,
250-
ChainArbitratorConfig: c.cfg,
251-
ChainEvents: chanEvents,
252+
MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary) error {
253+
if err := channel.CloseChannel(summary); err != nil {
254+
return err
255+
}
256+
c.cfg.NotifyClosedChannel(summary.ChanPoint)
257+
return nil
258+
},
259+
IsPendingClose: false,
260+
ChainArbitratorConfig: c.cfg,
261+
ChainEvents: chanEvents,
252262
}
253263

254264
// The final component needed is an arbitrator log that the arbitrator
@@ -719,13 +729,7 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error
719729
// SubscribeChannelEvents returns a new active subscription for the set of
720730
// possible on-chain events for a particular channel. The struct can be used by
721731
// callers to be notified whenever an event that changes the state of the
722-
// channel on-chain occurs. If syncDispatch is true, then the sender of the
723-
// notification will wait until an error is sent over the ProcessACK before
724-
// modifying any database state. This allows callers to request a reliable hand
725-
// off.
726-
//
727-
// TODO(roasbeef): can be used later to provide RPC hook for all channel
728-
// lifetimes
732+
// channel on-chain occurs.
729733
func (c *ChainArbitrator) SubscribeChannelEvents(
730734
chanPoint wire.OutPoint) (*ChainEventSubscription, error) {
731735

fundingmanager.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,10 @@ type fundingConfig struct {
333333
// flood us with very small channels that would never really be usable
334334
// due to fees.
335335
MinChanSize btcutil.Amount
336+
337+
// NotifyOpenChannelEvent informs the ChannelNotifier when channels
338+
// transition from pending open to open.
339+
NotifyOpenChannelEvent func(wire.OutPoint)
336340
}
337341

338342
// fundingManager acts as an orchestrator/bridge between the wallet's
@@ -1939,6 +1943,10 @@ func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.Open
19391943
return
19401944
}
19411945

1946+
// Inform the ChannelNotifier that the channel has transitioned from
1947+
// pending open to open.
1948+
f.cfg.NotifyOpenChannelEvent(completeChan.FundingOutpoint)
1949+
19421950
// TODO(roasbeef): ideally persistent state update for chan above
19431951
// should be abstracted
19441952

fundingmanager_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,8 +353,9 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
353353
publTxChan <- txn
354354
return nil
355355
},
356-
ZombieSweeperInterval: 1 * time.Hour,
357-
ReservationTimeout: 1 * time.Nanosecond,
356+
ZombieSweeperInterval: 1 * time.Hour,
357+
ReservationTimeout: 1 * time.Nanosecond,
358+
NotifyOpenChannelEvent: func(wire.OutPoint) {},
358359
})
359360
if err != nil {
360361
t.Fatalf("failed creating fundingManager: %v", err)

htlcswitch/interfaces.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package htlcswitch
22

33
import (
4+
"github.com/btcsuite/btcd/wire"
45
"github.com/lightningnetwork/lnd/channeldb"
56
"github.com/lightningnetwork/lnd/lnpeer"
67
"github.com/lightningnetwork/lnd/lntypes"
@@ -58,6 +59,9 @@ type ChannelLink interface {
5859
// possible).
5960
HandleChannelUpdate(lnwire.Message)
6061

62+
// ChannelPoint returns the channel outpoint for the channel link.
63+
ChannelPoint() *wire.OutPoint
64+
6165
// ChanID returns the channel ID for the channel link. The channel ID
6266
// is a more compact representation of a channel's full outpoint.
6367
ChanID() lnwire.ChannelID

htlcswitch/link.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"sync/atomic"
1010
"time"
1111

12+
"github.com/btcsuite/btcd/wire"
1213
"github.com/davecgh/go-spew/spew"
1314
"github.com/go-errors/errors"
1415
"github.com/lightningnetwork/lnd/channeldb"
@@ -1743,6 +1744,12 @@ func (l *channelLink) Peer() lnpeer.Peer {
17431744
return l.cfg.Peer
17441745
}
17451746

1747+
// ChannelPoint returns the channel outpoint for the channel link.
1748+
// NOTE: Part of the ChannelLink interface.
1749+
func (l *channelLink) ChannelPoint() *wire.OutPoint {
1750+
return l.channel.ChannelPoint()
1751+
}
1752+
17461753
// ShortChanID returns the short channel ID for the channel link. The short
17471754
// channel ID encodes the exact location in the main chain that the original
17481755
// funding output can be found.

htlcswitch/mock.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,11 @@ func initSwitchWithDB(startingHeight uint32, db *channeldb.DB) (*Switch, error)
162162
FetchLastChannelUpdate: func(lnwire.ShortChannelID) (*lnwire.ChannelUpdate, error) {
163163
return nil, nil
164164
},
165-
Notifier: &mockNotifier{},
166-
FwdEventTicker: ticker.MockNew(DefaultFwdEventInterval),
167-
LogEventTicker: ticker.MockNew(DefaultLogInterval),
165+
Notifier: &mockNotifier{},
166+
FwdEventTicker: ticker.MockNew(DefaultFwdEventInterval),
167+
LogEventTicker: ticker.MockNew(DefaultLogInterval),
168+
NotifyActiveChannel: func(wire.OutPoint) {},
169+
NotifyInactiveChannel: func(wire.OutPoint) {},
168170
}
169171

170172
return New(cfg, startingHeight)
@@ -673,6 +675,7 @@ func (f *mockChannelLink) ChanID() lnwire.ChannelID { return
673675
func (f *mockChannelLink) ShortChanID() lnwire.ShortChannelID { return f.shortChanID }
674676
func (f *mockChannelLink) Bandwidth() lnwire.MilliSatoshi { return 99999999 }
675677
func (f *mockChannelLink) Peer() lnpeer.Peer { return f.peer }
678+
func (f *mockChannelLink) ChannelPoint() *wire.OutPoint { return &wire.OutPoint{} }
676679
func (f *mockChannelLink) Stop() {}
677680
func (f *mockChannelLink) EligibleToForward() bool { return f.eligible }
678681
func (f *mockChannelLink) setLiveShortChanID(sid lnwire.ShortChannelID) { f.shortChanID = sid }

htlcswitch/switch.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,11 @@ type Config struct {
175175
// LogEventTicker is a signal instructing the htlcswitch to log
176176
// aggregate stats about it's forwarding during the last interval.
177177
LogEventTicker ticker.Ticker
178+
179+
// NotifyActiveChannel and NotifyInactiveChannel allow the link to tell
180+
// the ChannelNotifier when channels become active and inactive.
181+
NotifyActiveChannel func(wire.OutPoint)
182+
NotifyInactiveChannel func(wire.OutPoint)
178183
}
179184

180185
// Switch is the central messaging bus for all incoming/outgoing HTLCs.
@@ -1956,6 +1961,11 @@ func (s *Switch) addLiveLink(link ChannelLink) {
19561961
s.interfaceIndex[peerPub] = make(map[lnwire.ChannelID]ChannelLink)
19571962
}
19581963
s.interfaceIndex[peerPub][link.ChanID()] = link
1964+
1965+
// Inform the channel notifier if the link has become active.
1966+
if link.EligibleToForward() {
1967+
s.cfg.NotifyActiveChannel(*link.ChannelPoint())
1968+
}
19591969
}
19601970

19611971
// GetLink is used to initiate the handling of the get link command. The
@@ -2031,6 +2041,9 @@ func (s *Switch) removeLink(chanID lnwire.ChannelID) ChannelLink {
20312041
return nil
20322042
}
20332043

2044+
// Inform the Channel Notifier about the link becoming inactive.
2045+
s.cfg.NotifyInactiveChannel(*link.ChannelPoint())
2046+
20342047
// Remove the channel from live link indexes.
20352048
delete(s.pendingLinkIndex, link.ChanID())
20362049
delete(s.linkIndex, link.ChanID())

0 commit comments

Comments
 (0)