Skip to content

Commit e962a57

Browse files
h0lybytefilipecabacoedgurgelFudster
authored
🔄 Sync with upstream changes (#11)
* fix: runtime setup error (supabase#1520) * fix: use primary instead of replica on rename_settings_field (supabase#1521) * feat: upgrade cowboy & ranch (supabase#1523) * fix: Fix GenRpc to not try to connect to nodes that are not alive (supabase#1525) * fix: enable presence on track message (supabase#1527) currently the user would need to have enabled from the beginning of the channel. this will enable users to enable presence later in the flow by sending a track message which will enable presence messages for them * fix: set cowboy active_n=100 as cowboy 2.12.0 (supabase#1530) cowboy 2.13.0 set the default active_n=1 * fix: provide error_code metadata on RealtimeChannel.Logging (supabase#1531) * feat: disable UTF8 validation on websocket frames (supabase#1532) Currently all text frames as handled only with JSON which already requires UTF-8 * fix: move DB setup to happen after Connect.init (supabase#1533) This change reduces the impact of slow DB setup impacting other tenants trying to connect at the same time that landed on the same partition * fix: handle wal bloat (supabase#1528) Verify that replication connection is able to reconnect when faced with WAL bloat issues * feat: replay realtime.messages (supabase#1526) A new index was created on inserted_at DESC, topic WHERE private IS TRUE AND extension = "broadast" The hardcoded limit is 25 for now. * feat: gen_rpc pub sub adapter (supabase#1529) Add a PubSub adapter that uses gen_rpc to send messages to other nodes. It uses :gen_rpc.abcast/3 instead of :erlang.send/2 The adapter works very similarly to the PG2 adapter. It consists of multiple workers that forward to the local node using PubSub.local_broadcast. The way to choose the worker to be used is based on the sending process just like PG2 adapter does The number of workers is controlled by `:pool_size` or `:broadcast_pool_size`. This distinction exists because Phoenix.PubSub uses `:pool_size` to define how many partitions the PubSub registry will use. It's possible to control them separately by using `:broadcast_pool_size` * fix: ensure message id doesn't raise on non-map payloads (supabase#1534) * fix: match error on Connect (supabase#1536) --------- Co-authored-by: Eduardo Gurgel Pinho <eduardo.gurgel@supabase.io> * feat: websocket max heap size configuration (supabase#1538) * fix: set max process heap size to 500MB instead of 8GB * feat: set websocket transport max heap size WEBSOCKET_MAX_HEAP_SIZE can be used to configure it * fix: update gen_rpc to fix gen_rpc_dispatcher issues (supabase#1537) Issues: * Single gen_rpc_dispatcher that can be a bottleneck if the connecting takes some time * Many calls can land on the dispatcher but the node might be gone already. If we don't validate the node it might keep trying to connect until it times out instead of quickly giving up due to not being an actively connected node. * fix: improve ErlSysMon logging for processes (supabase#1540) Include initial_call, ancestors, registered_name, message_queue_len and total_heap_size Also bump long_schedule and long_gc * fix: make pubsub adapter configurable (supabase#1539) * fix: specify that only private channels are allowed when replaying (supabase#1543) messages * fix: rate limit connect module (supabase#1541) On bad connection, we rate limit the Connect module so we prevent abuses and too much logging of errors --------- Co-authored-by: Filipe Cabaço <filipe@supabase.io> Co-authored-by: Eduardo Gurgel <eduardo.gurgel@supabase.io> Co-authored-by: Bradley Haljendi <5642609+Fudster@users.noreply.github.com>
1 parent 98b113d commit e962a57

File tree

8 files changed

+116
-21
lines changed

8 files changed

+116
-21
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ This is the list of operational codes that can help you understand your deployme
243243
| ChannelRateLimitReached | The number of channels you can create has reached its limit |
244244
| ConnectionRateLimitReached | The number of connected clients as reached its limit |
245245
| ClientJoinRateLimitReached | The rate of joins per second from your clients has reached the channel limits |
246+
| DatabaseConnectionRateLimitReached | The rate of attempts to connect to tenants database has reached the limit |
246247
| MessagePerSecondRateLimitReached | The rate of messages per second from your clients has reached the channel limits |
247248
| RealtimeDisabledForTenant | Realtime has been disabled for the tenant |
248249
| UnableToConnectToTenantDatabase | Realtime was not able to connect to the tenant's database |

lib/realtime/tenants.ex

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,32 @@ defmodule Realtime.Tenants do
328328
%RateCounter.Args{id: {:channel, :authorization_errors, external_id}, opts: opts}
329329
end
330330

331+
@connect_per_second_default 10
332+
@doc "RateCounter arguments for counting connect per second."
333+
@spec connect_per_second_rate(Tenant.t() | String.t()) :: RateCounter.Args.t()
334+
def connect_per_second_rate(%Tenant{external_id: external_id}) do
335+
connect_per_second_rate(external_id)
336+
end
337+
338+
def connect_per_second_rate(tenant_id) do
339+
opts = [
340+
max_bucket_len: 10,
341+
limit: [
342+
value: @connect_per_second_default,
343+
measurement: :sum,
344+
log_fn: fn ->
345+
Logger.critical(
346+
"DatabaseConnectionRateLimitReached: Too many connection attempts against the tenant database",
347+
external_id: tenant_id,
348+
project: tenant_id
349+
)
350+
end
351+
]
352+
]
353+
354+
%RateCounter.Args{id: {:database, :connect, tenant_id}, opts: opts}
355+
end
356+
331357
defp pool_size(%{extensions: [%{settings: settings} | _]}) do
332358
Database.pool_size_by_application_name("realtime_connect", settings)
333359
end

lib/realtime/tenants/connect.ex

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,17 @@ defmodule Realtime.Tenants.Connect do
1111

1212
use Realtime.Logs
1313

14-
alias Realtime.Tenants.Rebalancer
1514
alias Realtime.Api.Tenant
15+
alias Realtime.GenCounter
16+
alias Realtime.RateCounter
1617
alias Realtime.Rpc
1718
alias Realtime.Tenants
1819
alias Realtime.Tenants.Connect.CheckConnection
1920
alias Realtime.Tenants.Connect.GetTenant
2021
alias Realtime.Tenants.Connect.Piper
2122
alias Realtime.Tenants.Connect.RegisterProcess
2223
alias Realtime.Tenants.Migrations
24+
alias Realtime.Tenants.Rebalancer
2325
alias Realtime.Tenants.ReplicationConnection
2426
alias Realtime.UsersCounter
2527

@@ -39,11 +41,8 @@ defmodule Realtime.Tenants.Connect do
3941
@doc "Check if Connect has finished setting up connections"
4042
def ready?(tenant_id) do
4143
case whereis(tenant_id) do
42-
pid when is_pid(pid) ->
43-
GenServer.call(pid, :ready?)
44-
45-
_ ->
46-
false
44+
pid when is_pid(pid) -> GenServer.call(pid, :ready?)
45+
_ -> false
4746
end
4847
end
4948

@@ -55,24 +54,29 @@ defmodule Realtime.Tenants.Connect do
5554
| {:error, :tenant_database_unavailable}
5655
| {:error, :initializing}
5756
| {:error, :tenant_database_connection_initializing}
58-
| {:error, :tenant_db_too_many_connections}
57+
| {:error, :connect_rate_limit_reached}
5958
| {:error, :rpc_error, term()}
6059
def lookup_or_start_connection(tenant_id, opts \\ []) when is_binary(tenant_id) do
61-
case get_status(tenant_id) do
62-
{:ok, conn} ->
63-
{:ok, conn}
60+
rate_args = Tenants.connect_per_second_rate(tenant_id)
61+
RateCounter.new(rate_args)
6462

65-
{:error, :tenant_database_unavailable} ->
66-
{:error, :tenant_database_unavailable}
63+
with {:ok, %{limit: %{triggered: false}}} <- RateCounter.get(rate_args),
64+
{:ok, conn} <- get_status(tenant_id) do
65+
{:ok, conn}
66+
else
67+
{:ok, %{limit: %{triggered: true}}} ->
68+
{:error, :connect_rate_limit_reached}
6769

6870
{:error, :tenant_database_connection_initializing} ->
71+
GenCounter.add(rate_args.id)
6972
call_external_node(tenant_id, opts)
7073

7174
{:error, :initializing} ->
7275
{:error, :tenant_database_unavailable}
7376

74-
{:error, :tenant_db_too_many_connections} ->
75-
{:error, :tenant_db_too_many_connections}
77+
{:error, reason} ->
78+
GenCounter.add(rate_args.id)
79+
{:error, reason}
7680
end
7781
end
7882

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,10 @@ defmodule RealtimeWeb.RealtimeChannel do
167167
msg = "Database can't accept more connections, Realtime won't connect"
168168
log_error(socket, "DatabaseLackOfConnections", msg)
169169

170+
{:error, :connect_rate_limit_reached} ->
171+
msg = "Too many database connections attempts per second"
172+
log_error(socket, "DatabaseConnectionRateLimitReached", msg)
173+
170174
{:error, :unable_to_set_policies, error} ->
171175
log_error(socket, "UnableToSetPolicies", error)
172176
{:error, %{reason: "Realtime was unable to connect to the project database"}}
@@ -213,6 +217,9 @@ defmodule RealtimeWeb.RealtimeChannel do
213217
{:error, :invalid_replay_params} ->
214218
log_error(socket, "UnableToReplayMessages", "Replay params are not valid")
215219

220+
{:error, :invalid_replay_channel} ->
221+
log_error(socket, "UnableToReplayMessages", "Replay is not allowed for public channels")
222+
216223
{:error, error} ->
217224
log_error(socket, "UnknownErrorOnChannel", error)
218225
{:error, %{reason: "Unknown Error on Channel"}}
@@ -790,7 +797,7 @@ defmodule RealtimeWeb.RealtimeChannel do
790797
end
791798

792799
defp maybe_replay_messages(%{"broadcast" => %{"replay" => _}}, _sub_topic, _db_conn, false = _private?) do
793-
{:error, :invalid_replay_params}
800+
{:error, :invalid_replay_channel}
794801
end
795802

796803
defp maybe_replay_messages(%{"broadcast" => %{"replay" => replay_params}}, sub_topic, db_conn, true = _private?)

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.51.3",
7+
version: "2.51.5",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/realtime/tenants/connect_test.exs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,53 @@ defmodule Realtime.Tenants.ConnectTest do
515515
assert capture_log(fn -> assert {:error, :rpc_error, _} = Connect.lookup_or_start_connection("tenant") end) =~
516516
"project=tenant external_id=tenant [error] ErrorOnRpcCall"
517517
end
518+
519+
test "rate limit connect when too many connections against bad database", %{tenant: tenant} do
520+
extension = %{
521+
"type" => "postgres_cdc_rls",
522+
"settings" => %{
523+
"db_host" => "127.0.0.1",
524+
"db_name" => "postgres",
525+
"db_user" => "supabase_admin",
526+
"db_password" => "postgres",
527+
"poll_interval" => 100,
528+
"poll_max_changes" => 100,
529+
"poll_max_record_bytes" => 1_048_576,
530+
"region" => "us-east-1",
531+
"ssl_enforced" => true
532+
}
533+
}
534+
535+
{:ok, tenant} = update_extension(tenant, extension)
536+
537+
log =
538+
capture_log(fn ->
539+
res =
540+
for _ <- 1..50 do
541+
Process.sleep(200)
542+
Connect.lookup_or_start_connection(tenant.external_id)
543+
end
544+
545+
assert Enum.any?(res, fn {_, res} -> res == :connect_rate_limit_reached end)
546+
end)
547+
548+
assert log =~ "DatabaseConnectionRateLimitReached: Too many connection attempts against the tenant database"
549+
end
550+
551+
test "rate limit connect will not trigger if connection is successful", %{tenant: tenant} do
552+
log =
553+
capture_log(fn ->
554+
res =
555+
for _ <- 1..20 do
556+
Process.sleep(500)
557+
Connect.lookup_or_start_connection(tenant.external_id)
558+
end
559+
560+
refute Enum.any?(res, fn {_, res} -> res == :tenant_db_too_many_connections end)
561+
end)
562+
563+
refute log =~ "DatabaseConnectionRateLimitReached: Too many connection attempts against the tenant database"
564+
end
518565
end
519566

520567
describe "shutdown/1" do

test/realtime_web/channels/realtime_channel_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do
153153

154154
assert {
155155
:error,
156-
%{reason: "UnableToReplayMessages: Replay params are not valid"}
156+
%{reason: "UnableToReplayMessages: Replay is not allowed for public channels"}
157157
} = subscribe_and_join(socket, "realtime:test", %{"config" => config})
158158

159159
refute_receive _any

test/realtime_web/controllers/broadcast_controller_test.exs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
272272
} do
273273
request_events_key = Tenants.requests_per_second_key(tenant)
274274
broadcast_events_key = Tenants.events_per_second_key(tenant)
275+
connect_events_key = Tenants.connect_per_second_rate(tenant).id
275276
expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _ -> :ok end)
276277

