Skip to content

Commit fe70a4f

Browse files
fix: connect backoff mechanism (#1354)
Connect backoff mechanism and attempt to reduce test flakiness --------- Co-authored-by: Eduardo Gurgel <[email protected]> Co-authored-by: Eduardo Gurgel Pinho <[email protected]>
1 parent e2275cd commit fe70a4f

File tree

14 files changed

+245
-151
lines changed

14 files changed

+245
-151
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ This is the list of operational codes that can help you understand your deployme
208208
| ClientJoinRateLimitReached | The rate of joins per second from your clients as reached the channel limits |
209209
| RealtimeDisabledForTenant | Realtime has been disabled for the tenant |
210210
| UnableToConnectToTenantDatabase | Realtime was not able to connect to the tenant's database |
211+
| TooManyConnectAttempts | Realtime restricted the amount of attempts when connecting to the tenants database |
211212
| RealtimeNodeDisconnected | Realtime is a distributed application and this means that one the system is unable to communicate with one of the distributed nodes |
212213
| MigrationsFailedToRun | Error when running the migrations against the Tenant database that are required by Realtime |
213214
| StartListenAndReplicationFailed | Error when starting the replication and listening of errors for database broadcasting |

config/runtime.exs

+4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ migration_partition_slots =
1818
connect_partition_slots =
1919
System.get_env("CONNECT_PARTITION_SLOTS", "#{System.schedulers_online() * 2}") |> String.to_integer()
2020

21+
connect_throttle_limit_per_second =
22+
System.get_env("CONNECT_THROTTLE_LIMIT_per_second_PER_SECOND", "1") |> String.to_integer()
23+
2124
if !(db_version in [nil, "ipv6", "ipv4"]),
2225
do: raise("Invalid IP version, please set either ipv6 or ipv4")
2326

@@ -39,6 +42,7 @@ socket_options =
3942
config :realtime,
4043
migration_partition_slots: migration_partition_slots,
4144
connect_partition_slots: connect_partition_slots,
45+
connect_throttle_limit_per_second: connect_throttle_limit_per_second,
4246
tenant_max_bytes_per_second: System.get_env("TENANT_MAX_BYTES_PER_SECOND", "100000") |> String.to_integer(),
4347
tenant_max_channels_per_client: System.get_env("TENANT_MAX_CHANNELS_PER_CLIENT", "100") |> String.to_integer(),
4448
tenant_max_concurrent_users: System.get_env("TENANT_MAX_CONCURRENT_USERS", "200") |> String.to_integer(),

lib/realtime/tenants.ex

+20-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,9 @@ defmodule Realtime.Tenants do
121121
requests_per_second_key(tenant),
122122
channels_per_client_key(tenant),
123123
joins_per_second_key(tenant),
124-
events_per_second_key(tenant)
124+
events_per_second_key(tenant),
125+
connection_attempts_per_second_key(tenant),
126+
presence_events_per_second_key(tenant)
125127
]
126128
end
127129

@@ -210,6 +212,23 @@ defmodule Realtime.Tenants do
210212
{:channel, :presence_events, tenant.external_id}
211213
end
212214

215+
@doc """
216+
The GenCounter key to use when counting connection attempts against Realtime.Tenants.Connect
217+
## Examples
218+
iex> Realtime.Tenants.connection_attempts_per_second_key("tenant_id")
219+
{:tenant, :connection_attempts, "tenant_id"}
220+
iex> Realtime.Tenants.connection_attempts_per_second_key(%Realtime.Api.Tenant{external_id: "tenant_id"})
221+
{:tenant, :connection_attempts, "tenant_id"}
222+
"""
223+
@spec connection_attempts_per_second_key(Tenant.t() | String.t()) :: {:tenant, :connection_attempts, String.t()}
224+
def connection_attempts_per_second_key(tenant) when is_binary(tenant) do
225+
{:tenant, :connection_attempts, tenant}
226+
end
227+
228+
def connection_attempts_per_second_key(%Tenant{} = tenant) do
229+
{:tenant, :connection_attempts, tenant.external_id}
230+
end
231+
213232
@spec get_tenant_limits(Realtime.Api.Tenant.t(), maybe_improper_list) :: list
214233
def get_tenant_limits(%Tenant{} = tenant, keys) when is_list(keys) do
215234
nodes = [Node.self() | Node.list()]

