Skip to content

Commit 320b4cf

Browse files
author
Robin Raymond
committed
added prometheus stats for rtp packets
1 parent 0997471 commit 320b4cf

4 files changed

Lines changed: 207 additions & 7 deletions

File tree

cmd/signal/json-rpc/main.go

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ package main
44
import (
55
"flag"
66
"fmt"
7+
"net"
78
"net/http"
89
"os"
910

1011
"github.com/gorilla/websocket"
12+
"github.com/prometheus/client_golang/prometheus/promhttp"
1113
"github.com/sourcegraph/jsonrpc2"
1214
websocketjsonrpc2 "github.com/sourcegraph/jsonrpc2/websocket"
1315
"github.com/spf13/viper"
@@ -18,11 +20,12 @@ import (
1820
)
1921

2022
var (
21-
conf = sfu.Config{}
22-
file string
23-
cert string
24-
key string
25-
addr string
23+
conf = sfu.Config{}
24+
file string
25+
cert string
26+
key string
27+
addr string
28+
metricsAddr string
2629
)
2730

2831
const (
@@ -77,6 +80,7 @@ func parse() bool {
7780
flag.StringVar(&cert, "cert", "", "cert file")
7881
flag.StringVar(&key, "key", "", "key file")
7982
flag.StringVar(&addr, "a", ":7000", "address to use")
83+
flag.StringVar(&metricsAddr, "m", ":8100", "merics to use")
8084
help := flag.Bool("h", false, "help info")
8185
flag.Parse()
8286
if !load() {
@@ -89,6 +93,26 @@ func parse() bool {
8993
return true
9094
}
9195

96+
func startMetrics(addr string) {
97+
// start metrics server
98+
m := http.NewServeMux()
99+
m.Handle("/metrics", promhttp.Handler())
100+
srv := &http.Server{
101+
Handler: m,
102+
}
103+
104+
metricsLis, err := net.Listen("tcp", addr)
105+
if err != nil {
106+
log.Panicf("cannot bind to metrics endpoint %s. err: %s", addr, err)
107+
}
108+
log.Infof("Metrics Listening at %s", addr)
109+
110+
err = srv.Serve(metricsLis)
111+
if err != nil {
112+
log.Errorf("debug server stopped. got err: %s", err)
113+
}
114+
}
115+
92116
func main() {
93117
if !parse() {
94118
showHelp()
@@ -123,6 +147,8 @@ func main() {
123147
<-jc.DisconnectNotify()
124148
}))
125149

150+
go startMetrics(metricsAddr)
151+
126152
var err error
127153
if key != "" && cert != "" {
128154
log.Infof("Listening at https://[%s]", addr)

pkg/buffer/buffer.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"strings"
55
"sync/atomic"
66
"time"
7+
"unsafe"
78

89
"github.com/pion/sdp/v3"
910

@@ -65,6 +66,15 @@ type Buffer struct {
6566
feedbackTWCC func(sn uint16, timeNS int64, marker bool)
6667
}
6768

69+
type BufferStats struct {
70+
LastExpected uint32
71+
LastReceived uint32
72+
LostRate float32
73+
PacketCount uint32 // Number of packets received from this source.
74+
Jitter float64 // An estimate of the statistical variance of the RTP data packet inter-arrival time.
75+
TotalByte uint64
76+
}
77+
6878
// BufferOptions provides configuration options for the buffer
6979
type Options struct {
7080
BufferTime int
@@ -272,14 +282,17 @@ func (b *Buffer) onNack(fn func(fb *rtcp.TransportLayerNack)) {
272282
b.pktQueue.onLost = fn
273283
}
274284

285+
// GetMediaSSRC returns the associated SSRC of the RTP stream
275286
func (b *Buffer) GetMediaSSRC() uint32 {
276287
return b.mediaSSRC
277288
}
278289

290+
// GetClockRate returns the RTP clock rate
279291
func (b *Buffer) GetClockRate() uint32 {
280292
return b.clockRate
281293
}
282294

295+
// GetSenderReportData returns the rtp, ntp and nanos of the last sender report
283296
func (b *Buffer) GetSenderReportData() (rtpTime uint32, ntpTime uint64, lastReceivedTimeInNanosSinceEpoch int64) {
284297
rtpTime = atomic.LoadUint32(&b.lastSRRTPTime)
285298
ntpTime = atomic.LoadUint64(&b.lastSRNTPTime)
@@ -288,17 +301,36 @@ func (b *Buffer) GetSenderReportData() (rtpTime uint32, ntpTime uint64, lastRece
288301
return rtpTime, ntpTime, lastReceivedTimeInNanosSinceEpoch
289302
}
290303

304+
// GetStats returns the raw statistics about a particular buffer state
305+
func (b *Buffer) GetStats() (stats BufferStats) {
306+
stats.LastExpected = atomic.LoadUint32(&b.lastExpected)
307+
stats.LastReceived = atomic.LoadUint32(&b.lastReceived)
308+
raw32Lost := atomic.LoadUint32((*uint32)(unsafe.Pointer(&b.lostRate)))
309+
stats.LostRate = *((*float32)(unsafe.Pointer(&raw32Lost)))
310+
stats.PacketCount = atomic.LoadUint32(&b.packetCount)
311+
312+
raw64Jitter := atomic.LoadUint64((*uint64)(unsafe.Pointer(&b.lostRate)))
313+
stats.Jitter = *((*float64)(unsafe.Pointer(&raw64Jitter)))
314+
315+
stats.TotalByte = atomic.LoadUint64(&b.totalByte)
316+
317+
return stats
318+
}
319+
320+
// GetLatestTimestamp returns the latest RTP timestamp factoring in potential RTP timestamp wrap-around
291321
func (b *Buffer) GetLatestTimestamp() (latestTimestamp uint32, latestTimestampTimeInNanosSinceEpoch int64) {
292322
latestTimestamp = atomic.LoadUint32(&b.latestTimestamp)
293323
latestTimestampTimeInNanosSinceEpoch = atomic.LoadInt64(&b.latestTimestampTime)
294324

295325
return latestTimestamp, latestTimestampTimeInNanosSinceEpoch
296326
}
297327

328+
// IsTimestampWrapAround returns true if wrap around happens from timestamp1 to timestamp2
298329
func IsTimestampWrapAround(timestamp1 uint32, timestamp2 uint32) bool {
299330
return (timestamp1&0xC000000 == 0) && (timestamp2&0xC000000 == 0xC000000)
300331
}
301332

333+
// IsLaterTimestamp returns true if timestamp1 is later in time than timestamp2 factoring in timestamp wrap-around
302334
func IsLaterTimestamp(timestamp1 uint32, timestamp2 uint32) bool {
303335
if timestamp1 > timestamp2 {
304336
if IsTimestampWrapAround(timestamp2, timestamp1) {

pkg/stats/interceptor.go

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,72 @@
11
package stats
22

33
import (
4+
"math"
45
"sync"
56

67
"github.com/pion/interceptor"
78
"github.com/pion/ion-sfu/pkg/buffer"
89
"github.com/pion/rtcp"
910
"github.com/pion/rtp"
11+
"github.com/prometheus/client_golang/prometheus"
1012
)
1113

14+
var (
15+
driftBuckets = []float64{5, 10, 20, 40, 80, 160, math.Inf(+1)}
16+
17+
drift = prometheus.NewHistogram(prometheus.HistogramOpts{
18+
Subsystem: "rtp",
19+
Name: "drift_millis",
20+
Buckets: driftBuckets,
21+
})
22+
23+
expectedCount = prometheus.NewCounter(prometheus.CounterOpts{
24+
Subsystem: "rtp",
25+
Name: "expected",
26+
})
27+
28+
receivedCount = prometheus.NewCounter(prometheus.CounterOpts{
29+
Subsystem: "rtp",
30+
Name: "received",
31+
})
32+
33+
packetCount = prometheus.NewCounter(prometheus.CounterOpts{
34+
Subsystem: "rtp",
35+
Name: "packets",
36+
})
37+
38+
totalBytes = prometheus.NewCounter(prometheus.CounterOpts{
39+
Subsystem: "rtp",
40+
Name: "bytes",
41+
})
42+
43+
expectedMinusReceived = prometheus.NewSummary(prometheus.SummaryOpts{
44+
Subsystem: "rtp",
45+
Name: "expected_minus_received",
46+
})
47+
48+
lostRate = prometheus.NewSummary(prometheus.SummaryOpts{
49+
Subsystem: "rtp",
50+
Name: "lostRate",
51+
})
52+
53+
jitter = prometheus.NewSummary(prometheus.SummaryOpts{
54+
Subsystem: "rtp",
55+
Name: "jitter",
56+
})
57+
)
58+
59+
func init() {
60+
prometheus.MustRegister(drift)
61+
prometheus.MustRegister(expectedCount)
62+
prometheus.MustRegister(receivedCount)
63+
prometheus.MustRegister(packetCount)
64+
prometheus.MustRegister(totalBytes)
65+
prometheus.MustRegister(expectedMinusReceived)
66+
prometheus.MustRegister(lostRate)
67+
prometheus.MustRegister(jitter)
68+
}
69+
1270
type Interceptor struct {
1371
sync.RWMutex
1472
bufferInterceptor *buffer.Interceptor
@@ -76,6 +134,33 @@ func (i *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.
76134
}
77135
}
78136
}
137+
case *rtcp.ReceiverReport:
138+
calculateStats := func(ssrc uint32) {
139+
i.RLock()
140+
defer i.RUnlock()
141+
142+
for _, s := range i.streams {
143+
if s.Buffer.GetMediaSSRC() != ssrc {
144+
continue
145+
}
146+
bufferStats := s.Buffer.GetStats()
147+
148+
hadStats, diffStats := s.updateStats(bufferStats)
149+
150+
if hadStats {
151+
expectedCount.Add(float64(diffStats.LastExpected))
152+
receivedCount.Add(float64(diffStats.LastReceived))
153+
packetCount.Add(float64(diffStats.PacketCount))
154+
totalBytes.Add(float64(diffStats.TotalByte))
155+
}
156+
157+
expectedMinusReceived.Observe(float64(bufferStats.LastExpected - bufferStats.LastReceived))
158+
lostRate.Observe(float64(bufferStats.LostRate))
159+
jitter.Observe(float64(bufferStats.Jitter))
160+
}
161+
}
162+
calculateStats(pkt.SSRC)
163+
79164
case *rtcp.SenderReport:
80165
findRelatedCName := func(ssrc uint32) string {
81166
i.RLock()
@@ -134,13 +219,42 @@ func (i *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.
134219
}
135220
}
136221

222+
calculateStats := func(ssrc uint32) {
223+
i.RLock()
224+
defer i.RUnlock()
225+
226+
for _, s := range i.streams {
227+
if s.Buffer.GetMediaSSRC() != ssrc {
228+
continue
229+
}
230+
231+
bufferStats := s.Buffer.GetStats()
232+
driftInMillis := s.getDriftInMillis()
233+
234+
hadStats, diffStats := s.updateStats(bufferStats)
235+
236+
drift.Observe(float64(driftInMillis))
237+
if hadStats {
238+
expectedCount.Add(float64(diffStats.LastExpected))
239+
receivedCount.Add(float64(diffStats.LastReceived))
240+
packetCount.Add(float64(diffStats.PacketCount))
241+
totalBytes.Add(float64(diffStats.TotalByte))
242+
}
243+
244+
expectedMinusReceived.Observe(float64(bufferStats.LastExpected - bufferStats.LastReceived))
245+
lostRate.Observe(float64(bufferStats.LostRate))
246+
jitter.Observe(float64(bufferStats.Jitter))
247+
}
248+
}
249+
137250
cname := findRelatedCName(pkt.SSRC)
138251

139252
minPacketNtpTimeInMillisSinceSenderEpoch, maxPacketNtpTimeInMillisSinceSenderEpoch := calculateLatestMinMaxSenderNtpTime(cname)
140253

141254
driftInMillis := maxPacketNtpTimeInMillisSinceSenderEpoch - minPacketNtpTimeInMillisSinceSenderEpoch
142255

143256
setDrift(cname, driftInMillis)
257+
calculateStats(pkt.SSRC)
144258
}
145259
}
146260

pkg/stats/stream.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,18 @@ import (
1111
log "github.com/pion/ion-log"
1212
)
1313

14-
// Stream contains buffer with statistics
14+
// Stream contains buffer statistics
1515
type Stream struct {
1616
sync.RWMutex
1717
Buffer *buffer.Buffer
1818
cname string
1919
driftInMillis uint64
20+
hasStats bool
21+
lastStats buffer.BufferStats
22+
diffStats buffer.BufferStats
2023
}
2124

22-
// NewBuffer constructs a new Buffer
25+
// NewStream constructs a new Stream
2326
func NewStream(buffer *buffer.Buffer, _ *interceptor.StreamInfo) *Stream {
2427
s := &Stream{
2528
Buffer: buffer,
@@ -29,6 +32,7 @@ func NewStream(buffer *buffer.Buffer, _ *interceptor.StreamInfo) *Stream {
2932
return s
3033
}
3134

35+
// GetCName returns the cname for a given stream
3236
func (s *Stream) GetCName() string {
3337
s.RLock()
3438
defer s.RUnlock()
@@ -46,3 +50,27 @@ func (s *Stream) setCName(cname string) {
4650
func (s *Stream) setDriftInMillis(driftInMillis uint64) {
4751
atomic.StoreUint64(&s.driftInMillis, driftInMillis)
4852
}
53+
54+
func (s *Stream) getDriftInMillis() uint64 {
55+
return atomic.LoadUint64(&s.driftInMillis)
56+
}
57+
58+
func (s *Stream) updateStats(stats buffer.BufferStats) (hasDiff bool, diffStats buffer.BufferStats) {
59+
s.Lock()
60+
defer s.Unlock()
61+
62+
hadStats := false
63+
64+
if s.hasStats {
65+
s.diffStats.LastExpected = stats.LastExpected - s.lastStats.LastExpected
66+
s.diffStats.LastReceived = stats.LastReceived - s.lastStats.LastReceived
67+
s.diffStats.PacketCount = stats.PacketCount - s.lastStats.PacketCount
68+
s.diffStats.TotalByte = stats.TotalByte - s.lastStats.TotalByte
69+
hadStats = true
70+
}
71+
72+
s.lastStats = stats
73+
s.hasStats = true
74+
75+
return hadStats, s.diffStats
76+
}

0 commit comments

Comments
 (0)