Skip to content

Commit 158c9b5

Browse files
committed
contractcourt: use single block subscription for block epochs
1 parent d12f76f commit 158c9b5

File tree

3 files changed

+94
-44
lines changed

3 files changed

+94
-44
lines changed

contractcourt/chain_arbitrator.go

+82-16
Original file line numberDiff line numberDiff line change
@@ -312,18 +312,8 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
312312
log.Tracef("Creating ChannelArbitrator for ChannelPoint(%v)",
313313
channel.FundingOutpoint)
314314

315-
// We'll start by registering for a block epoch notifications so this
316-
// channel can keep track of the current state of the main chain.
317-
//
318315
// TODO(roasbeef): fetch best height (or pass in) so can ensure block
319316
// epoch delivers all the notifications to
320-
//
321-
// TODO(roasbeef): instead 1 block epoch that multi-plexes to the rest?
322-
// * reduces the number of goroutines
323-
blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
324-
if err != nil {
325-
return nil, err
326-
}
327317

328318
chanPoint := channel.FundingOutpoint
329319

@@ -333,7 +323,6 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
333323
ChanPoint: chanPoint,
334324
Channel: c.getArbChannel(channel),
335325
ShortChanID: channel.ShortChanID(),
336-
BlockEpochs: blockEpoch,
337326

338327
MarkCommitmentBroadcasted: channel.MarkCommitmentBroadcasted,
339328
MarkChannelClosed: func(summary *channeldb.ChannelCloseSummary,
@@ -369,7 +358,6 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
369358
c.chanSource.Backend, arbCfg, c.cfg.ChainHash, chanPoint,
370359
)
371360
if err != nil {
372-
blockEpoch.Cancel()
373361
return nil, err
374362
}
375363

@@ -385,7 +373,6 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
385373

