Skip to content

Commit d45f63c

Browse files
committed
Merge branch 'v3' into v3-viam
2 parents f82fceb + 6ac4b71 commit d45f63c

File tree

2 files changed

+155
-35
lines changed

2 files changed

+155
-35
lines changed

peerconnection.go

+72-34
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,9 @@ type PeerConnection struct {
5656
idpLoginURL *string
5757

5858
isClosed *atomicBool
59-
isGracefulClosed *atomicBool
60-
isGracefulClosedDone chan struct{}
59+
isGracefullyClosingOrClosed bool
60+
isCloseDone chan struct{}
61+
isGracefulCloseDone chan struct{}
6162
isNegotiationNeeded *atomicBool
6263
updateNegotiationNeededFlagOnEmptyChain *atomicBool
6364

@@ -130,8 +131,8 @@ func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection,
130131
ICECandidatePoolSize: 0,
131132
},
132133
isClosed: &atomicBool{},
133-
isGracefulClosed: &atomicBool{},
134-
isGracefulClosedDone: make(chan struct{}),
134+
isCloseDone: make(chan struct{}),
135+
isGracefulCloseDone: make(chan struct{}),
135136
isNegotiationNeeded: &atomicBool{},
136137
updateNegotiationNeededFlagOnEmptyChain: &atomicBool{},
137138
lastOffer: "",
@@ -2087,22 +2088,44 @@ func (pc *PeerConnection) GracefulClose() error {
20872088
func (pc *PeerConnection) close(shouldGracefullyClose bool) error {
20882089
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #1)
20892090
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #2)
2090-
alreadyGracefullyClosed := shouldGracefullyClose && pc.isGracefulClosed.swap(true)
2091-
if pc.isClosed.swap(true) {
2092-
if alreadyGracefullyClosed {
2093-
// similar but distinct condition where we may be waiting for some
2094-
// other graceful close to finish. Incorrectly using isClosed may
2095-
// leak a goroutine.
2096-
<-pc.isGracefulClosedDone
2097-
}
2098-
return nil
2091+
2092+
pc.mu.Lock()
2093+
// A lock in this critical section is needed because pc.isClosed and
2094+
// pc.isGracefullyClosingOrClosed are related to each other in that we
2095+
// want to make graceful and normal closure one time operations in order
2096+
// to avoid any double closure errors from cropping up. However, there are
2097+
// some overlapping close cases when both normal and graceful close are used
2098+
// that should be idempotent, but be cautioned when writing new close behavior
2099+
// to preserve this property.
2100+
isAlreadyClosingOrClosed := pc.isClosed.swap(true)
2101+
isAlreadyGracefullyClosingOrClosed := pc.isGracefullyClosingOrClosed
2102+
if shouldGracefullyClose && !isAlreadyGracefullyClosingOrClosed {
2103+
pc.isGracefullyClosingOrClosed = true
20992104
}
2100-
if shouldGracefullyClose && !alreadyGracefullyClosed {
2101-
defer close(pc.isGracefulClosedDone)
2105+
pc.mu.Unlock()
2106+
2107+
if isAlreadyClosingOrClosed {
2108+
if !shouldGracefullyClose {
2109+
return nil
2110+
}
2111+
// Even if we're already closing, it may not be graceful:
2112+
// If we are not the ones doing the closing, we just wait for the graceful close
2113+
// to happen and then return.
2114+
if isAlreadyGracefullyClosingOrClosed {
2115+
<-pc.isGracefulCloseDone
2116+
return nil
2117+
}
2118+
// Otherwise we need to go through the graceful closure flow once the
2119+
// normal closure is done since there are extra steps to take with a
2120+
// graceful close.
2121+
<-pc.isCloseDone
2122+
} else {
2123+
defer close(pc.isCloseDone)
21022124
}
21032125

2104-
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3)
2105-
pc.signalingState.Set(SignalingStateClosed)
2126+
if shouldGracefullyClose {
2127+
defer close(pc.isGracefulCloseDone)
2128+
}
21062129

21072130
// Try closing everything and collect the errors
21082131
// Shutdown strategy:
@@ -2112,6 +2135,34 @@ func (pc *PeerConnection) close(shouldGracefullyClose bool) error {
21122135
// continue the chain the Mux has to be closed.
21132136
closeErrs := make([]error, 4)
21142137

2138+
doGracefulCloseOps := func() []error {
2139+
if !shouldGracefullyClose {
2140+
return nil
2141+
}
2142+
2143+
// these are all non-canon steps
2144+
var gracefulCloseErrors []error
2145+
if pc.iceTransport != nil {
2146+
gracefulCloseErrors = append(gracefulCloseErrors, pc.iceTransport.GracefulStop())
2147+
}
2148+
2149+
pc.ops.GracefulClose()
2150+
2151+
pc.sctpTransport.lock.Lock()
2152+
for _, d := range pc.sctpTransport.dataChannels {
2153+
gracefulCloseErrors = append(gracefulCloseErrors, d.GracefulClose())
2154+
}
2155+
pc.sctpTransport.lock.Unlock()
2156+
return gracefulCloseErrors
2157+
}
2158+
2159+
if isAlreadyClosingOrClosed {
2160+
return util.FlattenErrs(doGracefulCloseOps())
2161+
}
2162+
2163+
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3)
2164+
pc.signalingState.Set(SignalingStateClosed)
2165+
21152166
closeErrs = append(closeErrs, pc.api.interceptor.Close())
21162167

21172168
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #4)
@@ -2142,28 +2193,15 @@ func (pc *PeerConnection) close(shouldGracefullyClose bool) error {
21422193
closeErrs = append(closeErrs, pc.dtlsTransport.Stop())
21432194

21442195
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #8, #9, #10)
2145-
if pc.iceTransport != nil {
2146-
if shouldGracefullyClose {
2147-
// note that it isn't canon to stop gracefully
2148-
closeErrs = append(closeErrs, pc.iceTransport.GracefulStop())
2149-
} else {
2150-
closeErrs = append(closeErrs, pc.iceTransport.Stop())
2151-
}
2196+
if pc.iceTransport != nil && !shouldGracefullyClose {
2197+
// we will stop gracefully in doGracefulCloseOps
2198+
closeErrs = append(closeErrs, pc.iceTransport.Stop())
21522199
}
21532200

21542201
// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #11)
21552202
pc.updateConnectionState(pc.ICEConnectionState(), pc.dtlsTransport.State())
21562203

2157-
if shouldGracefullyClose {
2158-
pc.ops.GracefulClose()
2159-
2160-
// note that it isn't canon to stop gracefully
2161-
pc.sctpTransport.lock.Lock()
2162-
for _, d := range pc.sctpTransport.dataChannels {
2163-
closeErrs = append(closeErrs, d.GracefulClose())
2164-
}
2165-
pc.sctpTransport.lock.Unlock()
2166-
}
2204+
closeErrs = append(closeErrs, doGracefulCloseOps()...)
21672205

21682206
return util.FlattenErrs(closeErrs)
21692207
}

peerconnection_close_test.go

+83-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
package webrtc
88

99
import (
10+
"fmt"
11+
"sync"
1012
"testing"
1113
"time"
1214

@@ -180,7 +182,7 @@ func TestPeerConnection_Close_DuringICE(t *testing.T) {
180182
}
181183
}
182184

183-
func TestPeerConnection_CloseWithIncomingMessages(t *testing.T) {
185+
func TestPeerConnection_GracefulCloseWithIncomingMessages(t *testing.T) {
184186
// Limit runtime in case of deadlocks
185187
lim := test.TimeOut(time.Second * 20)
186188
defer lim.Stop()
@@ -245,3 +247,83 @@ func TestPeerConnection_CloseWithIncomingMessages(t *testing.T) {
245247
t.Fatal(err)
246248
}
247249
}
250+
251+
func TestPeerConnection_GracefulCloseWhileOpening(t *testing.T) {
252+
// Limit runtime in case of deadlocks
253+
lim := test.TimeOut(time.Second * 5)
254+
defer lim.Stop()
255+
256+
report := test.CheckRoutinesStrict(t)
257+
defer report()
258+
259+
pcOffer, pcAnswer, err := newPair()
260+
if err != nil {
261+
t.Fatal(err)
262+
}
263+
264+
if _, err = pcOffer.CreateDataChannel("initial_data_channel", nil); err != nil {
265+
t.Fatal(err)
266+
}
267+
268+
offer, err := pcOffer.CreateOffer(nil)
269+
if err != nil {
270+
t.Fatal(err)
271+
}
272+
offerGatheringComplete := GatheringCompletePromise(pcOffer)
273+
if err = pcOffer.SetLocalDescription(offer); err != nil {
274+
t.Fatal(err)
275+
}
276+
<-offerGatheringComplete
277+
278+
err = pcOffer.GracefulClose()
279+
if err != nil {
280+
t.Fatal(err)
281+
}
282+
283+
if err = pcAnswer.SetRemoteDescription(offer); err != nil {
284+
t.Fatal(err)
285+
}
286+
287+
err = pcAnswer.GracefulClose()
288+
if err != nil {
289+
t.Fatal(err)
290+
}
291+
}
292+
293+
func TestPeerConnection_GracefulCloseConcurrent(t *testing.T) {
294+
// Limit runtime in case of deadlocks
295+
lim := test.TimeOut(time.Second * 10)
296+
defer lim.Stop()
297+
298+
for _, mixed := range []bool{false, true} {
299+
t.Run(fmt.Sprintf("mixed_graceful=%t", mixed), func(t *testing.T) {
300+
report := test.CheckRoutinesStrict(t)
301+
defer report()
302+
303+
pc, err := NewPeerConnection(Configuration{})
304+
if err != nil {
305+
t.Fatal(err)
306+
}
307+
308+
const gracefulCloseConcurrency = 50
309+
var wg sync.WaitGroup
310+
wg.Add(gracefulCloseConcurrency)
311+
for i := 0; i < gracefulCloseConcurrency; i++ {
312+
go func() {
313+
defer wg.Done()
314+
assert.NoError(t, pc.GracefulClose())
315+
}()
316+
}
317+
if !mixed {
318+
if err := pc.Close(); err != nil {
319+
t.Fatal(err)
320+
}
321+
} else {
322+
if err := pc.GracefulClose(); err != nil {
323+
t.Fatal(err)
324+
}
325+
}
326+
wg.Wait()
327+
})
328+
}
329+
}

0 commit comments

Comments
 (0)