Skip to content

Commit a81d3d6

Browse files
kelvinlawsonaler9
andauthored
Support stream-specific multicast IP address and RTP/RTCP ports (#965)
Co-authored-by: aler9 <[email protected]>
1 parent f117738 commit a81d3d6

File tree

7 files changed

+191
-32
lines changed

7 files changed

+191
-32
lines changed

pkg/rtpreceiver/receiver.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ import (
1111
"github.com/pion/rtp"
1212
)
1313

14-
// Receiver is a utility to receive RTP packets. It is in charge of:
14+
// Receiver is a utility to receive RTP packets.
15+
// It is in charge of:
1516
// - removing duplicate packets (when transport is unreliable)
1617
// - reordering packets (when transport is unrealiable)
1718
// - counting received and lost packets

pkg/rtpsender/sender.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
)
1212

1313
// Sender is a utility to send RTP packets.
14-
// It is in charge of
14+
// It is in charge of:
1515
// - counting sent packets
1616
// - generating RTCP sender reports.
1717
type Sender struct {

server_multicast_writer.go

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,20 @@ package gortsplib
33
import (
44
"context"
55
"net"
6+
"time"
67

78
"github.com/bluenviron/gortsplib/v5/internal/asyncprocessor"
89
"github.com/bluenviron/gortsplib/v5/pkg/liberrors"
910
)
1011

1112
type serverMulticastWriter struct {
12-
s *Server
13+
udpReadBufferSize int
14+
listenPacket func(network string, address string) (net.PacketConn, error)
15+
writeQueueSize int
16+
writeTimeout time.Duration
17+
ip net.IP
18+
rtpPort int
19+
rtcpPort int
1320

1421
rtpl *serverUDPListener
1522
rtcpl *serverUDPListener
@@ -19,31 +26,26 @@ type serverMulticastWriter struct {
1926
}
2027

2128
func (h *serverMulticastWriter) initialize() error {
22-
ip, err := h.s.getMulticastIP()
23-
if err != nil {
24-
return err
25-
}
26-
2729
rtpl, rtcpl, err := createUDPListenerMulticastPair(
28-
h.s.UDPReadBufferSize,
29-
h.s.ListenPacket,
30-
h.s.WriteTimeout,
31-
h.s.MulticastRTPPort,
32-
h.s.MulticastRTCPPort,
33-
ip,
30+
h.udpReadBufferSize,
31+
h.listenPacket,
32+
h.writeTimeout,
33+
h.rtpPort,
34+
h.rtcpPort,
35+
h.ip,
3436
)
3537
if err != nil {
3638
return err
3739
}
3840

3941
rtpAddr := &net.UDPAddr{
40-
IP: rtpl.ip(),
41-
Port: rtpl.port(),
42+
IP: h.ip,
43+
Port: h.rtpPort,
4244
}
4345

4446
rtcpAddr := &net.UDPAddr{
45-
IP: rtcpl.ip(),
46-
Port: rtcpl.port(),
47+
IP: h.ip,
48+
Port: h.rtcpPort,
4749
}
4850

4951
h.rtpl = rtpl
@@ -52,7 +54,7 @@ func (h *serverMulticastWriter) initialize() error {
5254
h.rtcpAddr = rtcpAddr
5355

5456
h.writer = &asyncprocessor.Processor{
55-
BufferSize: h.s.WriteQueueSize,
57+
BufferSize: h.writeQueueSize,
5658
OnError: func(_ context.Context, _ error) {},
5759
}
5860
h.writer.Initialize()
@@ -67,10 +69,6 @@ func (h *serverMulticastWriter) close() {
6769
h.writer.Close()
6870
}
6971

70-
func (h *serverMulticastWriter) ip() net.IP {
71-
return h.rtpl.ip()
72-
}
73-
7472
func (h *serverMulticastWriter) writePacketRTP(byts []byte) error {
7573
ok := h.writer.Push(func() error {
7674
return h.rtpl.write(byts, h.rtpAddr)

server_play_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/bluenviron/gortsplib/v5/pkg/mikey"
2727
"github.com/bluenviron/gortsplib/v5/pkg/ntp"
2828
"github.com/bluenviron/gortsplib/v5/pkg/sdp"
29+
"github.com/bluenviron/mediacommon/v2/pkg/codecs/mpeg4audio"
2930
)
3031

3132
func multicastCapableIP(t *testing.T) string {
@@ -2788,3 +2789,127 @@ func TestServerPlayBackChannel(t *testing.T) {
27882789
})
27892790
}
27902791
}
2792+
2793+
func TestServerPlayMulticastParams(t *testing.T) {
2794+
listenIP := multicastCapableIP(t)
2795+
2796+
var stream *ServerStream
2797+
2798+
s := &Server{
2799+
Handler: &testServerHandler{
2800+
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
2801+
return &base.Response{
2802+
StatusCode: base.StatusOK,
2803+
}, stream, nil
2804+
},
2805+
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
2806+
return &base.Response{
2807+
StatusCode: base.StatusOK,
2808+
}, stream, nil
2809+
},
2810+
onPlay: func(_ *ServerHandlerOnPlayCtx) (*base.Response, error) {
2811+
return &base.Response{
2812+
StatusCode: base.StatusOK,
2813+
}, nil
2814+
},
2815+
},
2816+
RTSPAddress: listenIP + ":8554",
2817+
MulticastIPRange: "224.1.0.0/16",
2818+
MulticastRTPPort: 8000,
2819+
MulticastRTCPPort: 8001,
2820+
}
2821+
2822+
err := s.Start()
2823+
require.NoError(t, err)
2824+
defer s.Close()
2825+
2826+
media1 := &description.Media{
2827+
Type: description.MediaTypeVideo,
2828+
Formats: []format.Format{&format.H264{
2829+
PayloadTyp: 96,
2830+
SPS: []byte{
2831+
0x67, 0x42, 0xc0, 0x28, 0xd9, 0x00, 0x78, 0x02,
2832+
0x27, 0xe5, 0x84, 0x00, 0x00, 0x03, 0x00, 0x04,
2833+
0x00, 0x00, 0x03, 0x00, 0xf0, 0x3c, 0x60, 0xc9,
2834+
0x20,
2835+
},
2836+
PPS: []byte{
2837+
0x44, 0x01, 0xc0, 0x25, 0x2f, 0x05, 0x32, 0x40,
2838+
},
2839+
PacketizationMode: 1,
2840+
}},
2841+
}
2842+
2843+
media2 := &description.Media{
2844+
Type: description.MediaTypeAudio,
2845+
Formats: []format.Format{&format.MPEG4Audio{
2846+
PayloadTyp: 97,
2847+
Config: &mpeg4audio.AudioSpecificConfig{
2848+
Type: mpeg4audio.ObjectTypeAACLC,
2849+
SampleRate: 48000,
2850+
ChannelCount: 2,
2851+
},
2852+
SizeLength: 13,
2853+
IndexLength: 3,
2854+
IndexDeltaLength: 3,
2855+
}},
2856+
}
2857+
2858+
multicastParams := map[*description.Media]StreamMediaMulticastParams{
2859+
media1: {
2860+
IP: net.ParseIP("224.2.0.50"),
2861+
RTPPort: 9000,
2862+
RTCPPort: 9001,
2863+
},
2864+
media2: {
2865+
IP: net.ParseIP("224.2.0.51"),
2866+
RTPPort: 9002,
2867+
RTCPPort: 9003,
2868+
},
2869+
}
2870+
2871+
stream = &ServerStream{
2872+
Server: s,
2873+
Desc: &description.Session{
2874+
Medias: []*description.Media{media1, media2},
2875+
},
2876+
MulticastParams: multicastParams,
2877+
}
2878+
err = stream.Initialize()
2879+
require.NoError(t, err)
2880+
defer stream.Close()
2881+
2882+
nconn, err := net.Dial("tcp", listenIP+":8554")
2883+
require.NoError(t, err)
2884+
defer nconn.Close()
2885+
conn := conn.NewConn(bufio.NewReader(nconn), nconn)
2886+
2887+
desc := doDescribe(t, conn, false)
2888+
2889+
inTH1 := &headers.Transport{
2890+
Delivery: ptrOf(headers.TransportDeliveryMulticast),
2891+
Protocol: headers.TransportProtocolUDP,
2892+
Mode: ptrOf(headers.TransportModePlay),
2893+
}
2894+
2895+
res1, th1 := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[0]).String(), inTH1, "")
2896+
session := readSession(t, res1)
2897+
2898+
require.Equal(t, headers.TransportProtocolUDP, th1.Protocol)
2899+
require.Equal(t, headers.TransportDeliveryMulticast, *th1.Delivery)
2900+
require.Equal(t, "224.2.0.50", *th1.Destination2)
2901+
require.Equal(t, [2]int{9000, 9001}, *th1.Ports)
2902+
2903+
inTH2 := &headers.Transport{
2904+
Delivery: ptrOf(headers.TransportDeliveryMulticast),
2905+
Protocol: headers.TransportProtocolUDP,
2906+
Mode: ptrOf(headers.TransportModePlay),
2907+
}
2908+
2909+
_, th2 := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[1]).String(), inTH2, session)
2910+
2911+
require.Equal(t, headers.TransportProtocolUDP, th2.Protocol)
2912+
require.Equal(t, headers.TransportDeliveryMulticast, *th2.Delivery)
2913+
require.Equal(t, "224.2.0.51", *th2.Destination2)
2914+
require.Equal(t, [2]int{9002, 9003}, *th2.Ports)
2915+
}

