Skip to content

Commit cd90970

Browse files
committed
fix udp frames routing in case of multiple publishers with same ip; fix #56
1 parent f302536 commit cd90970

File tree

3 files changed

+79
-58
lines changed

3 files changed

+79
-58
lines changed

client.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,33 @@ const (
2424
clientUdpWriteBufferSize = 128 * 1024
2525
)
2626

27+
type udpClient struct {
28+
client *client
29+
trackId int
30+
streamType gortsplib.StreamType
31+
}
32+
33+
type udpClientAddr struct {
34+
// use a fixed-size array for ip comparison
35+
ip [net.IPv6len]byte
36+
port int
37+
}
38+
39+
func makeUdpClientAddr(ip net.IP, port int) udpClientAddr {
40+
ret := udpClientAddr{
41+
port: port,
42+
}
43+
44+
if len(ip) == net.IPv4len {
45+
copy(ret.ip[0:], []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}) // v4InV6Prefix
46+
copy(ret.ip[12:], ip)
47+
} else {
48+
copy(ret.ip[:], ip)
49+
}
50+
51+
return ret
52+
}
53+
2754
type describeRes struct {
2855
sdp []byte
2956
err error

main.go

Lines changed: 52 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -163,19 +163,19 @@ type programEventTerminate struct{}
163163
func (programEventTerminate) isProgramEvent() {}
164164

165165
type program struct {
166-
conf *conf
167-
logFile *os.File
168-
metrics *metrics
169-
serverRtsp *serverTcp
170-
serverRtp *serverUdp
171-
serverRtcp *serverUdp
172-
sources []*source
173-
clients map[*client]struct{}
174-
udpClientPublishers map[ipKey]*client
175-
paths map[string]*path
176-
cmds []*exec.Cmd
177-
publisherCount int
178-
readerCount int
166+
conf *conf
167+
logFile *os.File
168+
metrics *metrics
169+
serverRtsp *serverTcp
170+
serverRtp *serverUdp
171+
serverRtcp *serverUdp
172+
sources []*source
173+
clients map[*client]struct{}
174+
udpClientsByAddr map[udpClientAddr]*udpClient
175+
paths map[string]*path
176+
cmds []*exec.Cmd
177+
publisherCount int
178+
readerCount int
179179

180180
events chan programEvent
181181
done chan struct{}
@@ -201,12 +201,12 @@ func newProgram(args []string, stdin io.Reader) (*program, error) {
201201
}
202202

203203
p := &program{
204-
conf: conf,
205-
clients: make(map[*client]struct{}),
206-
udpClientPublishers: make(map[ipKey]*client),
207-
paths: make(map[string]*path),
208-
events: make(chan programEvent),
209-
done: make(chan struct{}),
204+
conf: conf,
205+
clients: make(map[*client]struct{}),
206+
udpClientsByAddr: make(map[udpClientAddr]*udpClient),
207+
paths: make(map[string]*path),
208+
events: make(chan programEvent),
209+
done: make(chan struct{}),
210210
}
211211

212212
if _, ok := p.conf.logDestinationsParsed[logDestinationFile]; ok {
@@ -424,29 +424,56 @@ outer:
424424
case programEventClientRecord:
425425
p.publisherCount += 1
426426
evt.client.state = clientStateRecord
427+
427428
if evt.client.streamProtocol == gortsplib.StreamProtocolUdp {
428-
p.udpClientPublishers[makeIpKey(evt.client.ip())] = evt.client
429+
for trackId, track := range evt.client.streamTracks {
430+
key := makeUdpClientAddr(evt.client.ip(), track.rtpPort)
431+
p.udpClientsByAddr[key] = &udpClient{
432+
client: evt.client,
433+
trackId: trackId,
434+
streamType: gortsplib.StreamTypeRtp,
435+
}
436+
437+
key = makeUdpClientAddr(evt.client.ip(), track.rtcpPort)
438+
p.udpClientsByAddr[key] = &udpClient{
439+
client: evt.client,
440+
trackId: trackId,
441+
streamType: gortsplib.StreamTypeRtcp,
442+
}
443+
}
429444
}
445+
430446
p.paths[evt.client.pathName].publisherSetReady()
431447
close(evt.done)
432448

433449
case programEventClientRecordStop:
434450
p.publisherCount -= 1
435451
evt.client.state = clientStatePreRecord
436452
if evt.client.streamProtocol == gortsplib.StreamProtocolUdp {
437-
delete(p.udpClientPublishers, makeIpKey(evt.client.ip()))
453+
for _, track := range evt.client.streamTracks {
454+
key := makeUdpClientAddr(evt.client.ip(), track.rtpPort)
455+
delete(p.udpClientsByAddr, key)
456+
457+
key = makeUdpClientAddr(evt.client.ip(), track.rtcpPort)
458+
delete(p.udpClientsByAddr, key)
459+
}
438460
}
439461
p.paths[evt.client.pathName].publisherSetNotReady()
440462
close(evt.done)
441463

442464
case programEventClientFrameUdp:
443-
client, trackId := p.findUdpClientPublisher(evt.addr, evt.streamType)
444-
if client == nil {
465+
pub, ok := p.udpClientsByAddr[makeUdpClientAddr(evt.addr.IP, evt.addr.Port)]
466+
if !ok {
467+
continue
468+
}
469+
470+
// client sent RTP on RTCP port or vice-versa
471+
if pub.streamType != evt.streamType {
445472
continue
446473
}
447474

448-
client.rtcpReceivers[trackId].OnFrame(evt.streamType, evt.buf)
449-
p.forwardFrame(client.pathName, trackId, evt.streamType, evt.buf)
475+
pub.client.rtcpReceivers[pub.trackId].OnFrame(evt.streamType, evt.buf)
476+
p.forwardFrame(pub.client.pathName, pub.trackId, evt.streamType, evt.buf)
450477

451478
case programEventClientFrameTcp:
452479
p.forwardFrame(evt.path, evt.trackId, evt.streamType, evt.buf)
@@ -555,25 +582,6 @@ func (p *program) findConfForPath(name string) *confPath {
555582
return nil
556583
}
557584

558-
func (p *program) findUdpClientPublisher(addr *net.UDPAddr, streamType gortsplib.StreamType) (*client, int) {
559-
c, ok := p.udpClientPublishers[makeIpKey(addr.IP)]
560-
if ok {
561-
for i, t := range c.streamTracks {
562-
if streamType == gortsplib.StreamTypeRtp {
563-
if t.rtpPort == addr.Port {
564-
return c, i
565-
}
566-
} else {
567-
if t.rtcpPort == addr.Port {
568-
return c, i
569-
}
570-
}
571-
}
572-
}
573-
574-
return nil, -1
575-
}
576-
577585
func (p *program) forwardFrame(path string, trackId int, streamType gortsplib.StreamType, frame []byte) {
578586
for c := range p.clients {
579587
if c.pathName != path ||

utils.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -149,20 +149,6 @@ func splitPath(path string) (string, string, error) {
149149
return comps[0], comps[1], nil
150150
}
151151

152-
// use a fixed-size array for ip comparison
153-
type ipKey [net.IPv6len]byte
154-
155-
func makeIpKey(ip net.IP) ipKey {
156-
var ret ipKey
157-
if len(ip) == net.IPv4len {
158-
copy(ret[0:], []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}) // v4InV6Prefix
159-
copy(ret[12:], ip)
160-
} else {
161-
copy(ret[:], ip)
162-
}
163-
return ret
164-
}
165-
166152
var rePathName = regexp.MustCompile("^[0-9a-zA-Z_-]+$")
167153

168154
func checkPathName(name string) error {

0 commit comments

Comments
 (0)