Skip to content

Commit ae4bad2

Browse files
authored
Fix segment collection end timestamp calculation (#102)
* Fix segment collection end timestamp calculation * Fix base segment timestamp generation * Remove unused functions * Adjust to CR * Add waiting for all input pads before assembling first segment
1 parent 8b55570 commit ae4bad2

File tree

12 files changed

+104
-36
lines changed

12 files changed

+104
-36
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ The package can be installed by adding `membrane_mp4_plugin` to your list of dep
1212
```elixir
1313
defp deps do
1414
[
15-
{:membrane_mp4_plugin, "~> 0.32.0"}
15+
{:membrane_mp4_plugin, "~> 0.33.0"}
1616
]
1717
end
1818
```

lib/membrane_mp4/muxer/cmaf.ex

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,9 @@ defmodule Membrane.MP4.Muxer.CMAF do
187187
pads_registration_order: [],
188188
sample_queues: %{},
189189
finish_current_segment?: false,
190-
video_pad: nil
190+
video_pad: nil,
191+
all_input_pads_ready?: false,
192+
buffers_awaiting_init: []
191193
})
192194
|> set_chunk_duration_range()
193195

@@ -309,23 +311,35 @@ defmodule Membrane.MP4.Muxer.CMAF do
309311
if are_all_group_pads_ready?(pad, ctx, state) do
310312
stream_format = generate_output_stream_format(output_pad, state)
311313

314+
old_input_pads_ready? = state.all_input_pads_ready?
315+
316+
state = update_input_pads_ready(pad, ctx, state)
317+
318+
{actions, state} =
319+
if old_input_pads_ready? != state.all_input_pads_ready? do
320+
replay_init_buffers(ctx, state)
321+
else
322+
{[], state}
323+
end
324+
312325
cond do
313326
is_nil(ctx.pads[output_pad].stream_format) ->
314-
{[stream_format: {output_pad, stream_format}], state}
327+
{[{:stream_format, {output_pad, stream_format}} | actions], state}
315328

316329
stream_format != ctx.pads[output_pad].stream_format ->
317-
{[], SegmentHelper.put_awaiting_stream_format(pad, stream_format, state)}
330+
{actions, SegmentHelper.put_awaiting_stream_format(pad, stream_format, state)}
318331

319332
true ->
320-
{[], state}
333+
{actions, state}
321334
end
322335
else
323336
{[], state}
324337
end
325338
end
326339

327340
@impl true
328-
def handle_buffer(Pad.ref(:input, _id) = pad, sample, ctx, state) do
341+
def handle_buffer(Pad.ref(:input, _id) = pad, sample, ctx, state)
342+
when state.all_input_pads_ready? do
329343
use Numbers, overload_operators: true, comparison: true
330344

331345
# In case DTS is not set, use PTS. This is the case for audio tracks or H264 originated
@@ -335,7 +349,7 @@ defmodule Membrane.MP4.Muxer.CMAF do
335349

336350
{sample, state} =
337351
state
338-
|> maybe_init_segment_base_timestamp(pad, sample)
352+
|> maybe_init_segment_timestamps(pad, sample)
339353
|> process_buffer_awaiting_duration(pad, sample)
340354

341355
state = SegmentHelper.update_awaiting_stream_format(state, pad)
@@ -359,6 +373,11 @@ defmodule Membrane.MP4.Muxer.CMAF do
359373
end
360374
end
361375

