Skip to content

Commit 03eedff

Browse files
committed
refactor(appengine): merge_device_status
rewrite merge_device_status so that all its queries now use exandra. refactor some private functions in their own modules Signed-off-by: Francesco Noacco <francesco.noacco@secomind.com>
1 parent c65f9b4 commit 03eedff

File tree

4 files changed

+415
-418
lines changed

4 files changed

+415
-418
lines changed
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
#
2+
# This file is part of Astarte.
3+
#
4+
# Copyright 2025 SECO Mind Srl
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
defmodule Astarte.AppEngine.API.Device.Aliases do
19+
alias Astarte.DataAccess.Astarte.Realm
20+
alias Astarte.DataAccess.Realms.Device
21+
alias Astarte.DataAccess.Realms.Name
22+
alias Ecto.Changeset
23+
24+
alias Astarte.DataAccess.Repo
25+
26+
import Ecto.Query
27+
28+
require Logger
29+
30+
defstruct to_update: [], to_delete: []
31+
32+
@type input :: %{alias_tag => alias_value} | [alias]
33+
@type alias_tag :: String.t()
34+
@type alias_value :: String.t()
35+
@type alias :: {alias_tag, alias_value}
36+
@type t :: %__MODULE__{
37+
to_update: [alias],
38+
to_delete: [alias_tag]
39+
}
40+
41+
@spec validate(input() | nil, String.t(), Device.t()) :: {:ok, t()} | term()
42+
def validate(nil, _, _), do: {:ok, %__MODULE__{to_delete: [], to_update: []}}
43+
44+
def validate(aliases, realm_name, device) do
45+
with :ok <- validate_format(aliases) do
46+
{to_delete, to_update} = aliases |> Enum.split_with(fn {_key, value} -> is_nil(value) end)
47+
to_delete = to_delete |> Enum.map(fn {tag, nil} -> tag end)
48+
state = %__MODULE__{to_delete: to_delete, to_update: to_update}
49+
50+
with :ok <- validate_device_ownership(state, realm_name, device) do
51+
{:ok, state}
52+
end
53+
end
54+
end
55+
56+
@spec apply(Changeset.t(), t()) :: Changeset.t()
57+
def apply(changeset, aliases) do
58+
%__MODULE__{to_delete: to_delete, to_update: to_update} = aliases
59+
60+
changeset
61+
|> apply_delete(to_delete)
62+
|> apply_update(to_update)
63+
end
64+
65+
@spec validate_format(input()) :: :ok | {:error, :invalid_alias}
66+
defp validate_format(aliases) do
67+
Enum.find_value(aliases, :ok, fn
68+
{_tag, ""} ->
69+
:invalid_value
70+
71+
{"", _value} ->
72+
:invalid_tag
73+
74+
_valid_format_tag ->
75+
false
76+
end)
77+
|> case do
78+
:ok ->
79+
:ok
80+
81+
:invalid_tag ->
82+
Logger.warning("Alias key cannot be an empty string.", tag: :invalid_alias_empty_key)
83+
{:error, :invalid_alias}
84+
85+
:invalid_value ->
86+
Logger.warning("Alias value cannot be an empty string.", tag: :invalid_alias_empty_value)
87+
{:error, :invalid_alias}
88+
end
89+
end
90+
91+
@spec validate_device_ownership(t(), String.t(), Device.t()) :: :ok
92+
defp validate_device_ownership(aliases, realm_name, device) do
93+
keyspace = Realm.keyspace_name(realm_name)
94+
95+
%__MODULE__{to_delete: to_delete, to_update: to_update} = aliases
96+
97+
to_delete = device.aliases |> Map.take(to_delete) |> Enum.map(fn {_tag, value} -> value end)
98+
to_update = to_update |> Enum.map(fn {_tag, value} -> value end)
99+
100+
chunked_aliases = Enum.concat(to_delete, to_update) |> Enum.chunk_every(99)
101+
102+
results =
103+
for alias_chunk <- chunked_aliases do
104+
from(n in Name, where: n.object_type == 1 and n.object_name in ^alias_chunk)
105+
|> Repo.all(prefix: keyspace)
106+
end
107+
|> List.flatten()
108+
109+
invalid_name =
110+
results |> Enum.find(fn name -> name.object_uuid != device.device_id end)
111+
112+
if is_nil(invalid_name) do
113+
:ok
114+
else
115+
existing_aliases =
116+
Enum.find(device.aliases, fn {_tag, value} -> value == invalid_name.object_name end)
117+
118+
inconsistent? = !is_nil(existing_aliases)
119+
120+
if inconsistent? do
121+
{invalid_tag, _value} = existing_aliases
122+
123+
Logger.error("Inconsistent alias for #{invalid_tag}.",
124+
device_id: device.device_id,
125+
tag: "inconsistent_alias"
126+
)
127+
128+
{:error, :database_error}
129+
else
130+
{:error, :alias_already_in_use}
131+
end
132+
end
133+
end
134+
135+
@spec generate_batch_queries(t(), String.t(), Device.t()) :: [{String.t(), list()}]
136+
def generate_batch_queries(aliases, keyspace, device) do
137+
%__MODULE__{to_delete: to_delete, to_update: to_update} = aliases
138+
139+
{update_tags, update_values} = Enum.unzip(to_update)
140+
141+
all_tags = to_delete ++ update_tags
142+
143+
tags_to_delete =
144+
device.aliases
145+
|> Enum.filter(fn {tag, _value} -> tag in all_tags end)
146+
147+
# We delete both aliases we mean to delete, and also existing aliases we want to update
148+
# as the name is part of the primary key for the names table.
149+
# Queries are chunked to avoid hitting scylla's `max_clustering_key_restrictions_per_query`
150+
delete_queries =
151+
tags_to_delete
152+
|> Enum.map(fn {_tag, value} -> value end)
153+
|> Enum.chunk_every(99)
154+
|> Enum.map(fn alias_chunk ->
155+
query =
156+
from n in Name,
157+
prefix: ^keyspace,
158+
where: n.object_type == 1 and n.object_name in ^alias_chunk
159+
160+
Repo.to_sql(:delete_all, query)
161+
end)
162+
163+
insert_queries =
164+
update_values
165+
|> Enum.map(&update_batch_query(keyspace, device.device_id, &1))
166+
167+
delete_queries ++ insert_queries
168+
end
169+
170+
defp update_batch_query(keyspace, device_id, value) do
171+
names_table = Name.__schema__(:source)
172+
173+
query =
174+
"INSERT INTO #{keyspace}.#{names_table} (object_type, object_name, object_uuid) VALUES (1, ?, ?)"
175+
176+
params = [value, device_id]
177+
{query, params}
178+
end
179+
180+
@spec apply_delete(Changeset.t(), [alias]) :: Changeset.t()
181+
defp apply_delete(%Changeset{valid?: false} = changeset, _delete_aliases),
182+
do: changeset
183+
184+
defp apply_delete(changeset, delete_aliases) when length(delete_aliases) == 0,
185+
do: changeset
186+
187+
defp apply_delete(changeset, delete_aliases) do
188+
aliases = changeset |> Changeset.fetch_field!(:aliases)
189+
190+
delete_tags = delete_aliases |> MapSet.new()
191+
192+
device_aliases = aliases |> Map.keys() |> MapSet.new()
193+
194+
if MapSet.subset?(delete_tags, device_aliases) do
195+
aliases = aliases |> Map.drop(delete_aliases)
196+
197+
changeset
198+
|> Changeset.put_change(:aliases, aliases)
199+
else
200+
Changeset.add_error(changeset, :aliases, "", reason: :alias_tag_not_found)
201+
end
202+
end
203+
204+
@spec apply_update(Changeset.t(), [alias]) :: Changeset.t()
205+
defp apply_update(%Changeset{valid?: false} = changeset, _update_aliases),
206+
do: changeset
207+
208+
defp apply_update(changeset, update_aliases) when length(update_aliases) == 0,
209+
do: changeset
210+
211+
defp apply_update(changeset, update_aliases) do
212+
aliases =
213+
changeset |> Changeset.fetch_field!(:aliases)
214+
215+
aliases = Map.merge(aliases, Map.new(update_aliases))
216+
217+
Changeset.put_change(changeset, :aliases, aliases)
218+
end
219+
end
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
#
2+
# This file is part of Astarte.
3+
#
4+
# Copyright 2025 SECO Mind Srl
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
defmodule Astarte.AppEngine.API.Device.Attributes do
19+
alias Ecto.Changeset
20+
21+
require Logger
22+
23+
defstruct to_update: %{}, to_delete: %{}
24+
25+
@type input :: %{attribute_tag => attribute_value} | [attribute]
26+
@type attribute_tag :: String.t()
27+
@type attribute_value :: String.t()
28+
@type attribute :: {attribute_tag, attribute_value}
29+
@type t ::
30+
%__MODULE__{
31+
to_update: [attribute],
32+
to_delete: [attribute_tag]
33+
}
34+
35+
@spec validate(input() | nil) :: {:ok, t()} | term()
36+
def validate(attributes) do
37+
attributes =
38+
case attributes do
39+
nil -> []
40+
attributes -> attributes
41+
end
42+
43+
with :ok <- validate_format(attributes) do
44+
{to_delete, to_update} =
45+
attributes
46+
|> Enum.split_with(fn {_key, value} -> is_nil(value) end)
47+
48+
to_delete = to_delete |> Enum.map(fn {key, nil} -> key end)
49+
50+
{:ok, %__MODULE__{to_delete: to_delete, to_update: to_update}}
51+
end
52+
end
53+
54+
@spec apply(Changeset.t(), t()) :: Changeset.t()
55+
def apply(changeset, attributes) do
56+
%__MODULE__{to_delete: to_delete, to_update: to_update} = attributes
57+
58+
changeset
59+
|> apply_delete(to_delete)
60+
|> apply_update(to_update)
61+
end
62+
63+
@spec validate_format(input()) :: :ok | {:error, :invalid_attributes}
64+
defp validate_format(attributes) do
65+
invalid_attribute? =
66+
Enum.any?(attributes, fn {attribute_key, _value} -> attribute_key == "" end)
67+
68+
if invalid_attribute? do
69+
Logger.warning("Attribute key cannot be an empty string.",
70+
tag: :invalid_attribute_empty_key
71+
)
72+
73+
{:error, :invalid_attributes}
74+
else
75+
:ok
76+
end
77+
end
78+
79+
@spec apply_delete(Changeset.t(), [attribute_tag]) :: Changeset.t()
80+
defp apply_delete(%Changeset{valid?: false} = changeset, _delete_attributes), do: changeset
81+
82+
defp apply_delete(changeset, delete_attributes) when length(delete_attributes) == 0,
83+
do: changeset
84+
85+
defp apply_delete(changeset, delete_attributes) do
86+
attributes = changeset |> Changeset.fetch_field!(:attributes)
87+
88+
attributes_to_delete = delete_attributes |> MapSet.new()
89+
90+
device_attributes = attributes |> Map.keys() |> MapSet.new()
91+
92+
if MapSet.subset?(attributes_to_delete, device_attributes) do
93+
attributes = attributes |> Map.drop(delete_attributes)
94+
95+
changeset
96+
|> Changeset.put_change(:attributes, attributes)
97+
else
98+
Changeset.add_error(changeset, :attributes, "", reason: :attribute_key_not_found)
99+
end
100+
end
101+
102+
@spec apply_update(Changeset.t(), [attribute]) :: Changeset.t()
103+
defp apply_update(%Changeset{valid?: false} = changeset, _update_attributes), do: changeset
104+
105+
defp apply_update(changeset, update_attributes) when length(update_attributes) == 0,
106+
do: changeset
107+
108+
defp apply_update(changeset, update_attributes) do
109+
attributes =
110+
changeset |> Changeset.fetch_field!(:attributes)
111+
112+
attributes = Map.merge(attributes, Map.new(update_attributes))
113+
114+
Changeset.put_change(changeset, :attributes, attributes)
115+
end
116+
end

0 commit comments

Comments
 (0)