Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions lib/realtime/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ defmodule Realtime.Api do
maybe_update_cache(tenant, changeset)
maybe_trigger_disconnect(changeset)
maybe_restart_db_connection(changeset)
maybe_restart_rate_counters(changeset)
Logger.debug("Tenant updated: #{inspect(tenant, pretty: true)}")

{:error, error} ->
Expand Down Expand Up @@ -254,6 +255,38 @@ defmodule Realtime.Api do
|> Map.put(:events_per_second_now, current)
end

@field_to_rate_counter_key %{
max_events_per_second: [
&Tenants.events_per_second_key/1,
&Tenants.db_events_per_second_key/1
],
max_joins_per_second: [
&Tenants.joins_per_second_key/1
],
max_presence_events_per_second: [
&Tenants.presence_events_per_second_key/1
],
extensions: [
&Tenants.connect_errors_per_second_key/1,
&Tenants.subscription_errors_per_second_key/1,
&Tenants.authorization_errors_per_second_key/1
]
}

defp maybe_restart_rate_counters(changeset) do
tenant_id = Changeset.fetch_field!(changeset, :external_id)

Enum.each(@field_to_rate_counter_key, fn {field, key_fns} ->
if Changeset.changed?(changeset, field) do
Enum.each(key_fns, fn key_fn ->
tenant_id
|> key_fn.()
|> RateCounter.publish_update()
end)
end
end)
end

defp maybe_update_cache(tenant, %Changeset{changes: changes, valid?: true}) when changes != %{} do
Tenants.Cache.global_cache_update(tenant)
end
Expand Down
61 changes: 32 additions & 29 deletions lib/realtime/rate_counter/rate_counter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,6 @@ defmodule Realtime.RateCounter do
)
end

@spec stop(term()) :: :ok
def stop(tenant_id) do
keys =
Registry.select(Realtime.Registry.Unique, [
{{{:"$1", :_, {:_, :_, :"$2"}}, :"$3", :_}, [{:==, :"$1", __MODULE__}, {:==, :"$2", tenant_id}], [:"$_"]}
])

Enum.each(keys, fn {{_, _, key}, {pid, _}} ->
if Process.alive?(pid), do: GenServer.stop(pid)
GenCounter.delete(key)
Cachex.del!(@cache, key)
end)

:ok
end

@doc """
Starts a new RateCounter under a DynamicSupervisor
"""
Expand All @@ -108,6 +92,10 @@ defmodule Realtime.RateCounter do
})
end

@doc "Publish an update to the RateCounter with the given id"
@spec publish_update(term()) :: :ok
def publish_update(id), do: Phoenix.PubSub.broadcast(Realtime.PubSub, update_topic(id), :update)

