Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .github/workflows/slack-notifier.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: PR Slack Notifier

on:
pull_request:
types: [review_requested, reopened, closed]
pull_request_review:
types: [submitted]

permissions:
contents: read
pull-requests: write
issues: write

concurrency:
group: pr-slack-${{ github.event.pull_request.number }}-${{ github.workflow }}
cancel-in-progress: false

jobs:
notify-devs:
runs-on: ubuntu-latest
steps:
- uses: livekit/slack-notifier-action@main
with:
config_json: ${{ secrets.SLACK_NOTIFY_CONFIG_JSON }}
slack_token: ${{ secrets.SLACK_PR_NOTIFIER_TOKEN }}
63 changes: 43 additions & 20 deletions pkg/rtc/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -1951,6 +1951,18 @@ func (t *PCTransport) initPCWithPreviousAnswer(previousAnswer webrtc.SessionDesc
continue
}

if !t.params.IsOfferer {
// `sendrecv` or `sendonly` means this transceiver is used for sending

// Note that a transceiver previously used to send could be `inactive`.
// Let those transceivers be created when remote description is set.
_, ok1 := m.Attribute(webrtc.RTPTransceiverDirectionSendrecv.String())
_, ok2 := m.Attribute(webrtc.RTPTransceiverDirectionSendonly.String())
if !ok1 && !ok2 {
continue
}
}

