Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions lib/realtime/nodes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,6 @@ defmodule Realtime.Nodes do
@spec launch_node(String.t(), String.t() | nil, atom()) :: atom()
def launch_node(tenant_id, region, default) do
case region_nodes(region) do
[node] ->
Logger.warning("Only one region node (#{inspect(node)}) for #{region} using default #{inspect(default)}")

default

[] ->
Logger.warning("Zero region nodes for #{region} using #{inspect(default)}")
default
Expand Down
15 changes: 7 additions & 8 deletions lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,11 @@ defmodule Realtime.Tenants do

connected_cluster when is_integer(connected_cluster) ->
tenant = Cache.get_tenant_by_external_id(external_id)
{:ok, db_conn} = Database.connect(tenant, "realtime_health_check")
Process.alive?(db_conn) && GenServer.stop(db_conn)
Migrations.run_migrations(tenant)
result? = Migrations.run_migrations(tenant)

{:ok,
%{
healthy: true,
healthy: result? == :ok || result? == :noop,
db_connected: false,
connected_cluster: connected_cluster,
region: region,
Expand Down Expand Up @@ -475,10 +473,11 @@ defmodule Realtime.Tenants do
@doc """
Checks if migrations for a given tenant need to run.
"""
@spec run_migrations?(Tenant.t()) :: boolean()
def run_migrations?(%Tenant{} = tenant) do
tenant.migrations_ran < Enum.count(Migrations.migrations())
end
@spec run_migrations?(Tenant.t() | integer()) :: boolean()
def run_migrations?(%Tenant{} = tenant), do: run_migrations?(tenant.migrations_ran)

def run_migrations?(migrations_ran) when is_integer(migrations_ran),
do: migrations_ran < Enum.count(Migrations.migrations())

@doc """
Broadcasts an operation event to the tenant's operations channel.
Expand Down
39 changes: 28 additions & 11 deletions lib/realtime/tenants/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ defmodule Realtime.Tenants.Migrations do
alias Realtime.Repo
alias Realtime.Api.Tenant
alias Realtime.Api
alias Realtime.Nodes
alias Realtime.GenRpc

alias Realtime.Tenants.Migrations.{
CreateRealtimeSubscriptionTable,
Expand Down Expand Up @@ -148,7 +150,7 @@ defmodule Realtime.Tenants.Migrations do
{20_251_103_001_201, BroadcastSendIncludePayloadId}
]

defstruct [:tenant_external_id, :settings]
defstruct [:tenant_external_id, :settings, migrations_ran: 0]

@type t :: %__MODULE__{
tenant_external_id: binary(),
Expand All @@ -160,24 +162,39 @@ defmodule Realtime.Tenants.Migrations do
"""
@spec run_migrations(Tenant.t()) :: :ok | :noop | {:error, any()}
def run_migrations(%Tenant{} = tenant) do
%{extensions: [%{settings: settings} | _]} = tenant
attrs = %__MODULE__{tenant_external_id: tenant.external_id, settings: settings}
if Tenants.run_migrations?(tenant) do
%{extensions: [%{settings: settings} | _]} = tenant

supervisor =
{:via, PartitionSupervisor, {Realtime.Tenants.Migrations.DynamicSupervisor, tenant.external_id}}
attrs = %__MODULE__{
tenant_external_id: tenant.external_id,
settings: settings,
migrations_ran: tenant.migrations_ran
}

spec = {__MODULE__, attrs}
node =
case Nodes.get_node_for_tenant(tenant) do
{:ok, node, _} -> node
{:error, _} -> node()
end

if Tenants.run_migrations?(tenant) do
case DynamicSupervisor.start_child(supervisor, spec) do
:ignore -> :ok
error -> error
end
GenRpc.call(node, __MODULE__, :start_migration, [attrs], tenant_id: tenant.external_id)
else
:noop
end
end

def start_migration(attrs) do
supervisor =
{:via, PartitionSupervisor, {Realtime.Tenants.Migrations.DynamicSupervisor, attrs.tenant_external_id}}

spec = {__MODULE__, attrs}

case DynamicSupervisor.start_child(supervisor, spec) do
:ignore -> :ok
error -> error
end
end

def start_link(%__MODULE__{tenant_external_id: tenant_external_id} = attrs) do
name = {:via, Registry, {Unique, {__MODULE__, :host, tenant_external_id}}}
GenServer.start_link(__MODULE__, attrs, name: name)
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.69.0",
version: "2.69.1",
elixir: "~> 1.18",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
72 changes: 72 additions & 0 deletions test/integration/region_aware_migrations_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
defmodule Realtime.Integration.RegionAwareMigrationsTest do
use Realtime.DataCase, async: false
use Mimic

alias Containers
alias Realtime.Tenants
alias Realtime.Tenants.Migrations

setup do
{:ok, port} = Containers.checkout()

settings = [
%{
"type" => "postgres_cdc_rls",
"settings" => %{
"db_host" => "127.0.0.1",
"db_name" => "postgres",
"db_user" => "supabase_admin",
"db_password" => "postgres",
"db_port" => "#{port}",
"poll_interval" => 100,
"poll_max_changes" => 100,
"poll_max_record_bytes" => 1_048_576,
"region" => "ap-southeast-2",
"publication" => "supabase_realtime_test",
"ssl_enforced" => false
}
}
]

tenant = tenant_fixture(%{extensions: settings})
region = Application.get_env(:realtime, :region)

{:ok, node} =
Clustered.start(nil,
extra_config: [
{:realtime, :region, Tenants.region(tenant)},
{:realtime, :master_region, region}
]
)

Process.sleep(100)

on_exit(fn -> Clustered.stop() end)

%{tenant: tenant, node: node}
end

test "run_migrations routes to node in tenant's region with expected arguments", %{tenant: tenant, node: node} do
assert tenant.migrations_ran == 0

Realtime.GenRpc
|> Mimic.expect(:call, fn called_node, mod, func, args, opts ->
assert called_node == node
assert mod == Migrations
assert func == :start_migration
assert opts[:tenant_id] == tenant.external_id

arg = hd(args)
assert arg.tenant_external_id == tenant.external_id
assert arg.migrations_ran == tenant.migrations_ran
assert arg.settings == hd(tenant.extensions).settings

call_original(Realtime.GenRpc, :call, [node, mod, func, args, opts])
end)

assert :ok = Migrations.run_migrations(tenant)
Process.sleep(1000)
tenant = Realtime.Repo.reload!(tenant)
refute tenant.migrations_ran == 0
end
end
4 changes: 2 additions & 2 deletions test/realtime/nodes_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ defmodule Realtime.NodesTest do
assert region == expected_region
end

test "on existing tenant id, and a single node for a given region, returns default", %{
test "on existing tenant id, and a single node for a given region, returns single node", %{
tenant: tenant,
region: region
} do
Expand All @@ -117,7 +117,7 @@ defmodule Realtime.NodesTest do

expected_region = Tenants.region(tenant)

assert node == node()
assert node != node()
assert region == expected_region
end

Expand Down
2 changes: 1 addition & 1 deletion test/realtime_web/controllers/tenant_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ defmodule RealtimeWeb.TenantControllerTest do

conn = get(conn, ~p"/api/tenants/#{tenant.external_id}/health")
data = json_response(conn, 200)["data"]
Process.sleep(2000)
Process.sleep(1000)

assert {:ok, %{rows: []}} = Postgrex.query(db_conn, "SELECT * FROM realtime.messages", [])

Expand Down
Loading