Logger.configure(level: :info)
# In case of problems installing Nx/EXLA/Bumblebee,
# you can remove them and the Nx backend config below.
# Examples that don't mention them should still work.
# MIX_INSTALL_CONFIG_BEGIN
boombox = {:boombox, github: "membraneframework/boombox"}
# This livebook uses boombox from the master branch. If any examples happen to not work, the latest stable version of this livebook
# can be found on https://hexdocs.pm/boombox/stream_processing.html or in the latest github release.
# MIX_INSTALL_CONFIG_END
Mix.install([
boombox,
:kino,
:nx,
:exla,
:bumblebee,
:websockex,
:membrane_simple_rtsp_server,
{:coerce, ">= 1.0.2"}
])
Nx.global_default_backend(EXLA.Backend)
# HTTP server for assets
data_dir = "/tmp/boombox_examples_data"
input_dir = "#{data_dir}/input"
File.mkdir_p!(input_dir)
out_dir = "#{data_dir}/output"
File.mkdir_p!(out_dir)
# match in case a dependency already started :inets
case :inets.start() do
:ok -> :ok
{:error, {:already_started, :inets}} -> :ok
err -> raise "Unexpected value returned by :inets.start/0: #{inspect(err)}"
end
case :inets.start(:httpd,
bind_address: ~c"localhost",
port: 1234,
document_root: ~c"#{data_dir}",
server_name: ~c"assets_server",
server_root: ~c"/tmp",
erl_script_nocache: true
) do
{:ok, _server} -> :ok
# port already in use — server likely started from another livebook
{:error, _} -> :ok
end👋 Here are some stream processing examples of using Boombox, showing how to generate custom packet streams, read and manipulate individual frames, and integrate Boombox within larger Membrane pipelines.
The cell below downloads assets to be used in the examples. The setup cell started an HTTP server on port 1234 that will serve static HTML files for sending/receiving the stream in the browser.
samples_url = "https://raw.githubusercontent.com/membraneframework/static/gh-pages/samples"
for {filename, remote} <- [
{"bun.mp4", "big-buck-bunny/bun33s.mp4"},
{"ffmpeg-testsrc.mp4", "ffmpeg-testsrc-480x270.mp4"}
],
path = "#{input_dir}/#{filename}",
not File.exists?(path) do
%{status: 200, body: data} = Req.get!("#{samples_url}/#{remote}")
File.write!(path, data)
end
assets_url =
"https://raw.githubusercontent.com/membraneframework/boombox/master/examples/data"
for asset <- ["hls", "webrtc_to_browser"],
path = "#{data_dir}/#{asset}.html",
not File.exists?(path) do
%{status: 200, body: data} = Req.get!("#{assets_url}/#{asset}.html")
File.write!(path, data)
endTo receive the stream, visit http://localhost:1234/webrtc_to_browser.html
overlay =
Req.get!("https://avatars.githubusercontent.com/u/25247695?s=200&v=4").body
|> Vix.Vips.Image.new_from_buffer()
|> then(fn {:ok, img} -> img end)
|> Image.trim!()
|> Image.thumbnail!(100)
bg = Image.new!(640, 480, color: :light_gray)
max_x = Image.width(bg) - Image.width(overlay)
max_y = Image.height(bg) - Image.height(overlay)
Stream.iterate({_x = 300, _y = 0, _dx = 1, _dy = 2, _pts = 0}, fn {x, y, dx, dy, pts} ->
dx = if (x + dx) in 0..max_x, do: dx, else: -dx
dy = if (y + dy) in 0..max_y, do: dy, else: -dy
pts = pts + div(Membrane.Time.seconds(1), _fps = 60)
{x + dx, y + dy, dx, dy, pts}
end)
|> Stream.map(fn {x, y, _dx, _dy, pts} ->
img = Image.compose!(bg, overlay, x: x, y: y)
%Boombox.Packet{kind: :video, payload: img, pts: pts}
end)
|> Boombox.run(
input: {:stream, video: :image, audio: false},
output: {:webrtc, "ws://localhost:8830"}
)To receive the stream, visit http://localhost:1234/hls.html after running the cells below
The first cell uses :reader and :writer endpoints to communicate with boombox. In this
configuration the process calling Boombox.read/1 controls when packets are being provided.
input1 = "#{input_dir}/bun.mp4"
input2 = "#{input_dir}/ffmpeg-testsrc.mp4"
output = "#{out_dir}/index.m3u8"
reader1 = Boombox.run(input: input1, output: {:reader, video: :image, audio: false})
reader2 = Boombox.run(input: input2, output: {:reader, video: :image, audio: false})
writer = Boombox.run(input: {:writer, video: :image, audio: false}, output: output)
Stream.repeatedly(fn ->
case {Boombox.read(reader1), Boombox.read(reader2)} do
{{:ok, packet1}, {:ok, packet2}} ->
joined_image =
Vix.Vips.Operation.join!(packet1.payload, packet2.payload, :VIPS_DIRECTION_HORIZONTAL)
packet = %Boombox.Packet{
pts: max(packet1.pts, packet2.pts),
payload: joined_image,
kind: :video
}
Boombox.write(writer, packet)
_finished ->
:eos
end
end)
|> Enum.find(&(&1 == :eos))
Boombox.close(writer)
Boombox.close(reader1)
Boombox.close(reader2)The second cell uses :message endpoints, meaning that the server communicates with boomboxes by
exchanging messages. A consequence of this is that the inputting boomboxes will control the
pace of providing the packets to the server, what can be useful in some circumstances:
defmodule MyServer do
use GenServer
def start(args) do
GenServer.start(__MODULE__, args)
end
@impl true
def init(args) do
boombox1 = Boombox.run(input: args.input1, output: {:message, video: :image, audio: false})
boombox2 = Boombox.run(input: args.input2, output: {:message, video: :image, audio: false})
output_writer =
Boombox.run(input: {:writer, video: :image, audio: false}, output: args.output)
{:ok,
%{
boombox_states: %{
boombox1: %{last_packet: nil, eos: false},
boombox2: %{last_packet: nil, eos: false}
},
boomboxes: %{boombox1 => :boombox1, boombox2 => :boombox2},
output_writer: output_writer
}}
end
@impl true
def handle_info({:boombox_packet, bb, %Boombox.Packet{} = packet}, state) do
boombox_id = state.boomboxes[bb]
state = put_in(state.boombox_states[boombox_id].last_packet, packet)
if Enum.all?(Map.values(state.boombox_states), &(&1.last_packet != nil)) do
joined_image =
Vix.Vips.Operation.join!(
state.boombox_states.boombox1.last_packet.payload,
state.boombox_states.boombox2.last_packet.payload,
:VIPS_DIRECTION_HORIZONTAL
)
pts =
max(
state.boombox_states.boombox1.last_packet.pts,
state.boombox_states.boombox2.last_packet.pts
)
packet = %Boombox.Packet{packet | payload: joined_image, pts: pts}
Boombox.write(state.output_writer, packet)
end
{:noreply, state}
end
@impl true
def handle_info({:boombox_finished, bb}, state) do
boombox_id = state.boomboxes[bb]
state = put_in(state.boombox_states[boombox_id].eos, true)
if Enum.all?(Map.values(state.boombox_states), & &1.eos) do
Boombox.close(state.output_writer)
{:stop, :normal, state}
else
{:noreply, state}
end
end
end
input1 = "#{input_dir}/bun.mp4"
input2 = "#{input_dir}/ffmpeg-testsrc.mp4"
output = "#{out_dir}/index.m3u8"
{:ok, server} = MyServer.start(%{input1: input1, input2: input2, output: output})
monitor = Process.monitor(server)
receive do
{:DOWN, ^monitor, :process, ^server, reason} ->
:ok
endTo receive the stream, visit http://localhost:1234/webrtc_to_browser.html after running the cell below.
defmodule VideoDiscardingPipeline do
use Membrane.Pipeline
@impl true
def handle_init(_ctx, opts) do
spec = [
child(:input_boombox, %Boombox.Bin{
input: opts[:boombox_input]
})
|> via_out(:output, options: [kind: :audio])
|> via_in(:input, options: [kind: :audio])
|> child(:output_boombox, %Boombox.Bin{
output: opts[:boombox_output]
}),
get_child(:input_boombox)
|> via_out(:output, options: [kind: :video])
|> child(Membrane.Fake.Sink)
]
{[spec: spec], %{}}
end
end
{:ok, supervisor, _pipeline} =
Membrane.Pipeline.start_link(VideoDiscardingPipeline,
boombox_input: "#{input_dir}/bun.mp4",
boombox_output: {:webrtc, "ws://localhost:8830"}
)
monitor_ref = Process.monitor(supervisor)
receive do
{:DOWN, ^monitor_ref, :process, _pid, _reason} -> :ok
end