Skip to content

Commit bb1c2b6

Browse files
authored
fix: fix tenant cache handling (#1665)
1 parent 5017618 commit bb1c2b6

File tree

17 files changed

+150
-86
lines changed

17 files changed

+150
-86
lines changed

lib/realtime/api.ex

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,14 @@ defmodule Realtime.Api do
119119
%Tenant{}
120120
|> Tenant.changeset(attrs)
121121
|> Repo.insert()
122+
|> case do
123+
{:ok, tenant} ->
124+
Cache.global_cache_update(tenant)
125+
{:ok, tenant}
126+
127+
error ->
128+
error
129+
end
122130
else
123131
call(:create_tenant, [attrs], tenant_id)
124132
end
@@ -144,7 +152,7 @@ defmodule Realtime.Api do
144152

145153
case updated do
146154
{:ok, tenant} ->
147-
maybe_invalidate_cache(changeset)
155+
maybe_update_cache(tenant, changeset)
148156
maybe_trigger_disconnect(changeset)
149157
maybe_restart_db_connection(changeset)
150158
Logger.debug("Tenant updated: #{inspect(tenant, pretty: true)}")
@@ -216,7 +224,12 @@ defmodule Realtime.Api do
216224
tenant
217225
|> Tenant.changeset(%{migrations_ran: count})
218226
|> Repo.update()
219-
|> tap(fn _ -> Cache.distributed_invalidate_tenant_cache(external_id) end)
227+
|> tap(fn result ->
228+
case result do
229+
{:ok, tenant} -> Cache.global_cache_update(tenant)
230+
_ -> :ok
231+
end
232+
end)
220233
else
221234
call(:update_migrations_ran, [external_id, count], external_id)
222235
end
@@ -241,12 +254,11 @@ defmodule Realtime.Api do
241254
|> Map.put(:events_per_second_now, current)
242255
end
243256

244-
defp maybe_invalidate_cache(%Changeset{changes: changes, valid?: true, data: %{external_id: external_id}})
245-
when changes != %{} do
246-
Tenants.Cache.distributed_invalidate_tenant_cache(external_id)
257+
defp maybe_update_cache(tenant, %Changeset{changes: changes, valid?: true}) when changes != %{} do
258+
Tenants.Cache.global_cache_update(tenant)
247259
end
248260

249-
defp maybe_invalidate_cache(_changeset), do: nil
261+
defp maybe_update_cache(_tenant, _changeset), do: :ok
250262

251263
defp maybe_trigger_disconnect(%Changeset{data: %{external_id: external_id}} = changeset)
252264
when requires_disconnect(changeset) do

lib/realtime/context_cache.ex

Lines changed: 0 additions & 21 deletions
This file was deleted.

lib/realtime/tenants/cache.ex

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ defmodule Realtime.Tenants.Cache do
55
require Cachex.Spec
66
require Logger
77

8+
alias Realtime.GenRpc
89
alias Realtime.Tenants
910

1011
def child_spec(_) do
@@ -16,32 +17,42 @@ defmodule Realtime.Tenants.Cache do
1617
}
1718
end
1819

19-
def get_tenant_by_external_id(keyword), do: apply_repo_fun(__ENV__.function, [keyword])
20+
def get_tenant_by_external_id(tenant_id) do
21+
case Cachex.fetch(__MODULE__, cache_key(tenant_id), fn _key ->
22+
case Tenants.get_tenant_by_external_id(tenant_id) do
23+
nil -> {:ignore, nil}
24+
tenant -> {:commit, tenant}
25+
end
26+
end) do
27+
{:commit, value} -> value
28+
{:ok, value} -> value
29+
{:ignore, value} -> value
30+
end
31+
end
32+
33+
defp cache_key(tenant_id), do: {:get_tenant_by_external_id, tenant_id}
2034

2135
@doc """
2236
Invalidates the cache for a tenant in the local node
2337
"""
24-
def invalidate_tenant_cache(tenant_id), do: Cachex.del(__MODULE__, {{:get_tenant_by_external_id, 1}, [tenant_id]})
38+
def invalidate_tenant_cache(tenant_id), do: Cachex.del(__MODULE__, cache_key(tenant_id))
39+
40+
def distributed_invalidate_tenant_cache(tenant_id) when is_binary(tenant_id) do
41+
GenRpc.multicast(__MODULE__, :invalidate_tenant_cache, [tenant_id])
42+
end
2543

