Skip to content

Commit 352e375

Browse files
h0lybytefilipecabacoedgurgelkevcodezFudster
authored
🔄 Sync with upstream changes (#13)
* fix: runtime setup error (supabase#1520) * fix: use primary instead of replica on rename_settings_field (supabase#1521) * feat: upgrade cowboy & ranch (supabase#1523) * fix: Fix GenRpc to not try to connect to nodes that are not alive (supabase#1525) * fix: enable presence on track message (supabase#1527) currently the user would need to have enabled from the beginning of the channel. this will enable users to enable presence later in the flow by sending a track message which will enable presence messages for them * fix: set cowboy active_n=100 as cowboy 2.12.0 (supabase#1530) cowboy 2.13.0 set the default active_n=1 * fix: provide error_code metadata on RealtimeChannel.Logging (supabase#1531) * feat: disable UTF8 validation on websocket frames (supabase#1532) Currently all text frames as handled only with JSON which already requires UTF-8 * fix: move DB setup to happen after Connect.init (supabase#1533) This change reduces the impact of slow DB setup impacting other tenants trying to connect at the same time that landed on the same partition * fix: handle wal bloat (supabase#1528) Verify that replication connection is able to reconnect when faced with WAL bloat issues * feat: replay realtime.messages (supabase#1526) A new index was created on inserted_at DESC, topic WHERE private IS TRUE AND extension = "broadast" The hardcoded limit is 25 for now. * feat: gen_rpc pub sub adapter (supabase#1529) Add a PubSub adapter that uses gen_rpc to send messages to other nodes. It uses :gen_rpc.abcast/3 instead of :erlang.send/2 The adapter works very similarly to the PG2 adapter. It consists of multiple workers that forward to the local node using PubSub.local_broadcast. The way to choose the worker to be used is based on the sending process just like PG2 adapter does The number of workers is controlled by `:pool_size` or `:broadcast_pool_size`. This distinction exists because Phoenix.PubSub uses `:pool_size` to define how many partitions the PubSub registry will use. It's possible to control them separately by using `:broadcast_pool_size` * fix: ensure message id doesn't raise on non-map payloads (supabase#1534) * fix: match error on Connect (supabase#1536) --------- Co-authored-by: Eduardo Gurgel Pinho <eduardo.gurgel@supabase.io> * feat: websocket max heap size configuration (supabase#1538) * fix: set max process heap size to 500MB instead of 8GB * feat: set websocket transport max heap size WEBSOCKET_MAX_HEAP_SIZE can be used to configure it * fix: update gen_rpc to fix gen_rpc_dispatcher issues (supabase#1537) Issues: * Single gen_rpc_dispatcher that can be a bottleneck if the connecting takes some time * Many calls can land on the dispatcher but the node might be gone already. If we don't validate the node it might keep trying to connect until it times out instead of quickly giving up due to not being an actively connected node. * fix: improve ErlSysMon logging for processes (supabase#1540) Include initial_call, ancestors, registered_name, message_queue_len and total_heap_size Also bump long_schedule and long_gc * fix: make pubsub adapter configurable (supabase#1539) * fix: specify that only private channels are allowed when replaying (supabase#1543) messages * fix: rate limit connect module (supabase#1541) On bad connection, we rate limit the Connect module so we prevent abuses and too much logging of errors * build: automatically cancel old tests/build on new push (supabase#1545) Currently, whenever you push any commit to your branch, the old builds are still running and a new build is started. Once a new commit is added, the old test results no longer matter and it's just a waste of CI resources. Also reduces confusion with multiple builds running in parallel for the same branch/possibly blocking any merges. With this little change, we ensure that whenever a new commit is added, the previous build is immediately canceled/stopped and only the build (latest commit) runs. * fix: move message queue data to off-heap for gen_rpc pub sub workers (supabase#1548) * fix: rate limit Connect.lookup_or_start_connection on error only (supabase#1549) * fix: increase connect error rate window to 30 seconds (supabase#1550) * fix: set a lower fullsweep_after flag for GenRpcPubSub workers (supabase#1551) * fix: hardcode presence limit (supabase#1552) * fix: further decrease limit on presence events (supabase#1553) * fix: bump up realtime (supabase#1554) * fix: lower rate limit to 100 events per second (supabase#1556) * fix: move connect rate limit to socket (supabase#1555) * fix: reduce max_frame_size to 5MB * fix: fullsweep_after=100 on gen rpc pub sub workers --------- Co-authored-by: Eduardo Gurgel Pinho <eduardo.gurgel@supabase.io> * fix: collect global metrics without tenant tagging (supabase#1557) * feat: presence payload size (supabase#1559) * Also tweak buckets to account all the way to 3000KB * Start tagging the payload size metrics with message_type. message_type can be presence, broadcast or postgres_changes * fix: use GenRpc for Realtime.Latency pings (supabase#1560) * Fastlane for phoenix presence_diff (supabase#1558) It uses a fork of Phoenix for time being * fix: count presence_diff events on MessageDispatcher * fix: remove traces from console during development --------- Co-authored-by: Filipe Cabaço <filipe@supabase.io> Co-authored-by: Eduardo Gurgel <eduardo.gurgel@supabase.io> Co-authored-by: Kevin Grüneberg <k.grueneberg1994@gmail.com> Co-authored-by: Bradley Haljendi <5642609+Fudster@users.noreply.github.com>
1 parent 28576e7 commit 352e375

34 files changed

+523
-144
lines changed

.github/workflows/tests.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ on:
1616
branches:
1717
- main
1818

19+
concurrency:
20+
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
21+
cancel-in-progress: true
22+
1923
jobs:
2024
tests:
2125
name: Tests

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ PORT ?= 4000
99
# Common commands
1010

1111
dev: ## Start a dev server
12-
ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=$(PORT) MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev REGION=fra DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5369 GEN_RPC_TCP_CLIENT_PORT=5469 iex --name $(NODE_NAME)@127.0.0.1 --cookie cookie -S mix phx.server
12+
ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=$(PORT) MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev REGION=us-east-1 DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5369 GEN_RPC_TCP_CLIENT_PORT=5469 iex --name $(NODE_NAME)@127.0.0.1 --cookie cookie -S mix phx.server
1313

1414
dev.orange: ## Start another dev server (orange) on port 4001
15-
ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=4001 MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5469 GEN_RPC_TCP_CLIENT_PORT=5369 iex --name orange@127.0.0.1 --cookie cookie -S mix phx.server
15+
ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=4001 MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev REGION=eu-west-1 DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" GEN_RPC_TCP_SERVER_PORT=5469 GEN_RPC_TCP_CLIENT_PORT=5369 iex --name orange@127.0.0.1 --cookie cookie -S mix phx.server
1616

1717
seed: ## Seed the database
1818
DB_ENC_KEY="1234567890123456" FLY_ALLOC_ID=123e4567-e89b-12d3-a456-426614174000 mix run priv/repo/dev_seeds.exs

config/dev.exs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ config :phoenix, :plug_init_mode, :runtime
9797
# Disable caching to ensure the rendered spec is refreshed
9898
config :open_api_spex, :cache_adapter, OpenApiSpex.Plug.NoneCache
9999

100-
config :opentelemetry, traces_exporter: {:otel_exporter_stdout, []}
100+
# Disabled but can print to stdout with:
101+
# config :opentelemetry, traces_exporter: {:otel_exporter_stdout, []}
102+
config :opentelemetry, traces_exporter: :none
101103

102104
config :mix_test_watch, clear: true

config/runtime.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ janitor_children_timeout = Env.get_integer("JANITOR_CHILDREN_TIMEOUT", :timer.se
6868
janitor_schedule_timer = Env.get_integer("JANITOR_SCHEDULE_TIMER_IN_MS", :timer.hours(4))
6969
platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws, else: :fly
7070
broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10)
71-
pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "pg2") |> String.to_atom()
71+
pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "gen_rpc") |> String.to_atom()
7272
websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize))
7373

