Skip to content

Commit 60ec8cc

Browse files
Fix cascade failure in PhoenixSignaling.Registry (#40)
* Fix cascade failure in PhoenixSignaling.Registry Signaling processes created by get_or_create were linked to the Registry via Signaling.new() (which calls GenServer.start_link). When any signaling process exited with a non-normal reason (e.g. a viewer's WebRTC connection failing), the link took down the Registry, which sent EXIT signals to every other linked signaling process, killing all active streams. Changes: - Unlink signaling processes from the Registry and monitor them instead - Clean up dead entries from the signaling map via :DOWN handler - Use get/1 instead of get!/1 in Channel.handle_in to gracefully close the channel when a signaling process has already exited - Change supervisor strategy from one_for_all to one_for_one since the WhipRegistry and PhoenixSignaling.Registry are independent --------- Co-authored-by: George Alexander Day <georgealexanderday@proton.me>
1 parent f56fee1 commit 60ec8cc

4 files changed

Lines changed: 41 additions & 7 deletions

File tree

lib/membrane_webrtc/app.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ defmodule Membrane.WebRTC.App do
88
[{Registry, name: Membrane.WebRTC.WhipRegistry, keys: :unique}] ++
99
if Code.ensure_loaded?(Phoenix), do: [Membrane.WebRTC.PhoenixSignaling.Registry], else: []
1010

11-
Supervisor.start_link(children, strategy: :one_for_all, name: __MODULE__.Supervisor)
11+
Supervisor.start_link(children, strategy: :one_for_one, name: __MODULE__.Supervisor)
1212
end
1313
end

lib/membrane_webrtc/phoenix_signaling/channel.ex

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,21 @@ if Code.ensure_loaded?(Phoenix) do
33
@moduledoc false
44
use Phoenix.Channel
55
alias Membrane.WebRTC.PhoenixSignaling
6+
alias Membrane.WebRTC.Signaling
67

78
@impl true
89
def join(signaling_id, _payload, socket) do
9-
PhoenixSignaling.register_channel(signaling_id)
10-
socket = assign(socket, :signaling_id, signaling_id)
10+
signaling = PhoenixSignaling.Registry.get_or_create(signaling_id)
11+
Process.monitor(signaling.pid)
12+
Signaling.register_peer(signaling, message_format: :json_data, pid: self())
13+
socket = assign(socket, signaling_id: signaling_id, signaling: signaling)
1114
{:ok, socket}
1215
end
1316

1417
@impl true
15-
def handle_in(signaling_id, msg, socket) do
18+
def handle_in(_signaling_id, msg, socket) do
1619
msg = Jason.decode!(msg)
17-
PhoenixSignaling.signal(signaling_id, msg)
20+
Signaling.signal(socket.assigns.signaling, msg)
1821
{:noreply, socket}
1922
end
2023

@@ -23,5 +26,10 @@ if Code.ensure_loaded?(Phoenix) do
2326
push(socket, socket.assigns.signaling_id, msg)
2427
{:noreply, socket}
2528
end
29+
30+
@impl true
31+
def handle_info({:DOWN, _ref, :process, _pid, _reason}, socket) do
32+
{:stop, :normal, socket}
33+
end
2634
end
2735
end

lib/membrane_webrtc/phoenix_signaling/registry.ex

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,17 @@ if Code.ensure_loaded?(Phoenix) do
3838

3939
@impl true
4040
def init(_args) do
41-
{:ok, %{signaling_map: %{}}}
41+
{:ok, %{signaling_map: %{}, refs: %{}}}
4242
end
4343

4444
@impl true
4545
def handle_call({:get_or_create, signaling_id}, _from, state) do
4646
case Map.get(state.signaling_map, signaling_id) do
4747
nil ->
48-
signaling = Signaling.new()
48+
signaling = Signaling.start()
49+
ref = Process.monitor(signaling.pid)
4950
state = put_in(state, [:signaling_map, signaling_id], signaling)
51+
state = put_in(state, [:refs, ref], signaling_id)
5052
{:reply, signaling, state}
5153

5254
signaling ->
@@ -58,5 +60,17 @@ if Code.ensure_loaded?(Phoenix) do
5860
def handle_call({:get, signaling_id}, _from, state) do
5961
{:reply, Map.get(state.signaling_map, signaling_id), state}
6062
end
63+
64+
@impl true
65+
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
66+
case Map.pop(state.refs, ref) do
67+
{nil, _refs} ->
68+
{:noreply, state}
69+
70+
{signaling_id, refs} ->
71+
{:noreply,
72+
%{state | signaling_map: Map.delete(state.signaling_map, signaling_id), refs: refs}}
73+
end
74+
end
6175
end
6276
end

lib/membrane_webrtc/signaling.ex

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,18 @@ defmodule Membrane.WebRTC.Signaling do
8282
%__MODULE__{pid: pid}
8383
end
8484

85+
@doc """
86+
Spawns Signaling GenServer and wraps it in a struct, without linking to the caller.
87+
88+
Be aware that the GenServer is not started under any supervision tree, so it needs to be manually
89+
stopped or monitored.
90+
"""
91+
@spec start() :: t()
92+
def start() do
93+
{:ok, pid} = GenServer.start(__MODULE__, [])
94+
%__MODULE__{pid: pid}
95+
end
96+
8597
@doc """
8698
Starts and links a Signaling GenServer.
8799

0 commit comments

Comments
 (0)