@@ -10,6 +10,8 @@ import (
1010 "context"
1111 "io"
1212 "math"
13+ "sync"
14+ "sync/atomic"
1315 "testing"
1416 "time"
1517
@@ -137,6 +139,74 @@ func TestRTPReceiver_ClosedReceiveForRIDAndRTX(t *testing.T) {
137139 }
138140}
139141
142+ func TestRTPReceiver_readRTX_ChannelAccessSafe (t * testing.T ) {
143+ receiver := & RTPReceiver {
144+ kind : RTPCodecTypeVideo ,
145+ received : make (chan any ),
146+ closedChan : make (chan any ),
147+ rtxPool : sync.Pool {New : func () any {
148+ return make ([]byte , 1200 )
149+ }},
150+ }
151+
152+ receiver .configureReceive (RTPReceiveParameters {
153+ Encodings : []RTPDecodingParameters {
154+ {
155+ RTPCodingParameters : RTPCodingParameters {
156+ RID : "rid" ,
157+ SSRC : 1111 ,
158+ RTX : RTPRtxParameters {
159+ SSRC : 2222 ,
160+ },
161+ },
162+ },
163+ },
164+ })
165+
166+ params := RTPParameters {
167+ Codecs : []RTPCodecParameters {
168+ {
169+ RTPCodecCapability : RTPCodecCapability {MimeType : MimeTypeVP8 },
170+ PayloadType : 96 ,
171+ },
172+ },
173+ }
174+ ridStreamInfo := & interceptor.StreamInfo {SSRC : 1111 }
175+ track , err := receiver .receiveForRid ("rid" , params , ridStreamInfo , nil , nil , nil , nil , nil )
176+ require .NoError (t , err )
177+
178+ close (receiver .received )
179+
180+ raw := []byte {0x80 , 0x60 , 0x12 , 0x34 , 0x00 , 0x00 , 0x00 , 0x01 , 0x00 , 0x00 , 0x00 , 0x02 , 0xAA , 0xBB }
181+ var returnedOnce atomic.Bool
182+ rtpInterceptor := interceptor .RTPReaderFunc (func (b []byte , a interceptor.Attributes ) (int , interceptor.Attributes , error ) {
183+ if returnedOnce .Load () {
184+ return 0 , a , io .EOF
185+ }
186+ returnedOnce .Store (true )
187+
188+ n := copy (b , raw )
189+ return n , a , nil
190+ })
191+
192+ repairStreamInfo := & interceptor.StreamInfo {SSRC : 2222 }
193+ require .NoError (t , receiver .receiveForRtxInternal (SSRC (2222 ), "" , repairStreamInfo , nil , rtpInterceptor , nil , nil ))
194+
195+ deadline := time .After (time .Second )
196+ for {
197+ if pkt := receiver .readRTX (track ); pkt != nil {
198+ return
199+ }
200+
201+ select {
202+ case <- deadline :
203+ t .Fatal ("expected RTX packet" )
204+ default :
205+ time .Sleep (5 * time .Millisecond )
206+ }
207+ }
208+ }
209+
140210// TestRTPReceiver_CollectStats_Mapping validates that collectStats maps
141211// interceptor/pkg/stats values into InboundRTPStreamStats.
142212func TestRTPReceiver_CollectStats_Mapping (t * testing.T ) {
0 commit comments