Skip to content

Commit 8c45d92

Browse files
committed
fix: restart RateCounters when their limits update
1 parent bb1c2b6 commit 8c45d92

File tree

6 files changed

+159
-33
lines changed

6 files changed

+159
-33
lines changed

lib/realtime/api.ex

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ defmodule Realtime.Api do
155155
maybe_update_cache(tenant, changeset)
156156
maybe_trigger_disconnect(changeset)
157157
maybe_restart_db_connection(changeset)
158+
maybe_restart_rate_counters(changeset)
158159
Logger.debug("Tenant updated: #{inspect(tenant, pretty: true)}")
159160

160161
{:error, error} ->
@@ -254,6 +255,38 @@ defmodule Realtime.Api do
254255
|> Map.put(:events_per_second_now, current)
255256
end
256257

258+
@field_to_rate_counter_key %{
259+
max_events_per_second: [
260+
&Tenants.events_per_second_key/1,
261+
&Tenants.db_events_per_second_key/1
262+
],
263+
max_joins_per_second: [
264+
&Tenants.joins_per_second_key/1
265+
],
266+
max_presence_events_per_second: [
267+
&Tenants.presence_events_per_second_key/1
268+
],
269+
extensions: [
270+
&Tenants.connect_errors_per_second_key/1,
271+
&Tenants.subscription_errors_per_second_key/1,
272+
&Tenants.authorization_errors_per_second_key/1
273+
]
274+
}
275+
276+
defp maybe_restart_rate_counters(changeset) do
277+
tenant_id = Changeset.fetch_field!(changeset, :external_id)
278+
279+
Enum.each(@field_to_rate_counter_key, fn {field, key_fns} ->
280+
if Changeset.changed?(changeset, field) do
281+
Enum.each(key_fns, fn key_fn ->
282+
tenant_id
283+
|> key_fn.()
284+
|> RateCounter.publish_update()
285+
end)
286+
end
287+
end)
288+
end
289+
257290
defp maybe_update_cache(tenant, %Changeset{changes: changes, valid?: true}) when changes != %{} do
258291
Tenants.Cache.global_cache_update(tenant)
259292
end

lib/realtime/rate_counter/rate_counter.ex

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -77,22 +77,6 @@ defmodule Realtime.RateCounter do
7777
)
7878
end
7979

80-
@spec stop(term()) :: :ok
81-
def stop(tenant_id) do
82-
keys =
83-
Registry.select(Realtime.Registry.Unique, [
84-
{{{:"$1", :_, {:_, :_, :"$2"}}, :"$3", :_}, [{:==, :"$1", __MODULE__}, {:==, :"$2", tenant_id}], [:"$_"]}
85-
])
86-
87-
Enum.each(keys, fn {{_, _, key}, {pid, _}} ->
88-
if Process.alive?(pid), do: GenServer.stop(pid)
89-
GenCounter.delete(key)
90-
Cachex.del!(@cache, key)
91-
end)
92-
93-
:ok
94-
end
95-
9680
@doc """
9781
Starts a new RateCounter under a DynamicSupervisor
9882
"""
@@ -108,6 +92,10 @@ defmodule Realtime.RateCounter do
10892
})
10993
end
11094

