Skip to content

Commit 8cc9b48

Browse files
authored
fix(housekeeping): Fix migrator for replicated astarte keyspaces (#1879)
The previous logic had a two-step process for astarte keyspace creation and setup. Unfortunately consistency level for many operations (such as CREATE) is ignored, so we have two options at our disposal - re-using a single connection so that all queries are made against the same scylladb replica - after each operation, run a `SELECT` with the desired consistency in a retry with backoff fashion The first methods is chosen as it looks less invasive and less error prone, but a final `SELECT` with desired consistency is preserved to guarantee that data was replicated before returning. As a logic shift was needed anyway, the chance was taken to remove custom logic from the housekeeping entrypoint and moving it into the normal supervision tree, in order to avoid future "application not started" curses. Signed-off-by: Francesco Noacco <francesco.noacco@secomind.com>
1 parent 0bd45eb commit 8cc9b48

File tree

10 files changed

+322
-337
lines changed

10 files changed

+322
-337
lines changed

apps/astarte_housekeeping/Dockerfile

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,8 @@ RUN apt-get -qq install openssl ca-certificates
4646
ARG BUILD_ENV=prod
4747

4848
COPY --from=builder /app/_build/$BUILD_ENV/rel/astarte_housekeeping .
49-
COPY --from=builder /app/entrypoint.sh .
5049

5150
# Change to non-root user
5251
USER nobody
5352

54-
ENTRYPOINT ["/bin/bash", "entrypoint.sh"]
55-
CMD ["start"]
53+
CMD ["./bin/astarte_housekeeping", "start"]

apps/astarte_housekeeping/entrypoint.sh

Lines changed: 0 additions & 7 deletions
This file was deleted.

apps/astarte_housekeeping/lib/astarte_housekeeping.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717
#
1818

1919
defmodule Astarte.Housekeeping do
20+
alias Astarte.DataAccess.Config, as: DataAccessConfig
2021
alias Astarte.Housekeeping.Config
22+
alias Astarte.Housekeeping.ReleaseTasks
2123
alias Astarte.RPC.Config, as: RPCConfig
22-
alias Astarte.DataAccess.Config, as: DataAccessConfig
2324

2425
use Application
2526

@@ -34,7 +35,8 @@ defmodule Astarte.Housekeeping do
3435

3536
children = [
3637
Astarte.HousekeepingWeb.Telemetry,
37-
Astarte.Housekeeping.BackendSupervisor
38+
Astarte.Housekeeping.BackendSupervisor,
39+
ReleaseTasks
3840
]
3941

4042
# make amqp supervisors logs less verbose

apps/astarte_housekeeping/lib/astarte_housekeeping/migrator.ex

Lines changed: 35 additions & 161 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,20 @@ defmodule Astarte.Housekeeping.Migrator do
2626
alias Astarte.Housekeeping.Queries
2727
@query_timeout 60_000
2828

29-
def run_astarte_keyspace_migrations do
29+
def run_astarte_keyspace_migrations(conn) do
3030
_ = Logger.info("Starting to migrate Astarte keyspace.", tag: "astarte_migration_started")
3131

32-
with :ok <- ensure_astarte_kv_store(),
33-
{:ok, astarte_schema_version} <- get_astarte_schema_version(),
34-
:ok <- migrate_astarte_keyspace_from_version(astarte_schema_version) do
32+
with {:ok, astarte_schema_version} <- Queries.get_astarte_schema_version(conn),
33+
:ok <- migrate_astarte_keyspace_from_version(conn, astarte_schema_version) do
3534
:ok
3635
end
3736
end
3837

39-
def run_realms_migrations do
38+
def run_realms_migrations(conn) do
4039
_ = Logger.info("Starting to migrate Realms.", tag: "realms_migration_started")
4140

42-
with {:ok, realms} <- Queries.list_realms(),
43-
:ok <- migrate_realms(realms) do
41+
with {:ok, realms} <- Queries.list_realms(conn),
42+
:ok <- migrate_realms(conn, realms) do
4443
:ok
4544
end
4645
end
@@ -63,107 +62,20 @@ defmodule Astarte.Housekeeping.Migrator do
6362
version
6463
end
6564

66-
defp migrate_realms([]) do
65+
defp migrate_realms(_conn, []) do
6766
_ = Logger.info("Finished migrating Realms.", tag: "realms_migration_finished")
6867
:ok
6968
end
7069

71-
defp migrate_realms([realm | tail]) do
70+
defp migrate_realms(conn, [realm | tail]) do
7271
_ = Logger.info("Starting to migrate realm.", tag: "realm_migration_started", realm: realm)
7372

74-
with {:ok, realm_astarte_schema_version} <- get_realm_astarte_schema_version(realm),
75-
:ok <- migrate_realm_from_version(realm, realm_astarte_schema_version) do
76-
migrate_realms(tail)
73+
with {:ok, realm_astarte_schema_version} <- Queries.get_realm_schema_version(conn, realm),
74+
:ok <- migrate_realm_from_version(conn, realm, realm_astarte_schema_version) do
75+
migrate_realms(conn, tail)
7776
end
7877
end
7978

80-
defp ensure_astarte_kv_store do
81-
query = """
82-
SELECT table_name
83-
FROM system_schema.tables
84-
WHERE keyspace_name='#{CQLUtils.realm_name_to_keyspace_name("astarte", Config.astarte_instance_id!())}' AND table_name='kv_store'
85-
"""
86-
87-
consistency = Consistency.domain_model(:read)
88-
89-
with {:ok, %Xandra.Page{} = page} <-
90-
Xandra.Cluster.execute(:xandra, query, %{}, consistency: consistency) do
91-
if Enum.count(page) == 1 do
92-
:ok
93-
else
94-
create_astarte_kv_store()
95-
end
96-
else
97-
{:error, %Xandra.Error{} = err} ->
98-
_ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error")
99-
{:error, :database_error}
100-
101-
{:error, %Xandra.ConnectionError{} = err} ->
102-
_ =
103-
Logger.warning("Database connection error: #{inspect(err)}.",
104-
tag: "database_connection_error"
105-
)
106-
107-
{:error, :database_connection_error}
108-
end
109-
end
110-
111-
defp create_astarte_kv_store do
112-
query = """
113-
CREATE TABLE #{CQLUtils.realm_name_to_keyspace_name("astarte", Config.astarte_instance_id!())}.kv_store (
114-
group varchar,
115-
key varchar,
116-
value blob,
117-
118-
PRIMARY KEY ((group), key)
119-
);
120-
"""
121-
122-
case Xandra.Cluster.execute(:xandra, query, %{},
123-
consistency: Consistency.domain_model(:write),
124-
timeout: @query_timeout
125-
) do
126-
{:ok, %Xandra.SchemaChange{}} ->
127-
:ok
128-
129-
{:error, %Xandra.Error{} = err} ->
130-
_ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error")
131-
{:error, :database_error}
132-
133-
{:error, %Xandra.ConnectionError{} = err} ->
134-
_ =
135-
Logger.warning("Database connection error: #{inspect(err)}.",
136-
tag: "database_connection_error"
137-
)
138-
139-
{:error, :database_connection_error}
140-
end
141-
end
142-
143-
defp get_astarte_schema_version do
144-
Xandra.Cluster.run(:xandra, fn conn ->
145-
with :ok <-
146-
use_keyspace(
147-
conn,
148-
"#{CQLUtils.realm_name_to_keyspace_name("astarte", Config.astarte_instance_id!())}"
149-
) do
150-
get_keyspace_astarte_schema_version(conn)
151-
end
152-
end)
153-
end
154-
155-
defp get_realm_astarte_schema_version(realm_name) do
156-
Xandra.Cluster.run(:xandra, fn conn ->
157-
with :ok <-
158-
use_keyspace(
159-
conn,
160-
CQLUtils.realm_name_to_keyspace_name(realm_name, Config.astarte_instance_id!())
161-
) do
162-
get_keyspace_astarte_schema_version(conn)
163-
end
164-
end)
165-
end
166-
16779
defp use_keyspace(conn, keyspace) do
16880
case Xandra.execute(conn, "USE #{keyspace}", %{}, timeout: @query_timeout) do
16981
{:ok, %Xandra.SetKeyspace{}} ->
@@ -183,86 +95,48 @@ defmodule Astarte.Housekeeping.Migrator do
18395
end
18496
end
18597

186-
defp get_keyspace_astarte_schema_version(keyspace_conn) do
187-
query = """
188-
SELECT blobAsBigint(value)
189-
FROM kv_store
190-
WHERE group='astarte' AND key='schema_version'
191-
"""
192-
193-
consistency = Consistency.domain_model(:read)
194-
195-
with {:ok, %Xandra.Page{} = page} <-
196-
Xandra.execute(keyspace_conn, query, %{}, consistency: consistency) do
197-
case Enum.to_list(page) do
198-
[%{"system.blobasbigint(value)" => schema_version}] ->
199-
{:ok, schema_version}
200-
201-
[] ->
202-
# If no entry is found, we assume we're at version 0
203-
{:ok, 0}
204-
end
205-
else
206-
{:error, %Xandra.Error{} = err} ->
207-
_ = Logger.warning("Database error: #{inspect(err)}.", tag: "database_error")
208-
{:error, :database_error}
209-
210-
{:error, %Xandra.ConnectionError{} = err} ->
211-
_ =
212-
Logger.warning("Database connection error: #{inspect(err)}.",
213-
tag: "database_connection_error"
214-
)
215-
216-
{:error, :database_connection_error}
217-
end
218-
end
219-
220-
defp migrate_astarte_keyspace_from_version(current_schema_version) do
98+
defp migrate_astarte_keyspace_from_version(conn, current_schema_version) do
22199
_ = Logger.info("Astarte schema version is #{current_schema_version}")
222100

223101
migrations =
224102
astarte_migrations_path()
225103
|> collect_migrations()
226104
|> filter_migrations(current_schema_version)
227105

228-
Xandra.Cluster.run(:xandra, [timeout: :infinity], fn conn ->
229-
with :ok <-
230-
use_keyspace(
231-
conn,
232-
"#{CQLUtils.realm_name_to_keyspace_name("astarte", Config.astarte_instance_id!())}"
233-
),
234-
:ok <- execute_migrations(conn, migrations) do
235-
_ = Logger.info("Finished migrating Astarte keyspace.", tag: "astarte_migration_finished")
106+
with :ok <-
107+
use_keyspace(
108+
conn,
109+
"#{CQLUtils.realm_name_to_keyspace_name("astarte", Config.astarte_instance_id!())}"
110+
),
111+
:ok <- execute_migrations(conn, migrations) do
112+
_ = Logger.info("Finished migrating Astarte keyspace.", tag: "astarte_migration_finished")
236113

237-
:ok
238-
end
239-
end)
114+
:ok
115+
end
240116
end
241117

242-
defp migrate_realm_from_version(realm_name, current_schema_version) do
118+
defp migrate_realm_from_version(conn, realm_name, current_schema_version) do
243119
_ = Logger.info("Realm schema version is #{current_schema_version}", realm: realm_name)
244120

245121
migrations =
246122
realm_migrations_path()
247123
|> collect_migrations()
248124
|> filter_migrations(current_schema_version)
249125

250-
Xandra.Cluster.run(:xandra, [timeout: :infinity], fn conn ->
251-
with :ok <-
252-
use_keyspace(
253-
conn,
254-
CQLUtils.realm_name_to_keyspace_name(realm_name, Config.astarte_instance_id!())
255-
),
256-
:ok <- execute_migrations(conn, migrations) do
257-
_ =
258-
Logger.info("Finished migrating realm.",
259-
tag: "realm_migration_finished",
260-
realm: realm_name
261-
)
126+
with :ok <-
127+
use_keyspace(
128+
conn,
129+
CQLUtils.realm_name_to_keyspace_name(realm_name, Config.astarte_instance_id!())
130+
),
131+
:ok <- execute_migrations(conn, migrations) do
132+
_ =
133+
Logger.info("Finished migrating realm.",
134+
tag: "realm_migration_finished",
135+
realm: realm_name
136+
)
262137

263-
:ok
264-
end
265-
end)
138+
:ok
139+
end
266140
end
267141

268142
defp astarte_migrations_path do

0 commit comments

Comments
 (0)