lib/realtime/tenants/connect.ex

+13-4
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@ defmodule Realtime.Tenants.Connect do
1515
alias Realtime.Api.Tenant
1616
alias Realtime.Rpc
1717
alias Realtime.Tenants
18-
alias Realtime.Tenants.ReplicationConnection
18+
alias Realtime.Tenants.Connect.Backoff
1919
alias Realtime.Tenants.Connect.CheckConnection
2020
alias Realtime.Tenants.Connect.GetTenant
2121
alias Realtime.Tenants.Connect.Piper
2222
alias Realtime.Tenants.Connect.RegisterProcess
2323
alias Realtime.Tenants.Connect.StartCounters
2424
alias Realtime.Tenants.Listen
2525
alias Realtime.Tenants.Migrations
26+
alias Realtime.Tenants.ReplicationConnection
2627
alias Realtime.UsersCounter
2728

2829
@rpc_timeout_default 30_000
@@ -114,6 +115,10 @@ defmodule Realtime.Tenants.Connect do
114115
{:error, {:shutdown, :tenant_not_found}} ->
115116
{:error, :tenant_not_found}
116117

118+
{:error, {:shutdown, :tenant_create_backoff}} ->
119+
log_warning("TooManyConnectAttempts", "Too many connect attempts to tenant database")
120+
{:error, :tenant_create_backoff}
121+
117122
{:error, :shutdown} ->
118123
log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database")
119124
{:error, :tenant_database_unavailable}
@@ -176,6 +181,7 @@ defmodule Realtime.Tenants.Connect do
176181

177182
pipes = [
178183
GetTenant,
184+
Backoff,
179185
CheckConnection,
180186
StartCounters,
181187
RegisterProcess
@@ -191,6 +197,9 @@ defmodule Realtime.Tenants.Connect do
191197
{:error, :tenant_db_too_many_connections} ->
192198
{:stop, {:shutdown, :tenant_db_too_many_connections}}
193199

200+
{:error, :tenant_create_backoff} ->
201+
{:stop, {:shutdown, :tenant_create_backoff}}
202+
194203
{:error, error} ->
195204
log_error("UnableToConnectToTenantDatabase", error)
196205
{:stop, :shutdown}
@@ -298,7 +307,7 @@ defmodule Realtime.Tenants.Connect do
298307
%{db_conn_reference: db_conn_reference} = state
299308
) do
300309
Logger.warning("Database connection has been terminated")
301-
{:stop, :normal, state}
310+
{:stop, :shutdown, state}
302311
end
303312

304313
# Handle replication connection termination
@@ -307,7 +316,7 @@ defmodule Realtime.Tenants.Connect do
307316
%{replication_connection_reference: replication_connection_reference} = state
308317
) do
309318
Logger.warning("Replication connection has died")
310-
{:stop, :normal, state}
319+
{:stop, :shutdown, state}
311320
end
312321

313322
# Handle listen connection termination
@@ -316,7 +325,7 @@ defmodule Realtime.Tenants.Connect do
316325
%{listen_reference: listen_reference} = state
317326
) do
318327
Logger.warning("Listen has been terminated")
319-
{:stop, :normal, state}
328+
{:stop, :shutdown, state}
320329
end
321330

322331
# Ignore messages to avoid handle_info unmatched functions
+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
defmodule Realtime.Tenants.Connect.Backoff do
2+
@moduledoc """
3+
Applies backoff on process initialization.
4+
"""
5+
alias Realtime.RateCounter
6+
alias Realtime.GenCounter
7+
alias Realtime.Tenants
8+
@behaviour Realtime.Tenants.Connect.Piper
9+
10+
@impl Realtime.Tenants.Connect.Piper
11+
def run(acc) do
12+
%{tenant_id: tenant_id} = acc
13+
connect_throttle_limit_per_second = Application.fetch_env!(:realtime, :connect_throttle_limit_per_second)
14+
15+
with {:ok, counter} <- start_connects_per_second_counter(tenant_id),
16+
{:ok, %{avg: avg}} when avg <= connect_throttle_limit_per_second <- RateCounter.get(counter) do
17+
GenCounter.add(counter)
18+
{:ok, acc}
19+
else
20+
_ -> {:error, :tenant_create_backoff}
21+
end
22+
end
23+
24+
defp start_connects_per_second_counter(tenant_id) do
25+
id = Tenants.connection_attempts_per_second_key(tenant_id)
26+
27+
case RateCounter.get(id) do
28+
{:ok, _} ->
29+
:ok
30+
31+
{:error, _} ->
32+
GenCounter.new(id)
33+
RateCounter.new(id, idle_shutdown: :infinity, tick: 100, idle_shutdown_ms: :timer.minutes(5))
34+
end
35+
36+
{:ok, id}
37+
end
38+
end

