Skip to content

Commit 5ed8559

Browse files
authored
Fix streaming Reader race condition (#55)
* Fix streaming Reader race condition - There was a race condition between creating a `streaming.Reader` with `NewReader` and calling its `Subscribe` where events could be consumed before there is any channel to send them to. - To fix this, the default behavior is now to only start the reader's `read` goroutine when `Subscribe` is called after it has inserted a channel that will receive events. - Alternatively, there is now an option `WithReaderStartExplicitly` which will only start the `read` goroutine when the new `Start` is called allowing multiple calls to `Subscribe` before receiving events. * Remove option to start explicitly
1 parent f5494e1 commit 5ed8559

File tree

1 file changed

+11
-3
lines changed

1 file changed

+11
-3
lines changed

streaming/reader.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ type (
4242
bufferSize int
4343
// channels to send notifications
4444
chans []chan *Event
45+
// startOnce is used to ensure the reader is started only once.
46+
startOnce sync.Once
4547
// donechan is the reader donechan channel.
4648
donechan chan struct{}
4749
// streamschan notifies the reader when streams are added or
@@ -114,9 +116,6 @@ func newReader(stream *Stream, opts ...options.Reader) (*Reader, error) {
114116
rdb: stream.rdb,
115117
}
116118

117-
reader.wait.Add(1)
118-
pulse.Go(reader.logger, reader.read)
119-
120119
return reader, nil
121120
}
122121

@@ -127,6 +126,7 @@ func (r *Reader) Subscribe() <-chan *Event {
127126
r.lock.Lock()
128127
defer r.lock.Unlock()
129128
r.chans = append(r.chans, c)
129+
r.start()
130130
return c
131131
}
132132

@@ -209,6 +209,14 @@ func (r *Reader) IsClosed() bool {
209209
return r.closed
210210
}
211211

212+
// start starts the reader's read goroutine if it is not already running.
213+
func (r *Reader) start() {
214+
r.startOnce.Do(func() {
215+
r.wait.Add(1)
216+
pulse.Go(r.logger, r.read)
217+
})
218+
}
219+
212220
// read reads events from the streams and sends them to the reader channel.
213221
func (r *Reader) read() {
214222
ctx := context.Background()

0 commit comments

Comments
 (0)