7474
no_channel_timeout_in_ms =

lib/extensions/postgres_cdc_rls/replication_poller.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
183183
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
184184
topic = "realtime:postgres:" <> tenant_id
185185

186-
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher)
186+
RealtimeWeb.TenantBroadcaster.pubsub_broadcast(tenant_id, topic, change, MessageDispatcher, :postgres_changes)
187187
end
188188

189189
{:ok, rows_count}

lib/realtime/gen_rpc/pub_sub.ex

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,11 @@ defmodule Realtime.GenRpcPubSub.Worker do
6565
def start_link({pubsub, worker}), do: GenServer.start_link(__MODULE__, pubsub, name: worker)
6666

6767
@impl true
68-
def init(pubsub), do: {:ok, pubsub}
68+
def init(pubsub) do
69+
Process.flag(:message_queue_data, :off_heap)
70+
Process.flag(:fullsweep_after, 100)
71+
{:ok, pubsub}
72+
end
6973

7074
@impl true
7175
def handle_info({:ftl, topic, message, dispatcher}, pubsub) do

lib/realtime/monitoring/latency.ex

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ defmodule Realtime.Latency do
77
use Realtime.Logs
88

99
alias Realtime.Nodes
10-
alias Realtime.Rpc
10+
alias Realtime.GenRpc
1111

