Skip to content

Commit e846ddd

Browse files
committed
refactor(appengine): port Device to exandra part 5
final effort in porting all cqex select queries in Device to exandra - select in `delete_attribute` - select in `insert_value_into_db` Signed-off-by: Francesco Noacco <francesco.noacco@secomind.com>
1 parent 4fe008e commit e846ddd

File tree

2 files changed

+28
-46
lines changed

2 files changed

+28
-46
lines changed

apps/astarte_appengine_api/lib/astarte_appengine_api/device/device.ex

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ defmodule Astarte.AppEngine.API.Device do
8181
aliases_change = Map.get(changeset.changes, :aliases, %{}),
8282
attributes_change = Map.get(changeset.changes, :attributes, %{}),
8383
:ok <- update_aliases(realm_name, client, device_id, aliases_change),
84-
:ok <- update_attributes(client, device_id, attributes_change) do
84+
:ok <- update_attributes(realm_name, client, device_id, attributes_change) do
8585
# Manually merge aliases since changesets don't perform maps deep merge
8686
merged_aliases = merge_data(device_status.aliases, updated_device_status.aliases)
8787
merged_attributes = merge_data(device_status.attributes, updated_device_status.attributes)
@@ -95,7 +95,7 @@ defmodule Astarte.AppEngine.API.Device do
9595
end
9696
end
9797

