Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api_signaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ const (
// Features for all clients.
ServerFeatureMcu = "mcu"
ServerFeatureSimulcast = "simulcast"
ServerFeatureUpdateSdp = "update-sdp"
ServerFeatureAudioVideoPermissions = "audio-video-permissions"

// Features for internal clients only.
Expand Down Expand Up @@ -647,4 +648,5 @@ type AnswerOfferMessage struct {
Type string `json:"type"`
RoomType string `json:"roomType"`
Payload map[string]interface{} `json:"payload"`
Update bool `json:"update,omitempty"`

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the Update really needed? A client knows if it already has a connection with another peer and can with that determine if it's an update or not. Or am I missing something?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an RTCPeerConnection handles an offer that is not an update it will end failing, but depending on the implementation it can take up to 30 seconds to fail (and in some cases I think that I saw it getting stuck, but I am not 100% sure).

In general this should not happen, but as offers are periodically requested until a connection is established in some corner cases with delayed messages and flaky connections it could happen that an offer was requested and just before receiving it a new request was made. The original connection is established, but Janus closes it due to receiving the new offer request. However, in some cases the browser could keep the connection open, so when the new offer is received it would be handled by the existing connection, rather than immediately establishing a new one.

Due to this Talk's WebUI closes the previous connection and starts a new one when an offer for the same participant is received, and this is why it needs to differentiate between normal offers, which closes the connection and start a new one, and updated offers, which apply to the existing one.

}
40 changes: 40 additions & 0 deletions clientsession.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,34 @@ func (s *ClientSession) SetClient(client *Client) *Client {
return prev
}

func (s *ClientSession) sendOffer(client McuClient, sender string, streamType string, offer map[string]interface{}) {
Comment thread
fancycode marked this conversation as resolved.

@fancycode fancycode Feb 16, 2022

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: de-duplicate with

offer_message := &AnswerOfferMessage{
once this has landed.

offer_message := &AnswerOfferMessage{
To: s.PublicId(),
From: sender,
Type: "offer",
RoomType: streamType,
Payload: offer,
Comment thread
fancycode marked this conversation as resolved.
Update: true,
}
offer_data, err := json.Marshal(offer_message)
if err != nil {
log.Println("Could not serialize offer", offer_message, err)
return
}
response_message := &ServerMessage{
Type: "message",
Message: &MessageServerMessage{
Sender: &MessageServerMessageSender{
Type: "session",
SessionId: sender,
},
Data: (*json.RawMessage)(&offer_data),
},
}

s.sendMessageUnlocked(response_message)
}

func (s *ClientSession) sendCandidate(client McuClient, sender string, streamType string, candidate interface{}) {
candidate_message := &AnswerOfferMessage{
To: s.PublicId(),
Expand Down Expand Up @@ -628,6 +656,18 @@ func (s *ClientSession) SendMessages(messages []*ServerMessage) bool {
return true
}

func (s *ClientSession) OnUpdateOffer(client McuClient, offer map[string]interface{}) {
s.mu.Lock()
defer s.mu.Unlock()

for _, sub := range s.subscribers {
if sub.Id() == client.Id() {
s.sendOffer(client, sub.Publisher(), client.StreamType(), offer)
return
}
}
}

func (s *ClientSession) OnIceCandidate(client McuClient, candidate interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
4 changes: 4 additions & 0 deletions hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,18 @@ func (h *Hub) SetMcu(mcu Mcu) {
if mcu == nil {
removeFeature(h.info, ServerFeatureMcu)
removeFeature(h.info, ServerFeatureSimulcast)
removeFeature(h.info, ServerFeatureUpdateSdp)
removeFeature(h.infoInternal, ServerFeatureMcu)
removeFeature(h.infoInternal, ServerFeatureSimulcast)
removeFeature(h.infoInternal, ServerFeatureUpdateSdp)
} else {
log.Printf("Using a timeout of %s for MCU requests", h.mcuTimeout)
addFeature(h.info, ServerFeatureMcu)
addFeature(h.info, ServerFeatureSimulcast)
addFeature(h.info, ServerFeatureUpdateSdp)
addFeature(h.infoInternal, ServerFeatureMcu)
addFeature(h.infoInternal, ServerFeatureSimulcast)
addFeature(h.infoInternal, ServerFeatureUpdateSdp)
}
}

Expand Down
2 changes: 2 additions & 0 deletions mcu_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ const (
type McuListener interface {
PublicId() string

OnUpdateOffer(client McuClient, offer map[string]interface{})

OnIceCandidate(client McuClient, candidate interface{})
OnIceCompleted(client McuClient)

Expand Down
7 changes: 6 additions & 1 deletion mcu_janus.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,12 @@ func (p *mcuJanusSubscriber) handleEvent(event *janus.EventMsg) {
log.Printf("Subscriber %d: associated room has been destroyed, closing", p.handleId)
go p.Close(ctx)
case "event":
// Ignore events like selected substream / temporal layer.
// Handle renegotiations, but ignore other events like selected
// substream / temporal layer.
if getPluginStringValue(event.Plugindata, pluginVideoRoom, "configured") == "ok" &&
event.Jsep != nil && event.Jsep["type"] == "offer" && event.Jsep["sdp"] != nil {
p.listener.OnUpdateOffer(p, event.Jsep)
}
case "slow_link":
// Ignore, processed through "handleSlowLink" in the general events.
default:
Expand Down
2 changes: 2 additions & 0 deletions mcu_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func (c *mcuProxyPubSubCommon) doSendMessage(ctx context.Context, msg *ProxyClie

func (c *mcuProxyPubSubCommon) doProcessPayload(client McuClient, msg *PayloadProxyServerMessage) {
switch msg.Type {
case "offer":
c.listener.OnUpdateOffer(client, msg.Payload["offer"].(map[string]interface{}))
case "candidate":
c.listener.OnIceCandidate(client, msg.Payload["candidate"])
default:
Expand Down
20 changes: 20 additions & 0 deletions proxy/proxy_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,26 @@ func (s *ProxySession) SetClient(client *ProxyClient) *ProxyClient {
return prev
}

func (s *ProxySession) OnUpdateOffer(client signaling.McuClient, offer map[string]interface{}) {
id := s.proxy.GetClientId(client)
if id == "" {
log.Printf("Received offer %+v from unknown %s client %s (%+v)", offer, client.StreamType(), client.Id(), client)
return
}

msg := &signaling.ProxyServerMessage{
Type: "payload",
Payload: &signaling.PayloadProxyServerMessage{
Type: "offer",
ClientId: id,
Payload: map[string]interface{}{
"offer": offer,
},
},
}
s.sendMessage(msg)
}

func (s *ProxySession) OnIceCandidate(client signaling.McuClient, candidate interface{}) {
id := s.proxy.GetClientId(client)
if id == "" {
Expand Down