Skip to content

Commit 568194c

Browse files
[RTC-93] Add some RTP metrics (#142)
* Add new metrics * Emit inbound frames counter * Some CR suggestions + label RTCP by the track type (outbound/inbound) instead of RTCP direction + Add sender report sent counter metric * Rename: 'inbound-rtp.nack' => 'inbound-rtp.nack_sent' * Move all RTCP metrics to track scope * Rename `frames` metric to `markers` Marker doesn't necessarily indicate a frame boundary. The fact that the frame could be damaged but we would still count it would also cause problems. `markers` is therefore more explicit about what this metric represents. * Remove unused telemetry labels * Fix lint * Remove duplicated metrics * Add rtx-sent metric * Add outbound rtp metrics and inbound paddings metric * Fix top level metrics from RTX streams * Fix nil.options error * Even more CR fixes * Apply suggestions from code review Co-authored-by: Bartosz Błaszków <[email protected]> * Refactor * Fix incorrect function name, remove padding counter metric --------- Co-authored-by: Bartosz Błaszków <[email protected]>
1 parent f87bfdd commit 568194c

File tree

9 files changed

+306
-54
lines changed

9 files changed

+306
-54
lines changed

lib/membrane/rtcp/parser.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ defmodule Membrane.RTCP.Parser do
5858
@impl true
5959
def handle_event(:output, %RTCPEvent{} = event, _ctx, state) do
6060
buffer = %Buffer{payload: RTCP.Packet.serialize(event.rtcp)}
61+
6162
{[buffer: {:receiver_report_output, buffer}], state}
6263
end
6364

lib/membrane/rtcp/receiver.ex

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,22 @@ defmodule Membrane.RTCP.Receiver do
3333
)
3434
|> Membrane.Time.milliseconds()
3535

36-
@fir_telemetry_event [Membrane.RTP, :rtcp, :fir, :sent]
36+
@fir_sent_telemetry_event [Membrane.RTP, :rtcp, :fir, :sent]
37+
@nack_sent_telemetry_event [Membrane.RTP, :rtcp, :nack, :sent]
38+
@sender_report_received_telemetry_event [Membrane.RTP, :rtcp, :sender_report, :arrival]
39+
@receiver_report_sent_telemetry_event [Membrane.RTP, :rtcp, :receiver_report, :sent]
3740

3841
@impl true
3942
def handle_init(_ctx, opts) do
40-
Membrane.TelemetryMetrics.register(@fir_telemetry_event, opts.telemetry_label)
43+
[
44+
@fir_sent_telemetry_event,
45+
@nack_sent_telemetry_event,
46+
@sender_report_received_telemetry_event,
47+
@receiver_report_sent_telemetry_event
48+
]
49+
|> Enum.each(fn event ->
50+
Membrane.TelemetryMetrics.register(event, opts.telemetry_label)
51+
end)
4152

4253
state =
4354
opts
@@ -64,6 +75,8 @@ defmodule Membrane.RTCP.Receiver do
6475

6576
@impl true
6677
def handle_event(:input, %RTCPEvent{rtcp: %SenderReportPacket{} = rtcp} = event, _ctx, state) do
78+
emit_telemetry_event(@sender_report_received_telemetry_event, state)
79+
6780
<<_wallclock_ts_upper_16_bits::16, wallclock_ts_middle_32_bits::32,
6881
_wallclock_ts_lower_16_bits::16>> =
6982
Time.to_ntp_timestamp(rtcp.sender_info.wallclock_timestamp)
@@ -104,6 +117,9 @@ defmodule Membrane.RTCP.Receiver do
104117
}
105118

106119
packet = %RTCP.ReceiverReportPacket{ssrc: state.local_ssrc, reports: [report_block]}
120+
121+
emit_telemetry_event(@receiver_report_sent_telemetry_event, state)
122+
107123
{[event: {:input, %RTCPEvent{rtcp: packet}}], state}
108124
end
109125

@@ -122,6 +138,8 @@ defmodule Membrane.RTCP.Receiver do
122138
}
123139
}
124140

