Skip to content

Commit 942b0db

Browse files
fix stream header sending for concurrent Close / CancelWrite calls
1 parent 80bd45f commit 942b0db

File tree

1 file changed

+31
-8
lines changed

1 file changed

+31
-8
lines changed

stream.go

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ type SendStream struct {
4545
// Might be initialized to nil if this sendStream is part of an incoming bidirectional stream.
4646
streamHdr []byte
4747
streamHdrMu sync.Mutex
48+
// Set to true when a goroutine is spawned to send the header asynchronously.
49+
// This only happens if the stream is closed / reset immediately after creation.
50+
sendingHdrAsync bool
4851

4952
onClose func() // to remove the stream from the streamsMap
5053

@@ -147,15 +150,25 @@ func (s *SendStream) maybeSendStreamHeader() error {
147150
// Write will unblock immediately, and future calls to Write will fail.
148151
// When called multiple times it is a no-op.
149152
func (s *SendStream) CancelWrite(e StreamErrorCode) {
153+
// if a Goroutine is already sending the header, return immediately
150154
s.streamHdrMu.Lock()
155+
if s.sendingHdrAsync {
156+
s.streamHdrMu.Unlock()
157+
return
158+
}
159+
151160
if len(s.streamHdr) > 0 {
152161
// Sending the stream header might block if we are blocked by flow control.
153162
// Send a stream header async so that CancelWrite can return immediately.
154-
go func() {
155-
defer s.streamHdrMu.Unlock()
163+
s.sendingHdrAsync = true
164+
streamHdr := s.streamHdr
165+
s.streamHdr = nil
166+
s.streamHdrMu.Unlock()
156167

168+
go func() {
157169
s.SetWriteDeadline(time.Time{})
158-
_ = s.maybeSendStreamHeader()
170+
_, _ = s.str.Write(streamHdr)
171+
s.str.SetReliableBoundary()
159172
s.str.CancelWrite(webtransportCodeToHTTPCode(e))
160173
s.onClose()
161174
}()
@@ -178,16 +191,26 @@ func (s *SendStream) closeWithSession(err error) {
178191
// Close closes the write-direction of the stream.
179192
// Future calls to Write are not permitted after calling Close.
180193
func (s *SendStream) Close() error {
194+
// if a Goroutine is already sending the header, return immediately
181195
s.streamHdrMu.Lock()
196+
if s.sendingHdrAsync {
197+
s.streamHdrMu.Unlock()
198+
return nil
199+
}
200+
182201
if len(s.streamHdr) > 0 {
183202
// Sending the stream header might block if we are blocked by flow control.
184-
// Send a stream header async so that Close can return immediately.
185-
go func() {
186-
defer s.streamHdrMu.Unlock()
203+
// Send a stream header async so that CancelWrite can return immediately.
204+
s.sendingHdrAsync = true
205+
streamHdr := s.streamHdr
206+
s.streamHdr = nil
207+
s.streamHdrMu.Unlock()
187208

209+
go func() {
188210
s.SetWriteDeadline(time.Time{})
189-
_ = s.maybeSendStreamHeader()
190-
s.str.Close()
211+
_, _ = s.str.Write(streamHdr)
212+
s.str.SetReliableBoundary()
213+
_ = s.str.Close()
191214
s.onClose()
192215
}()
193216
return nil

0 commit comments

Comments
 (0)