Skip to content

Commit 72aa77f

Browse files
authored
Merge pull request #1087 from noaccOS/refactor/appengine-device-update-exandra
refactor(appengine): port Device update queries to exandra
2 parents 2b0acf1 + 9e36095 commit 72aa77f

File tree

2 files changed

+39
-84
lines changed

2 files changed

+39
-84
lines changed

apps/astarte_appengine_api/lib/astarte_appengine_api/device/device.ex

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ defmodule Astarte.AppEngine.API.Device do
7777
changeset = DeviceStatus.changeset(device_status, device_status_merge),
7878
{:ok, updated_device_status} <- Ecto.Changeset.apply_action(changeset, :update),
7979
credentials_inhibited_change = Map.get(changeset.changes, :credentials_inhibited),
80-
:ok <- change_credentials_inhibited(client, device_id, credentials_inhibited_change),
80+
:ok <- change_credentials_inhibited(realm_name, device_id, credentials_inhibited_change),
8181
aliases_change = Map.get(changeset.changes, :aliases, %{}),
8282
attributes_change = Map.get(changeset.changes, :attributes, %{}),
8383
:ok <- update_aliases(realm_name, client, device_id, aliases_change),
@@ -114,7 +114,7 @@ defmodule Astarte.AppEngine.API.Device do
114114
end
115115

116116
{attribute_key, attribute_value}, _acc ->
117-
case Queries.insert_attribute(client, device_id, attribute_key, attribute_value) do
117+
case Queries.insert_attribute(realm_name, device_id, attribute_key, attribute_value) do
118118
:ok ->
119119
{:cont, :ok}
120120

@@ -154,13 +154,13 @@ defmodule Astarte.AppEngine.API.Device do
154154
|> Enum.into(%{})
155155
end
156156

157-
defp change_credentials_inhibited(_client, _device_id, nil) do
157+
defp change_credentials_inhibited(_realm_name, _device_id, nil) do
158158
:ok
159159
end
160160

161-
defp change_credentials_inhibited(client, device_id, credentials_inhibited)
161+
defp change_credentials_inhibited(realm_name, device_id, credentials_inhibited)
162162
when is_boolean(credentials_inhibited) do
163-
Queries.set_inhibit_credentials_request(client, device_id, credentials_inhibited)
163+
Queries.set_inhibit_credentials_request(realm_name, device_id, credentials_inhibited)
164164
end
165165