141+
emit_telemetry_event(@nack_sent_telemetry_event, state)
142+
125143
event = %RTCPEvent{rtcp: rtcp}
126144
Membrane.Logger.debug("Sending NACK to #{state.remote_ssrc} with ids #{inspect(ids)}")
127145
{[event: {:input, event}], state}
@@ -147,7 +165,7 @@ defmodule Membrane.RTCP.Receiver do
147165
}
148166
}
149167

150-
Membrane.TelemetryMetrics.execute(@fir_telemetry_event, %{}, %{}, state.telemetry_label)
168+
emit_telemetry_event(@fir_sent_telemetry_event, state)
151169

152170
event = %RTCPEvent{rtcp: rtcp}
153171
state = %{state | fir_seq_num: state.fir_seq_num + 1, last_fir_timestamp: now}
@@ -158,4 +176,7 @@ defmodule Membrane.RTCP.Receiver do
158176
{[], state}
159177
end
160178
end
179+
180+
defp emit_telemetry_event(event, state),
181+
do: Membrane.TelemetryMetrics.execute(event, %{}, %{}, state.telemetry_label)
161182
end

lib/membrane/rtp/metrics.ex

Lines changed: 73 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,34 +3,98 @@ defmodule Membrane.RTP.Metrics do
33
Defines list of metrics, that can be aggregated based on events from membrane_rtp_plugin.
44
"""
55

6+
alias Telemetry.Metrics
7+
68
@doc """
79
Returns list of metrics, that can be aggregated based on events from membrane_rtp_plugin.
810
"""
9-
@spec metrics() :: [Telemetry.Metrics.t()]
11+
@spec metrics() :: [Metrics.t()]
1012
def metrics() do
1113
[
12-
Telemetry.Metrics.counter(
13-
"inbound-rtp.keyframe_request_sent",
14-
event_name: [Membrane.RTP, :rtcp, :fir, :sent]
15-
),
16-
Telemetry.Metrics.counter(
14+
Metrics.counter(
1715
"inbound-rtp.packets",
1816
event_name: [Membrane.RTP, :packet, :arrival]
1917
),
20-
Telemetry.Metrics.sum(
18+
Metrics.sum(
2119
"inbound-rtp.bytes_received",
2220
event_name: [Membrane.RTP, :packet, :arrival],
2321
measurement: :bytes
2422
),
25-
Telemetry.Metrics.last_value(
23+
Metrics.last_value(
2624
"inbound-rtp.encoding",
2725
event_name: [Membrane.RTP, :inbound_track, :new],
2826
measurement: :encoding
2927
),
30-
Telemetry.Metrics.last_value(
28+
Metrics.last_value(
3129
"inbound-rtp.ssrc",
3230
event_name: [Membrane.RTP, :inbound_track, :new],
3331
measurement: :ssrc
32+
),
33+
Metrics.counter(
34+
"inbound-rtp.markers_received",
35+
event_name: [Membrane.RTP, :rtp, :marker_received]
36+
),
37+
Metrics.counter(
38+
"outbound-rtp.markers_sent",
39+
event_name: [Membrane.RTP, :rtp, :marker_sent]
40+
),
41+
Metrics.counter(
42+
"rtcp.total_packets_received",
43+
event_name: [Membrane.RTP, :rtcp, :arrival]
44+
),
45+
Metrics.counter(
46+
"rtcp.total_packets_sent",
47+
event_name: [Membrane.RTP, :rtcp, :sent]
48+
),
49+
Metrics.counter(
50+
"rtcp.nack_sent",
51+
event_name: [Membrane.RTP, :rtcp, :nack, :sent]
52+
),
53+
Metrics.counter(
54+
"rtcp.fir_sent",
55+
event_name: [Membrane.RTP, :rtcp, :fir, :sent]
56+
),
57+
Metrics.counter(
58+
"rtcp.sender_reports_sent",
59+
event_name: [Membrane.RTP, :rtcp, :sender_report, :sent]
60+
),
61+
Metrics.counter(
62+
"rtcp.receiver_reports_sent",
63+
event_name: [Membrane.RTP, :rtcp, :receiver_report, :sent]
64+
),
65+
Metrics.counter(
66+
"rtcp.nack_received",
67+
event_name: [Membrane.RTP, :rtcp, :nack, :arrival]
68+
),
69+
Metrics.counter(
70+
"rtcp.fir_received",
71+
event_name: [Membrane.RTP, :rtcp, :fir, :arrival]
72+
),
73+
Metrics.counter(
74+
"rtcp.pli_received",
75+
event_name: [Membrane.RTP, :rtcp, :pli, :arrival]
76+
),
77+
Metrics.counter(
78+
"rtcp.sender_reports_received",
79+
event_name: [Membrane.RTP, :rtcp, :sender_report, :arrival]
80+
),
81+
Metrics.counter(
82+
"rtcp.receiver_reports_received",
83+
event_name: [Membrane.RTP, :rtcp, :receiver_report, :arrival]
84+
),
85+
Metrics.sum(
86+
"outbound-rtp.rtx_sent",
87+
event_name: [Membrane.RTP, :rtx, :sent],
88+
measurement: :amount
89+
),
90+
Metrics.counter(
91+
"outbound-rtp.packets",
92+
event_name: [Membrane.RTP, :packet, :sent]
93+
),
94+
Metrics.counter(
95+
"outbound-rtp.bytes",
96+
event_name: [Membrane.RTP, :packet, :sent],
97+
measurement: :bytes
3498
)
3599
]
36100
end

lib/membrane/rtp/outbound_rtx_controller.ex

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,15 @@ defmodule Membrane.RTP.OutboundRtxController do
22
use Membrane.Filter
33

44
require Membrane.Logger
5+
require Membrane.TelemetryMetrics
56

67
alias Membrane.RTP.RetransmissionRequestEvent
78

9+
def_options telemetry_label: [
10+
spec: Membrane.TelemetryMetrics.label(),
11+
default: []
12+
]
13+
814
def_input_pad :input,
915
availability: :always,
1016
demand_mode: :auto,
@@ -18,12 +24,17 @@ defmodule Membrane.RTP.OutboundRtxController do
1824
@max_store_size 300
1925
@min_rtx_interval 10
2026

27+
@retransmission_telemetry_event [Membrane.RTP, :rtx, :sent]
28+
2129
@doc false
2230
@spec max_store_size() :: pos_integer()
2331
def max_store_size(), do: @max_store_size
2432

2533
@impl true
26-
def handle_init(_ctx, _opts), do: {[], %{store: %{}}}
34+
def handle_init(_ctx, opts) do
35+
Membrane.TelemetryMetrics.register(@retransmission_telemetry_event, opts.telemetry_label)
36+
{[], %{telemetry_label: opts.telemetry_label, store: %{}}}
37+
end
2738

2839
@impl true
2940
def handle_process(:input, buffer, _ctx, state) when byte_size(buffer.payload) > 0 do
@@ -56,10 +67,18 @@ defmodule Membrane.RTP.OutboundRtxController do
5667
end)
5768

5869
buffers_to_retransmit = Enum.reject(buffers, &is_nil/1)
70+
retransmissions_count = length(buffers_to_retransmit)
5971

60-
unless buffers_to_retransmit == [] do
72+
unless retransmissions_count == 0 do
6173
Membrane.Logger.debug(
62-
"Retransmitting #{length(buffers_to_retransmit)} buffer(s): #{inspect(Enum.map(buffers_to_retransmit, & &1.metadata.rtp.sequence_number))}"
74+
"Retransmitting #{retransmissions_count} buffer(s): #{inspect(Enum.map(buffers_to_retransmit, & &1.metadata.rtp.sequence_number))}"
75+
)
76+
77+
Membrane.TelemetryMetrics.execute(
78+
@retransmission_telemetry_event,
79+
%{amount: retransmissions_count},
80+
%{},
81+
state.telemetry_label
6382
)
6483
end
6584

0 commit comments

Comments
 (0)