@@ -402,10 +402,10 @@ func (h *HostInfo) HostnameAndPort() string {
402402}
403403
404404func (h * HostInfo ) ConnectAddressAndPort () string {
405- h .mu .Lock ()
406- defer h .mu .Unlock ()
407- addr , _ := h .connectAddressLocked ()
408- return net .JoinHostPort (addr .String (), strconv .Itoa (h .port ))
405+ h .mu .Lock ()
406+ defer h .mu .Unlock ()
407+ addr , _ := h .connectAddressLocked ()
408+ return net .JoinHostPort (addr .String (), strconv .Itoa (h .port ))
409409}
410410
411411func (h * HostInfo ) String () string {
@@ -747,27 +747,27 @@ const (
747747// debounces requests to call a refresh function (currently used for ring refresh). It also supports triggering a refresh immediately.
748748type refreshDebouncer struct {
749749 mu sync.Mutex
750+ stopped bool
750751 broadcaster * errorBroadcaster
751752 interval time.Duration
752753 timer * time.Timer
753754 refreshNowCh chan struct {}
755+ quit chan struct {}
756+ done chan struct {}
754757 refreshFn func () error
755- quitCtxFn context.CancelFunc
756- quitCtx context.Context
757- stoppedCtxFn context.CancelFunc
758- stoppedCtx context.Context
759758}
760759
761760func newRefreshDebouncer (interval time.Duration , refreshFn func () error ) * refreshDebouncer {
762761 d := & refreshDebouncer {
762+ stopped : false ,
763763 broadcaster : nil ,
764764 refreshNowCh : make (chan struct {}, 1 ),
765+ quit : make (chan struct {}),
766+ done : make (chan struct {}),
765767 interval : interval ,
766768 timer : time .NewTimer (interval ),
767769 refreshFn : refreshFn ,
768770 }
769- d .stoppedCtx , d .stoppedCtxFn = context .WithCancel (context .Background ())
770- d .quitCtx , d .quitCtxFn = context .WithCancel (context .Background ())
771771 d .timer .Stop ()
772772 go d .flusher ()
773773 return d
@@ -777,7 +777,7 @@ func newRefreshDebouncer(interval time.Duration, refreshFn func() error) *refres
777777func (d * refreshDebouncer ) debounce () {
778778 d .mu .Lock ()
779779 defer d .mu .Unlock ()
780- if d .isStopped () {
780+ if d .stopped {
781781 return
782782 }
783783 d .timer .Reset (d .interval )
@@ -788,11 +788,6 @@ func (d *refreshDebouncer) refreshNow() <-chan error {
788788 d .mu .Lock ()
789789 defer d .mu .Unlock ()
790790 if d .broadcaster == nil {
791- if d .isStopped () {
792- ch := make (chan error )
793- close (ch )
794- return ch
795- }
796791 d .broadcaster = newErrorBroadcaster ()
797792 select {
798793 case d .refreshNowCh <- struct {}{}:
@@ -804,15 +799,15 @@ func (d *refreshDebouncer) refreshNow() <-chan error {
804799}
805800
806801func (d * refreshDebouncer ) flusher () {
807- defer d . stoppedCtxFn ( )
802+ defer close ( d . done )
808803 for {
809804 select {
810805 case <- d .refreshNowCh :
811806 case <- d .timer .C :
812- case <- d .quitCtx . Done () :
807+ case <- d .quit :
813808 }
814809 d .mu .Lock ()
815- if d .isStopped () {
810+ if d .stopped {
816811 if d .broadcaster != nil {
817812 d .broadcaster .stop ()
818813 d .broadcaster = nil
@@ -846,12 +841,15 @@ func (d *refreshDebouncer) flusher() {
846841}
847842
848843func (d * refreshDebouncer ) stop () {
849- d .quitCtxFn () // wake up flusher
850- <- d .stoppedCtx .Done () // wait for flusher to exit
851- }
852-
853- func (d * refreshDebouncer ) isStopped () bool {
854- return d .quitCtx .Err () != nil
844+ d .mu .Lock ()
845+ if d .stopped {
846+ d .mu .Unlock ()
847+ return
848+ }
849+ d .stopped = true
850+ d .mu .Unlock ()
851+ close (d .quit )
852+ <- d .done
855853}
856854
857855// broadcasts an error to multiple channels (listeners)
0 commit comments