1212
defmodule Payload do
1313
@moduledoc false
@@ -33,7 +33,7 @@ defmodule Realtime.Latency do
3333
}
3434
end
3535

36-
@every 5_000
36+
@every 15_000
3737
def start_link(args) do
3838
GenServer.start_link(__MODULE__, args, name: __MODULE__)
3939
end
@@ -76,7 +76,7 @@ defmodule Realtime.Latency do
7676
Task.Supervisor.async(Realtime.TaskSupervisor, fn ->
7777
{latency, response} =
7878
:timer.tc(fn ->
79-
Rpc.call(n, __MODULE__, :pong, [pong_timeout], timeout: timer_timeout)
79+
GenRpc.call(n, __MODULE__, :pong, [pong_timeout], timeout: timer_timeout)
8080
end)
8181

8282
latency_ms = latency / 1_000
@@ -85,7 +85,7 @@ defmodule Realtime.Latency do
8585
from_node = Nodes.short_node_id_from_name(Node.self())
8686

8787
case response do
88-
{:badrpc, reason} ->
88+
{:error, :rpc_error, reason} ->
8989
log_error(
9090
"RealtimeNodeDisconnected",
9191
"Unable to connect to #{short_name} from #{region}: #{reason}"

lib/realtime/monitoring/prom_ex/plugins/tenant.ex

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,20 +36,21 @@ defmodule Realtime.PromEx.Plugins.Tenant do
3636
event_name: [:realtime, :tenants, :payload, :size],
3737
measurement: :size,
3838
description: "Tenant payload size",
39-
tags: [:tenant],
39+
tags: [:tenant, :message_type],
4040
unit: :byte,
4141
reporter_options: [
42-
buckets: [100, 250, 500, 1000, 2000, 3000, 5000, 10_000, 25_000]
42+
buckets: [250, 500, 1000, 3000, 5000, 10_000, 25_000, 100_000, 500_000, 1_000_000, 3_000_000]
4343
]
4444
),
4545
distribution(
4646
[:realtime, :payload, :size],
4747
event_name: [:realtime, :tenants, :payload, :size],
4848
measurement: :size,
4949
description: "Payload size",
50+
tags: [:message_type],
5051
unit: :byte,
5152
reporter_options: [
52-
buckets: [100, 250, 500, 1000, 2000, 3000, 5000, 10_000, 25_000]
53+
buckets: [250, 500, 1000, 3000, 5000, 10_000, 25_000, 100_000, 500_000, 1_000_000, 3_000_000]
5354
]
5455
)
5556
]
@@ -157,20 +158,38 @@ defmodule Realtime.PromEx.Plugins.Tenant do
157158
description: "Sum of messages sent on a Realtime Channel.",
158159
tags: [:tenant]
159160
),
161+
sum(
162+
[:realtime, :channel, :global, :events],
163+
event_name: [:realtime, :rate_counter, :channel, :events],
164+
measurement: :sum,
165+
description: "Global sum of messages sent on a Realtime Channel."
166+
),
160167
sum(
161168
[:realtime, :channel, :presence_events],
162169
event_name: [:realtime, :rate_counter, :channel, :presence_events],
163170
measurement: :sum,
164171
description: "Sum of presence messages sent on a Realtime Channel.",
165172
tags: [:tenant]
166173
),
174+
sum(
175+
[:realtime, :channel, :global, :presence_events],
176+
event_name: [:realtime, :rate_counter, :channel, :presence_events],
177+
measurement: :sum,
178+
description: "Global sum of presence messages sent on a Realtime Channel."
179+
),
167180
sum(
168181
[:realtime, :channel, :db_events],
169182
event_name: [:realtime, :rate_counter, :channel, :db_events],
170183
measurement: :sum,
171184
description: "Sum of db messages sent on a Realtime Channel.",
172185
tags: [:tenant]
173186
),
187+
sum(
188+
[:realtime, :channel, :global, :db_events],
189+
event_name: [:realtime, :rate_counter, :channel, :db_events],
190+
measurement: :sum,
191+
description: "Global sum of db messages sent on a Realtime Channel."
192+
),
174193
sum(
175194
[:realtime, :channel, :joins],
176195
event_name: [:realtime, :rate_counter, :channel, :joins],

lib/realtime/monitoring/prom_ex/plugins/tenants.ex

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,15 @@ defmodule Realtime.PromEx.Plugins.Tenants do
2121
unit: {:microsecond, :millisecond},
2222
tags: [:success, :tenant, :mechanism],
2323
reporter_options: [buckets: [10, 250, 5000, 15_000]]
24+
),
25+
distribution(
26+
[:realtime, :global, :rpc],
27+
event_name: [:realtime, :rpc],
28+
description: "Global Latency of rpc calls",
29+
measurement: :latency,
30+
unit: {:microsecond, :millisecond},
31+
tags: [:success, :mechanism],
32+
reporter_options: [buckets: [10, 250, 5000, 15_000]]
2433
)
2534
])
2635
end