376+
@impl true
377+
def handle_buffer(pad, sample, _ctx, state) do
378+
{[], %{state | buffers_awaiting_init: [{pad, sample} | state.buffers_awaiting_init]}}
379+
end
380+
362381
@impl true
363382
def handle_event(_pad, %__MODULE__.RequestMediaFinalization{}, _ctx, state) do
364383
{[], %{state | finish_current_segment?: true}}
@@ -438,9 +457,12 @@ defmodule Membrane.MP4.Muxer.CMAF do
438457
track_data = %{
439458
id: track_id,
440459
track: nil,
441-
# base timestamp of the current segment, initialized with DTS of the first buffer
460+
# decoding timestamp of the current segment, initialized with DTS of the first buffer
461+
# and then incremented by duration of every produced segment
462+
segment_decoding_timestamp: nil,
463+
# presentation timestamp of the current segment, initialized with PTS of the first buffer
442464
# and then incremented by duration of every produced segment
443-
segment_base_timestamp: nil,
465+
segment_presentation_timestamp: nil,
444466
end_timestamp: 0,
445467
buffer_awaiting_duration: nil,
446468
chunks_duration: Membrane.Time.seconds(0)
@@ -587,7 +609,7 @@ defmodule Membrane.MP4.Muxer.CMAF do
587609
sequence_number: state.seq_nums[output_pad],
588610
timescale: timescale,
589611
base_timestamp:
590-
track_data.segment_base_timestamp
612+
track_data.segment_presentation_timestamp
591613
|> Helper.timescalify(timescale)
592614
|> Ratio.trunc(),
593615
unscaled_duration: duration,
@@ -603,7 +625,12 @@ defmodule Membrane.MP4.Muxer.CMAF do
603625
state =
604626
tracks_data
605627
|> Enum.reduce(state, fn %{unscaled_duration: duration, pad: pad}, state ->
606-
update_in(state, [:pad_to_track_data, pad, :segment_base_timestamp], &(&1 + duration))
628+
state
629+
|> update_in([:pad_to_track_data, pad, :segment_decoding_timestamp], &(&1 + duration))
630+
|> update_in(
631+
[:pad_to_track_data, pad, :segment_presentation_timestamp],
632+
&(&1 + duration)
633+
)
607634
end)
608635
|> update_in([:seq_nums, output_pad], &(&1 + 1))
609636

@@ -712,16 +739,40 @@ defmodule Membrane.MP4.Muxer.CMAF do
712739
end
713740
end
714741

715-
defp maybe_init_segment_base_timestamp(state, pad, sample) do
742+
defp maybe_init_segment_timestamps(state, pad, sample) do
716743
case state do
717-
%{pad_to_track_data: %{^pad => %{segment_base_timestamp: nil}}} ->
718-
put_in(state, [:pad_to_track_data, pad, :segment_base_timestamp], sample.dts)
744+
%{pad_to_track_data: %{^pad => %{segment_decoding_timestamp: nil}}} ->
745+
update_in(state, [:pad_to_track_data, pad], fn data ->
746+
Map.merge(data, %{
747+
segment_decoding_timestamp: sample.dts,
748+
segment_presentation_timestamp: sample.pts
749+
})
750+
end)
719751

720752
_else ->
721753
state
722754
end
723755
end
724756

757+
defp update_input_pads_ready(pad, ctx, state) do
758+
all_input_pads_ready? =
759+
Enum.all?(ctx.pads, fn
760+
{^pad, _data} -> true
761+
{Pad.ref(:output, _id), _data} -> true
762+
{Pad.ref(:input, _id), data} -> data.stream_format != nil
763+
end)
764+
765+
%{state | all_input_pads_ready?: all_input_pads_ready?}
766+
end
767+
768+
defp replay_init_buffers(ctx, state) do
769+
{buffers, state} = Map.pop!(state, :buffers_awaiting_init)
770+
771+
Enum.flat_map_reduce(buffers, state, fn {pad, buffer}, state ->
772+
handle_buffer(pad, buffer, ctx, state)
773+
end)
774+
end
775+
725776
@min_chunk_duration Membrane.Time.milliseconds(50)
726777
defp set_chunk_duration_range(
727778
%{

lib/membrane_mp4/muxer/cmaf/segment_helper.ex

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do
120120
end
121121

122122
defp push_video_segment(state, queue, pad, sample) do
123-
base_timestamp = max_segment_base_timestamp(state)
123+
base_timestamp = max_segment_decoding_timestamp(state)
124124

125125
queue =
126126
if state.finish_current_segment? do
@@ -144,7 +144,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do
144144
end
145145

146146
defp push_audio_segment(state, queue, pad, sample) do
147-
base_timestamp = max_segment_base_timestamp(state)
147+
base_timestamp = max_segment_decoding_timestamp(state)
148148

149149
{video_pad, video_queue} =
150150
Enum.find(state.sample_queues, {nil, nil}, fn {_pad, queue} ->
@@ -190,7 +190,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do
190190
total_collected_durations =
191191
Map.fetch!(state.pad_to_track_data, pad).chunks_duration + collected_duration
192192

193-
base_timestamp = state.pad_to_track_data[pad].segment_base_timestamp
193+
base_timestamp = state.pad_to_track_data[pad].segment_decoding_timestamp
194194

195195
queue =
196196
cond do
@@ -262,7 +262,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do
262262
if video_queue do
263263
SamplesQueue.force_push(queue, sample)
264264
else
265-
base_timestamp = max_segment_base_timestamp(state)
265+
base_timestamp = max_segment_decoding_timestamp(state)
266266

267267
SamplesQueue.plain_push_until_target(queue, sample, base_timestamp)
268268
end
@@ -288,7 +288,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do
288288
defp update_queue_for(pad, queue, state), do: put_in(state, [:sample_queues, pad], queue)
289289

290290
defp collect_samples_for_video_track(pad, queue, state) do
291-
end_timestamp = SamplesQueue.last_collected_dts(queue)
291+
end_timestamp = SamplesQueue.collectable_end_timestamp(queue)
292292
state = update_queue_for(pad, queue, state)
293293

294294
if tracks_ready_for_collection?(state, end_timestamp) do
@@ -311,7 +311,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do
311311
end
312312

313313
defp collect_samples_for_audio_track(pad, queue, state) do
314-
end_timestamp = SamplesQueue.last_collected_dts(queue)
314+
end_timestamp = SamplesQueue.collectable_end_timestamp(queue)
315315
state = update_queue_for(pad, queue, state)
316316

317317
if tracks_ready_for_collection?(state, end_timestamp) do
@@ -338,7 +338,7 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do
338338

339339
defp tracks_ready_for_collection?(state, end_timestamp) do
340340
Enum.all?(state.sample_queues, fn {_pad, queue} ->
341-
SamplesQueue.last_collected_dts(queue) >= end_timestamp
341+
SamplesQueue.collectable_end_timestamp(queue) >= end_timestamp
342342
end)
343343
end
344344

@@ -393,11 +393,11 @@ defmodule Membrane.MP4.Muxer.CMAF.SegmentHelper do
393393
maybe_return_segment(segment, reset_chunks_duration(state))
394394
end
395395

396-
defp max_segment_base_timestamp(state) do
396+
defp max_segment_decoding_timestamp(state) do
397397
state.pad_to_track_data
398-
|> Enum.reject(fn {_key, track_data} -> is_nil(track_data.segment_base_timestamp) end)
398+
|> Enum.reject(fn {_key, track_data} -> is_nil(track_data.segment_decoding_timestamp) end)
399399
|> Enum.map(fn {_key, track_data} ->
400-
Ratio.to_float(track_data.segment_base_timestamp)
400+
Ratio.to_float(track_data.segment_decoding_timestamp)
401401
end)
402402
|> Enum.max()
403403
end

lib/membrane_mp4/muxer/cmaf/track_samples_queue.ex

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -280,24 +280,31 @@ defmodule Membrane.MP4.Muxer.CMAF.TrackSamplesQueue do
280280
end
281281

282282
@doc """
283-
Returns dts of the latest sample that is eligible for collection.
283+
Returns the end timestamp for latest sample that is eligible for collection.
284284
285285
In case of collectable state it is the last sample that has been put to queue, otherwise
286286
it is the last sample that will be in return from 'collect/1'.
287287
"""
288-
@spec last_collected_dts(t()) :: integer()
289-
def last_collected_dts(%__MODULE__{
288+
@spec collectable_end_timestamp(t()) :: integer()
289+
def collectable_end_timestamp(%__MODULE__{
290290
collectable?: false,
291291
target_samples: target_samples,
292292
excess_samples: excess_samples
293-
}),
294-
do: latest_collected_dts(excess_samples) || latest_collected_dts(target_samples) || -1
293+
}) do
294+
sample = List.first(excess_samples) || List.first(target_samples)
295+
296+
if sample do
297+
sample.dts + sample.metadata.duration
298+
else
299+
-1
300+
end
301+
end
295302

296-
def last_collected_dts(%__MODULE__{collectable?: true, target_samples: target_samples}),
297-
do: latest_collected_dts(List.last(target_samples, []) |> List.wrap()) || -1
303+
def collectable_end_timestamp(%__MODULE__{collectable?: true, target_samples: target_samples}) do
304+
sample = List.last(target_samples)
298305

299-
defp latest_collected_dts([]), do: nil
300-
defp latest_collected_dts([sample | _rest]), do: Ratio.to_float(sample.dts)
306+
sample.dts + sample.metadata.duration
307+
end
301308

302309
@doc """
303310
Returns the most recenlty pushed sample.

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
defmodule Membrane.MP4.Plugin.MixProject do
22
use Mix.Project
33

4-
@version "0.32.0"
4+
@version "0.33.0"
55
@github_url "https://github.com/membraneframework/membrane_mp4_plugin"
66

77
def project do
445 Bytes
Binary file not shown.
-445 Bytes
Binary file not shown.
0 Bytes
Binary file not shown.
0 Bytes
Binary file not shown.
0 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)