Skip to content

fix: connect backoff mechanism #1354

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
May 5, 2025
Merged
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ This is the list of operational codes that can help you understand your deployme
| ClientJoinRateLimitReached | The rate of joins per second from your clients as reached the channel limits |
| RealtimeDisabledForTenant | Realtime has been disabled for the tenant |
| UnableToConnectToTenantDatabase | Realtime was not able to connect to the tenant's database |
| TooManyConnectAttempts | Realtime restricted the amount of attempts when connecting to the tenants database |
| RealtimeNodeDisconnected | Realtime is a distributed application and this means that one the system is unable to communicate with one of the distributed nodes |
| MigrationsFailedToRun | Error when running the migrations against the Tenant database that are required by Realtime |
| StartListenAndReplicationFailed | Error when starting the replication and listening of errors for database broadcasting |
Expand Down
3 changes: 3 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ migration_partition_slots =
connect_partition_slots =
System.get_env("CONNECT_PARTITION_SLOTS", "#{System.schedulers_online() * 2}") |> String.to_integer()

connect_throttle_limit = System.get_env("CONNECT_THROTTLE_LIMIT", "10") |> String.to_integer()

if !(db_version in [nil, "ipv6", "ipv4"]),
do: raise("Invalid IP version, please set either ipv6 or ipv4")

Expand All @@ -39,6 +41,7 @@ socket_options =
config :realtime,
migration_partition_slots: migration_partition_slots,
connect_partition_slots: connect_partition_slots,
connect_throttle_limit: connect_throttle_limit,
tenant_max_bytes_per_second: System.get_env("TENANT_MAX_BYTES_PER_SECOND", "100000") |> String.to_integer(),
tenant_max_channels_per_client: System.get_env("TENANT_MAX_CHANNELS_PER_CLIENT", "100") |> String.to_integer(),
tenant_max_concurrent_users: System.get_env("TENANT_MAX_CONCURRENT_USERS", "200") |> String.to_integer(),
Expand Down
21 changes: 20 additions & 1 deletion lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ defmodule Realtime.Tenants do
requests_per_second_key(tenant),
channels_per_client_key(tenant),
joins_per_second_key(tenant),
events_per_second_key(tenant)
events_per_second_key(tenant),
connection_attempts_per_second_key(tenant),
presence_events_per_second_key(tenant)
]
end

Expand Down Expand Up @@ -210,6 +212,23 @@ defmodule Realtime.Tenants do
{:channel, :presence_events, tenant.external_id}
end

@doc """
The GenCounter key to use when counting connection attempts against Realtime.Tenants.Connect
## Examples
iex> Realtime.Tenants.connection_attempts_per_second_key("tenant_id")
{:tenant, :connection_attempts, "tenant_id"}
iex> Realtime.Tenants.connection_attempts_per_second_key(%Realtime.Api.Tenant{external_id: "tenant_id"})
{:tenant, :connection_attempts, "tenant_id"}
"""
@spec connection_attempts_per_second_key(Tenant.t() | String.t()) :: {:tenant, :connection_attempts, String.t()}
def connection_attempts_per_second_key(tenant) when is_binary(tenant) do
{:tenant, :connection_attempts, tenant}
end

def connection_attempts_per_second_key(%Tenant{} = tenant) do
{:tenant, :connection_attempts, tenant.external_id}
end

@spec get_tenant_limits(Realtime.Api.Tenant.t(), maybe_improper_list) :: list
def get_tenant_limits(%Tenant{} = tenant, keys) when is_list(keys) do
nodes = [Node.self() | Node.list()]
Expand Down
11 changes: 10 additions & 1 deletion lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ defmodule Realtime.Tenants.Connect do
alias Realtime.Api.Tenant
alias Realtime.Rpc
alias Realtime.Tenants
alias Realtime.Tenants.ReplicationConnection
alias Realtime.Tenants.Connect.Backoff
alias Realtime.Tenants.Connect.CheckConnection
alias Realtime.Tenants.Connect.GetTenant
alias Realtime.Tenants.Connect.Piper
alias Realtime.Tenants.Connect.RegisterProcess
alias Realtime.Tenants.Connect.StartCounters
alias Realtime.Tenants.Listen
alias Realtime.Tenants.Migrations
alias Realtime.Tenants.ReplicationConnection
alias Realtime.UsersCounter

@rpc_timeout_default 30_000
Expand Down Expand Up @@ -114,6 +115,10 @@ defmodule Realtime.Tenants.Connect do
{:error, {:shutdown, :tenant_not_found}} ->
{:error, :tenant_not_found}

{:error, {:shutdown, :tenant_create_backoff}} ->
log_warning("TooManyConnectAttempts", "Too many connect attempts to tenant database")
{:error, :tenant_create_backoff}

{:error, :shutdown} ->
log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database")
{:error, :tenant_database_unavailable}
Expand Down Expand Up @@ -176,6 +181,7 @@ defmodule Realtime.Tenants.Connect do

