Skip to content

Commit bef3c1d

Browse files
committed
fix deadlock in refresh debouncer stop
1 parent 34fdeeb commit bef3c1d

File tree

2 files changed

+47
-16
lines changed

2 files changed

+47
-16
lines changed

host_source.go

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -747,25 +747,27 @@ const (
747747
// debounces requests to call a refresh function (currently used for ring refresh). It also supports triggering a refresh immediately.
748748
type refreshDebouncer struct {
749749
mu sync.Mutex
750-
stopped bool
751750
broadcaster *errorBroadcaster
752751
interval time.Duration
753752
timer *time.Timer
754753
refreshNowCh chan struct{}
755-
quit chan struct{}
756754
refreshFn func() error
755+
quitCtxFn context.CancelFunc
756+
quitCtx context.Context
757+
stoppedCtxFn context.CancelFunc
758+
stoppedCtx context.Context
757759
}
758760

759761
func newRefreshDebouncer(interval time.Duration, refreshFn func() error) *refreshDebouncer {
760762
d := &refreshDebouncer{
761-
stopped: false,
762763
broadcaster: nil,
763764
refreshNowCh: make(chan struct{}, 1),
764-
quit: make(chan struct{}),
765765
interval: interval,
766766
timer: time.NewTimer(interval),
767767
refreshFn: refreshFn,
768768
}
769+
d.stoppedCtx, d.stoppedCtxFn = context.WithCancel(context.Background())
770+
d.quitCtx, d.quitCtxFn = context.WithCancel(context.Background())
769771
d.timer.Stop()
770772
go d.flusher()
771773
return d
@@ -775,7 +777,7 @@ func newRefreshDebouncer(interval time.Duration, refreshFn func() error) *refres
775777
func (d *refreshDebouncer) debounce() {
776778
d.mu.Lock()
777779
defer d.mu.Unlock()
778-
if d.stopped {
780+
if d.isStopped() {
779781
return
780782
}
781783
d.timer.Reset(d.interval)
@@ -786,6 +788,11 @@ func (d *refreshDebouncer) refreshNow() <-chan error {
786788
d.mu.Lock()
787789
defer d.mu.Unlock()
788790
if d.broadcaster == nil {
791+
if d.isStopped() {
792+
ch := make(chan error)
793+
close(ch)
794+
return ch
795+
}
789796
d.broadcaster = newErrorBroadcaster()
790797
select {
791798
case d.refreshNowCh <- struct{}{}:
@@ -797,14 +804,15 @@ func (d *refreshDebouncer) refreshNow() <-chan error {
797804
}
798805

799806
func (d *refreshDebouncer) flusher() {
807+
defer d.stoppedCtxFn()
800808
for {
801809
select {
802810
case <-d.refreshNowCh:
803811
case <-d.timer.C:
804-
case <-d.quit:
812+
case <-d.quitCtx.Done():
805813
}
806814
d.mu.Lock()
807-
if d.stopped {
815+
if d.isStopped() {
808816
if d.broadcaster != nil {
809817
d.broadcaster.stop()
810818
d.broadcaster = nil
@@ -838,15 +846,12 @@ func (d *refreshDebouncer) flusher() {
838846
}
839847

840848
func (d *refreshDebouncer) stop() {
841-
d.mu.Lock()
842-
if d.stopped {
843-
d.mu.Unlock()
844-
return
845-
}
846-
d.stopped = true
847-
d.mu.Unlock()
848-
d.quit <- struct{}{} // sync with flusher
849-
close(d.quit)
849+
d.quitCtxFn() // wake up flusher
850+
<-d.stoppedCtx.Done() // wait for flusher to exit
851+
}
852+
853+
func (d *refreshDebouncer) isStopped() bool {
854+
return d.quitCtx.Err() != nil
850855
}
851856

852857
// broadcasts an error to multiple channels (listeners)

host_source_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,32 @@ func TestRefreshDebouncer_EventsAfterRefreshNow(t *testing.T) {
274274
}
275275
}
276276

277+
// https://github.com/gocql/gocql/issues/1752
278+
func TestRefreshDebouncer_DeadlockOnStop(t *testing.T) {
279+
// there's no way to guarantee this bug manifests because it depends on which `case` is picked from the `select`
280+
// with 4 iterations of this test the deadlock would be hit pretty consistently
281+
const iterations = 4
282+
for i := 0; i < iterations; i++ {
283+
refreshCalledCh := make(chan int, 5)
284+
refreshDuration := 500 * time.Millisecond
285+
fn := func() error {
286+
refreshCalledCh <- 0
287+
time.Sleep(refreshDuration)
288+
return nil
289+
}
290+
d := newRefreshDebouncer(50*time.Millisecond, fn)
291+
timeBeforeRefresh := time.Now()
292+
_ = d.refreshNow()
293+
<-refreshCalledCh
294+
d.debounce()
295+
d.stop()
296+
timeAfterRefresh := time.Now()
297+
if timeAfterRefresh.Sub(timeBeforeRefresh) < refreshDuration {
298+
t.Errorf("refresh debouncer stop() didn't wait until flusher stopped")
299+
}
300+
}
301+
}
302+
277303
func TestErrorBroadcaster_MultipleListeners(t *testing.T) {
278304
b := newErrorBroadcaster()
279305
defer b.stop()

0 commit comments

Comments
 (0)