mix.exs

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.34.53",
7+
version: "2.34.54",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

+1-1
Original file line numberDiff line numberDiff line change
@@ -673,7 +673,7 @@ defmodule Realtime.Integration.RtChannelTest do
673673
capture_log([level: :warning], fn ->
674674
WebsocketClient.send_event(socket, realtime_topic, "access_token", %{"access_token" => token})
675675

676-
assert_receive %Message{event: "system"}, 500
676+
assert_receive %Message{event: "system"}, 1000
677677
end)
678678

679679
assert log =~ "ChannelShutdown"

test/realtime/monitoring/prom_ex/plugins/tenants_test.exs

+25-17
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ defmodule Realtime.PromEx.Plugins.TenantsTest do
22
use Realtime.DataCase, async: false
33

44
alias Realtime.PromEx.Plugins.Tenants
5-
alias Realtime.Rpc
65
alias Realtime.Tenants.Connect
76

87
defmodule MetricsTest do
@@ -15,25 +14,34 @@ defmodule Realtime.PromEx.Plugins.TenantsTest do
1514

1615
describe "pooling metrics" do
1716
setup do
17+
local_tenant = Containers.checkout_tenant(true)
18+
on_exit(fn -> Containers.checkin_tenant(local_tenant) end)
1819
start_supervised!(MetricsTest)
19-
local_tenant = Containers.checkout_tenant()
20-
remote_tenant = Containers.checkout_tenant()
21-
22-
on_exit(fn ->
23-
Containers.checkin_tenant(local_tenant)
24-
Containers.checkin_tenant(remote_tenant)
25-
end)
26-
27-
{:ok, node} = Clustered.start()
28-
{:ok, _} = Rpc.enhanced_call(node, Connect, :lookup_or_start_connection, [remote_tenant.external_id])
29-
{:ok, _} = Connect.lookup_or_start_connection(local_tenant.external_id)
30-
31-
%{local_tenant: local_tenant, remote_tenant: remote_tenant}
20+
{:ok, %{tenant: local_tenant}}
3221
end
3322

34-
test "conneted based on Connect module information for local node only" do
35-
Process.sleep(100 * 2)
36-
assert PromEx.get_metrics(MetricsTest) =~ "realtime_tenants_connected 1"
23+
test "conneted based on Connect module information for local node only", %{tenant: tenant} do
24+
# Enough time for the poll rate to be triggered at least once
25+
Process.sleep(200)
26+
previous_value = metric_value()
27+
{:ok, _} = Connect.lookup_or_start_connection(tenant.external_id)
28+
Process.sleep(200)
29+
assert metric_value() == previous_value + 1
3730
end
3831
end
32+
33+
defp metric_value() do
34+
PromEx.get_metrics(MetricsTest)
35+
|> String.split("\n", trim: true)
36+
|> Enum.find_value(
37+
"0",
38+
fn item ->
39+
case Regex.run(~r/realtime_tenants_connected\s(?<number>\d+)/, item, capture: ["number"]) do
40+
[number] -> number
41+
_ -> false
42+
end
43+
end
44+
)
45+
|> String.to_integer()
46+
end
3947
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
defmodule Realtime.Tenants.Connect.BackoffTest do
2+
use Realtime.DataCase, async: true
3+
alias Realtime.Tenants.Connect.Backoff
4+
5+
setup do
6+
tenant_id = random_string()
7+
{:ok, acc: %{tenant_id: tenant_id}}
8+
end
9+
10+
test "does not apply backoff for a given tenant if never called", %{acc: acc} do
11+
assert {:ok, acc} == Backoff.run(acc)
12+
end
13+
14+
test "applies backoff if the user as called more than once during the configured space", %{acc: acc} do
15+
# emulate calls
16+
for _ <- 1..10 do
17+
Backoff.run(acc)
18+
Process.sleep(10)
19+
end
20+
21+
assert {:error, :tenant_create_backoff} = Backoff.run(acc)
22+
end
23+
24+
test "resets backoff after the configured space", %{acc: acc} do
25+
# emulate calls
26+
for _ <- 1..10 do
27+
Backoff.run(acc)
28+
Process.sleep(10)
29+
end
30+
31+
# emulate block
32+
assert {:error, :tenant_create_backoff} = Backoff.run(acc)
33+
34+
# wait for the timer to expire
35+
Process.sleep(2000)
36+
37+
# check that the backoff has been reset
38+
assert {:ok, acc} == Backoff.run(acc)
39+
end
40+
end

