Skip to content

Commit e29c64f

Browse files
authored
Merge pull request #1114 from noaccOS/refactor/insert-value-to-db
refactor(appengine): port insert_values_to_db inserts to exandra
2 parents 2c9109b + 886430f commit e29c64f

File tree

2 files changed

+113
-112
lines changed

2 files changed

+113
-112
lines changed

apps/astarte_appengine_api/lib/astarte_appengine_api/device/device.ex

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,6 @@ defmodule Astarte.AppEngine.API.Device do
278278

279279
Queries.insert_value_into_db(
280280
realm_name,
281-
client,
282281
device_id,
283282
interface_descriptor,
284283
endpoint_id,
@@ -454,7 +453,6 @@ defmodule Astarte.AppEngine.API.Device do
454453

455454
Queries.insert_value_into_db(
456455
realm_name,
457-
client,
458456
device_id,
459457
interface_descriptor,
460458
nil,
@@ -718,8 +716,7 @@ defmodule Astarte.AppEngine.API.Device do
718716
# TODO: we should probably allow delete for every path regardless of the interface type
719717
# just for maintenance reasons
720718
def delete_interface_values(realm_name, encoded_device_id, interface, no_prefix_path) do
721-
with {:ok, client} <- Database.connect(realm: realm_name),
722-
{:ok, device_id} <- Device.decode_device_id(encoded_device_id),
719+
with {:ok, device_id} <- Device.decode_device_id(encoded_device_id),
723720
{:ok, major_version} <-
724721
DeviceQueries.interface_version(realm_name, device_id, interface),
725722
{:ok, interface_row} <-
@@ -733,7 +730,6 @@ defmodule Astarte.AppEngine.API.Device do
733730

734731
Queries.insert_value_into_db(
735732
realm_name,
736-
client,
737733
device_id,
738734
interface_descriptor,
739735
endpoint_id,
@@ -1562,7 +1558,6 @@ defmodule Astarte.AppEngine.API.Device do
15621558

15631559
defp timestamp_column(explicit_timestamp?) do
15641560
case explicit_timestamp? do
1565-
nil -> :reception_timestamp
15661561
false -> :reception_timestamp
15671562
true -> :value_timestamp
15681563
end

apps/astarte_appengine_api/lib/astarte_appengine_api/device/queries.ex

Lines changed: 112 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,18 @@ defmodule Astarte.AppEngine.API.Device.Queries do
116116
end
117117

118118
def do_interface_has_explicit_timestamp?(keyspace, interface_id) do
119-
from(d in DatabaseEndpoint,
120-
where: [interface_id: ^interface_id],
121-
select: d.explicit_timestamp,
122-
limit: 1
123-
)
124-
|> Repo.one!(prefix: keyspace)
119+
interface_explicit_timestamp =
120+
from(d in DatabaseEndpoint,
121+
where: [interface_id: ^interface_id],
122+
select: d.explicit_timestamp,
123+
limit: 1
124+
)
125+
|> Repo.one!(prefix: keyspace)
126+
127+
# ensure boolean value
128+
with nil <- interface_explicit_timestamp do
129+
false
130+
end
125131
end
126132

127133
def fetch_datastream_maximum_storage_retention(realm_name) do
@@ -195,15 +201,17 @@ defmodule Astarte.AppEngine.API.Device.Queries do
195201
:reception_timestamp_submillis, :datetime_value) #{ttl_string};
196202
"""
197203

204+
{reception_ms, reception_submillis} = split_ms_and_submillis(reception_timestamp)
205+
198206
insert_query =
199207
DatabaseQuery.new()
200208
|> DatabaseQuery.statement(insert_statement)
201209
|> DatabaseQuery.put(:device_id, device_id)
202210
|> DatabaseQuery.put(:interface_id, interface_descriptor.interface_id)
203211
|> DatabaseQuery.put(:endpoint_id, endpoint_id)
204212
|> DatabaseQuery.put(:path, path)
205-
|> DatabaseQuery.put(:reception_timestamp, div(reception_timestamp, 1000))
206-
|> DatabaseQuery.put(:reception_timestamp_submillis, rem(reception_timestamp, 1000))
213+
|> DatabaseQuery.put(:reception_timestamp, reception_ms)
214+
|> DatabaseQuery.put(:reception_timestamp_submillis, reception_submillis)
207215
|> DatabaseQuery.put(:datetime_value, value_timestamp)
208216

209217
DatabaseQuery.call!(db_client, insert_query)
@@ -214,7 +222,6 @@ defmodule Astarte.AppEngine.API.Device.Queries do
214222
# TODO Copy&pasted from data updater plant, make it a library
215223
def insert_value_into_db(
216224
realm_name,
217-
_db_client,
218225
device_id,
219226
%InterfaceDescriptor{storage_type: :multi_interface_individual_properties_dbtable} =
220227
interface_descriptor,
@@ -258,8 +265,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
258265

259266
# TODO Copy&pasted from data updater plant, make it a library
260267
def insert_value_into_db(
261-
_realm_name,
262-
db_client,
268+
realm_name,
263269
device_id,
264270
%InterfaceDescriptor{storage_type: :multi_interface_individual_properties_dbtable} =
265271
interface_descriptor,
@@ -270,35 +276,34 @@ defmodule Astarte.AppEngine.API.Device.Queries do
270276
timestamp,
271277
opts
272278
) do
273-
ttl_string = get_ttl_string(opts)
279+
value_column = CQLUtils.type_to_db_column_name(endpoint.value_type)
280+
keyspace = keyspace_name(realm_name)
281+
282+
{timestamp_ms, timestamp_submillis} = split_ms_and_submillis(timestamp)
274283

275284
# TODO: :reception_timestamp_submillis is just a place holder right now
276-
insert_query =
277-
DatabaseQuery.new()
278-
|> DatabaseQuery.statement("""
279-
INSERT INTO #{interface_descriptor.storage}
280-
(device_id, interface_id, endpoint_id, path, reception_timestamp,
281-
#{CQLUtils.type_to_db_column_name(endpoint.value_type)})
282-
VALUES (:device_id, :interface_id, :endpoint_id, :path, :reception_timestamp,
283-
:value) #{ttl_string};
284-
""")
285-
|> DatabaseQuery.put(:device_id, device_id)
286-
|> DatabaseQuery.put(:interface_id, interface_descriptor.interface_id)
287-
|> DatabaseQuery.put(:endpoint_id, endpoint_id)
288-
|> DatabaseQuery.put(:path, path)
289-
|> DatabaseQuery.put(:reception_timestamp, div(timestamp, 1000))
290-
|> DatabaseQuery.put(:reception_timestamp_submillis, div(timestamp, 100))
291-
|> DatabaseQuery.put(:value, to_db_friendly_type(value))
285+
interface_storage_attributes = %{
286+
value_column => to_db_friendly_type(value),
287+
device_id: device_id,
288+
interface_id: interface_descriptor.interface_id,
289+
endpoint_id: endpoint_id,
290+
path: path,
291+
reception_timestamp: timestamp_ms,
292+
reception_timestamp_submillis: timestamp_submillis
293+
}
292294

293-
DatabaseQuery.call!(db_client, insert_query)
295+
{1, _} =
296+
Repo.insert_all(interface_descriptor.storage, [interface_storage_attributes],
297+
prefix: keyspace,
298+
ttl: opts[:ttl]
299+
)
294300

295301
:ok
296302
end
297303

298304
# TODO Copy&pasted from data updater plant, make it a library
299305
def insert_value_into_db(
300-
_realm_name,
301-
db_client,
306+
realm_name,
302307
device_id,
303308
%InterfaceDescriptor{storage_type: :multi_interface_individual_datastream_dbtable} =
304309
interface_descriptor,
@@ -309,37 +314,28 @@ defmodule Astarte.AppEngine.API.Device.Queries do
309314
timestamp,
310315
opts
311316
) do
312-
ttl_string = get_ttl_string(opts)
313-
314-
insert_query =
315-
DatabaseQuery.new()
316-
|> DatabaseQuery.statement("""
317-
INSERT INTO #{interface_descriptor.storage}
318-
(device_id, interface_id, endpoint_id, path, value_timestamp, reception_timestamp, reception_timestamp_submillis,
319-
#{CQLUtils.type_to_db_column_name(endpoint.value_type)})
320-
VALUES (:device_id, :interface_id, :endpoint_id, :path, :value_timestamp, :reception_timestamp,
321-
:reception_timestamp_submillis, :value) #{ttl_string};
322-
""")
323-
|> DatabaseQuery.put(:device_id, device_id)
324-
|> DatabaseQuery.put(:interface_id, interface_descriptor.interface_id)
325-
|> DatabaseQuery.put(:endpoint_id, endpoint.endpoint_id)
326-
|> DatabaseQuery.put(:path, path)
327-
|> DatabaseQuery.put(:value_timestamp, div(timestamp, 1000))
328-
|> DatabaseQuery.put(:reception_timestamp, div(timestamp, 1000))
329-
|> DatabaseQuery.put(:reception_timestamp_submillis, rem(timestamp, 1000))
330-
|> DatabaseQuery.put(:value, to_db_friendly_type(value))
331-
332-
# TODO: |> DatabaseQuery.consistency(insert_consistency(interface_descriptor, endpoint))
317+
value_column = CQLUtils.type_to_db_column_name(endpoint.value_type)
318+
keyspace = keyspace_name(realm_name)
319+
{timestamp_ms, timestamp_submillis} = split_ms_and_submillis(timestamp)
333320

334-
DatabaseQuery.call!(db_client, insert_query)
321+
attributes = %{
322+
value_column => to_db_friendly_type(value),
323+
device_id: device_id,
324+
interface_id: interface_descriptor.interface_id,
325+
endpoint_id: endpoint.endpoint_id,
326+
path: path,
327+
value_timestamp: timestamp_ms,
328+
reception_timestamp: timestamp_ms,
329+
reception_timestamp_submillis: timestamp_submillis
330+
}
335331

332+
Repo.insert_all(interface_descriptor.storage, [attributes], prefix: keyspace, ttl: opts[:ttl])
336333
:ok
337334
end
338335

339336
# TODO Copy&pasted from data updater plant, make it a library
340337
def insert_value_into_db(
341338
realm_name,
342-
db_client,
343339
device_id,
344340
%InterfaceDescriptor{storage_type: :one_object_datastream_dbtable} = interface_descriptor,
345341
_endpoint_id,
@@ -349,7 +345,6 @@ defmodule Astarte.AppEngine.API.Device.Queries do
349345
timestamp,
350346
opts
351347
) do
352-
ttl_string = get_ttl_string(opts)
353348
keyspace = keyspace_name(realm_name)
354349
interface_id = interface_descriptor.interface_id
355350

@@ -360,70 +355,74 @@ defmodule Astarte.AppEngine.API.Device.Queries do
360355
)
361356
|> Repo.all(prefix: keyspace)
362357

363-
explicit_timestamp = do_interface_has_explicit_timestamp?(keyspace, interface_id)
364-
365-
# FIXME: new atoms are created here, we should avoid this. We need to replace CQEx.
366-
column_atoms =
367-
Enum.reduce(endpoint_rows, %{}, fn endpoint, column_atoms_acc ->
368-
endpoint_name =
369-
endpoint.endpoint
370-
|> String.split("/")
371-
|> List.last()
358+
explicit_timestamp? = do_interface_has_explicit_timestamp?(keyspace, interface_id)
372359

360+
column_meta =
361+
endpoint_rows
362+
|> Map.new(fn endpoint ->
363+
endpoint_name = endpoint.endpoint |> String.split("/") |> List.last()
373364
column_name = CQLUtils.endpoint_to_db_column_name(endpoint_name)
374-
375-
Map.put(column_atoms_acc, endpoint_name, String.to_atom(column_name))
365+
{endpoint_name, %{name: column_name, type: endpoint.value_type}}
376366
end)
377367

378-
{query_values, placeholders, query_columns} =
379-
Enum.reduce(value, {%{}, "", ""}, fn {obj_key, obj_value},
380-
{query_values_acc, placeholders_acc, query_acc} ->
381-
if column_atoms[obj_key] != nil do
382-
column_name = CQLUtils.endpoint_to_db_column_name(obj_key)
383-
384-
db_value = to_db_friendly_type(obj_value)
385-
next_query_values_acc = Map.put(query_values_acc, column_atoms[obj_key], db_value)
386-
next_placeholders_acc = "#{placeholders_acc} :#{to_string(column_atoms[obj_key])},"
387-
next_query_acc = "#{query_acc} #{column_name}, "
388-
389-
{next_query_values_acc, next_placeholders_acc, next_query_acc}
390-
else
391-
Logger.warning(
392-
"Unexpected object key #{inspect(obj_key)} with value #{inspect(obj_value)}."
393-
)
368+
{timestamp_ms, submillis} = split_ms_and_submillis(timestamp)
394369

395-
query_values_acc
396-
end
397-
end)
370+
base_attributes = %{
371+
device_id: device_id,
372+
path: path
373+
}
398374

399-
{query_columns, placeholders} =
400-
if explicit_timestamp do
401-
{"value_timestamp, #{query_columns}", ":value_timestamp, #{placeholders}"}
402-
else
403-
{query_columns, placeholders}
404-
end
375+
timestamp_attributes = timestamp_attributes(explicit_timestamp?, timestamp_ms, submillis)
376+
value_attributes = value_attributes(column_meta, value)
405377

406-
# TODO: :reception_timestamp_submillis is just a place holder right now
407-
insert_query =
408-
DatabaseQuery.new()
409-
|> DatabaseQuery.statement("""
410-
INSERT INTO #{interface_descriptor.storage} (device_id, path, #{query_columns} reception_timestamp, reception_timestamp_submillis)
411-
VALUES (:device_id, :path, #{placeholders} :reception_timestamp, :reception_timestamp_submillis) #{ttl_string};
412-
""")
413-
|> DatabaseQuery.put(:device_id, device_id)
414-
|> DatabaseQuery.put(:path, path)
415-
|> DatabaseQuery.put(:value_timestamp, div(timestamp, 1000))
416-
|> DatabaseQuery.put(:reception_timestamp, div(timestamp, 1000))
417-
|> DatabaseQuery.put(:reception_timestamp_submillis, rem(timestamp, 1000))
418-
|> DatabaseQuery.merge(query_values)
378+
object_datastream =
379+
base_attributes
380+
|> Map.merge(timestamp_attributes)
381+
|> Map.merge(value_attributes)
419382

420-
# TODO: |> DatabaseQuery.consistency(insert_consistency(interface_descriptor, endpoint))
383+
ttl = Keyword.get(opts, :ttl)
384+
# TODO: consistency = insert_consistency(interface_descriptor, endpoint)
385+
opts = [prefix: keyspace, ttl: ttl, returning: false]
421386

422-
DatabaseQuery.call!(db_client, insert_query)
387+
Repo.insert_all(interface_descriptor.storage, [object_datastream], opts)
423388

424389
:ok
425390
end
426391

392+
defp timestamp_attributes(true = _explicit_timestamp?, timestamp, submillis) do
393+
%{
394+
value_timestamp: timestamp,
395+
reception_timestamp: timestamp,
396+
reception_timestamp_submillis: submillis
397+
}
398+
end
399+
400+
defp timestamp_attributes(_nil_or_false_explicit_timestamp?, timestamp, submillis) do
401+
%{reception_timestamp: timestamp, reception_timestamp_submillis: submillis}
402+
end
403+
404+
defp value_attributes(column_meta, value) do
405+
value =
406+
value
407+
|> Enum.flat_map(fn {key, value} ->
408+
# filter map
409+
case Map.fetch(column_meta, key) do
410+
{:ok, meta} ->
411+
%{name: name, type: type} = meta
412+
data = %{type: type, value: value}
413+
[{name, data}]
414+
415+
:error ->
416+
Logger.warning("Unexpected object key #{inspect(key)} with value #{inspect(value)}.")
417+
418+
[]
419+
end
420+
end)
421+
422+
value
423+
|> Map.new(fn {column, data} -> {column, data.value} end)
424+
end
425+
427426
# TODO Copy&pasted from data updater plant, make it a library
428427
defp to_db_friendly_type(array) when is_list(array) do
429428
# If we have an array, we convert its elements to a db friendly type
@@ -1030,4 +1029,11 @@ defmodule Astarte.AppEngine.API.Device.Queries do
10301029
{:error, :database_error}
10311030
end
10321031
end
1032+
1033+
defp split_ms_and_submillis(timestamp_micro) do
1034+
timestamp_ms = div(timestamp_micro, 1000)
1035+
timestamp_submillis = rem(timestamp_micro, 1000)
1036+
1037+
{timestamp_ms, timestamp_submillis}
1038+
end
10331039
end

0 commit comments

Comments
 (0)