server_session.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,8 +1304,8 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
13041304
} else {
13051305
th.Delivery = ptrOf(headers.TransportDeliveryMulticast)
13061306
th.TTL = ptrOf(uint(127))
1307-
th.Destination2 = ptrOf(stream.medias[medi].multicastWriter.ip().String())
1308-
th.Ports = &[2]int{ss.s.MulticastRTPPort, ss.s.MulticastRTCPPort}
1307+
th.Destination2 = ptrOf(stream.medias[medi].multicastWriter.ip.String())
1308+
th.Ports = &[2]int{stream.medias[medi].multicastWriter.rtpPort, stream.medias[medi].multicastWriter.rtcpPort}
13091309
}
13101310

13111311
default: // TCP

server_stream.go

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package gortsplib
33
import (
44
"crypto/rand"
55
"fmt"
6+
"net"
67
"sync"
78
"sync/atomic"
89
"time"
@@ -40,14 +41,27 @@ func serverStreamExtractExistingSSRCs(medias map[*description.Media]*serverStrea
4041
return ret
4142
}
4243

43-
// ServerStream represents a data stream.
44+
// StreamMediaMulticastParams used to request specific Multicast configuration for each media in a stream
45+
type StreamMediaMulticastParams struct {
46+
IP net.IP
47+
RTPPort int
48+
RTCPPort int
49+
}
50+
51+
// ServerStream represents a media stream.
4452
// This is in charge of
4553
// - storing stream description and statistics
4654
// - distributing the stream to each reader
4755
// - allocating multicast listeners
4856
type ServerStream struct {
57+
// Parent server.
4958
Server *Server
50-
Desc *description.Session
59+
60+
// Stream description.
61+
Desc *description.Session
62+
63+
// (optional) Stream-specific Multicast settings.
64+
MulticastParams map[*description.Media]StreamMediaMulticastParams
5165

5266
mutex sync.RWMutex
5367
readers map[*ServerSession]struct{}
@@ -220,8 +234,33 @@ func (st *ServerStream) readerAdd(
220234
case ProtocolUDPMulticast:
221235
if st.multicastReaderCount == 0 {
222236
for _, media := range st.medias {
237+
var ip net.IP
238+
var rtpPort int
239+
var rtcpPort int
240+
241+
if params, ok := st.MulticastParams[media.media]; ok {
242+
ip = params.IP
243+
rtpPort = params.RTPPort
244+
rtcpPort = params.RTCPPort
245+
} else {
246+
var err error
247+
ip, err = st.Server.getMulticastIP()
248+
if err != nil {
249+
return err
250+
}
251+
252+
rtpPort = st.Server.MulticastRTPPort
253+
rtcpPort = st.Server.MulticastRTCPPort
254+
}
255+
223256
mw := &serverMulticastWriter{
224-
s: st.Server,
257+
udpReadBufferSize: st.Server.UDPReadBufferSize,
258+
listenPacket: st.Server.ListenPacket,
259+
writeQueueSize: st.Server.WriteQueueSize,
260+
writeTimeout: st.Server.WriteTimeout,
261+
ip: ip,
262+
rtpPort: rtpPort,
263+
rtcpPort: rtcpPort,
225264
}
226265
err := mw.initialize()
227266
if err != nil {

server_udp_listener.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,6 @@ func (u *serverUDPListener) close() {
120120
<-u.done
121121
}
122122

123-
func (u *serverUDPListener) ip() net.IP {
124-
return u.listenIP
125-
}
126-
127123
func (u *serverUDPListener) port() int {
128124
return u.pc.LocalAddr().(*net.UDPAddr).Port
129125
}

0 commit comments

Comments
 (0)