Skip to content

Commit 8fd9c68

Browse files
authored
Merge pull request #195 from danxuliu/send-updated-offers-to-subscribers-after-publisher-renegotiations
Send updated offers to subscribers after publisher renegotiations
2 parents d141775 + cd93db6 commit 8fd9c68

7 files changed

Lines changed: 76 additions & 1 deletion

File tree

api_signaling.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ const (
335335
// Features for all clients.
336336
ServerFeatureMcu = "mcu"
337337
ServerFeatureSimulcast = "simulcast"
338+
ServerFeatureUpdateSdp = "update-sdp"
338339
ServerFeatureAudioVideoPermissions = "audio-video-permissions"
339340
ServerFeatureTransientData = "transient-data"
340341

@@ -648,6 +649,7 @@ type AnswerOfferMessage struct {
648649
Type string `json:"type"`
649650
RoomType string `json:"roomType"`
650651
Payload map[string]interface{} `json:"payload"`
652+
Update bool `json:"update,omitempty"`
651653
}
652654

653655
// Type "transient"

clientsession.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,34 @@ func (s *ClientSession) SetClient(client *Client) *Client {
564564
return prev
565565
}
566566

567+
func (s *ClientSession) sendOffer(client McuClient, sender string, streamType string, offer map[string]interface{}) {
568+
offer_message := &AnswerOfferMessage{
569+
To: s.PublicId(),
570+
From: sender,
571+
Type: "offer",
572+
RoomType: streamType,
573+
Payload: offer,
574+
Update: true,
575+
}
576+
offer_data, err := json.Marshal(offer_message)
577+
if err != nil {
578+
log.Println("Could not serialize offer", offer_message, err)
579+
return
580+
}
581+
response_message := &ServerMessage{
582+
Type: "message",
583+
Message: &MessageServerMessage{
584+
Sender: &MessageServerMessageSender{
585+
Type: "session",
586+
SessionId: sender,
587+
},
588+
Data: (*json.RawMessage)(&offer_data),
589+
},
590+
}
591+
592+
s.sendMessageUnlocked(response_message)
593+
}
594+
567595
func (s *ClientSession) sendCandidate(client McuClient, sender string, streamType string, candidate interface{}) {
568596
candidate_message := &AnswerOfferMessage{
569597
To: s.PublicId(),
@@ -629,6 +657,18 @@ func (s *ClientSession) SendMessages(messages []*ServerMessage) bool {
629657
return true
630658
}
631659

660+
func (s *ClientSession) OnUpdateOffer(client McuClient, offer map[string]interface{}) {
661+
s.mu.Lock()
662+
defer s.mu.Unlock()
663+
664+
for _, sub := range s.subscribers {
665+
if sub.Id() == client.Id() {
666+
s.sendOffer(client, sub.Publisher(), client.StreamType(), offer)
667+
return
668+
}
669+
}
670+
}
671+
632672
func (s *ClientSession) OnIceCandidate(client McuClient, candidate interface{}) {
633673
s.mu.Lock()
634674
defer s.mu.Unlock()

hub.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,14 +371,18 @@ func (h *Hub) SetMcu(mcu Mcu) {
371371
if mcu == nil {
372372
removeFeature(h.info, ServerFeatureMcu)
373373
removeFeature(h.info, ServerFeatureSimulcast)
374+
removeFeature(h.info, ServerFeatureUpdateSdp)
374375
removeFeature(h.infoInternal, ServerFeatureMcu)
375376
removeFeature(h.infoInternal, ServerFeatureSimulcast)
377+
removeFeature(h.infoInternal, ServerFeatureUpdateSdp)
376378
} else {
377379
log.Printf("Using a timeout of %s for MCU requests", h.mcuTimeout)
378380
addFeature(h.info, ServerFeatureMcu)
379381
addFeature(h.info, ServerFeatureSimulcast)
382+
addFeature(h.info, ServerFeatureUpdateSdp)
380383
addFeature(h.infoInternal, ServerFeatureMcu)
381384
addFeature(h.infoInternal, ServerFeatureSimulcast)
385+
addFeature(h.infoInternal, ServerFeatureUpdateSdp)
382386
}
383387
}
384388

mcu_common.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ const (
5050
type McuListener interface {
5151
PublicId() string
5252

53+
OnUpdateOffer(client McuClient, offer map[string]interface{})
54+
5355
OnIceCandidate(client McuClient, candidate interface{})
5456
OnIceCompleted(client McuClient)
5557

mcu_janus.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1062,7 +1062,12 @@ func (p *mcuJanusSubscriber) handleEvent(event *janus.EventMsg) {
10621062
log.Printf("Subscriber %d: associated room has been destroyed, closing", p.handleId)
10631063
go p.Close(ctx)
10641064
case "event":
1065-
// Ignore events like selected substream / temporal layer.
1065+
// Handle renegotiations, but ignore other events like selected
1066+
// substream / temporal layer.
1067+
if getPluginStringValue(event.Plugindata, pluginVideoRoom, "configured") == "ok" &&
1068+
event.Jsep != nil && event.Jsep["type"] == "offer" && event.Jsep["sdp"] != nil {
1069+
p.listener.OnUpdateOffer(p, event.Jsep)
1070+
}
10661071
case "slow_link":
10671072
// Ignore, processed through "handleSlowLink" in the general events.
10681073
default:

mcu_proxy.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ func (c *mcuProxyPubSubCommon) doSendMessage(ctx context.Context, msg *ProxyClie
107107

108108
func (c *mcuProxyPubSubCommon) doProcessPayload(client McuClient, msg *PayloadProxyServerMessage) {
109109
switch msg.Type {
110+
case "offer":
111+
c.listener.OnUpdateOffer(client, msg.Payload["offer"].(map[string]interface{}))
110112
case "candidate":
111113
c.listener.OnIceCandidate(client, msg.Payload["candidate"])
112114
default:

proxy/proxy_session.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,26 @@ func (s *ProxySession) SetClient(client *ProxyClient) *ProxyClient {
122122
return prev
123123
}
124124

125+
func (s *ProxySession) OnUpdateOffer(client signaling.McuClient, offer map[string]interface{}) {
126+
id := s.proxy.GetClientId(client)
127+
if id == "" {
128+
log.Printf("Received offer %+v from unknown %s client %s (%+v)", offer, client.StreamType(), client.Id(), client)
129+
return
130+
}
131+
132+
msg := &signaling.ProxyServerMessage{
133+
Type: "payload",
134+
Payload: &signaling.PayloadProxyServerMessage{
135+
Type: "offer",
136+
ClientId: id,
137+
Payload: map[string]interface{}{
138+
"offer": offer,
139+
},
140+
},
141+
}
142+
s.sendMessage(msg)
143+
}
144+
125145
func (s *ProxySession) OnIceCandidate(client signaling.McuClient, candidate interface{}) {
126146
id := s.proxy.GetClientId(client)
127147
if id == "" {

0 commit comments

Comments
 (0)