Skip to content

Commit 565e79f

Browse files
committed
more test fixing
1 parent e324c0b commit 565e79f

File tree

9 files changed

+72
-97
lines changed

9 files changed

+72
-97
lines changed

config/runtime.exs

+3-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ migration_partition_slots =
1818
connect_partition_slots =
1919
System.get_env("CONNECT_PARTITION_SLOTS", "#{System.schedulers_online() * 2}") |> String.to_integer()
2020

21-
connect_throttle_limit = System.get_env("CONNECT_THROTTLE_LIMIT", "10") |> String.to_integer()
21+
connect_throttle_limit_per_second =
22+
System.get_env("CONNECT_THROTTLE_LIMIT_per_second_PER_SECOND", "1") |> String.to_integer()
2223

2324
if !(db_version in [nil, "ipv6", "ipv4"]),
2425
do: raise("Invalid IP version, please set either ipv6 or ipv4")
@@ -41,7 +42,7 @@ socket_options =
4142
config :realtime,
4243
migration_partition_slots: migration_partition_slots,
4344
connect_partition_slots: connect_partition_slots,
44-
connect_throttle_limit: connect_throttle_limit,
45+
connect_throttle_limit_per_second: connect_throttle_limit_per_second,
4546
tenant_max_bytes_per_second: System.get_env("TENANT_MAX_BYTES_PER_SECOND", "100000") |> String.to_integer(),
4647
tenant_max_channels_per_client: System.get_env("TENANT_MAX_CHANNELS_PER_CLIENT", "100") |> String.to_integer(),
4748
tenant_max_concurrent_users: System.get_env("TENANT_MAX_CONCURRENT_USERS", "200") |> String.to_integer(),

lib/realtime/tenants/connect.ex

+3-3
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ defmodule Realtime.Tenants.Connect do
307307
%{db_conn_reference: db_conn_reference} = state
308308
) do
309309
Logger.warning("Database connection has been terminated")
310-
{:stop, :normal, state}
310+
{:stop, :shutdown, state}
311311
end
312312

313313
# Handle replication connection termination
@@ -316,7 +316,7 @@ defmodule Realtime.Tenants.Connect do
316316
%{replication_connection_reference: replication_connection_reference} = state
317317
) do
318318
Logger.warning("Replication connection has died")
319-
{:stop, :normal, state}
319+
{:stop, :shutdown, state}
320320
end
321321

322322
# Handle listen connection termination
@@ -325,7 +325,7 @@ defmodule Realtime.Tenants.Connect do
325325
%{listen_reference: listen_reference} = state
326326
) do
327327
Logger.warning("Listen has been terminated")
328-
{:stop, :normal, state}
328+
{:stop, :shutdown, state}
329329
end
330330

331331
# Ignore messages to avoid handle_info unmatched functions

lib/realtime/tenants/connect/backoff.ex

+10-9
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,10 @@ defmodule Realtime.Tenants.Connect.Backoff do
1010
@impl Realtime.Tenants.Connect.Piper
1111
def run(acc) do
1212
%{tenant_id: tenant_id} = acc
13-
connect_throttle_limit = Application.fetch_env!(:realtime, :connect_throttle_limit)
13+
connect_throttle_limit_per_second = Application.fetch_env!(:realtime, :connect_throttle_limit_per_second)
1414

1515
with {:ok, counter} <- start_connects_per_second_counter(tenant_id),
16-
{:ok, %{avg: avg}} when avg < connect_throttle_limit <- RateCounter.get(counter) do
17-
IO.inspect(avg)
16+
{:ok, %{avg: avg}} when avg <= connect_throttle_limit_per_second <- RateCounter.get(counter) do
1817
GenCounter.add(counter)
1918
{:ok, acc}
2019
else
@@ -24,14 +23,16 @@ defmodule Realtime.Tenants.Connect.Backoff do
2423

2524
defp start_connects_per_second_counter(tenant_id) do
2625
id = Tenants.connection_attempts_per_second_key(tenant_id)
27-
GenCounter.new(id)
2826

29-
res = RateCounter.new(id, idle_shutdown: :infinity, tick: 200)
27+
case RateCounter.get(id) do
28+
{:ok, _} ->
29+
:ok
3030

