Skip to content

Commit 991a4a4

Browse files
authored
Refactor subscribedTrack + mediaTrackSubscriptions. (livekit#3908)
- Move downTrack instantiation to SubscribedTrack as it should own that DownTrack. Still more to do here as `DownTrack` is fetched from `SubscribedTrack` in a few places and used. Would like to avoid that, but doing this initially. - Use an interface from sfu.Downtrack and replace a bunch of callbacks. SubscribedTrack is the implementation for DownTrackListener.
1 parent e16b3ba commit 991a4a4

6 files changed

Lines changed: 229 additions & 293 deletions

File tree

pkg/routing/localrouter.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package routing
1616

1717
import (
1818
"context"
19-
"sync"
2019
"time"
2120

2221
"go.uber.org/atomic"
@@ -35,7 +34,6 @@ type LocalRouter struct {
3534
roomManagerClient RoomManagerClient
3635
nodeStatsConfig config.NodeStatsConfig
3736

38-
lock sync.RWMutex
3937
// channels for each participant
4038
requestChannels map[string]*MessageChannel
4139
responseChannels map[string]*MessageChannel

pkg/rtc/mediatrack.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ var _ types.LocalMediaTrack = (*MediaTrack)(nil)
4545
// Implements MediaTrack and PublishedTrack interface
4646
type MediaTrack struct {
4747
params MediaTrackParams
48-
numUpTracks atomic.Uint32
4948
buffer *buffer.Buffer
5049
everSubscribed atomic.Bool
5150

pkg/rtc/mediatracksubscriptions.go

Lines changed: 20 additions & 157 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,11 @@ import (
1919
"slices"
2020
"sync"
2121

22-
"github.com/pion/rtcp"
23-
"github.com/pion/webrtc/v4"
24-
"go.uber.org/atomic"
25-
26-
"github.com/livekit/livekit-server/pkg/sfu/buffer"
2722
"github.com/livekit/livekit-server/pkg/sfu/mime"
28-
sutils "github.com/livekit/livekit-server/pkg/utils"
2923
"github.com/livekit/protocol/livekit"
3024
"github.com/livekit/protocol/logger"
31-
"github.com/livekit/protocol/observability/roomobs"
25+
"github.com/pion/webrtc/v4"
26+
"go.uber.org/atomic"
3227

3328
"github.com/livekit/livekit-server/pkg/rtc/types"
3429
"github.com/livekit/livekit-server/pkg/sfu"
@@ -106,91 +101,31 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
106101
}
107102
t.subscribedTracksMu.Unlock()
108103

109-
var rtcpFeedback []webrtc.RTCPFeedback
110-
var maxTrack int
111-
switch t.params.MediaTrack.Kind() {
112-
case livekit.TrackType_AUDIO:
113-
rtcpFeedback = t.params.SubscriberConfig.RTCPFeedback.Audio
114-
maxTrack = t.params.ReceiverConfig.PacketBufferSizeAudio
115-
case livekit.TrackType_VIDEO:
116-
rtcpFeedback = t.params.SubscriberConfig.RTCPFeedback.Video
117-
maxTrack = t.params.ReceiverConfig.PacketBufferSizeVideo
118-
default:
119-
t.params.Logger.Warnw("unexpected track type", nil, "kind", t.params.MediaTrack.Kind())
120-
}
121-
codecs := wr.Codecs()
122-
for _, c := range codecs {
123-
c.RTCPFeedback = rtcpFeedback
124-
}
125-
126-
streamID := wr.StreamID()
127-
if sub.SupportsSyncStreamID() && t.params.MediaTrack.Stream() != "" {
128-
streamID = PackSyncStreamID(t.params.MediaTrack.PublisherID(), t.params.MediaTrack.Stream())
129-
}
130-
131-
var trailer []byte
132-
if t.params.MediaTrack.IsEncrypted() {
133-
trailer = sub.GetTrailer()
134-
}
135-
136-
downTrack, err := sfu.NewDownTrack(sfu.DowntrackParams{
137-
Codecs: codecs,
138-
Source: t.params.MediaTrack.Source(),
139-
Receiver: wr,
140-
BufferFactory: sub.GetBufferFactory(),
141-
SubID: subscriberID,
142-
StreamID: streamID,
143-
MaxTrack: maxTrack,
144-
PlayoutDelayLimit: sub.GetPlayoutDelayConfig(),
145-
Pacer: sub.GetPacer(),
146-
Trailer: trailer,
147-
Logger: LoggerWithTrack(sub.GetLogger().WithComponent(sutils.ComponentSub), trackID, t.params.IsRelayed),
148-
RTCPWriter: sub.WriteSubscriberRTCP,
149-
DisableSenderReportPassThrough: sub.GetDisableSenderReportPassThrough(),
150-
SupportsCodecChange: sub.SupportsCodecChange(),
104+
subTrack, err := NewSubscribedTrack(SubscribedTrackParams{
105+
ReceiverConfig: t.params.ReceiverConfig,
106+
SubscriberConfig: t.params.SubscriberConfig,
107+
Subscriber: sub,
108+
MediaTrack: t.params.MediaTrack,
109+
AdaptiveStream: sub.GetAdaptiveStream(),
110+
Telemetry: t.params.Telemetry,
111+
WrappedReceiver: wr,
112+
IsRelayed: t.params.IsRelayed,
113+
OnDownTrackCreated: t.onDownTrackCreated,
114+
OnDownTrackClosed: func(subscriberID livekit.ParticipantID) {
115+
t.subscribedTracksMu.Lock()
116+
delete(t.subscribedTracks, subscriberID)
117+
t.subscribedTracksMu.Unlock()
118+
},
119+
OnSubscriberMaxQualityChange: t.onSubscriberMaxQualityChange,
151120
})
152121
if err != nil {
153122
return nil, err
154123
}
155124

156-
if t.onDownTrackCreated != nil {
157-
t.onDownTrackCreated(downTrack)
158-
}
159-
160-
subTrack := NewSubscribedTrack(SubscribedTrackParams{
161-
PublisherID: t.params.MediaTrack.PublisherID(),
162-
PublisherIdentity: t.params.MediaTrack.PublisherIdentity(),
163-
PublisherVersion: t.params.MediaTrack.PublisherVersion(),
164-
Subscriber: sub,
165-
MediaTrack: t.params.MediaTrack,
166-
DownTrack: downTrack,
167-
AdaptiveStream: sub.GetAdaptiveStream(),
168-
})
169-
170-
if !sub.Hidden() {
171-
downTrack.OnBindAndConnected(func() {
172-
t.params.MediaTrack.OnTrackSubscribed()
173-
})
174-
}
175-
176125
// Bind callback can happen from replaceTrack, so set it up early
177126
var reusingTransceiver atomic.Bool
178127
var dtState sfu.DownTrackState
179-
downTrack.OnCodecNegotiated(func(codec webrtc.RTPCodecCapability) {
180-
if !wr.DetermineReceiver(codec) {
181-
if t.onSubscriberMaxQualityChange != nil {
182-
go func() {
183-
mimeType := mime.NormalizeMimeType(codec.MimeType)
184-
spatial := buffer.GetSpatialLayerForVideoQuality(
185-
mimeType,
186-
livekit.VideoQuality_HIGH,
187-
t.params.MediaTrack.ToProto(),
188-
)
189-
t.onSubscriberMaxQualityChange(downTrack.SubscriberID(), mimeType, spatial)
190-
}()
191-
}
192-
}
193-
})
128+
downTrack := subTrack.DownTrack()
194129
downTrack.OnBinding(func(err error) {
195130
if err != nil {
196131
go subTrack.Bound(err)
@@ -214,51 +149,6 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
214149
subTrack.SetPublisherMuted(t.params.MediaTrack.IsMuted())
215150
})
216151

217-
statsKey := telemetry.StatsKeyForTrack(
218-
sub.GetCountry(),
219-
livekit.StreamType_DOWNSTREAM,
220-
subscriberID,
221-
trackID,
222-
t.params.MediaTrack.Source(),
223-
t.params.MediaTrack.Kind(),
224-
)
225-
reporter := sub.GetReporter().WithTrack(trackID.String())
226-
downTrack.OnStatsUpdate(func(_ *sfu.DownTrack, stat *livekit.AnalyticsStat) {
227-
t.params.Telemetry.TrackStats(statsKey, stat)
228-
229-
if cs, ok := telemetry.CondenseStat(stat); ok {
230-
ti := wr.TrackInfo()
231-
reporter.Tx(func(tx roomobs.TrackTx) {
232-
tx.ReportName(ti.Name)
233-
tx.ReportKind(roomobs.TrackKindSub)
234-
tx.ReportType(roomobs.TrackTypeFromProto(ti.Type))
235-
tx.ReportSource(roomobs.TrackSourceFromProto(ti.Source))
236-
tx.ReportMime(mime.NormalizeMimeType(ti.MimeType).ReporterType())
237-
tx.ReportLayer(roomobs.PackTrackLayer(ti.Height, ti.Width))
238-
tx.ReportDuration(uint16(cs.EndTime.Sub(cs.StartTime).Milliseconds()))
239-
tx.ReportFrames(uint16(cs.Frames))
240-
tx.ReportSendBytes(uint32(cs.Bytes))
241-
tx.ReportSendPackets(cs.Packets)
242-
tx.ReportPacketsLost(cs.PacketsLost)
243-
tx.ReportScore(stat.Score)
244-
})
245-
}
246-
})
247-
248-
downTrack.OnMaxLayerChanged(func(dt *sfu.DownTrack, layer int32) {
249-
if t.onSubscriberMaxQualityChange != nil {
250-
t.onSubscriberMaxQualityChange(dt.SubscriberID(), dt.Mime(), layer)
251-
}
252-
})
253-
254-
downTrack.OnRttUpdate(func(_ *sfu.DownTrack, rtt uint32) {
255-
go sub.UpdateMediaRTT(rtt)
256-
})
257-
258-
downTrack.AddReceiverReportListener(func(dt *sfu.DownTrack, report *rtcp.ReceiverReport) {
259-
sub.HandleReceiverReport(dt, report)
260-
})
261-
262152
var transceiver *webrtc.RTPTransceiver
263153
var sender *webrtc.RTPSender
264154

@@ -313,6 +203,7 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
313203
Stereo: slices.Contains(info.AudioFeatures, livekit.AudioTrackFeature_TF_STEREO),
314204
Red: !info.DisableRed,
315205
}
206+
codecs := wr.Codecs()
316207
if addTrackParams.Red && (len(codecs) == 1 && mime.IsMimeTypeStringOpus(codecs[0].MimeType)) {
317208
addTrackParams.Red = false
318209
}
@@ -363,10 +254,6 @@ func (t *MediaTrackSubscriptions) AddSubscriber(sub types.LocalParticipant, wr *
363254

364255
downTrack.SetTransceiver(transceiver)
365256

366-
downTrack.OnCloseHandler(func(isExpectedToResume bool) {
367-
t.downTrackClosed(subscriberID, sub, subTrack, isExpectedToResume)
368-
})
369-
370257
t.subscribedTracksMu.Lock()
371258
t.subscribedTracks[subscriberID] = subTrack
372259
t.subscribedTracksMu.Unlock()
@@ -472,27 +359,3 @@ func (t *MediaTrackSubscriptions) DebugInfo() []map[string]interface{} {
472359

473360
return subscribedTrackInfo
474361
}
475-
476-
func (t *MediaTrackSubscriptions) downTrackClosed(
477-
subscriberID livekit.ParticipantID,
478-
sub types.LocalParticipant,
479-
subTrack types.SubscribedTrack,
480-
isExpectedToResume bool,
481-
) {
482-
// Cache transceiver for potential re-use on resume.
483-
// To ensure subscription manager does not re-subscribe before caching,
484-
// delete the subscribed track only after caching.
485-
if isExpectedToResume {
486-
dt := subTrack.DownTrack()
487-
if tr := dt.GetTransceiver(); tr != nil {
488-
sub.CacheDownTrack(subTrack.ID(), tr, dt.GetState())
489-
}
490-
}
491-
492-
go func() {
493-
t.subscribedTracksMu.Lock()
494-
delete(t.subscribedTracks, subscriberID)
495-
t.subscribedTracksMu.Unlock()
496-
subTrack.Close(isExpectedToResume)
497-
}()
498-
}

0 commit comments

Comments
 (0)