2644
@doc """
27-
Broadcasts a message to invalidate the tenant cache to all connected nodes
45+
Update the cache for a tenant
2846
"""
29-
@spec distributed_invalidate_tenant_cache(String.t()) :: boolean()
30-
def distributed_invalidate_tenant_cache(tenant_id) when is_binary(tenant_id) do
31-
nodes = [Node.self() | Node.list()]
32-
results = :erpc.multicall(nodes, __MODULE__, :invalidate_tenant_cache, [tenant_id], 1000)
33-
34-
results
35-
|> Enum.map(fn
36-
{res, _} ->
37-
res
38-
39-
exception ->
40-
Logger.error("Failed to invalidate tenant cache: #{inspect(exception)}")
41-
:error
42-
end)
43-
|> Enum.all?(&(&1 == :ok))
47+
def update_cache(tenant) do
48+
Cachex.put(__MODULE__, cache_key(tenant.external_id), tenant)
4449
end
4550

46-
defp apply_repo_fun(arg1, arg2), do: Realtime.ContextCache.apply_fun(Tenants, arg1, arg2)
51+
@doc """
52+
Update the cache for a tenant in all nodes
53+
"""
54+
@spec global_cache_update(Realtime.Api.Tenant.t()) :: :ok
55+
def global_cache_update(tenant) do
56+
GenRpc.multicast(__MODULE__, :update_cache, [tenant])
57+
end
4758
end

lib/realtime_web/controllers/tenant_controller.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ defmodule RealtimeWeb.TenantController do
195195
with %Tenant{} = tenant <- Api.get_tenant_by_external_id(tenant_id, use_replica: false),
196196
_ <- Tenants.suspend_tenant_by_external_id(tenant_id),
197197
true <- Api.delete_tenant_by_external_id(tenant_id),
198-
true <- Cache.distributed_invalidate_tenant_cache(tenant_id),
198+
:ok <- Cache.distributed_invalidate_tenant_cache(tenant_id),
199199
:ok <- PostgresCdc.stop_all(tenant, stop_all_timeout),
200200
:ok <- Database.replication_slot_teardown(tenant) do
201201
send_resp(conn, 204, "")

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.69.1",
7+
version: "2.69.2",
88
elixir: "~> 1.18",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -419,11 +419,7 @@ defmodule Realtime.Integration.RtChannelTest do
419419
test "broadcast to another tenant does not get mixed up", %{tenant: tenant, serializer: serializer} do
420420
other_tenant = Containers.checkout_tenant(run_migrations: true)
421421

422-
Cachex.put!(
423-
Realtime.Tenants.Cache,
424-
{{:get_tenant_by_external_id, 1}, [other_tenant.external_id]},
425-
{:cached, other_tenant}
426-
)
422+
Realtime.Tenants.Cache.update_cache(other_tenant)
427423

428424
{socket, _} = get_connection(tenant, serializer)
429425
config = %{broadcast: %{self: false}, private: false}
@@ -2406,7 +2402,7 @@ defmodule Realtime.Integration.RtChannelTest do
24062402
|> Realtime.Api.Tenant.changeset(%{limit => value})
24072403
|> Realtime.Repo.update!()
24082404

2409-
Cachex.put!(Realtime.Tenants.Cache, {{:get_tenant_by_external_id, 1}, [tenant.external_id]}, {:cached, tenant})
2405+
Realtime.Tenants.Cache.update_cache(tenant)
24102406
end
24112407

24122408
defp assert_process_down(pid, timeout \\ 1000) do

test/realtime/api_test.exs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ defmodule Realtime.ApiTest do
5555

5656
external_id = random_string()
5757