98-
defp update_attributes(client, device_id, attributes) do
98+
defp update_attributes(realm_name, client, device_id, attributes) do
9999
Enum.reduce_while(attributes, :ok, fn
100100
{"", _attribute_value}, _acc ->
101101
Logger.warning("Attribute key cannot be an empty string.",
@@ -105,7 +105,7 @@ defmodule Astarte.AppEngine.API.Device do
105105
{:halt, {:error, :invalid_attributes}}
106106

107107
{attribute_key, nil}, _acc ->
108-
case Queries.delete_attribute(client, device_id, attribute_key) do
108+
case Queries.delete_attribute(realm_name, client, device_id, attribute_key) do
109109
:ok ->
110110
{:cont, :ok}
111111

@@ -276,6 +276,7 @@ defmodule Astarte.AppEngine.API.Device do
276276
end
277277

278278
Queries.insert_value_into_db(
279+
realm_name,
279280
client,
280281
device_id,
281282
interface_descriptor,
@@ -451,6 +452,7 @@ defmodule Astarte.AppEngine.API.Device do
451452
end
452453

453454
Queries.insert_value_into_db(
455+
realm_name,
454456
client,
455457
device_id,
456458
interface_descriptor,
@@ -729,6 +731,7 @@ defmodule Astarte.AppEngine.API.Device do
729731
Queries.retrieve_mapping(realm_name, interface_descriptor.interface_id, endpoint_id)
730732

731733
Queries.insert_value_into_db(
734+
realm_name,
732735
client,
733736
device_id,
734737
interface_descriptor,

apps/astarte_appengine_api/lib/astarte_appengine_api/device/queries.ex

Lines changed: 22 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,10 @@ defmodule Astarte.AppEngine.API.Device.Queries do
107107

108108
def interface_has_explicit_timestamp?(realm_name, interface_id) do
109109
keyspace = keyspace_name(realm_name)
110+
do_interface_has_explicit_timestamp?(keyspace, interface_id)
111+
end
110112

113+
def do_interface_has_explicit_timestamp?(keyspace, interface_id) do
111114
from(d in DatabaseEndpoint,
112115
where: [interface_id: ^interface_id],
113116
select: d.explicit_timestamp,
@@ -205,6 +208,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
205208

206209
# TODO Copy&pasted from data updater plant, make it a library
207210
def insert_value_into_db(
211+
_realm_name,
208212
db_client,
209213
device_id,
210214
%InterfaceDescriptor{storage_type: :multi_interface_individual_properties_dbtable} =
@@ -243,6 +247,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
243247

244248
# TODO Copy&pasted from data updater plant, make it a library
245249
def insert_value_into_db(
250+
_realm_name,
246251
db_client,
247252
device_id,
248253
%InterfaceDescriptor{storage_type: :multi_interface_individual_properties_dbtable} =
@@ -281,6 +286,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
281286

282287
# TODO Copy&pasted from data updater plant, make it a library
283288
def insert_value_into_db(
289+
_realm_name,
284290
db_client,
285291
device_id,
286292
%InterfaceDescriptor{storage_type: :multi_interface_individual_datastream_dbtable} =
@@ -321,6 +327,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
321327

322328
# TODO Copy&pasted from data updater plant, make it a library
323329
def insert_value_into_db(
330+
realm_name,
324331
db_client,
325332
device_id,
326333
%InterfaceDescriptor{storage_type: :one_object_datastream_dbtable} = interface_descriptor,
@@ -332,32 +339,23 @@ defmodule Astarte.AppEngine.API.Device.Queries do
332339
opts
333340
) do
334341
ttl_string = get_ttl_string(opts)
342+
keyspace = keyspace_name(realm_name)
343+
interface_id = interface_descriptor.interface_id
335344

336-
endpoint_query =
337-
DatabaseQuery.new()
338-
|> DatabaseQuery.statement(
339-
"SELECT endpoint, value_type FROM endpoints WHERE interface_id=:interface_id;"
340-
)
341-
|> DatabaseQuery.put(:interface_id, interface_descriptor.interface_id)
342-
343-
endpoint_rows = DatabaseQuery.call!(db_client, endpoint_query)
344-
345-
explicit_timestamp_query =
346-
DatabaseQuery.new()
347-
|> DatabaseQuery.statement(
348-
"SELECT explicit_timestamp FROM endpoints WHERE interface_id=:interface_id LIMIT 1;"
345+
endpoint_rows =
346+
from(DatabaseEndpoint,
347+
where: [interface_id: ^interface_id],
348+
select: [:endpoint, :value_type]
349349
)
350-
|> DatabaseQuery.put(:interface_id, interface_descriptor.interface_id)
350+
|> Repo.all(prefix: keyspace)
351351

352-
[explicit_timestamp: explicit_timestamp] =
353-
DatabaseQuery.call!(db_client, explicit_timestamp_query)
354-
|> DatabaseResult.head()
352+
explicit_timestamp = do_interface_has_explicit_timestamp?(keyspace, interface_id)
355353

356354
# FIXME: new atoms are created here, we should avoid this. We need to replace CQEx.
357355
column_atoms =
358356
Enum.reduce(endpoint_rows, %{}, fn endpoint, column_atoms_acc ->
359357
endpoint_name =
360-
endpoint[:endpoint]
358+
endpoint.endpoint
361359
|> String.split("/")
362360
|> List.last()
363361

@@ -571,21 +569,13 @@ defmodule Astarte.AppEngine.API.Device.Queries do
571569
end
572570
end
573571

574-
def delete_attribute(client, device_id, attribute_key) do
575-
retrieve_attribute_statement = """
576-
SELECT attributes FROM devices WHERE device_id = :device_id
577-
"""
578-
579-
retrieve_attribute_query =
580-
DatabaseQuery.new()
581-
|> DatabaseQuery.statement(retrieve_attribute_statement)
582-
|> DatabaseQuery.put(:device_id, device_id)
583-
|> DatabaseQuery.consistency(:quorum)
572+
def delete_attribute(realm_name, client, device_id, attribute_key) do
573+
keyspace = keyspace_name(realm_name)
574+
query = from(d in DatabaseDevice, select: d.attributes)
575+
opts = [prefix: keyspace, consistency: :quorum]
584576

585-
with {:ok, result} <- DatabaseQuery.call(client, retrieve_attribute_query),
586-
[attributes: attributes] <- DatabaseResult.head(result),
587-
{^attribute_key, _attribute_value} <-
588-
Enum.find(attributes || [], fn m -> match?({^attribute_key, _}, m) end) do
577+
with {:ok, attributes} <- Repo.fetch(query, device_id, opts),
578+
{:ok, _} <- get_value(attributes, attribute_key, :attribute_key_not_found) do
589579
delete_attribute_statement = """
590580
DELETE attributes[:attribute_key]
591581
FROM devices
@@ -611,17 +601,6 @@ defmodule Astarte.AppEngine.API.Device.Queries do
611601
_ = Logger.warning("Database error, reason: #{inspect(reason)}.", tag: "db_error")
612602
{:error, :database_error}
613603
end
614-
else
615-
nil ->
616-
{:error, :attribute_key_not_found}
617-
618-
%{acc: _, msg: error_message} ->
619-
_ = Logger.warning("Database error: #{error_message}.", tag: "db_error")
620-
{:error, :database_error}
621-
622-
{:error, reason} ->
623-
_ = Logger.warning("Database error, reason: #{inspect(reason)}.", tag: "db_error")
624-
{:error, :database_error}
625604
end
626605
end
627606

0 commit comments

Comments
 (0)