pipes = [
GetTenant,
Backoff,
CheckConnection,
StartCounters,
RegisterProcess
Expand All @@ -191,6 +197,9 @@ defmodule Realtime.Tenants.Connect do
{:error, :tenant_db_too_many_connections} ->
{:stop, {:shutdown, :tenant_db_too_many_connections}}

{:error, :tenant_create_backoff} ->
{:stop, {:shutdown, :tenant_create_backoff}}

{:error, error} ->
log_error("UnableToConnectToTenantDatabase", error)
{:stop, :shutdown}
Expand Down
37 changes: 37 additions & 0 deletions lib/realtime/tenants/connect/backoff.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
defmodule Realtime.Tenants.Connect.Backoff do
@moduledoc """
Applies backoff on process initialization.
"""
alias Realtime.RateCounter
alias Realtime.GenCounter
alias Realtime.Tenants
@behaviour Realtime.Tenants.Connect.Piper

@impl Realtime.Tenants.Connect.Piper
def run(acc) do
%{tenant_id: tenant_id} = acc
connect_throttle_limit = Application.fetch_env!(:realtime, :connect_throttle_limit)

with {:ok, counter} <- start_connects_per_second_counter(tenant_id),
{:ok, %{avg: avg}} when avg < connect_throttle_limit <- RateCounter.get(counter) do
GenCounter.add(counter)
{:ok, acc}
else
_ -> {:error, :tenant_create_backoff}
end
end

defp start_connects_per_second_counter(tenant_id) do
id = Tenants.connection_attempts_per_second_key(tenant_id)
GenCounter.new(id)

res =
RateCounter.new(id, idle_shutdown: :infinity)

case res do
{:ok, _} -> {:ok, id}
{:error, {:already_started, _}} -> {:ok, id}
{:error, reason} -> {:error, reason}
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.34.53",
version: "2.34.54",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
42 changes: 42 additions & 0 deletions test/realtime/tenants/connect/backoff_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
defmodule Realtime.Tenants.Connect.BackoffTest do
use Realtime.DataCase, async: true
alias Realtime.Tenants.Connect.Backoff

setup do
tenant_id = random_string()
acc = %{tenant_id: tenant_id}
{:ok, acc: acc}
end

test "does not apply backoff for a given tenant if never called", %{acc: acc} do
assert {:ok, acc} == Backoff.run(acc)
end

test "applies backoff if the user as called more than once during the configured space", %{acc: acc} do

Check failure on line 15 in test/realtime/tenants/connect/backoff_test.exs

View workflow job for this annotation

GitHub Actions / Tests

test applies backoff if the user as called more than once during the configured space (Realtime.Tenants.Connect.BackoffTest)
# emulate calls
for _ <- 1..10 do
Backoff.run(acc)
end

Process.sleep(1000)
assert {:error, :tenant_create_backoff} = Backoff.run(acc)
end

test "resets backoff after the configured space", %{acc: acc} do
# emulate calls
for _ <- 1..10 do
Backoff.run(acc)
end

Process.sleep(1000)

# emulate block
assert {:error, :tenant_create_backoff} = Backoff.run(acc)

# wait for the timer to expire
Process.sleep(2000)

# check that the backoff has been reset
assert {:ok, acc} == Backoff.run(acc)
end
end
29 changes: 29 additions & 0 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,35 @@
end
end

describe "connect/1" do
test "respects backoff pipe", %{tenant: tenant} do

Check failure on line 377 in test/realtime/tenants/connect_test.exs

View workflow job for this annotation

GitHub Actions / Tests

test connect/1 respects backoff pipe (Realtime.Tenants.ConnectTest)
log =
capture_log(fn ->
for _ <- 1..100 do
Connect.connect(tenant.external_id)
Connect.shutdown(tenant.external_id)
Process.sleep(5)
end

assert {:error, :tenant_create_backoff} = Connect.connect(tenant.external_id)
end)

assert log =~ "Too many connect attempts to tenant database"
end

test "after timer, is able to connect", %{tenant: tenant} do

Check failure on line 392 in test/realtime/tenants/connect_test.exs

View workflow job for this annotation

GitHub Actions / Tests

test connect/1 after timer, is able to connect (Realtime.Tenants.ConnectTest)
for _ <- 1..100 do
Connect.connect(tenant.external_id)
Connect.shutdown(tenant.external_id)
Process.sleep(5)
end

assert {:error, :tenant_create_backoff} = Connect.connect(tenant.external_id)
Process.sleep(5000)
assert {:ok, _pid} = Connect.connect(tenant.external_id)
end
end

describe "shutdown/1" do
test "shutdowns all associated connections", %{tenant: tenant} do
assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
Expand Down
2 changes: 1 addition & 1 deletion test/realtime/tenants/replication_connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
assert {:error, _} = ReplicationConnection.start(tenant, self())
end

test "starts a handler for the tenant and broadcasts", %{tenant: tenant} do

Check failure on line 49 in test/realtime/tenants/replication_connection_test.exs

View workflow job for this annotation

GitHub Actions / Tests

test starts a handler for the tenant and broadcasts (Realtime.Tenants.ReplicationConnectionTest)
start_link_supervised!(
{ReplicationConnection, %ReplicationConnection{tenant_id: tenant.external_id, monitored_pid: self()}},
restart: :transient
Expand Down Expand Up @@ -115,9 +115,9 @@
end
end

test "fails on existing replication slot", %{tenant: tenant} do

Check failure on line 118 in test/realtime/tenants/replication_connection_test.exs

View workflow job for this annotation

GitHub Actions / Tests

test fails on existing replication slot (Realtime.Tenants.ReplicationConnectionTest)
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
name = "supabase_realtime_messages_replication_slot_test"
name = "supabase_realtime_messages_replication_slot_"
Postgrex.query!(db_conn, "SELECT pg_create_logical_replication_slot($1, 'test_decoding')", [name])

assert {:error, "Temporary Replication slot already exists and in use"} =
Expand Down
Loading