95+
@doc "Publish an update to the RateCounter with the given id"
96+
@spec publish_update(term()) :: :ok
97+
def publish_update(id), do: Phoenix.PubSub.broadcast(Realtime.PubSub, update_topic(id), :update)
98+
11199
@doc """
112100
Gets the state of the RateCounter.
113101
@@ -136,6 +124,8 @@ defmodule Realtime.RateCounter do
136124
end
137125
end
138126

127+
defp update_topic(id), do: "rate_counter:#{inspect(id)}"
128+
139129
@impl true
140130
def init(args) do
141131
id = Keyword.fetch!(args, :id)
@@ -151,6 +141,8 @@ defmodule Realtime.RateCounter do
151141
# a RateCounter running to calculate avg and buckets
152142
GenCounter.reset(id)
153143

144+
:ok = Phoenix.PubSub.subscribe(Realtime.PubSub, update_topic(id))
145+
154146
telemetry =
155147
if telem_opts do
156148
Logger.metadata(telem_opts.metadata)
@@ -228,30 +220,41 @@ defmodule Realtime.RateCounter do
228220
{:noreply, state}
229221
end
230222

231-
@impl true
232223
def handle_info(:idle_shutdown, state) do
233224
if Enum.all?(state.bucket, &(&1 == 0)) do
234225
# All the buckets are empty, so we can assume this RateCounter has not been useful recently
235226
Logger.warning("#{__MODULE__} idle_shutdown reached for: #{inspect(state.id)}")
236-
GenCounter.delete(state.id)
237-
# We are expiring in the near future instead of deleting so that
238-
# The process dies before the cache information disappears
239-
# If we were using Cachex.delete instead then the following rare scenario would be possible:
240-
# * RateCounter.get/2 is called;
241-
# * Cache was deleted but the process has not stopped yet;
242-
# * RateCounter.get/2 will then try to start a new RateCounter but the supervisor will return :already_started;
243-
# * Process finally stops;
244-
# * The cache is still empty because no new process was started causing an error
245-
246-
Cachex.expire(@cache, state.id, :timer.seconds(1))
247-
{:stop, :normal, state}
227+
shutdown(state)
248228
else
249229
Process.cancel_timer(state.idle_shutdown_ref)
250230
idle_shutdown_ref = shutdown_after(state.idle_shutdown)
251231
{:noreply, %{state | idle_shutdown_ref: idle_shutdown_ref}}
252232
end
253233
end
254234

235+
def handle_info(:update, state) do
236+
# When we get an update message we shutdown so that this RateCounter
237+
# can be restarted with new parameters
238+
shutdown(state)
239+
end
240+
241+
def handle_info(_, state), do: {:noreply, state}
242+
243+
defp shutdown(state) do
244+
GenCounter.delete(state.id)
245+
# We are expiring in the near future instead of deleting so that
246+
# The process dies before the cache information disappears
247+
# If we were using Cachex.delete instead then the following rare scenario would be possible:
248+
# * RateCounter.get/2 is called;
249+
# * Cache was deleted but the process has not stopped yet;
250+
# * RateCounter.get/2 will then try to start a new RateCounter but the supervisor will return :already_started;
251+
# * Process finally stops;
252+
# * The cache is still empty because no new process was started causing an error
253+
254+
Cachex.expire(@cache, state.id, :timer.seconds(1))
255+
{:stop, :normal, state}
256+
end
257+
255258
defp maybe_trigger_limit(%{limit: %{log: false}} = state), do: state
256259

257260
defp maybe_trigger_limit(%{limit: %{triggered: true, measurement: measurement}} = state) do

lib/realtime/tenants.ex

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -337,9 +337,11 @@ defmodule Realtime.Tenants do
337337
]
338338
]
339339

340-
%RateCounter.Args{id: {:channel, :authorization_errors, external_id}, opts: opts}
340+
%RateCounter.Args{id: authorization_errors_per_second_key(external_id), opts: opts}
341341
end
342342

343+
def authorization_errors_per_second_key(tenant_id), do: {:channel, :authorization_errors, tenant_id}
344+
343345
@spec subscription_errors_per_second_rate(String.t(), non_neg_integer) :: RateCounter.Args.t()
344346
def subscription_errors_per_second_rate(tenant_id, pool_size) do
345347
opts = [
@@ -356,9 +358,11 @@ defmodule Realtime.Tenants do
356358
]
357359
]
358360

359-
%RateCounter.Args{id: {:channel, :subscription_errors, tenant_id}, opts: opts}
361+
%RateCounter.Args{id: subscription_errors_per_second_key(tenant_id), opts: opts}
360362
end
361363

364+
def subscription_errors_per_second_key(tenant_id), do: {:channel, :subscription_errors, tenant_id}
365+
362366
@connect_errors_per_second_default 10
363367
@doc "RateCounter arguments for counting connect per second."
364368
@spec connect_errors_per_second_rate(Tenant.t() | String.t()) :: RateCounter.Args.t()
@@ -382,9 +386,11 @@ defmodule Realtime.Tenants do
382386
]
383387
]
384388

385-
%RateCounter.Args{id: {:database, :connect, tenant_id}, opts: opts}
389+
%RateCounter.Args{id: connect_errors_per_second_key(tenant_id), opts: opts}
386390
end
387391

392+
def connect_errors_per_second_key(tenant_id), do: {:database, :connect, tenant_id}
393+
388394
defp authorization_pool_size(%{extensions: [%{settings: settings} | _]}) do
389395
Database.pool_size_by_application_name("realtime_connect", settings)
390396
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.69.2",
7+
version: "2.69.3",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/realtime/api_test.exs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,78 @@ defmodule Realtime.ApiTest do
261261
reject(&Realtime.Tenants.Cache.global_cache_update/1)
262262
assert {:ok, %Tenant{}} = Api.update_tenant_by_external_id(tenant.external_id, %{name: tenant.name})
263263
end
264+
265+
test "change to max_events_per_second publishes update to respective rate counters", %{tenants: [tenant | _]} do
266+
expect(RateCounter, :publish_update, fn key ->
267+
assert key == Realtime.Tenants.events_per_second_key(tenant.external_id)
268+
end)
269+
270+
expect(RateCounter, :publish_update, fn key ->
271+
assert key == Realtime.Tenants.db_events_per_second_key(tenant.external_id)
272+
end)
273+
274+
reject(&RateCounter.publish_update/1)
275+
276+
assert {:ok, %Tenant{}} = Api.update_tenant_by_external_id(tenant.external_id, %{max_events_per_second: 123})
277+
end
278+
279+
test "change to max_joins_per_second publishes update to rate counters", %{tenants: [tenant | _]} do
280+
expect(RateCounter, :publish_update, fn key ->
281+
assert key == Realtime.Tenants.joins_per_second_key(tenant.external_id)
282+
end)
283+
284+
reject(&RateCounter.publish_update/1)
285+
286+
assert {:ok, %Tenant{}} = Api.update_tenant_by_external_id(tenant.external_id, %{max_joins_per_second: 123})
287+
end
288+
289+
test "change to max_presence_events_per_second publishes update to rate counters", %{tenants: [tenant | _]} do
290+
expect(RateCounter, :publish_update, fn key ->
291+
assert key == Realtime.Tenants.presence_events_per_second_key(tenant.external_id)
292+
end)
293+
294+
reject(&RateCounter.publish_update/1)
295+
296+
assert {:ok, %Tenant{}} =
297+
Api.update_tenant_by_external_id(tenant.external_id, %{max_presence_events_per_second: 123})
298+
end
299+
300+
test "change to extensions publishes update to rate counters", %{tenants: [tenant | _]} do
301+
extensions = [
302+
%{
303+
"type" => "postgres_cdc_rls",
304+
"settings" => %{
305+
"db_host" => "127.0.0.1",
306+
"db_name" => "postgres",
307+
"db_user" => "supabase_admin",
308+
"db_password" => "postgres",
309+
"db_port" => "1234",
310+
"poll_interval" => 100,
311+
"poll_max_changes" => 100,
312+
"poll_max_record_bytes" => 1_048_576,
313+
"region" => "us-east-1",
314+
"publication" => "supabase_realtime_test",
315+
"ssl_enforced" => false
316+
}
317+
}
318+
]
319+
320+
expect(RateCounter, :publish_update, fn key ->
321+
assert key == Realtime.Tenants.connect_errors_per_second_key(tenant.external_id)
322+
end)
323+
324+
expect(RateCounter, :publish_update, fn key ->
325+
assert key == Realtime.Tenants.subscription_errors_per_second_key(tenant.external_id)
326+
end)
327+
328+
expect(RateCounter, :publish_update, fn key ->
329+
assert key == Realtime.Tenants.authorization_errors_per_second_key(tenant.external_id)
330+
end)
331+
332+
reject(&RateCounter.publish_update/1)
333+
334+
assert {:ok, %Tenant{}} = Api.update_tenant_by_external_id(tenant.external_id, %{extensions: extensions})
335+
end
264336
end
265337

266338
describe "delete_tenant_by_external_id/1" do

test/realtime/rate_counter/rate_counter_test.exs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,18 @@ defmodule Realtime.RateCounterTest do
301301
end
302302
end
303303

304+
describe "publish_update/1" do
305+
test "cause shutdown with update message from update topic" do
306+
args = %Args{id: {:domain, :metric, Ecto.UUID.generate()}}
307+
{:ok, pid} = RateCounter.new(args)
308+
309+
Process.monitor(pid)
310+
RateCounter.publish_update(args.id)
311+
312+
assert_receive {:DOWN, _ref, :process, ^pid, :normal}
313+
end
314+
end
315+
304316
describe "get/1" do
305317
test "gets the state of a rate counter" do
306318
args = %Args{id: {:domain, :metric, Ecto.UUID.generate()}}

0 commit comments

Comments
 (0)