Skip to content

Commit 886430f

Browse files
committed
refactor(appengine): port insert_values_to_db's inserts to exandra
Signed-off-by: Francesco Noacco <francesco.noacco@secomind.com>
1 parent 7a87c60 commit 886430f

File tree

2 files changed

+87
-105
lines changed

2 files changed

+87
-105
lines changed

apps/astarte_appengine_api/lib/astarte_appengine_api/device/device.ex

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,6 @@ defmodule Astarte.AppEngine.API.Device do
278278

279279
Queries.insert_value_into_db(
280280
realm_name,
281-
client,
282281
device_id,
283282
interface_descriptor,
284283
endpoint_id,
@@ -454,7 +453,6 @@ defmodule Astarte.AppEngine.API.Device do
454453

455454
Queries.insert_value_into_db(
456455
realm_name,
457-
client,
458456
device_id,
459457
interface_descriptor,
460458
nil,
@@ -718,8 +716,7 @@ defmodule Astarte.AppEngine.API.Device do
718716
# TODO: we should probably allow delete for every path regardless of the interface type
719717
# just for maintenance reasons
720718
def delete_interface_values(realm_name, encoded_device_id, interface, no_prefix_path) do
721-
with {:ok, client} <- Database.connect(realm: realm_name),
722-
{:ok, device_id} <- Device.decode_device_id(encoded_device_id),
719+
with {:ok, device_id} <- Device.decode_device_id(encoded_device_id),
723720
{:ok, major_version} <-
724721
DeviceQueries.interface_version(realm_name, device_id, interface),
725722
{:ok, interface_row} <-
@@ -733,7 +730,6 @@ defmodule Astarte.AppEngine.API.Device do
733730

734731
Queries.insert_value_into_db(
735732
realm_name,
736-
client,
737733
device_id,
738734
interface_descriptor,
739735
endpoint_id,

apps/astarte_appengine_api/lib/astarte_appengine_api/device/queries.ex

Lines changed: 86 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,6 @@ defmodule Astarte.AppEngine.API.Device.Queries do
222222
# TODO Copy&pasted from data updater plant, make it a library
223223
def insert_value_into_db(
224224
realm_name,
225-
_db_client,
226225
device_id,
227226
%InterfaceDescriptor{storage_type: :multi_interface_individual_properties_dbtable} =
228227
interface_descriptor,
@@ -266,8 +265,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
266265

267266
# TODO Copy&pasted from data updater plant, make it a library
268267
def insert_value_into_db(
269-
_realm_name,
270-
db_client,
268+
realm_name,
271269
device_id,
272270
%InterfaceDescriptor{storage_type: :multi_interface_individual_properties_dbtable} =
273271
interface_descriptor,
@@ -278,37 +276,34 @@ defmodule Astarte.AppEngine.API.Device.Queries do
278276
timestamp,
279277
opts
280278
) do
281-
ttl_string = get_ttl_string(opts)
279+
value_column = CQLUtils.type_to_db_column_name(endpoint.value_type)
280+
keyspace = keyspace_name(realm_name)
282281

283282
{timestamp_ms, timestamp_submillis} = split_ms_and_submillis(timestamp)
284283

285284
# TODO: :reception_timestamp_submillis is just a place holder right now
286-
insert_query =
287-
DatabaseQuery.new()
288-
|> DatabaseQuery.statement("""
289-
INSERT INTO #{interface_descriptor.storage}
290-
(device_id, interface_id, endpoint_id, path, reception_timestamp,
291-
#{CQLUtils.type_to_db_column_name(endpoint.value_type)})
292-
VALUES (:device_id, :interface_id, :endpoint_id, :path, :reception_timestamp,
293-
:value) #{ttl_string};
294-
""")
295-
|> DatabaseQuery.put(:device_id, device_id)
296-
|> DatabaseQuery.put(:interface_id, interface_descriptor.interface_id)
297-
|> DatabaseQuery.put(:endpoint_id, endpoint_id)
298-
|> DatabaseQuery.put(:path, path)
299-
|> DatabaseQuery.put(:reception_timestamp, timestamp_ms)
300-
|> DatabaseQuery.put(:reception_timestamp_submillis, timestamp_submillis)
301-
|> DatabaseQuery.put(:value, to_db_friendly_type(value))
285+
interface_storage_attributes = %{
286+
value_column => to_db_friendly_type(value),
287+
device_id: device_id,
288+
interface_id: interface_descriptor.interface_id,
289+
endpoint_id: endpoint_id,
290+
path: path,
291+
reception_timestamp: timestamp_ms,
292+
reception_timestamp_submillis: timestamp_submillis
293+
}
302294

303-
DatabaseQuery.call!(db_client, insert_query)
295+
{1, _} =
296+
Repo.insert_all(interface_descriptor.storage, [interface_storage_attributes],
297+
prefix: keyspace,
298+
ttl: opts[:ttl]
299+
)
304300

305301
:ok
306302
end
307303

308304
# TODO Copy&pasted from data updater plant, make it a library
309305
def insert_value_into_db(
310-
_realm_name,
311-
db_client,
306+
realm_name,
312307
device_id,
313308
%InterfaceDescriptor{storage_type: :multi_interface_individual_datastream_dbtable} =
314309
interface_descriptor,
@@ -319,38 +314,28 @@ defmodule Astarte.AppEngine.API.Device.Queries do
319314
timestamp,
320315
opts
321316
) do
322-
ttl_string = get_ttl_string(opts)
317+
value_column = CQLUtils.type_to_db_column_name(endpoint.value_type)
318+
keyspace = keyspace_name(realm_name)
323319
{timestamp_ms, timestamp_submillis} = split_ms_and_submillis(timestamp)
324320

325-
insert_query =
326-
DatabaseQuery.new()
327-
|> DatabaseQuery.statement("""
328-
INSERT INTO #{interface_descriptor.storage}
329-
(device_id, interface_id, endpoint_id, path, value_timestamp, reception_timestamp, reception_timestamp_submillis,
330-
#{CQLUtils.type_to_db_column_name(endpoint.value_type)})
331-
VALUES (:device_id, :interface_id, :endpoint_id, :path, :value_timestamp, :reception_timestamp,
332-
:reception_timestamp_submillis, :value) #{ttl_string};
333-
""")
334-
|> DatabaseQuery.put(:device_id, device_id)
335-
|> DatabaseQuery.put(:interface_id, interface_descriptor.interface_id)
336-
|> DatabaseQuery.put(:endpoint_id, endpoint.endpoint_id)
337-
|> DatabaseQuery.put(:path, path)
338-
|> DatabaseQuery.put(:value_timestamp, timestamp_ms)
339-
|> DatabaseQuery.put(:reception_timestamp, timestamp_ms)
340-
|> DatabaseQuery.put(:reception_timestamp_submillis, timestamp_submillis)
341-
|> DatabaseQuery.put(:value, to_db_friendly_type(value))
342-
343-
# TODO: |> DatabaseQuery.consistency(insert_consistency(interface_descriptor, endpoint))
344-
345-
DatabaseQuery.call!(db_client, insert_query)
321+
attributes = %{
322+
value_column => to_db_friendly_type(value),
323+
device_id: device_id,
324+
interface_id: interface_descriptor.interface_id,
325+
endpoint_id: endpoint.endpoint_id,
326+
path: path,
327+
value_timestamp: timestamp_ms,
328+
reception_timestamp: timestamp_ms,
329+
reception_timestamp_submillis: timestamp_submillis
330+
}
346331

332+
Repo.insert_all(interface_descriptor.storage, [attributes], prefix: keyspace, ttl: opts[:ttl])
347333
:ok
348334
end
349335

350336
# TODO Copy&pasted from data updater plant, make it a library
351337
def insert_value_into_db(
352338
realm_name,
353-
db_client,
354339
device_id,
355340
%InterfaceDescriptor{storage_type: :one_object_datastream_dbtable} = interface_descriptor,
356341
_endpoint_id,
@@ -360,7 +345,6 @@ defmodule Astarte.AppEngine.API.Device.Queries do
360345
timestamp,
361346
opts
362347
) do
363-
ttl_string = get_ttl_string(opts)
364348
keyspace = keyspace_name(realm_name)
365349
interface_id = interface_descriptor.interface_id
366350

@@ -371,72 +355,74 @@ defmodule Astarte.AppEngine.API.Device.Queries do
371355
)
372356
|> Repo.all(prefix: keyspace)
373357

374-
explicit_timestamp = do_interface_has_explicit_timestamp?(keyspace, interface_id)
375-
376-
# FIXME: new atoms are created here, we should avoid this. We need to replace CQEx.
377-
column_atoms =
378-
Enum.reduce(endpoint_rows, %{}, fn endpoint, column_atoms_acc ->
379-
endpoint_name =
380-
endpoint.endpoint
381-
|> String.split("/")
382-
|> List.last()
358+
explicit_timestamp? = do_interface_has_explicit_timestamp?(keyspace, interface_id)
383359

360+
column_meta =
361+
endpoint_rows
362+
|> Map.new(fn endpoint ->
363+
endpoint_name = endpoint.endpoint |> String.split("/") |> List.last()
384364
column_name = CQLUtils.endpoint_to_db_column_name(endpoint_name)
385-
386-
Map.put(column_atoms_acc, endpoint_name, String.to_atom(column_name))
365+
{endpoint_name, %{name: column_name, type: endpoint.value_type}}
387366
end)
388367

389-
{query_values, placeholders, query_columns} =
390-
Enum.reduce(value, {%{}, "", ""}, fn {obj_key, obj_value},
391-
{query_values_acc, placeholders_acc, query_acc} ->
392-
if column_atoms[obj_key] != nil do
393-
column_name = CQLUtils.endpoint_to_db_column_name(obj_key)
394-
395-
db_value = to_db_friendly_type(obj_value)
396-
next_query_values_acc = Map.put(query_values_acc, column_atoms[obj_key], db_value)
397-
next_placeholders_acc = "#{placeholders_acc} :#{to_string(column_atoms[obj_key])},"
398-
next_query_acc = "#{query_acc} #{column_name}, "
399-
400-
{next_query_values_acc, next_placeholders_acc, next_query_acc}
401-
else
402-
Logger.warning(
403-
"Unexpected object key #{inspect(obj_key)} with value #{inspect(obj_value)}."
404-
)
405-
406-
query_values_acc
407-
end
408-
end)
368+
{timestamp_ms, submillis} = split_ms_and_submillis(timestamp)
409369

410-
{query_columns, placeholders} =
411-
if explicit_timestamp do
412-
{"value_timestamp, #{query_columns}", ":value_timestamp, #{placeholders}"}
413-
else
414-
{query_columns, placeholders}
415-
end
370+
base_attributes = %{
371+
device_id: device_id,
372+
path: path
373+
}
416374

417-
{timestamp_ms, timestamp_submillis} = split_ms_and_submillis(timestamp)
375+
timestamp_attributes = timestamp_attributes(explicit_timestamp?, timestamp_ms, submillis)
376+
value_attributes = value_attributes(column_meta, value)
418377

419-
# TODO: :reception_timestamp_submillis is just a place holder right now
420-
insert_query =
421-
DatabaseQuery.new()
422-
|> DatabaseQuery.statement("""
423-
INSERT INTO #{interface_descriptor.storage} (device_id, path, #{query_columns} reception_timestamp, reception_timestamp_submillis)
424-
VALUES (:device_id, :path, #{placeholders} :reception_timestamp, :reception_timestamp_submillis) #{ttl_string};
425-
""")
426-
|> DatabaseQuery.put(:device_id, device_id)
427-
|> DatabaseQuery.put(:path, path)
428-
|> DatabaseQuery.put(:value_timestamp, timestamp_ms)
429-
|> DatabaseQuery.put(:reception_timestamp, timestamp_ms)
430-
|> DatabaseQuery.put(:reception_timestamp_submillis, timestamp_submillis)
431-
|> DatabaseQuery.merge(query_values)
378+
object_datastream =
379+
base_attributes
380+
|> Map.merge(timestamp_attributes)
381+
|> Map.merge(value_attributes)
432382

433-
# TODO: |> DatabaseQuery.consistency(insert_consistency(interface_descriptor, endpoint))
383+
ttl = Keyword.get(opts, :ttl)
384+
# TODO: consistency = insert_consistency(interface_descriptor, endpoint)
385+
opts = [prefix: keyspace, ttl: ttl, returning: false]
434386

435-
DatabaseQuery.call!(db_client, insert_query)
387+
Repo.insert_all(interface_descriptor.storage, [object_datastream], opts)
436388

437389
:ok
438390
end
439391

392+
defp timestamp_attributes(true = _explicit_timestamp?, timestamp, submillis) do
393+
%{
394+
value_timestamp: timestamp,
395+
reception_timestamp: timestamp,
396+
reception_timestamp_submillis: submillis
397+
}
398+
end
399+
400+
defp timestamp_attributes(_nil_or_false_explicit_timestamp?, timestamp, submillis) do
401+
%{reception_timestamp: timestamp, reception_timestamp_submillis: submillis}
402+
end
403+
404+
defp value_attributes(column_meta, value) do
405+
value =
406+
value
407+
|> Enum.flat_map(fn {key, value} ->
408+
# filter map
409+
case Map.fetch(column_meta, key) do
410+
{:ok, meta} ->
411+
%{name: name, type: type} = meta
412+
data = %{type: type, value: value}
413+
[{name, data}]
414+
415+
:error ->
416+
Logger.warning("Unexpected object key #{inspect(key)} with value #{inspect(value)}.")
417+
418+
[]
419+
end
420+
end)
421+
422+
value
423+
|> Map.new(fn {column, data} -> {column, data.value} end)
424+
end
425+
440426
# TODO Copy&pasted from data updater plant, make it a library
441427
defp to_db_friendly_type(array) when is_list(array) do
442428
# If we have an array, we convert its elements to a db friendly type

0 commit comments

Comments
 (0)