Skip to content

Commit b464738

Browse files
authored
Rewrite ISOM muxer on TimestampQueue (#111)
* Rewrite ISOM Muxer on auto demands with TimestampQueue * WIP remove dependency to Track * Revert "WIP remove dependency to Track" This reverts commit ef837ec. * Remove leftovers * Delete manual ISOM * Format * Fix deps * Refactor due to CR
1 parent 7c58255 commit b464738

File tree

7 files changed

+66
-22
lines changed

7 files changed

+66
-22
lines changed

lib/membrane_mp4/muxer/isom.ex

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,15 @@ defmodule Membrane.MP4.Muxer.ISOM do
44
"""
55
use Membrane.Filter
66

7-
alias Membrane.{Buffer, File, MP4, RemoteStream, Time}
7+
alias Membrane.{Buffer, File, MP4, RemoteStream, Time, TimestampQueue}
88
alias Membrane.MP4.{Container, FileTypeBox, MediaDataBox, MovieBox, Track}
99

1010
@ftyp FileTypeBox.assemble("isom", ["isom", "iso2", "avc1", "mp41"])
1111
@ftyp_size @ftyp |> Container.serialize!() |> byte_size()
1212
@mdat_header_size 8
1313

1414
def_input_pad :input,
15-
flow_control: :manual,
16-
demand_unit: :buffers,
15+
flow_control: :auto,
1716
accepted_format:
1817
any_of(
1918
%Membrane.AAC{config: {:esds, _esds}},
@@ -31,7 +30,7 @@ defmodule Membrane.MP4.Muxer.ISOM do
3130

3231
def_output_pad :output,
3332
accepted_format: %RemoteStream{type: :bytestream, content_format: MP4},
34-
flow_control: :manual
33+
flow_control: :auto
3534

3635
def_options fast_start: [
3736
spec: boolean(),
@@ -61,27 +60,36 @@ defmodule Membrane.MP4.Muxer.ISOM do
6160

6261
@impl true
6362
def handle_init(_ctx, options) do
63+
queue =
64+
TimestampQueue.new(
65+
chunk_duration: options.chunk_duration,
66+
pause_demand_boundary: 3 * options.chunk_duration,
67+
pause_demand_boundary_unit: :time
68+
)
69+
6470
state =
6571
options
6672
|> Map.from_struct()
6773
|> Map.merge(%{
6874
mdat_size: 0,
6975
next_track_id: 1,
7076
pad_order: [],
71-
pad_to_track: %{}
77+
pad_to_track: %{},
78+
queue: queue
7279
})
7380

7481
{[], state}
7582
end
7683

7784
@impl true
78-
def handle_pad_added(Pad.ref(:input, pad_ref), _ctx, state) do
85+
def handle_pad_added(Pad.ref(:input, pad_ref) = pad, _ctx, state) do
7986
{track_id, state} = Map.get_and_update!(state, :next_track_id, &{&1, &1 + 1})
8087

8188
state =
8289
state
8390
|> Map.update!(:pad_order, &[pad_ref | &1])
8491
|> put_in([:pad_to_track, pad_ref], track_id)
92+
|> Map.update!(:queue, &TimestampQueue.register_pad(&1, pad))
8593

8694
{[], state}
8795
end
@@ -126,34 +134,59 @@ defmodule Membrane.MP4.Muxer.ISOM do
126134
end
127135

128136
@impl true
129-
def handle_demand(:output, _size, :buffers, _ctx, state) do
130-
next_ref = hd(state.pad_order)
137+
def handle_event(Pad.ref(:input, _pad_ref) = pad, event, _ctx, state) do
138+
state.queue
139+
|> TimestampQueue.push_event(pad, event)
140+
|> TimestampQueue.pop_available_items()
141+
|> handle_queue_output(state)
142+
end
143+
144+
@impl true
145+
def handle_event(:output, event, _ctx, state) do
146+
{[forward: event], state}
147+
end
131148

132-
{[demand: {Pad.ref(:input, next_ref), 1}], state}
149+
@impl true
150+
def handle_buffer(Pad.ref(:input, _pad_ref) = pad, buffer, _ctx, state) do
151+
state.queue
152+
|> TimestampQueue.push_buffer_and_pop_available_items(pad, buffer)
153+
|> handle_queue_output(state)
133154
end
134155

135156
@impl true
136-
def handle_buffer(Pad.ref(:input, pad_ref), buffer, _ctx, state) do
157+
def handle_end_of_stream(Pad.ref(:input, _pad_ref) = pad, _ctx, state) do
158+
state.queue
159+
|> TimestampQueue.push_end_of_stream(pad)
160+
|> TimestampQueue.pop_available_items()
161+
|> handle_queue_output(state)
162+
end
163+
164+
defp handle_queue_output({suggested_actions, batch, queue}, state) do
165+
{actions, state} = Enum.flat_map_reduce(batch, state, &handle_queue_item/2)
166+
{suggested_actions ++ actions, %{state | queue: queue}}
167+
end
168+
169+
defp handle_queue_item({_pad_ref, {:event, event}}, state) do
170+
{[event: {:output, event}], state}
171+
end
172+
173+
defp handle_queue_item({Pad.ref(:input, pad_ref), {:buffer, buffer}}, state) do
137174
# In case DTS is not set, use PTS. This is the case for audio tracks or H264 originated
138175
# from an RTP stream. ISO base media file format specification uses DTS for calculating
139176
# decoding deltas, and so is the implementation of sample table in this plugin.
140177
buffer = %Buffer{buffer | dts: Buffer.get_dts_or_pts(buffer)}
141178

142-
{maybe_buffer, state} =
143-
state
144-
|> update_in([:pad_to_track, pad_ref], &Track.store_sample(&1, buffer))
145-
|> maybe_flush_chunk(pad_ref)
146-
147-
{maybe_buffer ++ [redemand: :output], state}
179+
state
180+
|> update_in([:pad_to_track, pad_ref], &Track.store_sample(&1, buffer))
181+
|> maybe_flush_chunk(pad_ref)
148182
end
149183

150-
@impl true
151-
def handle_end_of_stream(Pad.ref(:input, pad_ref), _ctx, state) do
184+
defp handle_queue_item({Pad.ref(:input, pad_ref), :end_of_stream}, state) do
152185
{buffer, state} = do_flush_chunk(state, pad_ref)
153186
state = Map.update!(state, :pad_order, &List.delete(&1, pad_ref))
154187

155188
if state.pad_order != [] do
156-
{buffer ++ [redemand: :output], state}
189+
{buffer, state}
157190
else
158191
actions = finalize_mp4(state)
159192
{buffer ++ actions ++ [end_of_stream: :output], state}

mix.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ defmodule Membrane.MP4.Plugin.MixProject do
4545
{:membrane_h265_format, "~> 0.2.0"},
4646
{:membrane_opus_format, "~> 0.3.0"},
4747
{:membrane_file_plugin, "~> 0.17.0"},
48+
{:membrane_timestamp_queue, "~> 0.1.0"},
4849
{:bunch, "~> 1.5"},
4950
{:membrane_h26x_plugin, "~> 0.10.0", only: :test},
5051
{:membrane_aac_plugin, "~> 0.18.0", only: :test},

mix.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"},
1515
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
1616
"finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"},
17+
"heap": {:hex, :heap, "2.0.2", "d98cb178286cfeb5edbcf17785e2d20af73ca57b5a2cf4af584118afbcf917eb", [:mix], [], "hexpm", "ba9ea2fe99eb4bcbd9a8a28eaf71cbcac449ca1d8e71731596aace9028c9d429"},
1718
"hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"},
1819
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
1920
"logger_backends": {:hex, :logger_backends, "1.0.0", "09c4fad6202e08cb0fbd37f328282f16539aca380f512523ce9472b28edc6bdf", [:mix], [], "hexpm", "1faceb3e7ec3ef66a8f5746c5afd020e63996df6fd4eb8cdb789e5665ae6c9ce"},
@@ -36,6 +37,7 @@
3637
"membrane_precompiled_dependency_provider": {:hex, :membrane_precompiled_dependency_provider, "0.1.1", "a0d5b7942f8be452c30744207f78284f6a0e0c84c968aba7d76e206fbf75bc5d", [:mix], [{:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "87ad44752e2cf0fa3b31c5aac15b863343c2f6e0f0fd201f5ec4c0bcda8c6fa3"},
3738
"membrane_raw_audio_format": {:hex, :membrane_raw_audio_format, "0.12.0", "b574cd90f69ce2a8b6201b0ccf0826ca28b0fbc8245b8078d9f11cef65f7d5d5", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "6e6c98e3622a2b9df19eab50ba65d7eb45949b1ba306fa8423df6cdb12fd0b44"},
3839
"membrane_stream_plugin": {:hex, :membrane_stream_plugin, "0.4.0", "0c4ab72a4e13bf0faa0f1166fbaf68d2e34167dbec345aedb74ce1eb7497bdda", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "5a9a9c17783e18ad740e6ddfed364581bdb7ebdab8e61ba2c19a1830356f7eb8"},
40+
"membrane_timestamp_queue": {:hex, :membrane_timestamp_queue, "0.1.0", "98cfc057b845d690da9d57dd5f80e32a3aae2c43b52fac9aade07146113f3a89", [:mix], [{:heap, "~> 2.0", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "7aa2912839a85e93b5c92cd52a1e53b719e39f3231dc3e49b7cadf182e7345b8"},
3941
"mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"},
4042
"mint": {:hex, :mint, "1.5.2", "4805e059f96028948870d23d7783613b7e6b0e2fb4e98d720383852a760067fd", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "d77d9e9ce4eb35941907f1d3df38d8f750c357865353e21d335bdcdf6d892a02"},
4143
"nimble_options": {:hex, :nimble_options, "1.1.0", "3b31a57ede9cb1502071fade751ab0c7b8dbe75a9a4c2b5bbb0943a690b63172", [:mix], [], "hexpm", "8bbbb3941af3ca9acc7835f5655ea062111c9c27bcac53e004460dfd19008a99"},
0 Bytes
Binary file not shown.
0 Bytes
Binary file not shown.

test/membrane_mp4/muxer/cmaf/integration_test.exs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,8 @@ defmodule Membrane.MP4.Muxer.CMAF.IntegrationTest do
345345
assert_in_delta buffer.metadata.duration, Membrane.Time.seconds(2), @video_sample_duration
346346

347347
assert_end_of_stream(pipeline, :sink)
348+
349+
Testing.Pipeline.terminate(pipeline)
348350
end
349351

350352
@tag segment_min_duration: Membrane.Time.seconds(6)
@@ -362,6 +364,7 @@ defmodule Membrane.MP4.Muxer.CMAF.IntegrationTest do
362364
assert_in_delta buffer.metadata.duration, ctx.segment_min_duration, @video_sample_duration
363365

364366
assert_end_of_stream(pipeline, :sink)
367+
Testing.Pipeline.terminate(pipeline)
365368
end
366369

367370
@tag segment_min_duration: Membrane.Time.seconds(6)
@@ -405,6 +408,7 @@ defmodule Membrane.MP4.Muxer.CMAF.IntegrationTest do
405408
assert buffer.metadata.last_chunk?
406409

407410
assert_end_of_stream(pipeline, :sink)
411+
Testing.Pipeline.terminate(pipeline)
408412
end
409413
end
410414

test/membrane_mp4/muxer/isom/integration_test.exs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,10 @@ defmodule Membrane.MP4.Muxer.ISOM.IntegrationTest do
196196
out_encapsulation: :none,
197197
output_config: :esds
198198
}),
199-
child(:muxer, %Membrane.MP4.Muxer.ISOM{chunk_duration: Time.seconds(1), fast_start: true})
199+
child(:muxer, %Membrane.MP4.Muxer.ISOM{
200+
chunk_duration: Time.seconds(1),
201+
fast_start: true
202+
})
200203
|> child(:sink, %Membrane.File.Sink{location: out_path_for("two_tracks_fast_start")}),
201204
get_child(:video_parser) |> get_child(:muxer),
202205
get_child(:audio_parser) |> get_child(:muxer)
@@ -228,8 +231,9 @@ defmodule Membrane.MP4.Muxer.ISOM.IntegrationTest do
228231

229232
assert_receive {:DOWN, ^monitor_ref, :process, ^pid,
230233
{:membrane_child_crash, :muxer,
231-
{%RuntimeError{message: "ISOM Muxer doesn't support variable parameters"},
232-
_stacktrace}}},
234+
{%RuntimeError{
235+
message: "ISOM Muxer doesn't support variable parameters"
236+
}, _stacktrace}}},
233237
1_000
234238
end
235239
end

0 commit comments

Comments
 (0)