Skip to content

Commit e2275cd

Browse files
authored
fix: Selectively run migrations (#1353)
* Prevents migrations from running if the tenant no longer requires. * Setup Partition size for Connect module * Reduce test flakiness and usage of `dev_tenant` in integration tests
1 parent 5ddfbc5 commit e2275cd

37 files changed

+835
-745
lines changed

config/runtime.exs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ slot_name_suffix = System.get_env("SLOT_NAME_SUFFIX")
1515
migration_partition_slots =
1616
System.get_env("MIGRATION_PARTITION_SLOTS", "#{System.schedulers_online() * 2}") |> String.to_integer()
1717

18+
connect_partition_slots =
19+
System.get_env("CONNECT_PARTITION_SLOTS", "#{System.schedulers_online() * 2}") |> String.to_integer()
20+
1821
if !(db_version in [nil, "ipv6", "ipv4"]),
1922
do: raise("Invalid IP version, please set either ipv6 or ipv4")
2023

@@ -35,6 +38,7 @@ socket_options =
3538

3639
config :realtime,
3740
migration_partition_slots: migration_partition_slots,
41+
connect_partition_slots: connect_partition_slots,
3842
tenant_max_bytes_per_second: System.get_env("TENANT_MAX_BYTES_PER_SECOND", "100000") |> String.to_integer(),
3943
tenant_max_channels_per_client: System.get_env("TENANT_MAX_CHANNELS_PER_CLIENT", "100") |> String.to_integer(),
4044
tenant_max_concurrent_users: System.get_env("TENANT_MAX_CONCURRENT_USERS", "200") |> String.to_integer(),

config/test.exs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,10 @@ config :realtime,
3535
secure_channels: true,
3636
db_enc_key: "1234567890123456",
3737
jwt_claim_validators: System.get_env("JWT_CLAIM_VALIDATORS", "{}"),
38-
api_jwt_secret: System.get_env("API_JWT_SECRET"),
38+
api_jwt_secret: System.get_env("API_JWT_SECRET", "secret"),
3939
metrics_jwt_secret: "test",
4040
prom_poll_rate: 5_000
4141

42-
config :joken,
43-
current_time_adapter: RealtimeWeb.Joken.CurrentTime.Mock
44-
4542
# Print only errors during test
4643
config :logger,
4744
compile_time_purge_matching: [[module: Postgrex], [module: DBConnection]],

lib/realtime/api/tenant.ex

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ defmodule Realtime.Api.Tenant do
2626
field(:events_per_second_rolling, :float, virtual: true)
2727
field(:events_per_second_now, :integer, virtual: true)
2828
field(:private_only, :boolean, default: false)
29+
field(:migrations_ran, :integer, default: 0)
2930

3031
has_many(:extensions, Realtime.Api.Extensions,
3132
foreign_key: :tenant_external_id,
@@ -73,7 +74,8 @@ defmodule Realtime.Api.Tenant do
7374
:max_channels_per_client,
7475
:max_joins_per_second,
7576
:suspend,
76-
:private_only
77+
:private_only,
78+
:migrations_ran
7779
])
7880
|> validate_required([
7981
:external_id,

lib/realtime/application.ex

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ defmodule Realtime.Application do
5151
region = Application.get_env(:realtime, :region)
5252
:syn.join(RegionNodes, region, self(), node: node())
5353
migration_partition_slots = Application.get_env(:realtime, :migration_partition_slots)
54+
connect_partition_slots = Application.get_env(:realtime, :connect_partition_slots)
5455

5556
children =
5657
[
@@ -75,7 +76,10 @@ defmodule Realtime.Application do
7576
name: Realtime.Tenants.Migrations.DynamicSupervisor,
7677
partitions: migration_partition_slots},
7778
{PartitionSupervisor,
78-
child_spec: DynamicSupervisor, strategy: :one_for_one, name: Realtime.Tenants.Connect.DynamicSupervisor},
79+
child_spec: DynamicSupervisor,
80+
strategy: :one_for_one,
81+
name: Realtime.Tenants.Connect.DynamicSupervisor,
82+
partitions: connect_partition_slots},
7983
{PartitionSupervisor,
8084
child_spec: DynamicSupervisor,
8185
strategy: :one_for_one,

lib/realtime/tenants.ex

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,27 @@ defmodule Realtime.Tenants do
295295
|> tap(fn _ -> Cache.distributed_invalidate_tenant_cache(external_id) end)
296296
end
297297

298+
@doc """
299+
Checks if migrations for a given tenant need to run.
300+
"""
301+
@spec run_migrations?(binary()) :: boolean()
302+
def run_migrations?(external_id) do
303+
tenant = Cache.get_tenant_by_external_id(external_id)
304+
tenant.migrations_ran < Enum.count(Migrations.migrations())
305+
end
306+
307+
@doc """
308+
Updates the migrations_ran field for a tenant.
309+
"""
310+
@spec update_migrations_ran(binary(), integer()) :: {:ok, Tenant.t()} | {:error, term()}
311+
def update_migrations_ran(external_id, count) do
312+
external_id
313+
|> Cache.get_tenant_by_external_id()
314+
|> Tenant.changeset(%{migrations_ran: count})
315+
|> Repo.update!()
316+
|> tap(fn _ -> Cache.distributed_invalidate_tenant_cache(external_id) end)
317+
end
318+
298319
defp broadcast_operation_event(action, external_id),
299320
do: Phoenix.PubSub.broadcast!(Realtime.PubSub, "realtime:operations:" <> external_id, action)
300321
end

lib/realtime/tenants/connect.ex

Lines changed: 37 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,13 @@ defmodule Realtime.Tenants.Connect do
125125
end
126126

127127
@doc """
128-
Returns the pid of the tenant Connection process
128+
Returns the pid of the tenant Connection process and db_conn pid
129129
"""
130-
@spec whereis(binary()) :: pid | nil
130+
@spec whereis(binary()) :: pid() | nil
131131
def whereis(tenant_id) do
132132
case :syn.lookup(__MODULE__, tenant_id) do
133-
{pid, _} -> pid
134-
:undefined -> nil
133+
{pid, _} when is_pid(pid) -> pid
134+
_ -> nil
135135
end
136136
end
137137

@@ -141,8 +141,12 @@ defmodule Realtime.Tenants.Connect do
141141
@spec shutdown(binary()) :: :ok | nil
142142
def shutdown(tenant_id) do
143143
case whereis(tenant_id) do
144-
pid when is_pid(pid) -> GenServer.stop(pid)
145-
_ -> :ok
144+
pid when is_pid(pid) ->
145+
send(pid, :shutdown_connect)
146+
:ok
147+
148+
_ ->
149+
:ok
146150
end
147151
end
148152

@@ -196,7 +200,7 @@ defmodule Realtime.Tenants.Connect do
196200
def handle_continue(:run_migrations, state) do
197201
%{tenant: tenant, db_conn_pid: db_conn_pid} = state
198202

199-
with :ok <- Migrations.run_migrations(tenant),
203+
with res when res in [:ok, :noop] <- Migrations.run_migrations(tenant),
200204
:ok <- Migrations.create_partitions(db_conn_pid) do
201205
{:noreply, state, {:continue, :start_listen_and_replication}}
202206
else
@@ -273,42 +277,19 @@ defmodule Realtime.Tenants.Connect do
273277
{:noreply, %{state | connected_users_bucket: connected_users_bucket}}
274278
end
275279

276-
def handle_info(:shutdown, state) do
277-
%{
278-
db_conn_pid: db_conn_pid,
279-
replication_connection_pid: replication_connection_pid,
280-
listen_pid: listen_pid
281-
} = state
282-
280+
def handle_info(:shutdown_no_connected_users, state) do
283281
Logger.info("Tenant has no connected users, database connection will be terminated")
284-
:ok = GenServer.stop(db_conn_pid, :normal, 500)
285-
286-
replication_connection_pid && Process.alive?(replication_connection_pid) &&
287-
GenServer.stop(replication_connection_pid, :normal, 500)
288-
289-
listen_pid && Process.alive?(listen_pid) &&
290-
GenServer.stop(listen_pid, :normal, 500)
291-
292-
{:stop, :normal, state}
282+
shutdown_connect_process(state)
293283
end
294284

295285
def handle_info(:suspend_tenant, state) do
296-
%{
297-
db_conn_pid: db_conn_pid,
298-
replication_connection_pid: replication_connection_pid,
299-
listen_pid: listen_pid
300-
} = state
301-
302286
Logger.warning("Tenant was suspended, database connection will be terminated")
303-
:ok = GenServer.stop(db_conn_pid, :normal, 500)
304-
305-
replication_connection_pid && Process.alive?(replication_connection_pid) &&
306-
GenServer.stop(replication_connection_pid, :normal, 500)
307-
308-
listen_pid && Process.alive?(listen_pid) &&
309-
GenServer.stop(listen_pid, :normal, 500)
287+
shutdown_connect_process(state)
288+
end
310289

311-
{:stop, :normal, state}
290+
def handle_info(:shutdown_connect, state) do
291+
Logger.warning("Shutdowning tenant connection")
292+
shutdown_connect_process(state)
312293
end
313294

314295
# Handle database connection termination
@@ -371,7 +352,7 @@ defmodule Realtime.Tenants.Connect do
371352
@connected_users_bucket_shutdown,
372353
check_connected_user_interval
373354
) do
374-
Process.send_after(self(), :shutdown, check_connected_user_interval)
355+
Process.send_after(self(), :shutdown_no_connected_users, check_connected_user_interval)
375356
end
376357

377358
defp send_connected_user_check_message(connected_users_bucket, check_connected_user_interval) do
@@ -381,4 +362,22 @@ defmodule Realtime.Tenants.Connect do
381362

382363
defp tenant_suspended?(%Tenant{suspend: true}), do: {:error, :tenant_suspended}
383364
defp tenant_suspended?(_), do: :ok
365+
366+
defp shutdown_connect_process(state) do
367+
%{
368+
db_conn_pid: db_conn_pid,
369+
replication_connection_pid: replication_connection_pid,
370+
listen_pid: listen_pid
371+
} = state
372+
373+
:ok = GenServer.stop(db_conn_pid, :shutdown, 500)
374+
375+
replication_connection_pid && Process.alive?(replication_connection_pid) &&
376+
GenServer.stop(replication_connection_pid, :normal, 500)
377+
378+
listen_pid && Process.alive?(listen_pid) &&
379+
GenServer.stop(listen_pid, :normal, 500)
380+
381+
{:stop, :normal, state}
382+
end
384383
end

lib/realtime/tenants/connect/check_connection.ex

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,13 @@ defmodule Realtime.Tenants.Connect.CheckConnection do
1010
%{tenant: tenant} = acc
1111

1212
case Database.check_tenant_connection(tenant) do
13-
{:ok, conn} -> {:ok, %{acc | db_conn_pid: conn, db_conn_reference: Process.monitor(conn)}}
14-
{:error, error} -> {:error, error}
13+
{:ok, conn} ->
14+
Process.link(conn)
15+
db_conn_reference = Process.monitor(conn)
16+
{:ok, %{acc | db_conn_pid: conn, db_conn_reference: db_conn_reference}}
17+
18+
{:error, error} ->
19+
{:error, error}
1520
end
1621
end
1722
end

lib/realtime/tenants/migrations.ex

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ defmodule Realtime.Tenants.Migrations do
88

99
import Realtime.Logs
1010

11+
alias Realtime.Tenants
1112
alias Realtime.Database
1213
alias Realtime.Registry.Unique
1314
alias Realtime.Repo
@@ -149,7 +150,7 @@ defmodule Realtime.Tenants.Migrations do
149150
@doc """
150151
Run migrations for the given tenant.
151152
"""
152-
@spec run_migrations(Tenant.t()) :: :ok | {:error, any()}
153+
@spec run_migrations(Tenant.t()) :: :ok | :noop | {:error, any()}
153154
def run_migrations(%Tenant{} = tenant) do
154155
%{extensions: [%{settings: settings} | _]} = tenant
155156
attrs = %__MODULE__{tenant_external_id: tenant.external_id, settings: settings}
@@ -159,9 +160,13 @@ defmodule Realtime.Tenants.Migrations do
159160

160161
spec = {__MODULE__, attrs}
161162

162-
case DynamicSupervisor.start_child(supervisor, spec) do
163-
:ignore -> :ok
164-
error -> error
163+
if Tenants.run_migrations?(tenant.external_id) do
164+
case DynamicSupervisor.start_child(supervisor, spec) do
165+
:ignore -> :ok
166+
error -> error
167+
end
168+
else
169+
:noop
165170
end
166171
end
167172

@@ -174,8 +179,12 @@ defmodule Realtime.Tenants.Migrations do
174179
Logger.metadata(external_id: tenant_external_id, project: tenant_external_id)
175180

176181
case migrate(settings) do
177-
{:ok, _} -> :ignore
178-
{:error, error} -> {:stop, error}
182+
:ok ->
183+
Tenants.update_migrations_ran(tenant_external_id, Enum.count(@migrations))
184+
:ignore
185+
186+
{:error, error} ->
187+
{:stop, error}
179188
end
180189
end
181190

@@ -199,9 +208,9 @@ defmodule Realtime.Tenants.Migrations do
199208

200209
try do
201210
opts = [all: true, prefix: "realtime", dynamic_repo: repo]
202-
res = Ecto.Migrator.run(Repo, @migrations, :up, opts)
211+
Ecto.Migrator.run(Repo, @migrations, :up, opts)
203212

204-
{:ok, res}
213+
:ok
205214
rescue
206215
error ->
207216
log_error("MigrationsFailedToRun", error)
@@ -244,4 +253,6 @@ defmodule Realtime.Tenants.Migrations do
244253

245254
:ok
246255
end
256+
257+
def migrations(), do: @migrations
247258
end

lib/realtime_web/channels/user_socket.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ defmodule RealtimeWeb.UserSocket do
3030
%{uri: %{host: host}, x_headers: headers} = opts
3131

3232
{:ok, external_id} = Database.get_external_id(host)
33-
33+
external_id = String.upcase(external_id)
3434
Logger.metadata(external_id: external_id, project: external_id)
3535
Logger.put_process_level(self(), :error)
3636

lib/realtime_web/controllers/tenant_controller.ex

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,8 @@ defmodule RealtimeWeb.TenantController do
147147
_e, acc -> acc
148148
end)
149149

150-
with {:ok, %Tenant{} = tenant} <-
151-
Api.create_tenant(%{tenant_params | "extensions" => extensions}),
152-
:ok <- Migrations.run_migrations(tenant) do
150+
with {:ok, %Tenant{} = tenant} <- Api.create_tenant(%{tenant_params | "extensions" => extensions}),
151+
res when res in [:ok, :noop] <- Migrations.run_migrations(tenant) do
153152
Logger.metadata(external_id: tenant.external_id, project: tenant.external_id)
154153

155154
conn

0 commit comments

Comments
 (0)