test/realtime/tenants/connect_test.exs

+30-1
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ defmodule Realtime.Tenants.ConnectTest do
189189
log =
190190
capture_log(fn ->
191191
Realtime.Tenants.suspend_tenant_by_external_id(tenant2.external_id)
192-
Process.sleep(100)
192+
Process.sleep(50)
193193
end)
194194

195195
refute log =~ "Tenant was suspended"
@@ -373,6 +373,35 @@ defmodule Realtime.Tenants.ConnectTest do
373373
end
374374
end
375375

376+
describe "connect/1" do
377+
test "respects backoff pipe", %{tenant: tenant} do
378+
log =
379+
capture_log(fn ->
380+
for _ <- 1..10 do
381+
Connect.connect(tenant.external_id)
382+
Process.sleep(10)
383+
Connect.shutdown(tenant.external_id)
384+
end
385+
386+
assert {:error, :tenant_create_backoff} = Connect.connect(tenant.external_id)
387+
end)
388+
389+
assert log =~ "Too many connect attempts to tenant database"
390+
end
391+
392+
test "after timer, is able to connect", %{tenant: tenant} do
393+
for _ <- 1..10 do
394+
Connect.connect(tenant.external_id)
395+
Process.sleep(10)
396+
Connect.shutdown(tenant.external_id)
397+
end
398+
399+
assert {:error, :tenant_create_backoff} = Connect.connect(tenant.external_id)
400+
Process.sleep(5000)
401+
assert {:ok, _pid} = Connect.connect(tenant.external_id)
402+
end
403+
end
404+
376405
describe "shutdown/1" do
377406
test "shutdowns all associated connections", %{tenant: tenant} do
378407
assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id)

test/realtime/tenants/replication_connection_test.exs

+8
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
defmodule Realtime.Tenants.ReplicationConnectionTest do
2+
alias Realtime.RateCounter
23
use Realtime.DataCase, async: false
34

45
alias Realtime.Api.Message
@@ -13,6 +14,12 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
1314

1415
tenant = Containers.checkout_tenant(true)
1516

17+
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
18+
name = "supabase_realtime_messages_replication_slot_test"
19+
Postgrex.query(db_conn, "SELECT pg_drop_replication_slot($1)", [name])
20+
Process.exit(db_conn, :normal)
21+
tenant |> Tenants.limiter_keys() |> Enum.each(&RateCounter.new(&1))
22+
1623
on_exit(fn ->
1724
Application.put_env(:realtime, :slot_name_suffix, slot)
1825
Containers.checkin_tenant(tenant)
@@ -118,6 +125,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
118125
test "fails on existing replication slot", %{tenant: tenant} do
119126
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
120127
name = "supabase_realtime_messages_replication_slot_test"
128+
121129
Postgrex.query!(db_conn, "SELECT pg_create_logical_replication_slot($1, 'test_decoding')", [name])
122130

123131
assert {:error, "Temporary Replication slot already exists and in use"} =

test/realtime/user_counter_test.exs

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ defmodule Realtime.UsersCounterTest do
2626
test "returns count of connected clients for tenant on cluster node" do
2727
tenant_id = random_string()
2828
expected = generate_load(tenant_id)
29-
Process.sleep(500)
29+
Process.sleep(1000)
3030
assert UsersCounter.tenant_users(tenant_id) == expected
3131
end
3232
end
@@ -42,7 +42,7 @@ defmodule Realtime.UsersCounterTest do
4242
end
4343
end
4444

45-
defp generate_load(tenant_id, nodes \\ 3, processes \\ 3) do
45+
defp generate_load(tenant_id, nodes \\ 2, processes \\ 2) do
4646
for _ <- 1..nodes do
4747
{:ok, node} = Clustered.start(@aux_mod)
4848

0 commit comments

Comments
 (0)