277278
messages_to_send =
@@ -290,7 +291,10 @@ defmodule RealtimeWeb.BroadcastControllerTest do
290291

291292
GenCounter
292293
|> expect(:add, fn ^request_events_key -> :ok end)
293-
|> expect(:add, length(messages), fn ^broadcast_events_key -> :ok end)
294+
|> expect(:add, length(messages), fn
295+
^broadcast_events_key -> :ok
296+
^connect_events_key -> :ok
297+
end)
294298

295299
conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})
296300

@@ -326,6 +330,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
326330
} do
327331
request_events_key = Tenants.requests_per_second_key(tenant)
328332
broadcast_events_key = Tenants.events_per_second_key(tenant)
333+
connect_events_key = Tenants.connect_per_second_rate(tenant).id
329334
expect(TenantBroadcaster, :pubsub_broadcast, 6, fn _, _, _, _ -> :ok end)
330335

331336
channels =
@@ -354,7 +359,10 @@ defmodule RealtimeWeb.BroadcastControllerTest do
354359

355360
GenCounter
356361
|> expect(:add, fn ^request_events_key -> :ok end)
357-
|> expect(:add, length(messages), fn ^broadcast_events_key -> :ok end)
362+
|> expect(:add, length(messages), fn
363+
^broadcast_events_key -> :ok
364+
^connect_events_key -> :ok
365+
end)
358366