58+
expect(Realtime.Tenants.Cache, :global_cache_update, fn tenant ->
59+
assert tenant.external_id == external_id
60+
end)
61+
5862
valid_attrs = %{
5963
external_id: external_id,
6064
name: external_id,
@@ -89,6 +93,7 @@ defmodule Realtime.ApiTest do
8993
end
9094

9195
test "invalid data returns error changeset" do
96+
reject(&Realtime.Tenants.Cache.global_cache_update/1)
9297
assert {:error, %Ecto.Changeset{}} = Api.create_tenant(%{external_id: nil, jwt_secret: nil, name: nil})
9398
end
9499
end
@@ -197,10 +202,14 @@ defmodule Realtime.ApiTest do
197202

198203
test "valid data and tenant data change will not restart the database connection" do
199204
tenant = Containers.checkout_tenant(run_migrations: true)
200-
expect(Realtime.Tenants.Cache, :distributed_invalidate_tenant_cache, fn _ -> :ok end)
205+
206+
expect(Realtime.Tenants.Cache, :global_cache_update, fn tenant ->
207+
assert tenant.max_concurrent_users == 101
208+
end)
209+
201210
{:ok, old_pid} = Connect.lookup_or_start_connection(tenant.external_id)
202211

203-
assert {:ok, %Tenant{}} = Api.update_tenant_by_external_id(tenant.external_id, %{max_concurrent_users: 100})
212+
assert {:ok, %Tenant{}} = Api.update_tenant_by_external_id(tenant.external_id, %{max_concurrent_users: 101})
204213
refute_receive {:DOWN, _, :process, ^old_pid, :shutdown}, 500
205214
assert Process.alive?(old_pid)
206215
assert {:ok, new_pid} = Connect.lookup_or_start_connection(tenant.external_id)
@@ -241,12 +250,15 @@ defmodule Realtime.ApiTest do
241250
end
242251

243252
test "valid data and change to tenant data will refresh cache", %{tenants: [tenant | _]} do
253+
expect(Realtime.Tenants.Cache, :global_cache_update, fn tenant ->
254+
assert tenant.name == "new_name"
255+
end)
256+
244257
assert {:ok, %Tenant{}} = Api.update_tenant_by_external_id(tenant.external_id, %{name: "new_name"})
245-
assert %Tenant{name: "new_name"} = Realtime.Tenants.Cache.get_tenant_by_external_id(tenant.external_id)
246258
end
247259

248260
test "valid data and no changes to tenant will not refresh cache", %{tenants: [tenant | _]} do
249-
reject(&Realtime.Tenants.Cache.distributed_invalidate_tenant_cache/1)
261+
reject(&Realtime.Tenants.Cache.global_cache_update/1)
250262
assert {:ok, %Tenant{}} = Api.update_tenant_by_external_id(tenant.external_id, %{name: tenant.name})
251263
end
252264
end
@@ -367,8 +379,13 @@ defmodule Realtime.ApiTest do
367379
describe "update_migrations_ran/1" do
368380
test "updates migrations_ran to the count of all migrations" do
369381
tenant = tenant_fixture(%{migrations_ran: 0})
370-
Api.update_migrations_ran(tenant.external_id, 1)
371-
tenant = Repo.reload!(tenant)
382+
383+
expect(Realtime.Tenants.Cache, :global_cache_update, fn tenant ->
384+
assert tenant.migrations_ran == 1
385+
:ok
386+
end)
387+
388+
assert {:ok, tenant} = Api.update_migrations_ran(tenant.external_id, 1)
372389
assert tenant.migrations_ran == 1
373390
end
374391
end

test/realtime/tenants/authorization_test.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ defmodule Realtime.Tenants.AuthorizationTest do
282282
def rls_context(context) do
283283
tenant = Containers.checkout_tenant(run_migrations: true)
284284
# Warm cache to avoid Cachex and Ecto.Sandbox ownership issues
285-
Cachex.put!(Realtime.Tenants.Cache, {{:get_tenant_by_external_id, 1}, [tenant.external_id]}, {:cached, tenant})
285+
Realtime.Tenants.Cache.update_cache(tenant)
286286

287287
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
288288
topic = context[:topic] || random_string()
@@ -323,6 +323,6 @@ defmodule Realtime.Tenants.AuthorizationTest do
323323
{:ok, tenant} = Realtime.Api.update_tenant_by_external_id(tenant.external_id, %{extensions: extensions})
324324

325325
# Warm cache to avoid Cachex and Ecto.Sandbox ownership issues
326-
Cachex.put!(Realtime.Tenants.Cache, {{:get_tenant_by_external_id, 1}, [tenant.external_id]}, {:cached, tenant})
326+
Realtime.Tenants.Cache.update_cache(tenant)
327327
end
328328
end

test/realtime/tenants/batch_broadcast_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ defmodule Realtime.Tenants.BatchBroadcastTest do
1616

1717
setup do
1818
tenant = Containers.checkout_tenant(run_migrations: true)
19-
Cachex.put!(Realtime.Tenants.Cache, {{:get_tenant_by_external_id, 1}, [tenant.external_id]}, {:cached, tenant})
19+
Realtime.Tenants.Cache.update_cache(tenant)
2020
{:ok, tenant: tenant}
2121
end
2222

test/realtime/tenants/cache_test.exs

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ defmodule Realtime.Tenants.CacheTest do
2121
assert %Api.Tenant{name: "new name"} = Tenants.get_tenant_by_external_id(external_id)
2222
assert %Api.Tenant{name: "tenant"} = Cache.get_tenant_by_external_id(external_id)
2323
end
24+
25+
test "does not cache when tenant is not found" do
26+
assert Cache.get_tenant_by_external_id("not found") == nil
27+
28+
assert Cachex.exists?(Cache, {:get_tenant_by_external_id, "not found"}) == {:ok, false}
29+
end
2430
end
2531

2632
describe "invalidate_tenant_cache/1" do
@@ -40,6 +46,18 @@ defmodule Realtime.Tenants.CacheTest do
4046
end
4147
end
4248

49+
describe "update_cache/1" do
50+
test "updates the cache given a tenant", %{tenant: tenant} do
51+
external_id = tenant.external_id
52+
assert %Api.Tenant{name: "tenant"} = Cache.get_tenant_by_external_id(external_id)
53+
# Update a tenant
54+
updated_tenant = %{tenant | name: "updated name"}
55+
# Update cache
56+
Cache.update_cache(updated_tenant)
57+
assert %Api.Tenant{name: "updated name"} = Cache.get_tenant_by_external_id(external_id)
58+
end
59+
end
60+
4361
describe "distributed_invalidate_tenant_cache/1" do
4462
setup do
4563
{:ok, node} = Clustered.start()
@@ -53,30 +71,61 @@ defmodule Realtime.Tenants.CacheTest do
5371
dummy_name = random_string()
5472

5573
# Ensure cache has the values
56-
Cachex.put!(
57-
Realtime.Tenants.Cache,
58-
{{:get_tenant_by_external_id, 1}, [external_id]},
59-
{:cached, %{tenant | name: dummy_name}}
60-
)
61-
62-
Rpc.enhanced_call(node, Cachex, :put!, [
63-
Realtime.Tenants.Cache,
64-
{{:get_tenant_by_external_id, 1}, [external_id]},
65-
{:cached, %{tenant | name: dummy_name}}
66-
])
74+
Realtime.Tenants.Cache.update_cache(%{tenant | name: dummy_name})
75+
76+
Rpc.enhanced_call(node, Realtime.Tenants.Cache, :update_cache, [%{tenant | name: dummy_name}])
6777

6878
# Cache showing old value
69-
assert %Api.Tenant{name: ^dummy_name} = Cache.get_tenant_by_external_id(external_id)
70-
assert %Api.Tenant{name: ^dummy_name} = Rpc.enhanced_call(node, Cache, :get_tenant_by_external_id, [external_id])
79+
assert {:ok, %Api.Tenant{name: ^dummy_name}} = Cachex.get(Cache, {:get_tenant_by_external_id, external_id})
80+
81+
assert {:ok, %Api.Tenant{name: ^dummy_name}} =
82+
Rpc.enhanced_call(node, Cachex, :get, [Cache, {:get_tenant_by_external_id, external_id}])
7183

7284
# Invalidate cache
73-
assert true = Cache.distributed_invalidate_tenant_cache(external_id)
85+
assert :ok = Cache.distributed_invalidate_tenant_cache(external_id)
7486

87+
# wait for cache to be invalidated in both nodes
88+
Process.sleep(200)
7589
# Cache showing new value
7690
assert %Api.Tenant{name: ^expected_name} = Cache.get_tenant_by_external_id(external_id)
7791

7892
assert %Api.Tenant{name: ^expected_name} =
7993
Rpc.enhanced_call(node, Cache, :get_tenant_by_external_id, [external_id])
8094
end
8195
end
96+
97+
describe "global_cache_update/1" do
98+
setup do
99+
{:ok, node} = Clustered.start()
100+
%{node: node}
101+
end
102+
103+
test "update the cache given a tenant_id", %{node: node} do
104+
external_id = "dev_tenant"
105+
%Api.Tenant{name: expected_name} = tenant = Tenants.get_tenant_by_external_id(external_id)
106+
107+
dummy_name = random_string()
108+
109+
# Ensure cache has the values
110+
Realtime.Tenants.Cache.update_cache(%{tenant | name: dummy_name})
111+
112+
Rpc.enhanced_call(node, Cache, :update_cache, [%{tenant | name: dummy_name}])
113+
114+
# Cache showing old value
115+
assert %Api.Tenant{name: ^dummy_name} = Cache.get_tenant_by_external_id(external_id)
116+
assert %Api.Tenant{name: ^dummy_name} = Rpc.enhanced_call(node, Cache, :get_tenant_by_external_id, [external_id])
117+
118+
# Update cache
119+
assert :ok = Cache.global_cache_update(tenant)
120+
121+
# wait for cache to be updated in both nodes
122+
Process.sleep(200)
123+
124+
# Cache showing new value
125+
assert {:ok, %Api.Tenant{name: ^expected_name}} = Cachex.get(Cache, {:get_tenant_by_external_id, external_id})
126+
127+
assert {:ok, %Api.Tenant{name: ^expected_name}} =
128+
Rpc.enhanced_call(node, Cachex, :get, [Cache, {:get_tenant_by_external_id, external_id}])
129+
end
130+
end
82131
end

0 commit comments

Comments
 (0)