Skip to content

Commit 5866816

Browse files
authored
Add support for SRTP in the muxer and demuxer (#177)
* SRTP working
1 parent c2c4e34 commit 5866816

File tree

4 files changed

+163
-27
lines changed

4 files changed

+163
-27
lines changed

lib/membrane/rtp/demuxer.ex

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,14 @@ defmodule Membrane.RTP.Demuxer do
105105
This option determines the action to be taken after a stream has been announced with a
106106
`t:new_rtp_stream_notification/0` notification but the corresponding pad has not been connected within the specified timeout period.
107107
"""
108+
],
109+
srtp: [
110+
spec: false | [ExLibSRTP.Policy.t()],
111+
default: false,
112+
description: """
113+
Specifies whether to use SRTP to decrypt the input streams. Requires adding [srtp](https://github.com/membraneframework/elixir_libsrtp)
114+
dependency to work. If true takes a list of SRTP policies to use for decrypting packets. See `t:ExLibSRTP.Policy.t/0` for details.
115+
"""
108116
]
109117

110118
defmodule State do
@@ -138,17 +146,38 @@ defmodule Membrane.RTP.Demuxer do
138146
@type t :: %__MODULE__{
139147
payload_type_mapping: RTP.PayloadFormat.payload_type_mapping(),
140148
not_linked_pad_handling: %{action: :raise | :ignore, timeout: Membrane.Time.t()},
149+
srtp: ExLibSRTP.t() | nil,
141150
stream_states: %{RTP.ssrc() => StreamState.t()},
142151
pads_waiting_for_stream: %{Pad.ref() => Membrane.RTP.Demuxer.stream_id()}
143152
}
144153

145-
@enforce_keys [:not_linked_pad_handling, :payload_type_mapping]
154+
@enforce_keys [:not_linked_pad_handling, :payload_type_mapping, :srtp]
146155
defstruct @enforce_keys ++ [stream_states: %{}, pads_waiting_for_stream: %{}]
147156
end
148157

149158
@impl true
150159
def handle_init(_ctx, opts) do
151-
{[], struct(State, Map.from_struct(opts))}
160+
srtp =
161+
case opts.srtp do
162+
false ->
163+
nil
164+
165+
policies ->
166+
if not Code.ensure_loaded?(ExLibSRTP) do
167+
raise "Optional dependency :ex_libsrtp is required for SRTP"
168+
end
169+
170+
srtp = apply(ExLibSRTP, :new, [])
171+
Enum.each(policies, &apply(ExLibSRTP, :add_stream, [srtp, &1]))
172+
srtp
173+
end
174+
175+
opts =
176+
opts
177+
|> Map.from_struct()
178+
|> Map.put(:srtp, srtp)
179+
180+
{[], struct(State, opts)}
152181
end
153182

154183
@impl true
@@ -290,7 +319,35 @@ defmodule Membrane.RTP.Demuxer do
290319

291320
@spec handle_rtp_packet(binary(), CallbackContext.t(), State.t()) ::
292321
{[Membrane.Element.Action.t()], State.t()}
293-
defp handle_rtp_packet(raw_rtp_packet, ctx, state) do
322+
defp handle_rtp_packet(packet, ctx, state) do
323+
case unprotect_packet(packet, state.srtp) do
324+
nil -> {[], state}
325+
raw_packet -> handle_raw_rtp_packet(raw_packet, ctx, state)
326+
end
327+
end
328+
329+
@spec unprotect_packet(binary(), ExLibSRTP.t() | nil) :: binary() | nil
330+
defp unprotect_packet(rtp_packet, nil) do
331+
rtp_packet
332+
end
333+
334+
defp unprotect_packet(protected_rtp_packet, srtp) do
335+
case apply(ExLibSRTP, :unprotect, [srtp, protected_rtp_packet]) do
336+
{:ok, raw_rtp_packet} ->
337+
raw_rtp_packet
338+
339+
{:error, reason} when reason in [:replay_fail, :replay_old] ->
340+
Membrane.Logger.warning("Ignoring packet due to `#{reason}`")
341+
nil
342+
343+
{:error, reason} ->
344+
raise "Failed to unprotect packet due to `#{reason}`"
345+
end
346+
end
347+
348+
@spec handle_raw_rtp_packet(binary(), CallbackContext.t(), State.t()) ::
349+
{[Membrane.Element.Action.t()], State.t()}
350+
defp handle_raw_rtp_packet(raw_rtp_packet, ctx, state) do
294351
{:ok, packet} = ExRTP.Packet.decode(raw_rtp_packet)
295352

296353
{new_stream_actions, state} =

lib/membrane/rtp/muxer.ex

Lines changed: 70 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,15 @@ defmodule Membrane.RTP.Muxer do
6363

6464
def_output_pad :output, accepted_format: %RemoteStream{type: :packetized, content_format: RTP}
6565

66+
def_options srtp: [
67+
spec: false | [ExLibSRTP.Policy.t()],
68+
default: false,
69+
description: """
70+
Specifies whether to use SRTP to encrypt the output stream. Requires adding [srtp](https://github.com/membraneframework/elixir_libsrtp)
71+
dependency to work. If true takes a list of SRTP policies to use for encrypting packets. See `t:ExLibSRTP.Policy.t/0` for details.
72+
"""
73+
]
74+
6675
defmodule State do
6776
@moduledoc false
6877
defmodule StreamState do
@@ -84,16 +93,32 @@ defmodule Membrane.RTP.Muxer do
8493
end
8594

8695
@type t :: %__MODULE__{
87-
stream_states: %{Pad.ref() => StreamState.t()}
96+
stream_states: %{Pad.ref() => StreamState.t()},
97+
srtp: ExLibSRTP.t() | nil
8898
}
8999

90-
@enforce_keys []
100+
@enforce_keys [:srtp]
91101
defstruct @enforce_keys ++ [stream_states: %{}]
92102
end
93103

94104
@impl true
95-
def handle_init(_ctx, _opts) do
96-
{[], %State{}}
105+
def handle_init(_ctx, opts) do
106+
srtp =
107+
case opts.srtp do
108+
false ->
109+
nil
110+
111+
policies ->
112+
if not Code.ensure_loaded?(ExLibSRTP) do
113+
raise "Optional dependency :ex_libsrtp is required for SRTP"
114+
end
115+
116+
srtp = apply(ExLibSRTP, :new, [])
117+
Enum.each(policies, &apply(ExLibSRTP, :add_stream, [srtp, &1]))
118+
srtp
119+
end
120+
121+
{[], %State{srtp: srtp}}
97122
end
98123

99124
@impl true
@@ -105,7 +130,7 @@ defmodule Membrane.RTP.Muxer do
105130
def handle_stream_format(pad, stream_format, ctx, state) do
106131
pad_options = ctx.pads[pad].options
107132

108-
ssrc = get_stream_ssrc(pad_options, state)
133+
ssrc = get_stream_ssrc(pad_options.ssrc, state)
109134

110135
encoding_name =
111136
@payload_format_to_encoding_name[stream_format.payload_format] ||
@@ -141,7 +166,7 @@ defmodule Membrane.RTP.Muxer do
141166

142167
@impl true
143168
def handle_buffer(Pad.ref(:input, _ref) = pad_ref, buffer, _ctx, state) do
144-
{rtp_metadata, metadata} = Map.pop(buffer.metadata, :rtp, %{})
169+
rtp_metadata = Map.get(buffer.metadata, :rtp, %{})
145170
stream_state = state.stream_states[pad_ref]
146171

147172
rtp_offset =
@@ -165,15 +190,25 @@ defmodule Membrane.RTP.Muxer do
165190
marker: Map.get(rtp_metadata, :marker, false)
166191
)
167192

168-
raw_packet = ExRTP.Packet.encode(packet)
193+
buffer_action =
194+
packet
195+
|> ExRTP.Packet.encode()
196+
|> protect_packet(state.srtp)
197+
|> case do
198+
nil ->
199+
[]
200+
201+
raw_packet ->
202+
buffer = %Membrane.Buffer{
203+
buffer
204+
| payload: raw_packet,
205+
metadata: Map.put(buffer.metadata, :rtp, %{packet | payload: <<>>})
206+
}
169207

170-
buffer = %Membrane.Buffer{
171-
buffer
172-
| payload: raw_packet,
173-
metadata: Map.put(metadata, :rtp, %{packet | payload: <<>>})
174-
}
208+
[buffer: {:output, buffer}]
209+
end
175210

176-
{[buffer: {:output, buffer}], state}
211+
{buffer_action, state}
177212
end
178213

179214
@impl true
@@ -187,10 +222,30 @@ defmodule Membrane.RTP.Muxer do
187222
end
188223
end
189224

190-
defp get_stream_ssrc(pad_options, state) do
225+
@spec protect_packet(binary(), ExLibSRTP.t() | nil) :: binary() | nil
226+
defp protect_packet(rtp_packet, nil) do
227+
rtp_packet
228+
end
229+
230+
defp protect_packet(rtp_packet, srtp) do
231+
case apply(ExLibSRTP, :protect, [srtp, rtp_packet]) do
232+
{:ok, protected_rtp_packet} ->
233+
protected_rtp_packet
234+
235+
{:error, reason} when reason in [:replay_fail, :replay_old] ->
236+
Membrane.Logger.warning("Ignoring packet due to `#{reason}`")
237+
nil
238+
239+
{:error, reason} ->
240+
raise "Failed to unprotect packet due to `#{reason}`"
241+
end
242+
end
243+
244+
@spec get_stream_ssrc(RTP.ssrc() | :random, State.t()) :: RTP.ssrc()
245+
defp get_stream_ssrc(ssrc, state) do
191246
assigned_ssrcs = Enum.map(state.stream_states, fn {_pad_ref, %{ssrc: ssrc}} -> ssrc end)
192247

193-
case pad_options.ssrc do
248+
case ssrc do
194249
:random ->
195250
Stream.repeatedly(fn -> Enum.random(0..@max_ssrc) end)
196251
|> Enum.find(&(&1 not in assigned_ssrcs))

test/membrane/rtp/muxer_demuxer_integration_test.exs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ defmodule Membrane.RTP.MuxerDemuxerTest do
1919
})
2020
|> child(:realtimer, Membrane.Realtimer)
2121
|> child(:rtp_h264_payloader, Membrane.RTP.H264.Payloader)
22-
|> child(:rtp_muxer, Membrane.RTP.Muxer)
23-
|> child(:rtp_demuxer, Membrane.RTP.Demuxer)
22+
|> child(:rtp_muxer, %Membrane.RTP.Muxer{srtp: opts.srtp})
23+
|> child(:rtp_demuxer, %Membrane.RTP.Demuxer{srtp: opts.srtp})
2424
|> via_out(:output,
2525
options: [stream_id: {:encoding_name, :H264}]
2626
)
@@ -32,14 +32,26 @@ defmodule Membrane.RTP.MuxerDemuxerTest do
3232
end
3333
end
3434

35-
@tag :tmp_dir
36-
test "Muxed and demuxed stream is the same as unchanged one", %{tmp_dir: tmp_dir} do
35+
describe "Muxed and demuxed stream is the same as unchanged one" do
36+
@tag :tmp_dir
37+
test "when not using SRTP encryption", %{tmp_dir: tmp_dir} do
38+
perform_test(false, tmp_dir)
39+
end
40+
41+
@tag :tmp_dir
42+
test "when using SRTP encryption", %{tmp_dir: tmp_dir} do
43+
policy = %ExLibSRTP.Policy{ssrc: :any_inbound, key: String.duplicate("a", 30)}
44+
perform_test([policy], tmp_dir)
45+
end
46+
end
47+
48+
defp perform_test(srtp, tmp_dir) do
3749
output_path = Path.join(tmp_dir, "output.h264")
3850

3951
pipeline =
4052
Testing.Pipeline.start_supervised!(
4153
module: MuxerDemuxerPipeline,
42-
custom_args: %{input_path: @input_path, output_path: output_path}
54+
custom_args: %{input_path: @input_path, output_path: output_path, srtp: srtp}
4355
)
4456

4557
assert_start_of_stream(pipeline, :sink)

test/membrane/rtp/muxer_test.exs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ defmodule Membrane.RTP.MuxerTest do
1313
use Membrane.Pipeline
1414

1515
@impl true
16-
def handle_init(_ctx, _opts) do
16+
def handle_init(_ctx, opts) do
1717
spec = [
1818
child(:hackney_source, %Membrane.Hackney.Source{
1919
location:
@@ -27,7 +27,7 @@ defmodule Membrane.RTP.MuxerTest do
2727
output_alignment: :nalu
2828
})
2929
|> child(:h264_payloader, Membrane.RTP.H264.Payloader)
30-
|> child(:rtp_muxer, Membrane.RTP.Muxer)
30+
|> child(:rtp_muxer, %Membrane.RTP.Muxer{srtp: opts.srtp})
3131
|> child(:sink, Membrane.Testing.Sink),
3232
get_child(:mp4_demuxer)
3333
|> via_out(:output, options: [kind: :audio])
@@ -40,8 +40,20 @@ defmodule Membrane.RTP.MuxerTest do
4040
end
4141
end
4242

43-
test "Muxer muxes correct amount of packets" do
44-
pipeline = Testing.Pipeline.start_supervised!(module: Pipeline)
43+
describe "Muxer muxes correct amount of packets" do
44+
test "when encrypting the stream with SRTP" do
45+
policy = %ExLibSRTP.Policy{ssrc: :any_inbound, key: String.duplicate("b", 30)}
46+
perform_test([policy])
47+
end
48+
49+
test "when not encrypting the stream with SRTP" do
50+
perform_test(false)
51+
end
52+
end
53+
54+
defp perform_test(srtp) do
55+
pipeline =
56+
Testing.Pipeline.start_supervised!(module: Pipeline, custom_args: %{srtp: srtp})
4557

4658
%{audio: %{payload_type: audio_payload_type}, video: %{payload_type: video_payload_type}} =
4759
@rtp_output

0 commit comments

Comments
 (0)