Skip to content

Commit b063ed9

Browse files
committed
fix: run migrations on tenants nearest region
1 parent ad36f07 commit b063ed9

File tree

5 files changed

+93
-14
lines changed

5 files changed

+93
-14
lines changed

lib/realtime/tenants.ex

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,11 @@ defmodule Realtime.Tenants do
9898

9999
connected_cluster when is_integer(connected_cluster) ->
100100
tenant = Cache.get_tenant_by_external_id(external_id)
101-
{:ok, db_conn} = Database.connect(tenant, "realtime_health_check")
102-
Process.alive?(db_conn) && GenServer.stop(db_conn)
103-
Migrations.run_migrations(tenant)
101+
result? = Migrations.run_migrations(tenant)
104102

105103
{:ok,
106104
%{
107-
healthy: true,
105+
healthy: result? == :ok || result? == :noop,
108106
db_connected: false,
109107
connected_cluster: connected_cluster,
110108
region: region,
@@ -475,10 +473,11 @@ defmodule Realtime.Tenants do
475473
@doc """
476474
Checks if migrations for a given tenant need to run.
477475
"""
478-
@spec run_migrations?(Tenant.t()) :: boolean()
479-
def run_migrations?(%Tenant{} = tenant) do
480-
tenant.migrations_ran < Enum.count(Migrations.migrations())
481-
end
476+
@spec run_migrations?(Tenant.t() | integer()) :: boolean()
477+
def run_migrations?(%Tenant{} = tenant), do: run_migrations?(tenant.migrations_ran)
478+
479+
def run_migrations?(migrations_ran) when is_integer(migrations_ran),
480+
do: migrations_ran < Enum.count(Migrations.migrations())
482481

483482
@doc """
484483
Broadcasts an operation event to the tenant's operations channel.

lib/realtime/tenants/migrations.ex

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ defmodule Realtime.Tenants.Migrations do
1111
alias Realtime.Repo
1212
alias Realtime.Api.Tenant
1313
alias Realtime.Api
14+
alias Realtime.Nodes
15+
alias Realtime.GenRpc
1416

1517
alias Realtime.Tenants.Migrations.{
1618
CreateRealtimeSubscriptionTable,
@@ -148,7 +150,7 @@ defmodule Realtime.Tenants.Migrations do
148150
{20_251_103_001_201, BroadcastSendIncludePayloadId}
149151
]
150152

151-
defstruct [:tenant_external_id, :settings]
153+
defstruct [:tenant_external_id, :settings, migrations_ran: 0]
152154

153155
@type t :: %__MODULE__{
154156
tenant_external_id: binary(),
@@ -161,14 +163,31 @@ defmodule Realtime.Tenants.Migrations do
161163
@spec run_migrations(Tenant.t()) :: :ok | :noop | {:error, any()}
162164
def run_migrations(%Tenant{} = tenant) do
163165
%{extensions: [%{settings: settings} | _]} = tenant
164-
attrs = %__MODULE__{tenant_external_id: tenant.external_id, settings: settings}
165166

167+
attrs = %__MODULE__{
168+
tenant_external_id: tenant.external_id,
169+
settings: settings,
170+
migrations_ran: tenant.migrations_ran
171+
}
172+
173+
region = Tenants.region(tenant)
174+
175+
node =
176+
case Nodes.node_from_region(region, tenant.external_id) do
177+
{:ok, node} -> node
178+
{:error, :not_available} -> node()
179+
end
180+
181+
GenRpc.call(node, __MODULE__, :start_migration, [attrs], tenant_id: tenant.external_id)
182+
end
183+
184+
def start_migration(attrs) do
166185
supervisor =
167-
{:via, PartitionSupervisor, {Realtime.Tenants.Migrations.DynamicSupervisor, tenant.external_id}}
186+
{:via, PartitionSupervisor, {Realtime.Tenants.Migrations.DynamicSupervisor, attrs.tenant_external_id}}
168187

169188
spec = {__MODULE__, attrs}
170189

171-
if Tenants.run_migrations?(tenant) do
190+
if Tenants.run_migrations?(attrs.migrations_ran) do
172191
case DynamicSupervisor.start_child(supervisor, spec) do
173192
:ignore -> :ok
174193
error -> error

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.68.4",
7+
version: "2.68.5",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
defmodule Realtime.Integration.RegionAwareMigrationsTest do
2+
use Realtime.DataCase, async: false
3+
use Mimic
4+
5+
alias Containers
6+
alias Realtime.Tenants
7+
alias Realtime.Tenants.Migrations
8+
9+
setup do
10+
{:ok, port} = Containers.checkout()
11+
12+
settings = [
13+
%{
14+
"type" => "postgres_cdc_rls",
15+
"settings" => %{
16+
"db_host" => "127.0.0.1",
17+
"db_name" => "postgres",
18+
"db_user" => "supabase_admin",
19+
"db_password" => "postgres",
20+
"db_port" => "#{port}",
21+
"poll_interval" => 100,
22+
"poll_max_changes" => 100,
23+
"poll_max_record_bytes" => 1_048_576,
24+
"region" => "eu-west-2",
25+
"publication" => "supabase_realtime_test",
26+
"ssl_enforced" => false
27+
}
28+
}
29+
]
30+
31+
tenant = tenant_fixture(%{extensions: settings})
32+
33+
{:ok, node} = Clustered.start(nil, extra_config: [{:realtime, :region, Tenants.region(tenant)}])
34+
35+
on_exit(fn -> Clustered.stop() end)
36+
37+
Process.sleep(1000)
38+
%{tenant: tenant, node: node}
39+
end
40+
41+
test "run_migrations routes to node in tenant's region with expected arguments", %{tenant: tenant, node: node} do
42+
assert tenant.migrations_ran == 0
43+
44+
Realtime.GenRpc
45+
|> Mimic.expect(:call, fn called_node, mod, func, args, opts ->
46+
assert called_node == node
47+
assert mod == Migrations
48+
assert func == :start_migration
49+
assert opts[:tenant_id] == tenant.external_id
50+
51+
arg = hd(args)
52+
assert arg.tenant_external_id == tenant.external_id
53+
assert arg.migrations_ran == tenant.migrations_ran
54+
assert arg.settings == hd(tenant.extensions).settings
55+
56+
call_original(Realtime.GenRpc, :call, [node, mod, func, args, opts])
57+
end)
58+
59+
assert :ok = Migrations.run_migrations(tenant)
60+
end
61+
end

test/realtime_web/controllers/tenant_controller_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ defmodule RealtimeWeb.TenantControllerTest do
419419

420420
conn = get(conn, ~p"/api/tenants/#{tenant.external_id}/health")
421421
data = json_response(conn, 200)["data"]
422-
Process.sleep(2000)
422+
Process.sleep(4000)
423423

424424
assert {:ok, %{rows: []}} = Postgrex.query(db_conn, "SELECT * FROM realtime.messages", [])
425425

0 commit comments

Comments
 (0)