Skip to content

Commit 614ee3b

Browse files
Guard SendStream.CancelWrite with doneOnce to prevent race conditions
1 parent 80bd45f commit 614ee3b

File tree

1 file changed

+27
-25
lines changed

1 file changed

+27
-25
lines changed

stream.go

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ type SendStream struct {
4848

4949
onClose func() // to remove the stream from the streamsMap
5050

51-
closeOnce sync.Once
52-
closed chan struct{}
53-
closeErr error
51+
doneOnce sync.Once
52+
closed chan struct{}
53+
closeErr error
5454

5555
deadlineMu sync.Mutex
5656
writeDeadline time.Time
@@ -147,28 +147,30 @@ func (s *SendStream) maybeSendStreamHeader() error {
147147
// Write will unblock immediately, and future calls to Write will fail.
148148
// When called multiple times it is a no-op.
149149
func (s *SendStream) CancelWrite(e StreamErrorCode) {
150-
s.streamHdrMu.Lock()
151-
if len(s.streamHdr) > 0 {
152-
// Sending the stream header might block if we are blocked by flow control.
153-
// Send a stream header async so that CancelWrite can return immediately.
154-
go func() {
155-
defer s.streamHdrMu.Unlock()
156-
157-
s.SetWriteDeadline(time.Time{})
158-
_ = s.maybeSendStreamHeader()
159-
s.str.CancelWrite(webtransportCodeToHTTPCode(e))
160-
s.onClose()
161-
}()
162-
return
163-
}
164-
s.streamHdrMu.Unlock()
150+
s.doneOnce.Do(func() {
151+
s.streamHdrMu.Lock()
152+
if len(s.streamHdr) > 0 {
153+
// Sending the stream header might block if we are blocked by flow control.
154+
// Send a stream header async so that CancelWrite can return immediately.
155+
go func() {
156+
defer s.streamHdrMu.Unlock()
157+
158+
s.SetWriteDeadline(time.Time{})
159+
_ = s.maybeSendStreamHeader()
160+
s.str.CancelWrite(webtransportCodeToHTTPCode(e))
161+
s.onClose()
162+
}()
163+
return
164+
}
165+
s.streamHdrMu.Unlock()
165166

166-
s.str.CancelWrite(webtransportCodeToHTTPCode(e))
167-
s.onClose()
167+
s.str.CancelWrite(webtransportCodeToHTTPCode(e))
168+
s.onClose()
169+
})
168170
}
169171

170172
func (s *SendStream) closeWithSession(err error) {
171-
s.closeOnce.Do(func() {
173+
s.doneOnce.Do(func() {
172174
s.closeErr = err
173175
s.str.CancelWrite(WTSessionGoneErrorCode)
174176
close(s.closed)
@@ -232,9 +234,9 @@ type ReceiveStream struct {
232234

233235
onClose func() // to remove the stream from the streamsMap
234236

235-
closeOnce sync.Once
236-
closed chan struct{}
237-
closeErr error
237+
doneOnce sync.Once
238+
closed chan struct{}
239+
closeErr error
238240

239241
deadlineMu sync.Mutex
240242
readDeadline time.Time
@@ -309,7 +311,7 @@ func (s *ReceiveStream) CancelRead(e StreamErrorCode) {
309311
}
310312

311313
func (s *ReceiveStream) closeWithSession(err error) {
312-
s.closeOnce.Do(func() {
314+
s.doneOnce.Do(func() {
313315
s.closeErr = err
314316
s.str.CancelRead(WTSessionGoneErrorCode)
315317
close(s.closed)

0 commit comments

Comments
 (0)