lib/realtime/tenants.ex

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -328,18 +328,18 @@ defmodule Realtime.Tenants do
328328
%RateCounter.Args{id: {:channel, :authorization_errors, external_id}, opts: opts}
329329
end
330330

331-
@connect_per_second_default 10
331+
@connect_errors_per_second_default 10
332332
@doc "RateCounter arguments for counting connect per second."
333-
@spec connect_per_second_rate(Tenant.t() | String.t()) :: RateCounter.Args.t()
334-
def connect_per_second_rate(%Tenant{external_id: external_id}) do
335-
connect_per_second_rate(external_id)
333+
@spec connect_errors_per_second_rate(Tenant.t() | String.t()) :: RateCounter.Args.t()
334+
def connect_errors_per_second_rate(%Tenant{external_id: external_id}) do
335+
connect_errors_per_second_rate(external_id)
336336
end
337337

338-
def connect_per_second_rate(tenant_id) do
338+
def connect_errors_per_second_rate(tenant_id) do
339339
opts = [
340-
max_bucket_len: 10,
340+
max_bucket_len: 30,
341341
limit: [
342-
value: @connect_per_second_default,
342+
value: @connect_errors_per_second_default,
343343
measurement: :sum,
344344
log_fn: fn ->
345345
Logger.critical(

0 commit comments

Comments
 (0)