diff --git a/apps/astarte_appengine_api/lib/astarte_appengine_api/device/aliases.ex b/apps/astarte_appengine_api/lib/astarte_appengine_api/device/aliases.ex new file mode 100644 index 0000000000..b4fb9c77ea --- /dev/null +++ b/apps/astarte_appengine_api/lib/astarte_appengine_api/device/aliases.ex @@ -0,0 +1,162 @@ +# +# This file is part of Astarte. +# +# Copyright 2025 SECO Mind Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +defmodule Astarte.AppEngine.API.Device.Aliases do + alias Astarte.DataAccess.Realms.Device + alias Ecto.Changeset + + alias Astarte.AppEngine.API.Device.Queries + + require Logger + + defstruct to_update: [], to_delete: [] + + @type input :: %{alias_tag => alias_value} | [alias] + @type alias_tag :: String.t() + @type alias_value :: String.t() + @type alias :: {alias_tag, alias_value} + @type t :: %__MODULE__{ + to_update: [alias], + to_delete: [alias_tag] + } + + @spec validate(input() | nil, String.t(), Device.t()) :: {:ok, t()} | term() + def validate(nil, _, _), do: {:ok, %__MODULE__{to_delete: [], to_update: []}} + + def validate(aliases, realm_name, device) do + with :ok <- validate_format(aliases) do + {to_delete, to_update} = aliases |> Enum.split_with(fn {_key, value} -> is_nil(value) end) + to_delete = to_delete |> Enum.map(fn {tag, nil} -> tag end) + state = %__MODULE__{to_delete: to_delete, to_update: to_update} + + with :ok <- validate_device_ownership(state, realm_name, device) do + {:ok, state} + end + end + end + + @spec apply(Changeset.t(), t()) :: Changeset.t() + def apply(changeset, aliases) do + %__MODULE__{to_delete: to_delete, to_update: to_update} = aliases + + changeset + |> apply_delete(to_delete) + |> apply_update(to_update) + end + + @spec validate_format(input()) :: :ok | {:error, :invalid_alias} + defp validate_format(aliases) do + Enum.find_value(aliases, :ok, fn + {_tag, ""} -> + :invalid_value + + {"", _value} -> + :invalid_tag + + _valid_format_tag -> + false + end) + |> case do + :ok -> + :ok + + :invalid_tag -> + Logger.warning("Alias key cannot be an empty string.", tag: :invalid_alias_empty_key) + {:error, :invalid_alias} + + :invalid_value -> + Logger.warning("Alias value cannot be an empty string.", tag: :invalid_alias_empty_value) + {:error, :invalid_alias} + end + end + + @spec validate_device_ownership(t(), String.t(), Device.t()) :: :ok + defp validate_device_ownership(aliases, realm_name, device) do + %__MODULE__{to_delete: to_delete, to_update: to_update} = aliases + + to_delete = device.aliases |> Map.take(to_delete) |> Enum.map(fn {_tag, value} -> value end) + to_update = to_update |> Enum.map(fn {_tag, value} -> value end) + + all_aliases = to_delete ++ to_update + + invalid_name = + Queries.find_all_aliases(realm_name, all_aliases) + |> Enum.find(fn name -> name.object_uuid != device.device_id end) + + if is_nil(invalid_name) do + :ok + else + existing_aliases = + Enum.find(device.aliases, fn {_tag, value} -> value == invalid_name.object_name end) + + inconsistent? = !is_nil(existing_aliases) + + if inconsistent? do + {invalid_tag, _value} = existing_aliases + + Logger.error("Inconsistent alias for #{invalid_tag}.", + device_id: device.device_id, + tag: "inconsistent_alias" + ) + + {:error, :database_error} + else + {:error, :alias_already_in_use} + end + end + end + + @spec apply_delete(Changeset.t(), [alias]) :: Changeset.t() + defp apply_delete(%Changeset{valid?: false} = changeset, _delete_aliases), + do: changeset + + defp apply_delete(changeset, delete_aliases) when length(delete_aliases) == 0, + do: changeset + + defp apply_delete(changeset, delete_aliases) do + aliases = changeset |> Changeset.fetch_field!(:aliases) + + delete_tags = delete_aliases |> MapSet.new() + + device_aliases = aliases |> Map.keys() |> MapSet.new() + + if MapSet.subset?(delete_tags, device_aliases) do + aliases = aliases |> Map.drop(delete_aliases) + + changeset + |> Changeset.put_change(:aliases, aliases) + else + Changeset.add_error(changeset, :aliases, "", reason: :alias_tag_not_found) + end + end + + @spec apply_update(Changeset.t(), [alias]) :: Changeset.t() + defp apply_update(%Changeset{valid?: false} = changeset, _update_aliases), + do: changeset + + defp apply_update(changeset, update_aliases) when length(update_aliases) == 0, + do: changeset + + defp apply_update(changeset, update_aliases) do + aliases = + changeset |> Changeset.fetch_field!(:aliases) + + aliases = Map.merge(aliases, Map.new(update_aliases)) + + Changeset.put_change(changeset, :aliases, aliases) + end +end diff --git a/apps/astarte_appengine_api/lib/astarte_appengine_api/device/attributes.ex b/apps/astarte_appengine_api/lib/astarte_appengine_api/device/attributes.ex new file mode 100644 index 0000000000..13b2f8df8b --- /dev/null +++ b/apps/astarte_appengine_api/lib/astarte_appengine_api/device/attributes.ex @@ -0,0 +1,116 @@ +# +# This file is part of Astarte. +# +# Copyright 2025 SECO Mind Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +defmodule Astarte.AppEngine.API.Device.Attributes do + alias Ecto.Changeset + + require Logger + + defstruct to_update: [], to_delete: [] + + @type input :: %{attribute_tag => attribute_value} | [attribute] + @type attribute_tag :: String.t() + @type attribute_value :: String.t() + @type attribute :: {attribute_tag, attribute_value} + @type t :: + %__MODULE__{ + to_update: [attribute], + to_delete: [attribute_tag] + } + + @spec validate(input() | nil) :: {:ok, t()} | term() + def validate(attributes) do + attributes = + case attributes do + nil -> [] + attributes -> attributes + end + + with :ok <- validate_format(attributes) do + {to_delete, to_update} = + attributes + |> Enum.split_with(fn {_key, value} -> is_nil(value) end) + + to_delete = to_delete |> Enum.map(fn {key, nil} -> key end) + + {:ok, %__MODULE__{to_delete: to_delete, to_update: to_update}} + end + end + + @spec apply(Changeset.t(), t()) :: Changeset.t() + def apply(changeset, attributes) do + %__MODULE__{to_delete: to_delete, to_update: to_update} = attributes + + changeset + |> apply_delete(to_delete) + |> apply_update(to_update) + end + + @spec validate_format(input()) :: :ok | {:error, :invalid_attributes} + defp validate_format(attributes) do + invalid_attribute? = + Enum.any?(attributes, fn {attribute_key, _value} -> attribute_key == "" end) + + if invalid_attribute? do + Logger.warning("Attribute key cannot be an empty string.", + tag: :invalid_attribute_empty_key + ) + + {:error, :invalid_attributes} + else + :ok + end + end + + @spec apply_delete(Changeset.t(), [attribute_tag]) :: Changeset.t() + defp apply_delete(%Changeset{valid?: false} = changeset, _delete_attributes), do: changeset + + defp apply_delete(changeset, delete_attributes) when length(delete_attributes) == 0, + do: changeset + + defp apply_delete(changeset, delete_attributes) do + attributes = changeset |> Changeset.fetch_field!(:attributes) + + attributes_to_delete = delete_attributes |> MapSet.new() + + device_attributes = attributes |> Map.keys() |> MapSet.new() + + if MapSet.subset?(attributes_to_delete, device_attributes) do + attributes = attributes |> Map.drop(delete_attributes) + + changeset + |> Changeset.put_change(:attributes, attributes) + else + Changeset.add_error(changeset, :attributes, "", reason: :attribute_key_not_found) + end + end + + @spec apply_update(Changeset.t(), [attribute]) :: Changeset.t() + defp apply_update(%Changeset{valid?: false} = changeset, _update_attributes), do: changeset + + defp apply_update(changeset, update_attributes) when length(update_attributes) == 0, + do: changeset + + defp apply_update(changeset, update_attributes) do + attributes = + changeset |> Changeset.fetch_field!(:attributes) + + attributes = Map.merge(attributes, Map.new(update_attributes)) + + Changeset.put_change(changeset, :attributes, attributes) + end +end diff --git a/apps/astarte_appengine_api/lib/astarte_appengine_api/device/device.ex b/apps/astarte_appengine_api/lib/astarte_appengine_api/device/device.ex index 2ad0e092ba..a2f24755c1 100644 --- a/apps/astarte_appengine_api/lib/astarte_appengine_api/device/device.ex +++ b/apps/astarte_appengine_api/lib/astarte_appengine_api/device/device.ex @@ -20,6 +20,8 @@ defmodule Astarte.AppEngine.API.Device do The Device context. """ alias Astarte.AppEngine.API.DataTransmitter + alias Astarte.AppEngine.API.Device.Aliases + alias Astarte.AppEngine.API.Device.Attributes alias Astarte.AppEngine.API.Device.AstarteValue alias Astarte.AppEngine.API.Device.DevicesListOptions alias Astarte.AppEngine.API.Device.DeviceStatus @@ -70,97 +72,60 @@ defmodule Astarte.AppEngine.API.Device do end def merge_device_status(realm_name, encoded_device_id, device_status_merge) do + aliases = device_status_merge["aliases"] + attributes = device_status_merge["attributes"] + with {:ok, device_id} <- Device.decode_device_id(encoded_device_id), - {:ok, device_status} <- Queries.retrieve_device_status(realm_name, device_id), - changeset = DeviceStatus.changeset(device_status, device_status_merge), - {:ok, updated_device_status} <- Ecto.Changeset.apply_action(changeset, :update), - credentials_inhibited_change = Map.get(changeset.changes, :credentials_inhibited), - :ok <- change_credentials_inhibited(realm_name, device_id, credentials_inhibited_change), - aliases_change = Map.get(changeset.changes, :aliases, %{}), - attributes_change = Map.get(changeset.changes, :attributes, %{}), - :ok <- update_aliases(realm_name, device_id, aliases_change), - :ok <- update_attributes(realm_name, device_id, attributes_change) do - # Manually merge aliases since changesets don't perform maps deep merge - merged_aliases = merge_data(device_status.aliases, updated_device_status.aliases) - merged_attributes = merge_data(device_status.attributes, updated_device_status.attributes) - - updated_map = - updated_device_status - |> Map.put(:aliases, merged_aliases) - |> Map.put(:attributes, merged_attributes) - - {:ok, updated_map} + {:ok, device} <- Queries.retrieve_device_for_status(realm_name, device_id), + {:ok, aliases} <- Aliases.validate(aliases, realm_name, device), + {:ok, attributes} <- Attributes.validate(attributes) do + do_merge_device_status(realm_name, device_status_merge, device, aliases, attributes) end end - defp update_attributes(realm_name, device_id, attributes) do - Enum.reduce_while(attributes, :ok, fn {attribute_key, attribute_value}, _ -> - case {attribute_key, attribute_value} do - {"", _attribute_value} -> - Logger.warning("Attribute key cannot be an empty string.", - tag: :invalid_attribute_empty_key - ) - - {:halt, {:error, :invalid_attributes}} - - {attribute_key, nil} -> - case Queries.delete_attribute(realm_name, device_id, attribute_key) do - :ok -> - {:cont, :ok} - - {:error, reason} -> - {:halt, {:error, reason}} - end - - {attribute_key, attribute_value} -> - case Queries.insert_attribute(realm_name, device_id, attribute_key, attribute_value) do - :ok -> - {:cont, :ok} - - {:error, reason} -> - {:halt, {:error, reason}} - end + defp do_merge_device_status(realm_name, device_status_merge, device, aliases, attributes) do + params = + case Map.fetch(device_status_merge, "credentials_inhibited") do + {:ok, credentials_inhibited} -> %{credentials_inhibited: credentials_inhibited} + :error -> %{} end - end) - end - defp update_aliases(realm_name, device_id, aliases) do - Enum.reduce_while(aliases, :ok, fn - {_alias_key, ""}, _acc -> - Logger.warning("Alias value cannot be an empty string.", tag: :invalid_alias_empty_value) - {:halt, {:error, :invalid_alias}} + changeset = + DeviceStatus.from_db_row(device) + |> Changeset.cast(params, [:credentials_inhibited]) + |> Aliases.apply(aliases) + |> Attributes.apply(attributes) - {"", _alias_value}, _acc -> - Logger.warning("Alias key cannot be an empty string.", tag: :invalid_alias_empty_key) - {:halt, {:error, :invalid_alias}} + case Changeset.apply_action(changeset, :update) do + {:ok, status} -> + %Aliases{to_delete: alias_tags_to_delete, to_update: aliases_to_update} = aliases - {alias_key, nil}, _acc -> - case Queries.delete_alias(realm_name, device_id, alias_key) do - :ok -> {:cont, :ok} - {:error, reason} -> {:halt, {:error, reason}} - end + merge_device_status_result = + Queries.merge_device_status( + realm_name, + device, + changeset.changes, + alias_tags_to_delete, + aliases_to_update + ) - {alias_key, alias_value}, _acc -> - case Queries.insert_alias(realm_name, device_id, alias_key, alias_value) do - :ok -> {:cont, :ok} - {:error, reason} -> {:halt, {:error, reason}} + with :ok <- merge_device_status_result do + deletion_in_progress? = Queries.deletion_in_progress?(realm_name, device.device_id) + {:ok, %{status | deletion_in_progress: deletion_in_progress?}} end - end) - end - - defp merge_data(old_data, new_data) when is_map(old_data) and is_map(new_data) do - Map.merge(old_data, new_data) - |> Enum.reject(fn {_, v} -> v == nil end) - |> Enum.into(%{}) - end - defp change_credentials_inhibited(_realm_name, _device_id, nil) do - :ok + {:error, changeset} -> + {:error, sanitize_error(changeset)} + end end - defp change_credentials_inhibited(realm_name, device_id, credentials_inhibited) - when is_boolean(credentials_inhibited) do - Queries.set_inhibit_credentials_request(realm_name, device_id, credentials_inhibited) + defp sanitize_error(changeset) do + # if there is a custom error, return it: it was created by Aliases.apply or Attributes.apply + Enum.find_value(changeset.errors, changeset, fn + {:aliases, {"", [reason: reason]}} -> reason + {:attributes, {"", [reason: reason]}} -> reason + _ -> false + end) end @doc """ diff --git a/apps/astarte_appengine_api/lib/astarte_appengine_api/device/queries.ex b/apps/astarte_appengine_api/lib/astarte_appengine_api/device/queries.ex index 5ac444bee4..67c39534a2 100644 --- a/apps/astarte_appengine_api/lib/astarte_appengine_api/device/queries.ex +++ b/apps/astarte_appengine_api/lib/astarte_appengine_api/device/queries.ex @@ -471,7 +471,12 @@ defmodule Astarte.AppEngine.API.Device.Queries do Repo.fetch(query, device_id, error: :device_not_found) end - defp deletion_in_progress?(keyspace, device_id) do + def deletion_in_progress?(realm_name, device_id) do + keyspace = keyspace_name(realm_name) + do_deletion_in_progress?(keyspace, device_id) + end + + defp do_deletion_in_progress?(keyspace, device_id) do case Repo.fetch(DeletionInProgress, device_id, prefix: keyspace) do {:ok, _} -> true {:error, _} -> false @@ -540,6 +545,112 @@ defmodule Astarte.AppEngine.API.Device.Queries do Repo.fetch_one(query, consistency: :quorum, error: :device_not_found) end + def find_all_aliases(realm_name, alias_list) do + keyspace = keyspace_name(realm_name) + + # Queries are chunked to avoid hitting scylla's `max_clustering_key_restrictions_per_query` + alias_list + |> Enum.chunk_every(99) + |> Enum.map(&from(n in Name, where: n.object_type == 1 and n.object_name in ^&1)) + |> Enum.map(&Repo.all(&1, prefix: keyspace)) + |> List.flatten() + end + + def merge_device_status(_, _, device_status_changes, _, _) + when map_size(device_status_changes) == 0, + do: :ok + + def merge_device_status(realm_name, device, changes, alias_tags_to_delete, aliases_to_update) do + keyspace = keyspace_name(realm_name) + + device_query = merge_device_status_device_query(keyspace, device.device_id, changes) + + aliases_queries = + merge_device_status_aliases_queries( + keyspace, + device, + alias_tags_to_delete, + aliases_to_update + ) + + queries = [device_query | aliases_queries] + + case Exandra.execute_batch(Repo, %Exandra.Batch{queries: queries}, consistency: :each_quorum) do + :ok -> + :ok + + {:error, reason} -> + Logger.warning("Database error, reason: #{inspect(reason)}", tag: "db_error") + {:error, :database_error} + end + end + + defp merge_device_status_device_query(keyspace, device_id, changes) do + changes = + case Map.fetch(changes, :credentials_inhibited) do + {:ok, inhibit_credentials_request} -> + changes + |> Map.delete(:credentials_inhibited) + |> Map.put(:inhibit_credentials_request, inhibit_credentials_request) + + :error -> + changes + end + |> Keyword.new() + + device_query = + from DatabaseDevice, + prefix: ^keyspace, + where: [device_id: ^device_id], + update: [set: ^changes] + + Repo.to_sql(:update_all, device_query) + end + + defp merge_device_status_aliases_queries( + keyspace, + device, + alias_tags_to_delete, + aliases_to_update + ) do + {update_tags, update_values} = Enum.unzip(aliases_to_update) + + all_tags = alias_tags_to_delete ++ update_tags + + tags_to_delete = + device.aliases + |> Enum.filter(fn {tag, _value} -> tag in all_tags end) + + # We delete both aliases we mean to delete, and also existing aliases we want to update + # as the name is part of the primary key for the names table. + # Queries are chunked to avoid hitting scylla's `max_clustering_key_restrictions_per_query` + delete_queries = + tags_to_delete + |> Enum.map(fn {_tag, value} -> value end) + |> Enum.chunk_every(99) + |> Enum.map(fn alias_chunk -> + query = + from n in Name, + prefix: ^keyspace, + where: n.object_type == 1 and n.object_name in ^alias_chunk + + Repo.to_sql(:delete_all, query) + end) + + insert_queries = + update_values + |> Enum.map( + &%Name{ + object_name: &1, + object_type: 1, + object_uuid: device.device_id + } + ) + |> Enum.map(&Repo.insert_to_sql(&1, prefix: keyspace)) + + delete_queries ++ insert_queries + end + def insert_attribute(realm_name, device_id, attribute_key, attribute_value) do keyspace = keyspace_name(realm_name) new_attribute = %{attribute_key => attribute_value} @@ -939,7 +1050,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do groups -> groups |> Map.keys() end - deletion_in_progress? = deletion_in_progress?(keyspace, device_id) + deletion_in_progress? = do_deletion_in_progress?(keyspace, device_id) device_id = Device.encode_device_id(device_id) connected = connected || false