31-
case res do
32-
{:ok, _} -> {:ok, id}
33-
{:error, {:already_started, _}} -> {:ok, id}
34-
{:error, reason} -> {:error, reason}
31+
{:error, _} ->
32+
GenCounter.new(id)
33+
RateCounter.new(id, idle_shutdown: :infinity, tick: 100, idle_shutdown_ms: :timer.minutes(5))
3534
end
35+
36+
{:ok, id}
3637
end
3738
end

test/realtime/monitoring/prom_ex/plugins/tenants_test.exs

+4-14
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ defmodule Realtime.PromEx.Plugins.TenantsTest do
22
use Realtime.DataCase, async: false
33

44
alias Realtime.PromEx.Plugins.Tenants
5-
alias Realtime.Rpc
65
alias Realtime.Tenants.Connect
76

87
defmodule MetricsTest do
@@ -16,23 +15,14 @@ defmodule Realtime.PromEx.Plugins.TenantsTest do
1615
describe "pooling metrics" do
1716
setup do
1817
start_supervised!(MetricsTest)
19-
local_tenant = Containers.checkout_tenant()
20-
remote_tenant = Containers.checkout_tenant()
21-
22-
on_exit(fn ->
23-
Containers.checkin_tenant(local_tenant)
24-
Containers.checkin_tenant(remote_tenant)
25-
end)
26-
27-
{:ok, node} = Clustered.start()
28-
{:ok, _} = Rpc.enhanced_call(node, Connect, :lookup_or_start_connection, [remote_tenant.external_id])
18+
local_tenant = Containers.checkout_tenant(true)
19+
on_exit(fn -> Containers.checkin_tenant(local_tenant) end)
2920
{:ok, _} = Connect.lookup_or_start_connection(local_tenant.external_id)
30-
31-
%{local_tenant: local_tenant, remote_tenant: remote_tenant}
21+
:ok
3222
end
3323

3424
test "conneted based on Connect module information for local node only" do
35-
Process.sleep(100 * 2)
25+
Process.sleep(2000)
3626
assert PromEx.get_metrics(MetricsTest) =~ "realtime_tenants_connected 1"
3727
end
3828
end

test/realtime/tenants/connect/backoff_test.exs

+5-6
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ defmodule Realtime.Tenants.Connect.BackoffTest do
44

55
setup do
66
tenant_id = random_string()
7-
acc = %{tenant_id: tenant_id}
8-
{:ok, acc: acc}
7+
{:ok, acc: %{tenant_id: tenant_id}}
98
end
109

1110
test "does not apply backoff for a given tenant if never called", %{acc: acc} do
@@ -14,19 +13,19 @@ defmodule Realtime.Tenants.Connect.BackoffTest do
1413

1514
test "applies backoff if the user as called more than once during the configured space", %{acc: acc} do
1615
# emulate calls
17-
for _ <- 1..100 do
16+
for _ <- 1..10 do
1817
Backoff.run(acc)
19-
Process.sleep(5)
18+
Process.sleep(10)
2019
end
2120

2221
assert {:error, :tenant_create_backoff} = Backoff.run(acc)
2322
end
2423

2524
test "resets backoff after the configured space", %{acc: acc} do
2625
# emulate calls
27-
for _ <- 1..100 do
26+
for _ <- 1..10 do
2827
Backoff.run(acc)
29-
Process.sleep(5)
28+
Process.sleep(10)
3029
end
3130

3231
# emulate block

test/realtime/tenants/connect_test.exs

+5-5
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ defmodule Realtime.Tenants.ConnectTest do
189189
log =
190190
capture_log(fn ->
191191
Realtime.Tenants.suspend_tenant_by_external_id(tenant2.external_id)
192-
Process.sleep(100)
192+
Process.sleep(50)
193193
end)
194194