386374
pendingRemoteCommitment, err := channel.RemoteCommitChainTip()
387375
if err != nil && err != channeldb.ErrNoPendingCommit {
388-
blockEpoch.Cancel()
389376
return nil, err
390377
}
391378
if pendingRemoteCommitment != nil {
@@ -556,7 +543,6 @@ func (c *ChainArbitrator) Start() error {
556543
arbCfg := ChannelArbitratorConfig{
557544
ChanPoint: chanPoint,
558545
ShortChanID: closeChanInfo.ShortChanID,
559-
BlockEpochs: blockEpoch,
560546
ChainArbitratorConfig: c.cfg,
561547
ChainEvents: &ChainEventSubscription{},
562548
IsPendingClose: true,
@@ -627,20 +613,100 @@ func (c *ChainArbitrator) Start() error {
627613
}
628614
}
629615

630-
// Finally, we'll launch all the goroutines for each arbitrator so they
631-
// can carry out their duties.
616+
// Launch all the goroutines for each arbitrator so they can carry out
617+
// their duties.
632618
for _, arbitrator := range c.activeChannels {
633619
if err := arbitrator.Start(); err != nil {
634620
c.Stop()
635621
return err
636622
}
637623
}
638624

625+
// Subscribe to a single stream of block epoch notifications that we
626+
// will dispatch to all active arbitrators.
627+
blockEpoch, err := c.cfg.Notifier.RegisterBlockEpochNtfn(nil)
628+
if err != nil {
629+
return err
630+
}
631+
632+
// Start our goroutine which will dispatch blocks to each arbitrator.
633+
c.wg.Add(1)
634+
go func() {
635+
defer c.wg.Done()
636+
c.dispatchBlocks(blockEpoch)
637+
}()
638+
639639
// TODO(roasbeef): eventually move all breach watching here
640640

641641
return nil
642642
}
643643

644+
// dispatchBlocks consumes a block epoch notification stream and dispatches
645+
// blocks to each of the chain arb's active channel arbitrators. This function
646+
// must be run in a goroutine.
647+
func (c *ChainArbitrator) dispatchBlocks(
648+
blockEpoch *chainntnfs.BlockEpochEvent) {
649+
650+
// getBlockChannels is a helper function which acquires the chain arb
651+
// lock and returns a map of channel points to block channels.
652+
getBlockChannels := func() map[wire.OutPoint]chan<- int32 {
653+
c.Lock()
654+
blocks := make(map[wire.OutPoint]chan<- int32)
655+
for _, channel := range c.activeChannels {
656+
blocks[channel.cfg.ChanPoint] = channel.blocks
657+
}
658+
c.Unlock()
659+
660+
return blocks
661+
}
662+
663+
// On exit, cancel our blocks subscription and close each block channel
664+
// so that the arbitrators know they will no longer be receiving blocks.
665+
defer func() {
666+
blockEpoch.Cancel()
667+
668+
blocks := getBlockChannels()
669+
for _, block := range blocks {
670+
close(block)
671+
}
672+
}()
673+
674+
// Consume block epochs until we receive the instruction to shutdown.
675+
for {
676+
select {
677+
// Consume block epochs, exiting if our subscription is
678+
// terminated.
679+
case block, ok := <-blockEpoch.Epochs:
680+
if !ok {
681+
log.Trace("dispatchBlocks block epoch " +
682+
"cancelled")
683+
684+
return
685+
}
686+
687+
// Get the set of currently active channels block
688+
// subscription channels and dispatch the block to
689+
// each.
690+
for _, blockChan := range getBlockChannels() {
691+
select {
692+
// Deliver the block to the arbitrator.
693+
case blockChan <- block.Height:
694+
695+
// If the chain arb is shutting down, we don't
696+
// need to deliver any more blocks (everything
697+
// will be shutting down).
698+
case <-c.quit:
699+
return
700+
}
701+
}
702+
703+
// Exit if the chain arbitrator is shutting down.
704+
case <-c.quit:
705+
return
706+
}
707+
}
708+
}
709+
644710
// publishClosingTxs will load any stored cooperative or unilater closing
645711
// transactions and republish them. This helps ensure propagation of the
646712
// transactions in the event that prior publications failed.

contractcourt/channel_arbitrator.go

+8-14
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"github.com/btcsuite/btcd/wire"
1313
"github.com/btcsuite/btcutil"
1414
"github.com/davecgh/go-spew/spew"
15-
"github.com/lightningnetwork/lnd/chainntnfs"
1615
"github.com/lightningnetwork/lnd/channeldb"
1716
"github.com/lightningnetwork/lnd/channeldb/kvdb"
1817
"github.com/lightningnetwork/lnd/input"
@@ -108,12 +107,6 @@ type ChannelArbitratorConfig struct {
108107
// to the switch during contract resolution.
109108
ShortChanID lnwire.ShortChannelID
110109

111-
// BlockEpochs is an active block epoch event stream backed by an
112-
// active ChainNotifier instance. We will use new block notifications
113-
// sent over this channel to decide when we should go on chain to
114-
// reclaim/redeem the funds in an HTLC sent to/from us.
115-
BlockEpochs *chainntnfs.BlockEpochEvent
116-
117110
// ChainEvents is an active subscription to the chain watcher for this
118111
// channel to be notified of any on-chain activity related to this
119112
// channel.
@@ -325,6 +318,11 @@ type ChannelArbitrator struct {
325318
// to do its duty.
326319
cfg ChannelArbitratorConfig
327320

321+
// blocks is a channel that the arbitrator will receive new blocks on.
322+
// This channel should be buffered by 1 so that it does not block
323+
// the sender.
324+
blocks chan int32
325+
328326
// signalUpdates is a channel that any new live signals for the channel
329327
// we're watching over will be sent.
330328
signalUpdates chan *signalUpdateMsg
@@ -366,6 +364,7 @@ func NewChannelArbitrator(cfg ChannelArbitratorConfig,
366364

367365
return &ChannelArbitrator{
368366
log: log,
367+
blocks: make(chan int32, 1),
369368
signalUpdates: make(chan *signalUpdateMsg),
370369
htlcUpdates: make(<-chan *ContractUpdate),
371370
resolutionSignal: make(chan struct{}),
@@ -397,13 +396,11 @@ func (c *ChannelArbitrator) Start() error {
397396
// machine can act accordingly.
398397
c.state, err = c.log.CurrentState()
399398
if err != nil {
400-
c.cfg.BlockEpochs.Cancel()
401399
return err
402400
}
403401

404402
_, bestHeight, err := c.cfg.ChainIO.GetBestBlock()
405403
if err != nil {
406-
c.cfg.BlockEpochs.Cancel()
407404
return err
408405
}
409406

@@ -479,7 +476,6 @@ func (c *ChannelArbitrator) Start() error {
479476
c.cfg.ChanPoint)
480477

481478
default:
482-
c.cfg.BlockEpochs.Cancel()
483479
return err
484480
}
485481
}
@@ -501,7 +497,6 @@ func (c *ChannelArbitrator) Start() error {
501497
// commitment has been confirmed on chain, and before we
502498
// advance our state step, we call InsertConfirmedCommitSet.
503499
if err := c.relaunchResolvers(commitSet, triggerHeight); err != nil {
504-
c.cfg.BlockEpochs.Cancel()
505500
return err
506501
}
507502
}
@@ -2111,7 +2106,6 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
21112106

21122107
// TODO(roasbeef): tell top chain arb we're done
21132108
defer func() {
2114-
c.cfg.BlockEpochs.Cancel()
21152109
c.wg.Done()
21162110
}()
21172111

@@ -2121,11 +2115,11 @@ func (c *ChannelArbitrator) channelAttendant(bestHeight int32) {
21212115
// A new block has arrived, we'll examine all the active HTLC's
21222116
// to see if any of them have expired, and also update our
21232117
// track of the best current height.
2124-
case blockEpoch, ok := <-c.cfg.BlockEpochs.Epochs:
2118+
case blockHeight, ok := <-c.blocks:
21252119
if !ok {
21262120
return
21272121
}
2128-
bestHeight = blockEpoch.Height
2122+
bestHeight = blockHeight
21292123

21302124
// If we're not in the default state, then we can
21312125
// ignore this signal as we're waiting for contract

contractcourt/channel_arbitrator_test.go

+4-14
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,6 @@ type chanArbTestCtx struct {
197197

198198
resolvedChan chan struct{}
199199

200-
blockEpochs chan *chainntnfs.BlockEpoch
201-
202200
incubationRequests chan struct{}
203201

204202
resolutions chan []ResolutionMsg
@@ -304,12 +302,6 @@ func withMarkClosed(markClosed func(*channeldb.ChannelCloseSummary,
304302
func createTestChannelArbitrator(t *testing.T, log ArbitratorLog,
305303
opts ...testChanArbOption) (*chanArbTestCtx, error) {
306304

307-
blockEpochs := make(chan *chainntnfs.BlockEpoch)
308-
blockEpoch := &chainntnfs.BlockEpochEvent{
309-
Epochs: blockEpochs,
310-
Cancel: func() {},
311-
}
312-
313305
chanPoint := wire.OutPoint{}
314306
shortChanID := lnwire.ShortChannelID{}
315307
chanEvents := &ChainEventSubscription{
@@ -366,7 +358,6 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog,
366358
arbCfg := &ChannelArbitratorConfig{
367359
ChanPoint: chanPoint,
368360
ShortChanID: shortChanID,
369-
BlockEpochs: blockEpoch,
370361
MarkChannelResolved: func() error {
371362
resolvedChan <- struct{}{}
372363
return nil
@@ -433,7 +424,6 @@ func createTestChannelArbitrator(t *testing.T, log ArbitratorLog,
433424
cleanUp: cleanUp,
434425
resolvedChan: resolvedChan,
435426
resolutions: resolutionChan,
436-
blockEpochs: blockEpochs,
437427
log: log,
438428
incubationRequests: incubateChan,
439429
sweeper: mockSweeper,
@@ -1759,7 +1749,7 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) {
17591749
// now mine a block (height 5), which is 5 blocks away
17601750
// (our grace delta) from the expiry of that HTLC.
17611751
case testCase.htlcExpired:
1762-
chanArbCtx.blockEpochs <- &chainntnfs.BlockEpoch{Height: 5}
1752+
chanArbCtx.chanArb.blocks <- 5
17631753

17641754
// Otherwise, we'll just trigger a regular force close
17651755
// request.
@@ -1863,7 +1853,7 @@ func TestChannelArbitratorDanglingCommitForceClose(t *testing.T) {
18631853
// so instead, we'll mine another block which'll cause
18641854
// it to re-examine its state and realize there're no
18651855
// more HTLCs.
1866-
chanArbCtx.blockEpochs <- &chainntnfs.BlockEpoch{Height: 6}
1856+
chanArbCtx.chanArb.blocks <- 6
18671857
chanArbCtx.AssertStateTransitions(StateFullyResolved)
18681858
})
18691859
}
@@ -1940,13 +1930,13 @@ func TestChannelArbitratorPendingExpiredHTLC(t *testing.T) {
19401930
// We will advance the uptime to 10 seconds which should be still within
19411931
// the grace period and should not trigger going to chain.
19421932
testClock.SetTime(startTime.Add(time.Second * 10))
1943-
chanArbCtx.blockEpochs <- &chainntnfs.BlockEpoch{Height: 5}
1933+
chanArbCtx.chanArb.blocks <- 5
19441934
chanArbCtx.AssertState(StateDefault)
19451935

19461936
// We will advance the uptime to 16 seconds which should trigger going
19471937
// to chain.
19481938
testClock.SetTime(startTime.Add(time.Second * 16))
1949-
chanArbCtx.blockEpochs <- &chainntnfs.BlockEpoch{Height: 6}
1939+
chanArbCtx.chanArb.blocks <- 6
19501940
chanArbCtx.AssertStateTransitions(
19511941
StateBroadcastCommit,
19521942
StateCommitmentBroadcasted,

0 commit comments

Comments
 (0)