Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 29 additions & 17 deletions pkg/sfu/forwardstats.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package sfu

import (
"fmt"
"sync"
"time"

"go.uber.org/atomic"

"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
)

const (
highForwardingLatency = 500 * time.Millisecond
skewFactor = 10
)

type ForwardStats struct {
lock sync.Mutex
lastLeftNano atomic.Int64
latency *utils.LatencyAggregate
closeCh chan struct{}
lock sync.Mutex
latency *utils.LatencyAggregate
closeCh chan struct{}
}

func NewForwardStats(latencyUpdateInterval, reportInterval, latencyWindowLength time.Duration) *ForwardStats {
Expand All @@ -27,29 +31,35 @@ func NewForwardStats(latencyUpdateInterval, reportInterval, latencyWindowLength
return s
}

func (s *ForwardStats) Update(arrival, left int64) {
func (s *ForwardStats) Update(arrival, left int64) (int64, bool) {
transit := left - arrival

// ignore if transit is too large or negative, this could happen if system time is adjusted
if transit < 0 || time.Duration(transit) > 5*time.Second {
return
}
lastLeftNano := s.lastLeftNano.Load()
if left < lastLeftNano || !s.lastLeftNano.CompareAndSwap(lastLeftNano, left) {
return
isHighForwardingLatency := false
if time.Duration(transit) > highForwardingLatency {
isHighForwardingLatency = true
}

s.lock.Lock()
defer s.lock.Unlock()
s.latency.Update(time.Duration(arrival), float64(transit))
s.lock.Unlock()

return transit, isHighForwardingLatency
}

func (s *ForwardStats) GetStats() (time.Duration, time.Duration) {
s.lock.Lock()
w := s.latency.Summarize()
s.lock.Unlock()

return time.Duration(w.Mean()), time.Duration(w.StdDev())
latency, jitter := time.Duration(w.Mean()), time.Duration(w.StdDev())
if jitter > latency*skewFactor {
logger.Infow(
"high jitter in forwarding path",
"latency", latency,
"jitter", jitter,
"stats", fmt.Sprintf("count %.2f, mean %.2f, stdDev %.2f", w.Count(), w.Mean(), w.StdDev()),
)
}
return latency, jitter
}

func (s *ForwardStats) GetLastStats(duration time.Duration) (time.Duration, time.Duration) {
Expand All @@ -67,10 +77,12 @@ func (s *ForwardStats) Stop() {
func (s *ForwardStats) report(reportInterval time.Duration) {
ticker := time.NewTicker(reportInterval)
defer ticker.Stop()

for {
select {
case <-s.closeCh:
return

case <-ticker.C:
latency, jitter := s.GetLastStats(reportInterval)
latencySlow, jitterSlow := s.GetStats()
Expand Down
8 changes: 7 additions & 1 deletion pkg/sfu/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,13 @@ func (w *WebRTCReceiver) forwardRTP(layer int32, buff *buffer.Buffer) {

// track delay/jitter
if writeCount > 0 && w.forwardStats != nil {
w.forwardStats.Update(pkt.Arrival, mono.UnixNano())
if latency, isHigh := w.forwardStats.Update(pkt.Arrival, mono.UnixNano()); isHigh {
w.logger.Infow(
"high forwarding latency",
"latency", latency,
"writeCount", writeCount,
)
}
}

// track video layers
Expand Down
Loading