tr, err := t.pc.AddTransceiverFromKind(
codecType,
webrtc.RTPTransceiverInit{
Expand All @@ -1976,35 +1988,46 @@ func (t *PCTransport) initPCWithPreviousAnswer(previousAnswer webrtc.SessionDesc
return senders, nil
}

func (t *PCTransport) SetPreviousSdp(offer, answer *webrtc.SessionDescription) {
func (t *PCTransport) SetPreviousSdp(localDescription, remoteDescription *webrtc.SessionDescription) {
// when there is no answer, cannot migrate, force a full reconnect
if answer == nil {
if (t.params.IsOfferer && remoteDescription == nil) || (!t.params.IsOfferer && localDescription == nil) {
t.onNegotiationFailed(true, "no previous answer")
return
}

t.lock.Lock()
if t.pc.RemoteDescription() == nil && t.previousAnswer == nil {
if t.params.IsOfferer {
t.previousAnswer = answer
var (
senders map[string]*webrtc.RTPSender
err error
parseMids bool
)
if t.params.IsOfferer {
if t.pc.RemoteDescription() == nil && t.previousAnswer == nil {
t.previousAnswer = remoteDescription
senders, err = t.initPCWithPreviousAnswer(*remoteDescription)
parseMids = true
}
senders, err := t.initPCWithPreviousAnswer(*answer)
if err != nil {
t.lock.Unlock()

t.onNegotiationFailed(true, fmt.Sprintf("initPCWithPreviousAnswer failed, error: %s", err))
return
} else {
if t.pc.LocalDescription() == nil {
senders, err = t.initPCWithPreviousAnswer(*localDescription)
parseMids = true
}
}
if err != nil {
t.lock.Unlock()
t.onNegotiationFailed(true, fmt.Sprintf("initPCWithPreviousAnswer failed, error: %s", err))
return
}

if offer != nil {
// in migration case, can't reuse transceiver before negotiated except track subscribed at previous node
t.canReuseTransceiver = false
if err := t.parseTrackMid(*offer, senders); err != nil {
t.params.Logger.Warnw(
"parse previous offer failed", err,
"offer", offer.SDP,
)
}
if localDescription != nil && parseMids {
// in migration case, can't reuse transceiver before negotiating excepted tracks
// that were subscribed at previous node
t.canReuseTransceiver = false
if err := t.parseTrackMid(*localDescription, senders); err != nil {
t.params.Logger.Warnw(
"parse previous local description failed", err,
"localDescription", localDescription.SDP,
)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/rtc/transportmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ func (t *TransportManager) SetMigrateInfo(
}

if t.params.UseSinglePeerConnection {
t.publisher.SetPreviousSdp(previousOffer, previousAnswer)
t.publisher.SetPreviousSdp(previousAnswer, previousOffer)
} else {
t.subscriber.SetPreviousSdp(previousOffer, previousAnswer)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/service/roomallocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,12 +213,11 @@ func (r *StandardRoomAllocator) applyNamedRoomConfiguration(req *livekit.CreateR

clone := utils.CloneProto(req)

// Request overwrites conf
if clone.EmptyTimeout == 0 {
clone.EmptyTimeout = conf.EmptyTimeout
}
if clone.DepartureTimeout == 0 {
clone.DepartureTimeout = req.DepartureTimeout
clone.DepartureTimeout = conf.DepartureTimeout
}
if clone.MaxParticipants == 0 {
clone.MaxParticipants = conf.MaxParticipants
Expand Down
2 changes: 1 addition & 1 deletion pkg/sfu/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (b *Buffer) createDDParserAndFrameRateCalculator() {
}
b.ddParser = NewDependencyDescriptorParser(b.ddExtID, b.logger, func(spatial, temporal int32) {
frc.SetMaxLayer(spatial, temporal)
})
}, false)
}
}

Expand Down
53 changes: 47 additions & 6 deletions pkg/sfu/buffer/dependencydescriptorparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package buffer
import (
"fmt"
"sort"
"time"

"github.com/pion/rtp"
"go.uber.org/atomic"
Expand All @@ -27,6 +28,14 @@ import (
"github.com/livekit/protocol/logger"
)

const (
ddRestartThreshold = 30 * time.Second

// frame integrity check 2 seconds for L3T3 30fps video
integrityCheckFrame = 180
integrityCheckPkt = 1024
)

var (
ErrFrameEarlierThanKeyFrame = fmt.Errorf("frame is earlier than current keyframe")
ErrDDStructureAttachedToNonFirstPacket = fmt.Errorf("dependency descriptor structure is attached to non-first packet of a frame")
Expand All @@ -48,16 +57,22 @@ type DependencyDescriptorParser struct {
frameChecker *FrameIntegrityChecker

ddNotFoundCount atomic.Uint32

// restart detection
restartGeneration int
enableRestart bool
lastPacketAt time.Time
}

func NewDependencyDescriptorParser(ddExtID uint8, logger logger.Logger, onMaxLayerChanged func(int32, int32)) *DependencyDescriptorParser {
func NewDependencyDescriptorParser(ddExtID uint8, logger logger.Logger, onMaxLayerChanged func(int32, int32), enableRestart bool) *DependencyDescriptorParser {
return &DependencyDescriptorParser{
ddExtID: ddExtID,
logger: logger,
onMaxLayerChanged: onMaxLayerChanged,
seqWrapAround: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
frameWrapAround: utils.NewWrapAround[uint16, uint64](utils.WrapAroundParams{IsRestartAllowed: false}),
frameChecker: NewFrameIntegrityChecker(180, 1024), // 2seconds for L3T3 30fps video
frameChecker: NewFrameIntegrityChecker(integrityCheckFrame, integrityCheckPkt),
enableRestart: enableRestart,
}
}

Expand All @@ -71,6 +86,10 @@ type ExtDependencyDescriptor struct {
ExtFrameNum uint64
// the frame number of the keyframe which the current frame depends on
ExtKeyFrameNum uint64

// increase when the stream restarts, clear and reinitialize all dd state includes
// attached structure, frame chain, decode target.
RestartGeneration int
}

func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*ExtDependencyDescriptor, VideoLayer, error) {
Expand All @@ -84,6 +103,16 @@ func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*ExtDependencyDescr
return nil, videoLayer, ErrDDExtentionNotFound
}

var restart bool
if r.enableRestart {
if !r.lastPacketAt.IsZero() && time.Since(r.lastPacketAt) > ddRestartThreshold {
r.restart()
restart = true
r.logger.Debugw("dependency descriptor parser restart stream", "generation", r.restartGeneration)
}
r.lastPacketAt = time.Now()
}

var ddVal dd.DependencyDescriptor
ext := &dd.DependencyDescriptorExtension{
Descriptor: &ddVal,
Expand All @@ -103,7 +132,8 @@ func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*ExtDependencyDescr
videoLayer.Spatial, videoLayer.Temporal = int32(ddVal.FrameDependencies.SpatialId), int32(ddVal.FrameDependencies.TemporalId)
}

unwrapped := r.frameWrapAround.Update(ddVal.FrameNumber)
// assume the packet is in-order when stream restarting
unwrapped := r.frameWrapAround.UpdateWithOrderKnown(ddVal.FrameNumber, restart)
extFN := unwrapped.ExtendedVal

if extFN < r.structureExtFrameNum {
Expand All @@ -114,9 +144,10 @@ func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*ExtDependencyDescr
r.frameChecker.AddPacket(extSeq, extFN, &ddVal)

extDD := &ExtDependencyDescriptor{
Descriptor: &ddVal,
ExtFrameNum: extFN,
Integrity: r.frameChecker.FrameIntegrity(extFN),
Descriptor: &ddVal,
ExtFrameNum: extFN,
Integrity: r.frameChecker.FrameIntegrity(extFN),
RestartGeneration: r.restartGeneration,
}

if ddVal.AttachedStructure != nil {
Expand Down Expand Up @@ -167,6 +198,16 @@ func (r *DependencyDescriptorParser) Parse(pkt *rtp.Packet) (*ExtDependencyDescr
return extDD, videoLayer, nil
}

func (r *DependencyDescriptorParser) restart() {
r.frameChecker = NewFrameIntegrityChecker(integrityCheckFrame, integrityCheckPkt)
r.structure = nil
r.structureExtFrameNum = 0
r.activeDecodeTargetsExtSeq = 0
r.activeDecodeTargetsMask = 0
r.decodeTargets = r.decodeTargets[:0]
r.restartGeneration++
}

// ------------------------------------------------------------------------------

type DependencyDescriptorDecodeTarget struct {
Expand Down
16 changes: 11 additions & 5 deletions pkg/sfu/utils/wraparound.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (w *WrapAroundUpdateResult[ET]) MarshalLogObject(e zapcore.ObjectEncoder) e
return nil
}

func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) {
func (w *WrapAround[T, ET]) UpdateWithOrderKnown(val T, orderKnown bool) (result WrapAroundUpdateResult[ET]) {
if !w.initialized {
result.PreExtendedHighest = ET(val) - 1
result.ExtendedVal = ET(val)
Expand All @@ -92,10 +92,12 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) {
return
}

gap := val - w.highest
if gap > T(w.fullRange>>1) {
// out-of-order
return w.maybeAdjustStart(val)
if !orderKnown {
gap := val - w.highest
if gap > T(w.fullRange>>1) {
// out-of-order
return w.maybeAdjustStart(val)
}
}

// in-order
Expand All @@ -111,6 +113,10 @@ func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) {
return
}

func (w *WrapAround[T, ET]) Update(val T) (result WrapAroundUpdateResult[ET]) {
return w.UpdateWithOrderKnown(val, false)
}

func (w *WrapAround[T, ET]) UndoUpdate(result WrapAroundUpdateResult[ET]) {
if !w.initialized || result.PreExtendedHighest >= result.ExtendedVal {
return
Expand Down
29 changes: 28 additions & 1 deletion pkg/sfu/videolayerselector/dependencydescriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (
"github.com/livekit/protocol/logger"
)

const (
decisionCacheMaxElements = 256
decisionCacheNackEntries = 80
)

type DependencyDescriptor struct {
*Base

Expand All @@ -40,12 +45,14 @@ type DependencyDescriptor struct {
decodeTargetsLock sync.RWMutex
decodeTargets []*DecodeTarget
fnWrapper FrameNumberWrapper

restartGeneration int
}

func NewDependencyDescriptor(logger logger.Logger) *DependencyDescriptor {
return &DependencyDescriptor{
Base: NewBase(logger),
decisions: NewSelectorDecisionCache(256, 80),
decisions: NewSelectorDecisionCache(decisionCacheMaxElements, decisionCacheNackEntries),
fnWrapper: FrameNumberWrapper{logger: logger},
}
}
Expand Down Expand Up @@ -75,6 +82,20 @@ func (d *DependencyDescriptor) Select(extPkt *buffer.ExtPacket, _layer int32) (r
return
}

if ddwdt.RestartGeneration > d.restartGeneration {
d.logger.Debugw("stream restarted",
"packet", ddwdt.RestartGeneration,
"current", d.restartGeneration,
"structureKeyFrame", d.extKeyFrameNum,
"efn", ddwdt.ExtFrameNum,
"lastEfn", d.fnWrapper.LastOrigin(),
)
d.restart(ddwdt.RestartGeneration)
} else if ddwdt.RestartGeneration < d.restartGeneration {
// must not happen
d.logger.Warnw("packet from old generation", nil, "packet", ddwdt.RestartGeneration, "current", d.restartGeneration)
}

dd := ddwdt.Descriptor

extFrameNum := ddwdt.ExtFrameNum
Expand Down Expand Up @@ -434,3 +455,9 @@ func (d *DependencyDescriptor) CheckSync() (locked bool, layer int32) {

return false, layer
}

func (d *DependencyDescriptor) restart(generation int) {
d.restartGeneration = generation
d.invalidateKeyFrame()
d.decisions = NewSelectorDecisionCache(decisionCacheMaxElements, decisionCacheNackEntries)
}
4 changes: 4 additions & 0 deletions pkg/sfu/videolayerselector/framenumberwrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,7 @@ func (f *FrameNumberWrapper) UpdateAndGet(new uint64, updateOffset bool) uint64
f.last = new
return new + f.offset
}

func (f *FrameNumberWrapper) LastOrigin() uint64 {
return f.last
}
Loading