166166
@doc """

apps/astarte_appengine_api/lib/astarte_appengine_api/device/queries.ex

Lines changed: 34 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -546,32 +546,19 @@ defmodule Astarte.AppEngine.API.Device.Queries do
546546
Repo.fetch_one(query, consistency: :quorum, error: :device_not_found)
547547
end
548548

549-
def insert_attribute(client, device_id, attribute_key, attribute_value) do
550-
insert_attribute_statement = """
551-
UPDATE devices
552-
SET attributes[:attribute_key] = :attribute_value
553-
WHERE device_id = :device_id
554-
"""
549+
def insert_attribute(realm_name, device_id, attribute_key, attribute_value) do
550+
keyspace = keyspace_name(realm_name)
551+
new_attribute = %{attribute_key => attribute_value}
555552

556553
query =
557-
DatabaseQuery.new()
558-
|> DatabaseQuery.statement(insert_attribute_statement)
559-
|> DatabaseQuery.put(:attribute_key, attribute_key)
560-
|> DatabaseQuery.put(:attribute_value, attribute_value)
561-
|> DatabaseQuery.put(:device_id, device_id)
562-
|> DatabaseQuery.consistency(:each_quorum)
554+
from d in DatabaseDevice,
555+
prefix: ^keyspace,
556+
where: d.device_id == ^device_id,
557+
update: [set: [attributes: fragment("attributes + ?", ^new_attribute)]]
563558

564-
with {:ok, _result} <- DatabaseQuery.call(client, query) do
565-
:ok
566-
else
567-
%{acc: _, msg: error_message} ->
568-
_ = Logger.warning("Database error: #{error_message}.", tag: "db_error")
569-
{:error, :database_error}
559+
Repo.update_all(query, [], consistency: :each_quorum)
570560

571-
{:error, reason} ->
572-
_ = Logger.warning("Database error, reason: #{inspect(reason)}.", tag: "db_error")
573-
{:error, :database_error}
574-
end
561+
:ok
575562
end
576563

577564
def delete_attribute(realm_name, client, device_id, attribute_key) do
@@ -610,46 +597,35 @@ defmodule Astarte.AppEngine.API.Device.Queries do
610597
end
611598

612599
def insert_alias(realm_name, client, device_id, alias_tag, alias_value) do
600+
keyspace = keyspace_name(realm_name)
601+
names_table = Name.__schema__(:source)
602+
613603
insert_alias_to_names_statement = """
614-
INSERT INTO names
604+
INSERT INTO #{keyspace}.#{names_table}
615605
(object_name, object_type, object_uuid)
616-
VALUES (:alias, 1, :device_id)
606+
VALUES (?, 1, ?)
617607
"""
618608

619-
insert_alias_to_names_query =
620-
DatabaseQuery.new()
621-
|> DatabaseQuery.statement(insert_alias_to_names_statement)
622-
|> DatabaseQuery.put(:alias, alias_value)
623-
|> DatabaseQuery.put(:device_id, device_id)
624-
|> DatabaseQuery.consistency(:each_quorum)
625-
|> DatabaseQuery.convert()
609+
insert_alias_to_names_params = [alias_value, device_id]
610+
insert_alias_to_names_query = {insert_alias_to_names_statement, insert_alias_to_names_params}
626611

627-
insert_alias_to_device_statement = """
628-
UPDATE devices
629-
SET aliases[:alias_tag] = :alias
630-
WHERE device_id = :device_id
631-
"""
612+
new_alias = %{alias_tag => alias_value}
632613

633-
insert_alias_to_device_query =
634-
DatabaseQuery.new()
635-
|> DatabaseQuery.statement(insert_alias_to_device_statement)
636-
|> DatabaseQuery.put(:alias_tag, alias_tag)
637-
|> DatabaseQuery.put(:alias, alias_value)
638-
|> DatabaseQuery.put(:device_id, device_id)
639-
|> DatabaseQuery.consistency(:each_quorum)
640-
|> DatabaseQuery.convert()
614+
insert_alias_to_device =
615+
from DatabaseDevice,
616+
prefix: ^keyspace,
617+
where: [device_id: ^device_id],
618+
update: [set: [aliases: fragment("aliases + ?", ^new_alias)]]
619+
620+
insert_alias_to_device_query = Repo.to_sql(:update_all, insert_alias_to_device)
641621

642622
insert_batch =
643-
CQEx.cql_query_batch(
644-
consistency: :each_quorum,
645-
mode: :logged,
646-
queries: [insert_alias_to_names_query, insert_alias_to_device_query]
647-
)
623+
%Exandra.Batch{queries: [insert_alias_to_names_query, insert_alias_to_device_query]}
648624

649625
with {:existing, {:error, :device_not_found}} <-
650626
{:existing, device_alias_to_device_id(realm_name, alias_value)},
651627
:ok <- try_delete_alias(realm_name, client, device_id, alias_tag),
652-
{:ok, _result} <- DatabaseQuery.call(client, insert_batch) do
628+
:ok <- Exandra.execute_batch(Repo, insert_batch, consistency: :each_quorum) do
653629
:ok
654630
else
655631
{:existing, {:ok, _device_uuid}} ->
@@ -660,14 +636,6 @@ defmodule Astarte.AppEngine.API.Device.Queries do
660636

661637
{:error, :device_not_found} ->
662638
{:error, :device_not_found}
663-
664-
%{acc: _, msg: error_message} ->
665-
_ = Logger.warning("Database error: #{error_message}.", tag: "db_error")
666-
{:error, :database_error}
667-
668-
{:error, reason} ->
669-
_ = Logger.warning("Database error, reason: #{inspect(reason)}.", tag: "db_error")
670-
{:error, :database_error}
671639
end
672640
end
673641

@@ -744,31 +712,18 @@ defmodule Astarte.AppEngine.API.Device.Queries do
744712
end
745713
end
746714

747-
def set_inhibit_credentials_request(client, device_id, inhibit_credentials_request) do
748-
statement = """
749-
UPDATE devices
750-
SET inhibit_credentials_request = :inhibit_credentials_request
751-
WHERE device_id = :device_id
752-
"""
715+
def set_inhibit_credentials_request(realm_name, device_id, inhibit_credentials_request) do
716+
keyspace = keyspace_name(realm_name)
753717

754718
query =
755-
DatabaseQuery.new()
756-
|> DatabaseQuery.statement(statement)
757-
|> DatabaseQuery.put(:inhibit_credentials_request, inhibit_credentials_request)
758-
|> DatabaseQuery.put(:device_id, device_id)
759-
|> DatabaseQuery.consistency(:each_quorum)
719+
from DatabaseDevice,
720+
prefix: ^keyspace,
721+
update: [set: [inhibit_credentials_request: ^inhibit_credentials_request]],
722+
where: [device_id: ^device_id]
760723

761-
with {:ok, _result} <- DatabaseQuery.call(client, query) do
762-
:ok
763-
else
764-
%{acc: _, msg: error_message} ->
765-
_ = Logger.warning("Database error: #{error_message}.", tag: "db_error")
766-
{:error, :database_error}
724+
Repo.update_all(query, [], consistency: :each_quorum)
767725

768-
{:error, reason} ->
769-
_ = Logger.warning("Update failed, reason: #{inspect(reason)}.", tag: "db_error")
770-
{:error, :database_error}
771-
end
726+
:ok
772727
end
773728

774729
def retrieve_object_datastream_values(realm_name, device_id, interface_row, path, columns, opts) do

0 commit comments

Comments
 (0)