@@ -40,6 +40,8 @@ type DataChannel struct {
40
40
readyState atomic.Value // DataChannelState
41
41
bufferedAmountLowThreshold uint64
42
42
detachCalled bool
43
+ readLoopActive chan struct {}
44
+ isGracefulClosed bool
43
45
44
46
// The binaryType represents attribute MUST, on getting, return the value to
45
47
// which it was last set. On setting, if the new value is either the string
@@ -225,6 +227,10 @@ func (d *DataChannel) OnOpen(f func()) {
225
227
func (d * DataChannel ) onOpen () {
226
228
d .mu .RLock ()
227
229
handler := d .onOpenHandler
230
+ if d .isGracefulClosed {
231
+ d .mu .RUnlock ()
232
+ return
233
+ }
228
234
d .mu .RUnlock ()
229
235
230
236
if handler != nil {
@@ -252,6 +258,10 @@ func (d *DataChannel) OnDial(f func()) {
252
258
func (d * DataChannel ) onDial () {
253
259
d .mu .RLock ()
254
260
handler := d .onDialHandler
261
+ if d .isGracefulClosed {
262
+ d .mu .RUnlock ()
263
+ return
264
+ }
255
265
d .mu .RUnlock ()
256
266
257
267
if handler != nil {
@@ -261,6 +271,10 @@ func (d *DataChannel) onDial() {
261
271
262
272
// OnClose sets an event handler which is invoked when
263
273
// the underlying data transport has been closed.
274
+ // Note: Due to backwards compatibility, there is a chance that
275
+ // OnClose can be called, even if the GracefulClose is used.
276
+ // If this is the case for you, you can deregister OnClose
277
+ // prior to GracefulClose.
264
278
func (d * DataChannel ) OnClose (f func ()) {
265
279
d .mu .Lock ()
266
280
defer d .mu .Unlock ()
@@ -292,6 +306,10 @@ func (d *DataChannel) OnMessage(f func(msg DataChannelMessage)) {
292
306
func (d * DataChannel ) onMessage (msg DataChannelMessage ) {
293
307
d .mu .RLock ()
294
308
handler := d .onMessageHandler
309
+ if d .isGracefulClosed {
310
+ d .mu .RUnlock ()
311
+ return
312
+ }
295
313
d .mu .RUnlock ()
296
314
297
315
if handler == nil {
@@ -302,6 +320,10 @@ func (d *DataChannel) onMessage(msg DataChannelMessage) {
302
320
303
321
func (d * DataChannel ) handleOpen (dc * datachannel.DataChannel , isRemote , isAlreadyNegotiated bool ) {
304
322
d .mu .Lock ()
323
+ if d .isGracefulClosed {
324
+ d .mu .Unlock ()
325
+ return
326
+ }
305
327
d .dataChannel = dc
306
328
bufferedAmountLowThreshold := d .bufferedAmountLowThreshold
307
329
onBufferedAmountLow := d .onBufferedAmountLow
@@ -326,7 +348,12 @@ func (d *DataChannel) handleOpen(dc *datachannel.DataChannel, isRemote, isAlread
326
348
d .mu .Lock ()
327
349
defer d .mu .Unlock ()
328
350
351
+ if d .isGracefulClosed {
352
+ return
353
+ }
354
+
329
355
if ! d .api .settingEngine .detach .DataChannels {
356
+ d .readLoopActive = make (chan struct {})
330
357
go d .readLoop ()
331
358
}
332
359
}
@@ -342,6 +369,10 @@ func (d *DataChannel) OnError(f func(err error)) {
342
369
func (d * DataChannel ) onError (err error ) {
343
370
d .mu .RLock ()
344
371
handler := d .onErrorHandler
372
+ if d .isGracefulClosed {
373
+ d .mu .RUnlock ()
374
+ return
375
+ }
345
376
d .mu .RUnlock ()
346
377
347
378
if handler != nil {
@@ -356,6 +387,12 @@ var rlBufPool = sync.Pool{New: func() interface{} {
356
387
}}
357
388
358
389
func (d * DataChannel ) readLoop () {
390
+ defer func () {
391
+ d .mu .Lock ()
392
+ readLoopActive := d .readLoopActive
393
+ d .mu .Unlock ()
394
+ defer close (readLoopActive )
395
+ }()
359
396
for {
360
397
buffer := rlBufPool .Get ().([]byte ) //nolint:forcetypeassert
361
398
n , isString , err := d .dataChannel .ReadDataChannel (buffer )
@@ -438,7 +475,32 @@ func (d *DataChannel) Detach() (datachannel.ReadWriteCloser, error) {
438
475
// Close Closes the DataChannel. It may be called regardless of whether
439
476
// the DataChannel object was created by this peer or the remote peer.
440
477
func (d * DataChannel ) Close () error {
478
+ return d .close (false )
479
+ }
480
+
481
+ // GracefulClose Closes the DataChannel. It may be called regardless of whether
482
+ // the DataChannel object was created by this peer or the remote peer. It also waits
483
+ // for any goroutines it started to complete. This is only safe to call outside of
484
+ // DataChannel callbacks or if in a callback, in its own goroutine.
485
+ func (d * DataChannel ) GracefulClose () error {
486
+ return d .close (true )
487
+ }
488
+
489
+ // Normally, close only stops writes from happening, so graceful=true
490
+ // will wait for reads to be finished based on underlying SCTP association
491
+ // closure or a SCTP reset stream from the other side. This is safe to call
492
+ // with graceful=true after tearing down a PeerConnection but not
493
+ // necessarily before. For example, if you used a vnet and dropped all packets
494
+ // right before closing the DataChannel, you'd need never see a reset stream.
495
+ func (d * DataChannel ) close (shouldGracefullyClose bool ) error {
441
496
d .mu .Lock ()
497
+ d .isGracefulClosed = true
498
+ readLoopActive := d .readLoopActive
499
+ if shouldGracefullyClose && readLoopActive != nil {
500
+ defer func () {
501
+ <- readLoopActive
502
+ }()
503
+ }
442
504
haveSctpTransport := d .dataChannel != nil
443
505
d .mu .Unlock ()
444
506
0 commit comments