359367
conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})
360368

@@ -408,6 +416,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do
408416
} do
409417
request_events_key = Tenants.requests_per_second_key(tenant)
410418
broadcast_events_key = Tenants.events_per_second_key(tenant)
419+
connect_events_key = Tenants.connect_per_second_rate(tenant).id
411420
expect(TenantBroadcaster, :pubsub_broadcast, 5, fn _, _, _, _ -> :ok end)
412421

413422
messages_to_send =
@@ -428,7 +437,9 @@ defmodule RealtimeWeb.BroadcastControllerTest do
428437

429438
GenCounter
430439
|> expect(:add, fn ^request_events_key -> :ok end)
431-
|> expect(:add, length(messages_to_send), fn ^broadcast_events_key -> :ok end)
440+
# remove the one message that won't be broadcasted for this user
441+
|> expect(:add, 1, fn ^connect_events_key -> :ok end)
442+
|> expect(:add, length(messages) - 1, fn ^broadcast_events_key -> :ok end)
432443

433444
conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})
434445

@@ -482,7 +493,6 @@ defmodule RealtimeWeb.BroadcastControllerTest do
482493

483494
GenCounter
484495
|> expect(:add, fn ^request_events_key -> 1 end)
485-
|> reject(:add, 1)
486496

487497
conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages})
488498

0 commit comments

Comments
 (0)