-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Expand file tree
/
Copy pathforwardstats.go
More file actions
110 lines (92 loc) · 2.88 KB
/
Copy pathforwardstats.go
File metadata and controls
110 lines (92 loc) · 2.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package sfu
import (
"sync"
"time"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
"github.com/livekit/protocol/utils/mono"
)
const (
highForwardingLatency = 20 * time.Millisecond
skewFactor = 10
)
type ForwardStats struct {
lock sync.Mutex
latency *utils.LatencyAggregate
lowest int64
highest int64
lastUpdateAt int64
closeCh chan struct{}
}
func NewForwardStats(latencyUpdateInterval, reportInterval, latencyWindowLength time.Duration) *ForwardStats {
s := &ForwardStats{
latency: utils.NewLatencyAggregate(latencyUpdateInterval, latencyWindowLength),
lowest: time.Second.Nanoseconds(),
closeCh: make(chan struct{}),
}
go s.report(reportInterval)
return s
}
func (s *ForwardStats) Update(arrival, left int64) (int64, bool) {
transit := left - arrival
isHighForwardingLatency := false
if time.Duration(transit) > highForwardingLatency {
isHighForwardingLatency = true
}
s.lock.Lock()
s.latency.Update(time.Duration(arrival), float64(transit))
s.lowest = min(transit, s.lowest)
s.highest = max(transit, s.highest)
s.lastUpdateAt = arrival
s.lock.Unlock()
return transit, isHighForwardingLatency
}
func (s *ForwardStats) GetStats(shortDuration time.Duration) (time.Duration, time.Duration, time.Duration, time.Duration) {
s.lock.Lock()
// a dummy sample to flush the pipe to current time
now := mono.UnixNano()
if (now - s.lastUpdateAt) > shortDuration.Nanoseconds() {
s.latency.Update(time.Duration(now), 0)
}
wLong := s.latency.Summarize()
wShort := s.latency.SummarizeLast(shortDuration)
lowest := s.lowest
s.lowest = time.Second.Nanoseconds()
highest := s.highest
s.highest = 0
s.lock.Unlock()
latencyLong, jitterLong := time.Duration(wLong.Mean()), time.Duration(wLong.StdDev())
latencyShort, jitterShort := time.Duration(wShort.Mean()), time.Duration(wShort.StdDev())
if jitterLong > latencyLong*skewFactor {
logger.Infow(
"high jitter in forwarding path",
"lowest", time.Duration(lowest),
"highest", time.Duration(highest),
"countLong", wLong.Count(),
"latencyLong", latencyLong,
"jitterLong", jitterLong,
"countShort", wShort.Count(),
"latencyShort", latencyShort,
"jitterShort", jitterShort,
)
}
return latencyLong, jitterLong, latencyShort, jitterShort
}
func (s *ForwardStats) Stop() {
close(s.closeCh)
}
func (s *ForwardStats) report(reportInterval time.Duration) {
ticker := time.NewTicker(reportInterval)
defer ticker.Stop()
for {
select {
case <-s.closeCh:
return
case <-ticker.C:
latencyLong, jitterLong, latencyShort, jitterShort := s.GetStats(reportInterval)
prometheus.RecordForwardJitter(uint32(jitterShort.Microseconds()), uint32(jitterLong.Microseconds()))
prometheus.RecordForwardLatency(uint32(latencyShort.Microseconds()), uint32(latencyLong.Microseconds()))
}
}
}