Skip to content

Commit ad7ca44

Browse files
committed
Fix(SFU): Prevent multiple receiver close calls on simulcast
1 parent cfd4eaf commit ad7ca44

3 files changed

Lines changed: 26 additions & 22 deletions

File tree

pkg/sfu/receiver.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ type Receiver interface {
3434
// WebRTCReceiver receives a video track
3535
type WebRTCReceiver struct {
3636
sync.Mutex
37-
rtcpMu sync.RWMutex
37+
rtcpMu sync.RWMutex
38+
closeOnce sync.Once
3839

3940
peerID string
4041
trackID string
@@ -215,7 +216,7 @@ func (w *WebRTCReceiver) writeRTP(layer int) {
215216
w.closeTracks(layer)
216217
w.nackWorker.Stop()
217218
if w.onCloseHandler != nil {
218-
w.onCloseHandler()
219+
w.closeOnce.Do(w.onCloseHandler)
219220
}
220221
}()
221222
for pkt := range w.buffers[layer].PacketChan() {

pkg/sfu/router.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -128,12 +128,19 @@ func (r *router) AddReceiver(receiver *webrtc.RTPReceiver, track *webrtc.TrackRe
128128
}
129129
})
130130

131-
recv := r.receivers[trackID]
132-
if recv == nil {
131+
recv, ok := r.receivers[trackID]
132+
if !ok {
133133
recv = NewWebRTCReceiver(receiver, track, r.id)
134134
r.receivers[trackID] = recv
135135
recv.SetRTCPCh(r.rtcpCh)
136136
recv.OnCloseHandler(func() {
137+
if r.config.WithStats {
138+
if track.Kind() == webrtc.RTPCodecTypeVideo {
139+
stats.VideoTracks.Dec()
140+
} else {
141+
stats.AudioTracks.Dec()
142+
}
143+
}
137144
r.deleteReceiver(trackID, uint32(track.SSRC()))
138145
})
139146
publish = true
@@ -217,11 +224,13 @@ func (r *router) addDownTrack(sub *Subscriber, recv Receiver) error {
217224

218225
// nolint:scopelint
219226
downTrack.OnCloseHandler(func() {
220-
if err := sub.pc.RemoveTrack(downTrack.transceiver.Sender()); err != nil {
221-
log.Errorf("Error closing down track: %v", err)
222-
} else {
223-
sub.RemoveDownTrack(recv.StreamID(), downTrack)
224-
sub.negotiate()
227+
if sub.pc.ConnectionState() != webrtc.PeerConnectionStateClosed {
228+
if err := sub.pc.RemoveTrack(downTrack.transceiver.Sender()); err != nil {
229+
log.Errorf("Error closing down track: %v", err)
230+
} else {
231+
sub.RemoveDownTrack(recv.StreamID(), downTrack)
232+
sub.negotiate()
233+
}
225234
}
226235
})
227236

@@ -236,14 +245,6 @@ func (r *router) addDownTrack(sub *Subscriber, recv Receiver) error {
236245

237246
func (r *router) deleteReceiver(track string, ssrc uint32) {
238247
r.Lock()
239-
if r.config.WithStats {
240-
if r.receivers[track].Kind() == webrtc.RTPCodecTypeVideo {
241-
stats.VideoTracks.Dec()
242-
} else {
243-
stats.AudioTracks.Dec()
244-
}
245-
}
246-
247248
delete(r.receivers, track)
248249
delete(r.stats, ssrc)
249250
r.Unlock()

pkg/sfu/subscriber.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,12 @@ func (s *Subscriber) RemoveDownTrack(streamID string, downTrack *DownTrack) {
135135
idx = i
136136
}
137137
}
138-
dts[idx] = dts[len(dts)-1]
139-
dts[len(dts)-1] = nil
140-
dts = dts[:len(dts)-1]
141-
s.tracks[streamID] = dts
138+
if idx >= 0 {
139+
dts[idx] = dts[len(dts)-1]
140+
dts[len(dts)-1] = nil
141+
dts = dts[:len(dts)-1]
142+
s.tracks[streamID] = dts
143+
}
142144
}
143145
}
144146

@@ -193,7 +195,7 @@ func (s *Subscriber) downTracksReports() {
193195
for {
194196
time.Sleep(5 * time.Second)
195197

196-
if s.pc.ConnectionState() == webrtc.ICETransportStateClosed {
198+
if s.pc.ConnectionState() == webrtc.PeerConnectionStateClosed {
197199
return
198200
}
199201

0 commit comments

Comments
 (0)