@@ -312,18 +312,8 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
312
312
log .Tracef ("Creating ChannelArbitrator for ChannelPoint(%v)" ,
313
313
channel .FundingOutpoint )
314
314
315
- // We'll start by registering for a block epoch notifications so this
316
- // channel can keep track of the current state of the main chain.
317
- //
318
315
// TODO(roasbeef): fetch best height (or pass in) so can ensure block
319
316
// epoch delivers all the notifications to
320
- //
321
- // TODO(roasbeef): instead 1 block epoch that multi-plexes to the rest?
322
- // * reduces the number of goroutines
323
- blockEpoch , err := c .cfg .Notifier .RegisterBlockEpochNtfn (nil )
324
- if err != nil {
325
- return nil , err
326
- }
327
317
328
318
chanPoint := channel .FundingOutpoint
329
319
@@ -333,7 +323,6 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
333
323
ChanPoint : chanPoint ,
334
324
Channel : c .getArbChannel (channel ),
335
325
ShortChanID : channel .ShortChanID (),
336
- BlockEpochs : blockEpoch ,
337
326
338
327
MarkCommitmentBroadcasted : channel .MarkCommitmentBroadcasted ,
339
328
MarkChannelClosed : func (summary * channeldb.ChannelCloseSummary ,
@@ -369,7 +358,6 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
369
358
c .chanSource .Backend , arbCfg , c .cfg .ChainHash , chanPoint ,
370
359
)
371
360
if err != nil {
372
- blockEpoch .Cancel ()
373
361
return nil , err
374
362
}
375
363
@@ -385,7 +373,6 @@ func newActiveChannelArbitrator(channel *channeldb.OpenChannel,
385
373
386
374
pendingRemoteCommitment , err := channel .RemoteCommitChainTip ()
387
375
if err != nil && err != channeldb .ErrNoPendingCommit {
388
- blockEpoch .Cancel ()
389
376
return nil , err
390
377
}
391
378
if pendingRemoteCommitment != nil {
@@ -556,7 +543,6 @@ func (c *ChainArbitrator) Start() error {
556
543
arbCfg := ChannelArbitratorConfig {
557
544
ChanPoint : chanPoint ,
558
545
ShortChanID : closeChanInfo .ShortChanID ,
559
- BlockEpochs : blockEpoch ,
560
546
ChainArbitratorConfig : c .cfg ,
561
547
ChainEvents : & ChainEventSubscription {},
562
548
IsPendingClose : true ,
@@ -627,20 +613,130 @@ func (c *ChainArbitrator) Start() error {
627
613
}
628
614
}
629
615
630
- // Finally, we'll launch all the goroutines for each arbitrator so they
631
- // can carry out their duties.
616
+ // Launch all the goroutines for each arbitrator so they can carry out
617
+ // their duties.
632
618
for _ , arbitrator := range c .activeChannels {
633
619
if err := arbitrator .Start (); err != nil {
634
620
c .Stop ()
635
621
return err
636
622
}
637
623
}
638
624
625
+ // Subscribe to a single stream of block epoch notifications that we
626
+ // will dispatch to all active arbitrators.
627
+ blockEpoch , err := c .cfg .Notifier .RegisterBlockEpochNtfn (nil )
628
+ if err != nil {
629
+ return err
630
+ }
631
+
632
+ // Start our goroutine which will dispatch blocks to each arbitrator.
633
+ c .wg .Add (1 )
634
+ go func () {
635
+ defer c .wg .Done ()
636
+ c .dispatchBlocks (blockEpoch )
637
+ }()
638
+
639
639
// TODO(roasbeef): eventually move all breach watching here
640
640
641
641
return nil
642
642
}
643
643
644
+ // blockRecipient contains the information we need to dispatch a block to a
645
+ // channel arbitrator.
646
+ type blockRecipient struct {
647
+ // chanPoint is the funding outpoint of the channel.
648
+ chanPoint wire.OutPoint
649
+
650
+ // blocks is the channel that new block heights are sent into.
651
+ blocks chan <- int32
652
+
653
+ // quit is closed if the receiving entity is shutting down.
654
+ quit chan struct {}
655
+ }
656
+
657
+ // dispatchBlocks consumes a block epoch notification stream and dispatches
658
+ // blocks to each of the chain arb's active channel arbitrators. This function
659
+ // must be run in a goroutine.
660
+ func (c * ChainArbitrator ) dispatchBlocks (
661
+ blockEpoch * chainntnfs.BlockEpochEvent ) {
662
+
663
+ // getRecipients is a helper function which acquires the chain arb
664
+ // lock and returns a set of block recipients which can be used to
665
+ // dispatch blocks.
666
+ getRecipients := func () []blockRecipient {
667
+ c .Lock ()
668
+ blocks := make ([]blockRecipient , 0 , len (c .activeChannels ))
669
+ for _ , channel := range c .activeChannels {
670
+ blocks = append (blocks , blockRecipient {
671
+ chanPoint : channel .cfg .ChanPoint ,
672
+ blocks : channel .blocks ,
673
+ quit : channel .quit ,
674
+ })
675
+ }
676
+ c .Unlock ()
677
+
678
+ return blocks
679
+ }
680
+
681
+ // On exit, cancel our blocks subscription and close each block channel
682
+ // so that the arbitrators know they will no longer be receiving blocks.
683
+ defer func () {
684
+ blockEpoch .Cancel ()
685
+
686
+ recipients := getRecipients ()
687
+ for _ , recipient := range recipients {
688
+ close (recipient .blocks )
689
+ }
690
+ }()
691
+
692
+ // Consume block epochs until we receive the instruction to shutdown.
693
+ for {
694
+ select {
695
+ // Consume block epochs, exiting if our subscription is
696
+ // terminated.
697
+ case block , ok := <- blockEpoch .Epochs :
698
+ if ! ok {
699
+ log .Trace ("dispatchBlocks block epoch " +
700
+ "cancelled" )
701
+
702
+ return
703
+ }
704
+
705
+ // Get the set of currently active channels block
706
+ // subscription channels and dispatch the block to
707
+ // each.
708
+ for _ , recipient := range getRecipients () {
709
+ select {
710
+ // Deliver the block to the arbitrator.
711
+ case recipient .blocks <- block .Height :
712
+
713
+ // If the recipient is shutting down, exit
714
+ // without delivering the block. This may be
715
+ // the case when two blocks are mined in quick
716
+ // succession, and the arbitrator resolves
717
+ // after the first block, and does not need to
718
+ // consume the second block.
719
+ case <- recipient .quit :
720
+ log .Tracef ("channel: %v exit without " +
721
+ "receiving block: %v" ,
722
+ recipient .chanPoint ,
723
+ block .Height )
724
+
725
+ // If the chain arb is shutting down, we don't
726
+ // need to deliver any more blocks (everything
727
+ // will be shutting down).
728
+ case <- c .quit :
729
+ return
730
+ }
731
+ }
732
+
733
+ // Exit if the chain arbitrator is shutting down.
734
+ case <- c .quit :
735
+ return
736
+ }
737
+ }
738
+ }
739
+
644
740
// publishClosingTxs will load any stored cooperative or unilater closing
645
741
// transactions and republish them. This helps ensure propagation of the
646
742
// transactions in the event that prior publications failed.
0 commit comments