Skip to content

Commit 18519f0

Browse files
committed
fix: run migrations on tenants nearest region
1 parent 05df771 commit 18519f0

File tree

7 files changed

+109
-28
lines changed

7 files changed

+109
-28
lines changed

lib/realtime/nodes.ex

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,6 @@ defmodule Realtime.Nodes do
9494
@spec launch_node(String.t(), String.t() | nil, atom()) :: atom()
9595
def launch_node(tenant_id, region, default) do
9696
case region_nodes(region) do
97-
[node] ->
98-
Logger.warning("Only one region node (#{inspect(node)}) for #{region} using default #{inspect(default)}")
99-
100-
default
101-
10297
[] ->
10398
Logger.warning("Zero region nodes for #{region} using #{inspect(default)}")
10499
default

lib/realtime/tenants.ex

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,11 @@ defmodule Realtime.Tenants do
9898

9999
connected_cluster when is_integer(connected_cluster) ->
100100
tenant = Cache.get_tenant_by_external_id(external_id)
101-
{:ok, db_conn} = Database.connect(tenant, "realtime_health_check")
102-
Process.alive?(db_conn) && GenServer.stop(db_conn)
103-
Migrations.run_migrations(tenant)
101+
result? = Migrations.run_migrations(tenant)
104102

105103
{:ok,
106104
%{
107-
healthy: true,
105+
healthy: result? == :ok || result? == :noop,
108106
db_connected: false,
109107
connected_cluster: connected_cluster,
110108
region: region,
@@ -475,10 +473,11 @@ defmodule Realtime.Tenants do
475473
@doc """
476474
Checks if migrations for a given tenant need to run.
477475
"""
478-
@spec run_migrations?(Tenant.t()) :: boolean()
479-
def run_migrations?(%Tenant{} = tenant) do
480-
tenant.migrations_ran < Enum.count(Migrations.migrations())
481-
end
476+
@spec run_migrations?(Tenant.t() | integer()) :: boolean()
477+
def run_migrations?(%Tenant{} = tenant), do: run_migrations?(tenant.migrations_ran)
478+
479+
def run_migrations?(migrations_ran) when is_integer(migrations_ran),
480+
do: migrations_ran < Enum.count(Migrations.migrations())
482481

483482
@doc """
484483
Broadcasts an operation event to the tenant's operations channel.

lib/realtime/tenants/migrations.ex

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ defmodule Realtime.Tenants.Migrations do
1111
alias Realtime.Repo
1212
alias Realtime.Api.Tenant
1313
alias Realtime.Api
14+
alias Realtime.Nodes
15+
alias Realtime.GenRpc
1416

1517
alias Realtime.Tenants.Migrations.{
1618
CreateRealtimeSubscriptionTable,
@@ -148,7 +150,7 @@ defmodule Realtime.Tenants.Migrations do
148150
{20_251_103_001_201, BroadcastSendIncludePayloadId}
149151
]
150152

151-
defstruct [:tenant_external_id, :settings]
153+
defstruct [:tenant_external_id, :settings, migrations_ran: 0]
152154

153155
@type t :: %__MODULE__{
154156
tenant_external_id: binary(),
@@ -160,24 +162,39 @@ defmodule Realtime.Tenants.Migrations do
160162
"""
161163
@spec run_migrations(Tenant.t()) :: :ok | :noop | {:error, any()}
162164
def run_migrations(%Tenant{} = tenant) do
163-
%{extensions: [%{settings: settings} | _]} = tenant
164-
attrs = %__MODULE__{tenant_external_id: tenant.external_id, settings: settings}
165+
if Tenants.run_migrations?(tenant) do
166+
%{extensions: [%{settings: settings} | _]} = tenant
165167

166-
supervisor =
167-
{:via, PartitionSupervisor, {Realtime.Tenants.Migrations.DynamicSupervisor, tenant.external_id}}
168+
attrs = %__MODULE__{
169+
tenant_external_id: tenant.external_id,
170+
settings: settings,
171+
migrations_ran: tenant.migrations_ran
172+
}
168173

169-
spec = {__MODULE__, attrs}
174+
node =
175+
case Nodes.get_node_for_tenant(tenant) do
176+
{:ok, node, _} -> node
177+
{:error, _} -> node()
178+
end
170179

171-
if Tenants.run_migrations?(tenant) do
172-
case DynamicSupervisor.start_child(supervisor, spec) do
173-
:ignore -> :ok
174-
error -> error
175-
end
180+
GenRpc.call(node, __MODULE__, :start_migration, [attrs], tenant_id: tenant.external_id)
176181
else
177182
:noop
178183
end
179184
end
180185

186+
def start_migration(attrs) do
187+
supervisor =
188+
{:via, PartitionSupervisor, {Realtime.Tenants.Migrations.DynamicSupervisor, attrs.tenant_external_id}}
189+
190+
spec = {__MODULE__, attrs}
191+
192+
case DynamicSupervisor.start_child(supervisor, spec) do
193+
:ignore -> :ok
194+
error -> error
195+
end
196+
end
197+
181198
def start_link(%__MODULE__{tenant_external_id: tenant_external_id} = attrs) do
182199
name = {:via, Registry, {Unique, {__MODULE__, :host, tenant_external_id}}}
183200
GenServer.start_link(__MODULE__, attrs, name: name)

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.69.0",
7+
version: "2.69.1",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
defmodule Realtime.Integration.RegionAwareMigrationsTest do
2+
use Realtime.DataCase, async: false
3+
use Mimic
4+
5+
alias Containers
6+
alias Realtime.Tenants
7+
alias Realtime.Tenants.Migrations
8+
9+
setup do
10+
{:ok, port} = Containers.checkout()
11+
12+
settings = [
13+
%{
14+
"type" => "postgres_cdc_rls",
15+
"settings" => %{
16+
"db_host" => "127.0.0.1",
17+
"db_name" => "postgres",
18+
"db_user" => "supabase_admin",
19+
"db_password" => "postgres",
20+
"db_port" => "#{port}",
21+
"poll_interval" => 100,
22+
"poll_max_changes" => 100,
23+
"poll_max_record_bytes" => 1_048_576,
24+
"region" => "ap-southeast-2",
25+
"publication" => "supabase_realtime_test",
26+
"ssl_enforced" => false
27+
}
28+
}
29+
]
30+
31+
tenant = tenant_fixture(%{extensions: settings})
32+
region = Application.get_env(:realtime, :region)
33+
34+
{:ok, node} =
35+
Clustered.start(nil,
36+
extra_config: [
37+
{:realtime, :region, Tenants.region(tenant)},
38+
{:realtime, :master_region, region}
39+
]
40+
)
41+
42+
on_exit(fn -> Clustered.stop() end)
43+
44+
%{tenant: tenant, node: node}
45+
end
46+
47+
test "run_migrations routes to node in tenant's region with expected arguments", %{tenant: tenant, node: node} do
48+
assert tenant.migrations_ran == 0
49+
50+
Realtime.GenRpc
51+
|> Mimic.expect(:call, fn called_node, mod, func, args, opts ->
52+
assert called_node == node
53+
assert mod == Migrations
54+
assert func == :start_migration
55+
assert opts[:tenant_id] == tenant.external_id
56+
57+
arg = hd(args)
58+
assert arg.tenant_external_id == tenant.external_id
59+
assert arg.migrations_ran == tenant.migrations_ran
60+
assert arg.settings == hd(tenant.extensions).settings
61+
62+
call_original(Realtime.GenRpc, :call, [node, mod, func, args, opts])
63+
end)
64+
65+
assert :ok = Migrations.run_migrations(tenant)
66+
Process.sleep(1000)
67+
tenant = Realtime.Repo.reload!(tenant)
68+
refute tenant.migrations_ran == 0
69+
end
70+
end

test/realtime/nodes_test.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ defmodule Realtime.NodesTest do
108108
assert region == expected_region
109109
end
110110

111-
test "on existing tenant id, and a single node for a given region, returns default", %{
111+
test "on existing tenant id, and a single node for a given region, returns single node", %{
112112
tenant: tenant,
113113
region: region
114114
} do
@@ -117,7 +117,7 @@ defmodule Realtime.NodesTest do
117117

118118
expected_region = Tenants.region(tenant)
119119

120-
assert node == node()
120+
assert node != node()
121121
assert region == expected_region
122122
end
123123

test/realtime_web/controllers/tenant_controller_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ defmodule RealtimeWeb.TenantControllerTest do
419419

420420
conn = get(conn, ~p"/api/tenants/#{tenant.external_id}/health")
421421
data = json_response(conn, 200)["data"]
422-
Process.sleep(2000)
422+
Process.sleep(1000)
423423

424424
assert {:ok, %{rows: []}} = Postgrex.query(db_conn, "SELECT * FROM realtime.messages", [])
425425

0 commit comments

Comments
 (0)