@doc """
Gets the state of the RateCounter.
Expand Down Expand Up @@ -136,6 +124,8 @@ defmodule Realtime.RateCounter do
end
end

defp update_topic(id), do: "rate_counter:#{inspect(id)}"

@impl true
def init(args) do
id = Keyword.fetch!(args, :id)
Expand All @@ -151,6 +141,8 @@ defmodule Realtime.RateCounter do
# a RateCounter running to calculate avg and buckets
GenCounter.reset(id)

:ok = Phoenix.PubSub.subscribe(Realtime.PubSub, update_topic(id))

telemetry =
if telem_opts do
Logger.metadata(telem_opts.metadata)
Expand Down Expand Up @@ -228,30 +220,41 @@ defmodule Realtime.RateCounter do
{:noreply, state}
end

@impl true
def handle_info(:idle_shutdown, state) do
if Enum.all?(state.bucket, &(&1 == 0)) do
# All the buckets are empty, so we can assume this RateCounter has not been useful recently
Logger.warning("#{__MODULE__} idle_shutdown reached for: #{inspect(state.id)}")
GenCounter.delete(state.id)
# We are expiring in the near future instead of deleting so that
# The process dies before the cache information disappears
# If we were using Cachex.delete instead then the following rare scenario would be possible:
# * RateCounter.get/2 is called;
# * Cache was deleted but the process has not stopped yet;
# * RateCounter.get/2 will then try to start a new RateCounter but the supervisor will return :already_started;
# * Process finally stops;
# * The cache is still empty because no new process was started causing an error

Cachex.expire(@cache, state.id, :timer.seconds(1))
{:stop, :normal, state}
shutdown(state)
else
Process.cancel_timer(state.idle_shutdown_ref)
idle_shutdown_ref = shutdown_after(state.idle_shutdown)
{:noreply, %{state | idle_shutdown_ref: idle_shutdown_ref}}
end
end

def handle_info(:update, state) do
# When we get an update message we shutdown so that this RateCounter
# can be restarted with new parameters
shutdown(state)
end

def handle_info(_, state), do: {:noreply, state}

defp shutdown(state) do
GenCounter.delete(state.id)
# We are expiring in the near future instead of deleting so that
# The process dies before the cache information disappears
# If we were using Cachex.delete instead then the following rare scenario would be possible:
# * RateCounter.get/2 is called;
# * Cache was deleted but the process has not stopped yet;
# * RateCounter.get/2 will then try to start a new RateCounter but the supervisor will return :already_started;
# * Process finally stops;
# * The cache is still empty because no new process was started causing an error

Cachex.expire(@cache, state.id, :timer.seconds(1))
{:stop, :normal, state}
end

defp maybe_trigger_limit(%{limit: %{log: false}} = state), do: state

defp maybe_trigger_limit(%{limit: %{triggered: true, measurement: measurement}} = state) do
Expand Down
12 changes: 9 additions & 3 deletions lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,11 @@ defmodule Realtime.Tenants do
]
]

%RateCounter.Args{id: {:channel, :authorization_errors, external_id}, opts: opts}
%RateCounter.Args{id: authorization_errors_per_second_key(external_id), opts: opts}
end

def authorization_errors_per_second_key(tenant_id), do: {:channel, :authorization_errors, tenant_id}

@spec subscription_errors_per_second_rate(String.t(), non_neg_integer) :: RateCounter.Args.t()
def subscription_errors_per_second_rate(tenant_id, pool_size) do
opts = [
Expand All @@ -356,9 +358,11 @@ defmodule Realtime.Tenants do
]
]

%RateCounter.Args{id: {:channel, :subscription_errors, tenant_id}, opts: opts}
%RateCounter.Args{id: subscription_errors_per_second_key(tenant_id), opts: opts}
end

def subscription_errors_per_second_key(tenant_id), do: {:channel, :subscription_errors, tenant_id}

@connect_errors_per_second_default 10
@doc "RateCounter arguments for counting connect per second."
@spec connect_errors_per_second_rate(Tenant.t() | String.t()) :: RateCounter.Args.t()
Expand All @@ -382,9 +386,11 @@ defmodule Realtime.Tenants do
]
]

%RateCounter.Args{id: {:database, :connect, tenant_id}, opts: opts}
%RateCounter.Args{id: connect_errors_per_second_key(tenant_id), opts: opts}
end

def connect_errors_per_second_key(tenant_id), do: {:database, :connect, tenant_id}

defp authorization_pool_size(%{extensions: [%{settings: settings} | _]}) do
Database.pool_size_by_application_name("realtime_connect", settings)
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.69.2",
version: "2.69.3",
elixir: "~> 1.18",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
72 changes: 72 additions & 0 deletions test/realtime/api_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,78 @@ defmodule Realtime.ApiTest do
reject(&Realtime.Tenants.Cache.global_cache_update/1)
assert {:ok, %Tenant{}} = Api.update_tenant_by_external_id(tenant.external_id, %{name: tenant.name})
end

test "change to max_events_per_second publishes update to respective rate counters", %{tenants: [tenant | _]} do
expect(RateCounter, :publish_update, fn key ->
assert key == Realtime.Tenants.events_per_second_key(tenant.external_id)
end)

expect(RateCounter, :publish_update, fn key ->
assert key == Realtime.Tenants.db_events_per_second_key(tenant.external_id)
end)

reject(&RateCounter.publish_update/1)

assert {:ok, %Tenant{}} = Api.update_tenant_by_external_id(tenant.external_id, %{max_events_per_second: 123})
end

test "change to max_joins_per_second publishes update to rate counters", %{tenants: [tenant | _]} do
expect(RateCounter, :publish_update, fn key ->
assert key == Realtime.Tenants.joins_per_second_key(tenant.external_id)
end)

reject(&RateCounter.publish_update/1)

assert {:ok, %Tenant{}} = Api.update_tenant_by_external_id(tenant.external_id, %{max_joins_per_second: 123})
end

test "change to max_presence_events_per_second publishes update to rate counters", %{tenants: [tenant | _]} do
expect(RateCounter, :publish_update, fn key ->
assert key == Realtime.Tenants.presence_events_per_second_key(tenant.external_id)
end)

reject(&RateCounter.publish_update/1)

assert {:ok, %Tenant{}} =
Api.update_tenant_by_external_id(tenant.external_id, %{max_presence_events_per_second: 123})
end

test "change to extensions publishes update to rate counters", %{tenants: [tenant | _]} do
extensions = [
%{
"type" => "postgres_cdc_rls",
"settings" => %{
"db_host" => "127.0.0.1",
"db_name" => "postgres",
"db_user" => "supabase_admin",
"db_password" => "postgres",
"db_port" => "1234",
"poll_interval" => 100,
"poll_max_changes" => 100,
"poll_max_record_bytes" => 1_048_576,
"region" => "us-east-1",
"publication" => "supabase_realtime_test",
"ssl_enforced" => false
}
}
]

expect(RateCounter, :publish_update, fn key ->
assert key == Realtime.Tenants.connect_errors_per_second_key(tenant.external_id)
end)

expect(RateCounter, :publish_update, fn key ->
assert key == Realtime.Tenants.subscription_errors_per_second_key(tenant.external_id)
end)

expect(RateCounter, :publish_update, fn key ->
assert key == Realtime.Tenants.authorization_errors_per_second_key(tenant.external_id)
end)

reject(&RateCounter.publish_update/1)

assert {:ok, %Tenant{}} = Api.update_tenant_by_external_id(tenant.external_id, %{extensions: extensions})
end
end

describe "delete_tenant_by_external_id/1" do
Expand Down
12 changes: 12 additions & 0 deletions test/realtime/rate_counter/rate_counter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,18 @@ defmodule Realtime.RateCounterTest do
end
end

describe "publish_update/1" do
test "cause shutdown with update message from update topic" do
args = %Args{id: {:domain, :metric, Ecto.UUID.generate()}}
{:ok, pid} = RateCounter.new(args)

Process.monitor(pid)
RateCounter.publish_update(args.id)

assert_receive {:DOWN, _ref, :process, ^pid, :normal}
end
end

describe "get/1" do
test "gets the state of a rate counter" do
args = %Args{id: {:domain, :metric, Ecto.UUID.generate()}}
Expand Down