195195
refute log =~ "Tenant was suspended"
@@ -377,10 +377,10 @@ defmodule Realtime.Tenants.ConnectTest do
377377
test "respects backoff pipe", %{tenant: tenant} do
378378
log =
379379
capture_log(fn ->
380-
for _ <- 1..100 do
380+
for _ <- 1..10 do
381381
Connect.connect(tenant.external_id)
382+
Process.sleep(10)
382383
Connect.shutdown(tenant.external_id)
383-
Process.sleep(5)
384384
end
385385

386386
assert {:error, :tenant_create_backoff} = Connect.connect(tenant.external_id)
@@ -390,10 +390,10 @@ defmodule Realtime.Tenants.ConnectTest do
390390
end
391391

392392
test "after timer, is able to connect", %{tenant: tenant} do
393-
for _ <- 1..100 do
393+
for _ <- 1..10 do
394394
Connect.connect(tenant.external_id)
395+
Process.sleep(10)
395396
Connect.shutdown(tenant.external_id)
396-
Process.sleep(5)
397397
end
398398

399399
assert {:error, :tenant_create_backoff} = Connect.connect(tenant.external_id)

test/realtime/tenants/replication_connection_test.exs

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
defmodule Realtime.Tenants.ReplicationConnectionTest do
2+
alias Realtime.RateCounter
23
use Realtime.DataCase, async: false
34

45
alias Realtime.Api.Message
@@ -13,6 +14,12 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
1314

1415
tenant = Containers.checkout_tenant(true)
1516

17+
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
18+
name = "supabase_realtime_messages_replication_slot_test"
19+
Postgrex.query(db_conn, "SELECT pg_drop_replication_slot($1)", [name])
20+
Process.exit(db_conn, :normal)
21+
tenant |> Tenants.limiter_keys() |> Enum.each(&RateCounter.new(&1))
22+
1623
on_exit(fn ->
1724
Application.put_env(:realtime, :slot_name_suffix, slot)
1825
Containers.checkin_tenant(tenant)
@@ -117,7 +124,8 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
117124

118125
test "fails on existing replication slot", %{tenant: tenant} do
119126
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
120-
name = "supabase_realtime_messages_replication_slot_"
127+
name = "supabase_realtime_messages_replication_slot_test"
128+
121129
Postgrex.query!(db_conn, "SELECT pg_create_logical_replication_slot($1, 'test_decoding')", [name])
122130

123131
assert {:error, "Temporary Replication slot already exists and in use"} =

test/realtime_web/controllers/tenant_controller_test.exs

+9-26
Original file line numberDiff line numberDiff line change
@@ -99,22 +99,15 @@ defmodule RealtimeWeb.TenantControllerTest do
9999
end
100100

101101
describe "upsert with post" do
102-
test "renders tenant when data is valid", %{conn: conn} do
103-
external_id = random_string()
104-
105-
port =
106-
5500..9000
107-
|> Enum.reject(&(&1 in Enum.map(:ets.tab2list(:test_ports), fn {port} -> port end)))
108-
|> Enum.random()
102+
setup [:with_tenant]
109103

104+
test "renders tenant when data is valid", %{conn: conn, tenant: tenant} do
105+
external_id = tenant.external_id
106+
port = Database.from_tenant(tenant, "realtime_test", :stop).port
110107
attrs = default_tenant_attrs(port)
111108
attrs = Map.put(attrs, "external_id", external_id)
112-
113-
Containers.initialize_no_tenant(external_id, port)
114-
on_exit(fn -> Containers.stop_container(external_id) end)
115-
116109
conn = post(conn, ~p"/api/tenants", tenant: attrs)
117-
assert %{"id" => _id, "external_id" => ^external_id} = json_response(conn, 201)["data"]
110+
assert %{"id" => _id, "external_id" => ^external_id} = json_response(conn, 200)["data"]
118111

119112
conn = get(conn, Routes.tenant_path(conn, :show, external_id))
120113
assert ^external_id = json_response(conn, 200)["data"]["external_id"]
@@ -166,23 +159,13 @@ defmodule RealtimeWeb.TenantControllerTest do
166159
describe "upsert with put" do
167160
setup [:with_tenant]
168161

169-
test "renders tenant when data is valid", %{conn: conn} do
170-
external_id = random_string()
171-
172-
port =
173-
5500..9000
174-
|> Enum.reject(&(&1 in Enum.map(:ets.tab2list(:test_ports), fn {port} -> port end)))
175-
|> Enum.random()
176-
177-
:ets.insert(:test_ports, {port})
178-
162+
test "renders tenant when data is valid", %{tenant: tenant, conn: conn} do
163+
external_id = tenant.external_id
164+
port = Database.from_tenant(tenant, "realtime_test", :stop).port
179165
attrs = default_tenant_attrs(port)
180166

181-
Containers.initialize_no_tenant(external_id, port)
182-
on_exit(fn -> Containers.stop_container(external_id) end)
183-
184167
conn = put(conn, ~p"/api/tenants/#{external_id}", tenant: attrs)
185-
assert %{"id" => _id, "external_id" => ^external_id} = json_response(conn, 201)["data"]
168+
assert %{"id" => _id, "external_id" => ^external_id} = json_response(conn, 200)["data"]
186169

187170
conn = get(conn, Routes.tenant_path(conn, :show, external_id))
188171
assert ^external_id = json_response(conn, 200)["data"]["external_id"]

test/support/containers.ex

+24-31
Original file line numberDiff line numberDiff line change
@@ -90,55 +90,48 @@ defmodule Containers do
9090
tenant = Enum.random(tenants)
9191
:ets.insert(:containers, {tenant.external_id, %{tenant: tenant, using?: true}})
9292

93-
settings = Database.from_tenant(tenant, "realtime_test", :stop)
94-
settings = %{settings | max_restarts: 0, ssl: false}
95-
{:ok, conn} = Database.connect_db(settings)
93+
capture_log(fn ->
94+
settings = Database.from_tenant(tenant, "realtime_test", :stop)
95+
settings = %{settings | max_restarts: 0, ssl: false}
96+
{:ok, conn} = Database.connect_db(settings)
9697

97-
Postgrex.transaction(conn, fn db_conn ->
98-
pid = Connect.whereis(tenant.external_id)
99-
if pid && Process.alive?(pid), do: Connect.shutdown(tenant.external_id)
98+
Postgrex.transaction(conn, fn db_conn ->
99+
Postgrex.query!(
100+
db_conn,
101+
"SELECT pg_terminate_backend(pid) from pg_stat_activity where application_name like 'realtime_%' and application_name != 'realtime_test'",
102+
[]
103+
)
100104

101-
tenant
102-
|> Tenants.limiter_keys()
103-
|> Enum.each(fn key ->
104105
RateCounter.stop(tenant.external_id)
105106
GenCounter.stop(tenant.external_id)
106-
RateCounter.new(key)
107-
GenCounter.new(key)
108-
end)
109107

110-
Postgrex.query!(
111-
db_conn,
112-
"SELECT pg_terminate_backend(pid) from pg_stat_activity where application_name like 'realtime_%' and application_name != 'realtime_test'",
113-
[]
114-
)
108+
Postgrex.query!(db_conn, "DROP SCHEMA realtime CASCADE", [])
109+
Postgrex.query!(db_conn, "CREATE SCHEMA realtime", [])
115110

116-
Postgrex.query!(db_conn, "DROP SCHEMA realtime CASCADE", [])
117-
Postgrex.query!(db_conn, "CREATE SCHEMA realtime", [])
111+
if Tenants.get_tenant_by_external_id(tenant.external_id) do
112+
Tenants.update_migrations_ran(tenant.external_id, 0)
113+
end
118114

119-
if Tenants.get_tenant_by_external_id(tenant.external_id) do
120-
Tenants.update_migrations_ran(tenant.external_id, 0)
115+
:ok
116+
end)
117+
118+
if run_migrations? do
119+
Migrations.run_migrations(tenant)
120+
{:ok, pid} = Database.connect(tenant, "realtime_test", :stop)
121+
:ok = Migrations.create_partitions(pid)
121122
end
122123

123-
:ok
124+
Process.sleep(1000)
124125
end)
125126

126-
if run_migrations? do
127-
Migrations.run_migrations(tenant)
128-
{:ok, pid} = Database.connect(tenant, "realtime_test", :stop)
129-
:ok = Migrations.create_partitions(pid)
130-
end
131-
132-
Process.sleep(1000)
133-
134127
tenant
135128
end
136129

137130
def checkin_tenant(tenant) do
138131
:ets.insert(:containers, {tenant.external_id, %{tenant: tenant, using?: false}})
139132
end
140133

141-
@spec stop_container(any()) :: {any(), non_neg_integer()}
134+
@spec stop_container(Tenant.t() | binary()) :: {any(), non_neg_integer()}
142135
def stop_container(%Tenant{} = tenant) do
143136
:ets.delete(:containers, tenant.external_id)
144137
pid = Connect.whereis(tenant.external_id)

0 commit comments

Comments
 (0)