Skip to content

Commit 4eb210e

Browse files
committed
refactor(appengine): merge_device_status
rewrite merge_device_status so that all its queries now use exandra. refactor some private functions in their own modules Signed-off-by: Francesco Noacco <francesco.noacco@secomind.com>
1 parent 30521c8 commit 4eb210e

File tree

4 files changed

+402
-387
lines changed

4 files changed

+402
-387
lines changed
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
#
2+
# This file is part of Astarte.
3+
#
4+
# Copyright 2025 SECO Mind Srl
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
defmodule Astarte.AppEngine.API.Device.Aliases do
19+
alias Astarte.DataAccess.Astarte.Realm
20+
alias Astarte.DataAccess.Realms.Device
21+
alias Astarte.DataAccess.Realms.Name
22+
alias Ecto.Changeset
23+
24+
alias Astarte.DataAccess.Repo
25+
26+
import Ecto.Query
27+
28+
require Logger
29+
30+
defstruct to_update: [], to_delete: []
31+
32+
@type input :: %{alias_tag => alias_value} | [alias]
33+
@type alias_tag :: String.t()
34+
@type alias_value :: String.t()
35+
@type alias :: {alias_tag, alias_value}
36+
@type t :: %__MODULE__{
37+
to_update: [alias],
38+
to_delete: [alias_tag]
39+
}
40+
41+
@spec validate(input() | nil, String.t(), Device.t()) :: {:ok, t()} | term()
42+
def validate(nil, _, _), do: {:ok, %__MODULE__{to_delete: [], to_update: []}}
43+
44+
def validate(aliases, realm_name, device) do
45+
with :ok <- validate_format(aliases) do
46+
{to_delete, to_update} = aliases |> Enum.split_with(fn {_key, value} -> is_nil(value) end)
47+
to_delete = to_delete |> Enum.map(fn {tag, nil} -> tag end)
48+
state = %__MODULE__{to_delete: to_delete, to_update: to_update}
49+
50+
with :ok <- validate_device_ownership(state, realm_name, device) do
51+
{:ok, state}
52+
end
53+
end
54+
end
55+
56+
@spec apply(Changeset.t(), t()) :: Changeset.t()
57+
def apply(changeset, aliases) do
58+
%__MODULE__{to_delete: to_delete, to_update: to_update} = aliases
59+
60+
changeset
61+
|> apply_delete(to_delete)
62+
|> apply_update(to_update)
63+
end
64+
65+
@spec validate_format(input()) :: :ok | {:error, :invalid_alias}
66+
defp validate_format(aliases) do
67+
Enum.find_value(aliases, :ok, fn
68+
{_tag, ""} ->
69+
:invalid_value
70+
71+
{"", _value} ->
72+
:invalid_tag
73+
74+
_valid_format_tag ->
75+
false
76+
end)
77+
|> case do
78+
:ok ->
79+
:ok
80+
81+
:invalid_tag ->
82+
Logger.warning("Alias key cannot be an empty string.", tag: :invalid_alias_empty_key)
83+
{:error, :invalid_alias}
84+
85+
:invalid_value ->
86+
Logger.warning("Alias value cannot be an empty string.", tag: :invalid_alias_empty_value)
87+
{:error, :invalid_alias}
88+
end
89+
end
90+
91+
@spec validate_device_ownership(t(), String.t(), Device.t()) :: :ok
92+
defp validate_device_ownership(aliases, realm_name, device) do
93+
keyspace = Realm.keyspace_name(realm_name)
94+
95+
%__MODULE__{to_delete: to_delete, to_update: to_update} = aliases
96+
97+
to_delete = device.aliases |> Map.take(to_delete) |> Enum.map(fn {_tag, value} -> value end)
98+
to_update = to_update |> Enum.map(fn {_tag, value} -> value end)
99+
100+
chunked_aliases = Enum.concat(to_delete, to_update) |> Enum.chunk_every(99)
101+
102+
results =
103+
for alias_chunk <- chunked_aliases do
104+
from(n in Name, where: n.object_type == 1 and n.object_name in ^alias_chunk)
105+
|> Repo.all(prefix: keyspace)
106+
end
107+
|> List.flatten()
108+
109+
invalid_name =
110+
results |> Enum.find(fn name -> name.object_uuid != device.device_id end)
111+
112+
if is_nil(invalid_name) do
113+
:ok
114+
else
115+
existing_aliases =
116+
Enum.find(device.aliases, fn {_tag, value} -> value == invalid_name.object_name end)
117+
118+
inconsistent? = !is_nil(existing_aliases)
119+
120+
if inconsistent? do
121+
{invalid_tag, _value} = existing_aliases
122+
123+
Logger.error("Inconsistent alias for #{invalid_tag}.",
124+
device_id: device.device_id,
125+
tag: "inconsistent_alias"
126+
)
127+
128+
{:error, :database_error}
129+
else
130+
{:error, :alias_already_in_use}
131+
end
132+
end
133+
end
134+
135+
@spec generate_batch_queries(t(), String.t(), Device.t()) :: [{String.t(), list()}]
136+
def generate_batch_queries(aliases, keyspace, device) do
137+
%__MODULE__{to_delete: to_delete, to_update: to_update} = aliases
138+
139+
{update_tags, update_values} = Enum.unzip(to_update)
140+
141+
all_tags = to_delete ++ update_tags
142+
143+
tags_to_delete =
144+
device.aliases
145+
|> Enum.filter(fn {tag, _value} -> tag in all_tags end)
146+
147+
# we delete both aliases we mean to delete, and also existing aliases we want to update
148+
# as the name is part of the primary key for the names table
149+
delete_queries =
150+
tags_to_delete
151+
|> Enum.map(fn {_tag, value} -> value end)
152+
|> Enum.chunk_every(99)
153+
|> Enum.map(fn alias_chunk ->
154+
query =
155+
from n in Name,
156+
prefix: ^keyspace,
157+
where: n.object_type == 1 and n.object_name in ^alias_chunk
158+
159+
Repo.to_sql(:delete_all, query)
160+
end)
161+
162+
insert_queries =
163+
update_values
164+
|> Enum.map(&update_batch_query(keyspace, device.device_id, &1))
165+
166+
delete_queries ++ insert_queries
167+
end
168+
169+
defp update_batch_query(keyspace, device_id, value) do
170+
names_table = %Name{}.__meta__.source
171+
172+
query =
173+
"INSERT INTO #{keyspace}.#{names_table} (object_type, object_name, object_uuid) VALUES (1, ?, ?)"
174+
175+
params = [value, device_id]
176+
{query, params}
177+
end
178+
179+
@spec apply_delete(Changeset.t(), [alias]) :: Changeset.t()
180+
defp apply_delete(%Changeset{valid?: false} = changeset, _delete_aliases),
181+
do: changeset
182+
183+
defp apply_delete(changeset, delete_aliases) when length(delete_aliases) == 0,
184+
do: changeset
185+
186+
defp apply_delete(changeset, delete_aliases) do
187+
aliases = changeset |> Changeset.fetch_field!(:aliases)
188+
189+
delete_tags = delete_aliases |> MapSet.new()
190+
191+
device_aliases = aliases |> Map.keys() |> MapSet.new()
192+
193+
if MapSet.subset?(delete_tags, device_aliases) do
194+
aliases = aliases |> Map.drop(delete_aliases)
195+
196+
changeset
197+
|> Changeset.put_change(:aliases, aliases)
198+
else
199+
Changeset.add_error(changeset, :aliases, "", reason: :alias_tag_not_found)
200+
end
201+
end
202+
203+
@spec apply_update(Changeset.t(), [alias]) :: Changeset.t()
204+
defp apply_update(%Changeset{valid?: false} = changeset, _update_aliases),
205+
do: changeset
206+
207+
defp apply_update(changeset, update_aliases) when length(update_aliases) == 0,
208+
do: changeset
209+
210+
defp apply_update(changeset, update_aliases) do
211+
aliases =
212+
changeset |> Changeset.fetch_field!(:aliases)
213+
214+
aliases = Map.merge(aliases, Map.new(update_aliases))
215+
216+
Changeset.put_change(changeset, :aliases, aliases)
217+
end
218+
end
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
#
2+
# This file is part of Astarte.
3+
#
4+
# Copyright 2025 SECO Mind Srl
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
defmodule Astarte.AppEngine.API.Device.Attributes do
19+
alias Ecto.Changeset
20+
21+
require Logger
22+
23+
defstruct to_update: %{}, to_delete: %{}
24+
25+
@type input :: %{alias_tag => alias_value} | [alias]
26+
@type alias_tag :: String.t()
27+
@type alias_value :: String.t()
28+
@type alias :: {alias_tag, alias_value}
29+
@type t :: %__MODULE__{
30+
to_update: [alias],
31+
to_delete: [alias]
32+
}
33+
34+
@spec validate(input() | nil) :: {:ok, t()} | term()
35+
def validate(attributes) do
36+
attributes =
37+
case attributes do
38+
nil -> []
39+
attributes -> attributes
40+
end
41+
42+
with :ok <- validate_format(attributes) do
43+
# :ok <- validate_device_ownership(aliases, realm_name, device_id) do
44+
{to_delete, to_update} =
45+
attributes
46+
|> Enum.split_with(fn {_key, value} -> is_nil(value) end)
47+
48+
{:ok, %__MODULE__{to_delete: to_delete, to_update: to_update}}
49+
end
50+
end
51+
52+
@spec apply(Changeset.t(), t()) :: Changeset.t()
53+
def apply(changeset, aliases) do
54+
%__MODULE__{to_delete: to_delete, to_update: to_update} = aliases
55+
56+
changeset
57+
|> apply_delete(to_delete)
58+
|> apply_update(to_update)
59+
end
60+
61+
@spec validate_format(input()) :: :ok | {:error, :invalid_alias}
62+
defp validate_format(attributes) do
63+
invalid_attribute? =
64+
Enum.any?(attributes, fn {attribute_key, _value} -> attribute_key == "" end)
65+
66+
if invalid_attribute? do
67+
Logger.warning("Attribute key cannot be an empty string.",
68+
tag: :invalid_attribute_empty_key
69+
)
70+
71+
{:error, :invalid_attributes}
72+
else
73+
:ok
74+
end
75+
end
76+
77+
@spec apply_delete(Changeset.t(), [alias]) :: Changeset.t()
78+
defp apply_delete(%Changeset{valid?: false} = changeset, _delete_aliases), do: changeset
79+
80+
defp apply_delete(changeset, delete_attributes) when length(delete_attributes) == 0,
81+
do: changeset
82+
83+
defp apply_delete(changeset, delete_attributes) do
84+
attributes = changeset |> Changeset.fetch_field!(:attributes)
85+
86+
{delete_keys, _delete_values} = Enum.unzip(delete_attributes)
87+
attributes_to_delete = delete_keys |> MapSet.new()
88+
89+
device_attributes = attributes |> Map.keys() |> MapSet.new()
90+
91+
if MapSet.subset?(attributes_to_delete, device_attributes) do
92+
attributes = attributes |> Map.drop(delete_keys)
93+
94+
changeset
95+
|> Changeset.put_change(:attributes, attributes)
96+
else
97+
Changeset.add_error(changeset, :attributes, "", reason: :attribute_key_not_found)
98+
end
99+
end
100+
101+
@spec apply_update(Changeset.t(), [alias]) :: Changeset.t()
102+
defp apply_update(%Changeset{valid?: false} = changeset, _update_attributes), do: changeset
103+
104+
defp apply_update(changeset, update_attributes) when length(update_attributes) == 0,
105+
do: changeset
106+
107+
defp apply_update(changeset, update_attributes) do
108+
attributes =
109+
changeset |> Changeset.fetch_field!(:attributes)
110+
111+
attributes = Map.merge(attributes, Map.new(update_attributes))
112+
113+
# Works because inserts in CQL are upserts
114+
Changeset.put_change(changeset, :attributes, attributes)
115+
end
116+
end

0 commit comments

Comments
 (0)