Skip to content

Commit 972a81d

Browse files
authored
Merge pull request #1115 from noaccOS/refactor/insert_path_into_db
refactor(appengine): port insert_path_into_db to exandra
2 parents c0d2111 + 4faad21 commit 972a81d

File tree

3 files changed

+105
-74
lines changed

3 files changed

+105
-74
lines changed

apps/astarte_appengine_api/lib/astarte_appengine_api/device/device.ex

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ defmodule Astarte.AppEngine.API.Device do
3636
alias Astarte.Core.Mapping
3737
alias Astarte.Core.Mapping.EndpointsAutomaton
3838
alias Astarte.Core.Mapping.ValueType
39-
alias Astarte.DataAccess.Database
4039
alias Astarte.DataAccess.Mappings
4140
alias Astarte.DataAccess.Device, as: DeviceQueries
4241
alias Astarte.DataAccess.Interface, as: InterfaceQueries
@@ -228,7 +227,6 @@ defmodule Astarte.AppEngine.API.Device do
228227
end
229228

230229
defp update_individual_interface_values(
231-
client,
232230
realm_name,
233231
device_id,
234232
interface_descriptor,
@@ -256,8 +254,10 @@ defmodule Astarte.AppEngine.API.Device do
256254
) do
257255
realm_max_ttl = Queries.fetch_datastream_maximum_storage_retention(realm_name)
258256

257+
now = DateTime.utc_now()
258+
259259
timestamp_micro =
260-
DateTime.utc_now()
260+
now
261261
|> DateTime.to_unix(:microsecond)
262262

263263
db_max_ttl =
@@ -290,13 +290,13 @@ defmodule Astarte.AppEngine.API.Device do
290290

291291
if interface_descriptor.type == :datastream do
292292
Queries.insert_path_into_db(
293-
client,
293+
realm_name,
294294
device_id,
295295
interface_descriptor,
296296
endpoint_id,
297297
path,
298-
div(timestamp_micro, 1000),
299-
timestamp_micro,
298+
now,
299+
now,
300300
opts
301301
)
302302
end
@@ -403,15 +403,16 @@ defmodule Astarte.AppEngine.API.Device do
403403
end
404404

405405
defp update_object_interface_values(
406-
client,
407406
realm_name,
408407
device_id,
409408
interface_descriptor,
410409
path,
411410
raw_value
412411
) do
412+
now = DateTime.utc_now()
413+
413414
timestamp_micro =
414-
DateTime.utc_now()
415+
now
415416
|> DateTime.to_unix(:microsecond)
416417

417418
with {:ok, mappings} <-
@@ -464,13 +465,13 @@ defmodule Astarte.AppEngine.API.Device do
464465
)
465466

466467
Queries.insert_path_into_db(
467-
client,
468+
realm_name,
468469
device_id,
469470
interface_descriptor,
470471
endpoint_id,
471472
path,
472-
div(timestamp_micro, 1000),
473-
timestamp_micro,
473+
now,
474+
now,
474475
opts
475476
)
476477

@@ -514,8 +515,7 @@ defmodule Astarte.AppEngine.API.Device do
514515
raw_value,
515516
_params
516517
) do
517-
with {:ok, client} <- Database.connect(realm: realm_name),
518-
{:ok, device_id} <- Device.decode_device_id(encoded_device_id),
518+
with {:ok, device_id} <- Device.decode_device_id(encoded_device_id),
519519
{:ok, major_version} <-
520520
DeviceQueries.interface_version(realm_name, device_id, interface),
521521
{:ok, interface_row} <-
@@ -525,7 +525,6 @@ defmodule Astarte.AppEngine.API.Device do
525525
path <- "/" <> no_prefix_path do
526526
if interface_descriptor.aggregation == :individual do
527527
update_individual_interface_values(
528-
client,
529528
realm_name,
530529
device_id,
531530
interface_descriptor,
@@ -534,7 +533,6 @@ defmodule Astarte.AppEngine.API.Device do
534533
)
535534
else
536535
update_object_interface_values(
537-
client,
538536
realm_name,
539537
device_id,
540538
interface_descriptor,

apps/astarte_appengine_api/lib/astarte_appengine_api/device/queries.ex

Lines changed: 35 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,21 @@ defmodule Astarte.AppEngine.API.Device.Queries do
2323
alias Astarte.AppEngine.API.Device.DeletionInProgress
2424
alias Astarte.AppEngine.API.Device.DeviceStatus
2525
alias Astarte.AppEngine.API.Device.DevicesList
26-
alias Astarte.AppEngine.API.Device.InterfaceValuesOptions
2726
alias Astarte.AppEngine.API.Device.InterfaceInfo
27+
alias Astarte.AppEngine.API.Realms.IndividualProperty
2828
alias Astarte.AppEngine.API.KvStore
2929
alias Astarte.AppEngine.API.Name
3030
alias Astarte.AppEngine.API.Repo
3131
alias Astarte.Core.CQLUtils
3232
alias Astarte.Core.Device
3333
alias Astarte.Core.InterfaceDescriptor
34-
alias CQEx.Query, as: DatabaseQuery
35-
alias CQEx.Result, as: DatabaseResult
3634
alias Astarte.AppEngine.API.Realm
3735
alias Astarte.AppEngine.API.Devices.Device, as: DatabaseDevice
3836
alias Astarte.AppEngine.API.Endpoint, as: DatabaseEndpoint
3937

4038
require CQEx
4139
require Logger
4240

43-
def first_result_row(values) do
44-
DatabaseResult.head(values)
45-
end
46-
4741
def retrieve_interfaces_list(realm_name, device_id) do
4842
keyspace = keyspace_name(realm_name)
4943

@@ -165,17 +159,8 @@ defmodule Astarte.AppEngine.API.Device.Queries do
165159
|> Repo.all()
166160
end
167161

168-
defp get_ttl_string(opts) do
169-
with {:ok, value} when is_integer(value) <- Keyword.fetch(opts, :ttl) do
170-
"USING TTL #{to_string(value)}"
171-
else
172-
_any_error ->
173-
""
174-
end
175-
end
176-
177162
def insert_path_into_db(
178-
db_client,
163+
realm_name,
179164
device_id,
180165
%InterfaceDescriptor{storage_type: storage_type} = interface_descriptor,
181166
endpoint_id,
@@ -191,30 +176,25 @@ defmodule Astarte.AppEngine.API.Device.Queries do
191176
# TODO: use received value_timestamp when needed
192177
# TODO: :reception_timestamp_submillis is just a place holder right now
193178

194-
ttl_string = get_ttl_string(opts)
179+
keyspace = keyspace_name(realm_name)
195180

196-
insert_statement = """
197-
INSERT INTO individual_properties
198-
(device_id, interface_id, endpoint_id, path,
199-
reception_timestamp, reception_timestamp_submillis, datetime_value)
200-
VALUES (:device_id, :interface_id, :endpoint_id, :path, :reception_timestamp,
201-
:reception_timestamp_submillis, :datetime_value) #{ttl_string};
202-
"""
181+
# Ecto expects microsecond precision
182+
{reception, reception_submillis} = split_datetime_to_ms_and_submillis(reception_timestamp)
183+
value_timestamp = value_timestamp |> DateTime.truncate(:millisecond) |> pad_usec()
203184

204-
{reception_ms, reception_submillis} = split_ms_and_submillis(reception_timestamp)
185+
value = %IndividualProperty{
186+
device_id: device_id,
187+
interface_id: interface_descriptor.interface_id,
188+
endpoint_id: endpoint_id,
189+
path: path,
190+
reception_timestamp: reception,
191+
reception_timestamp_submillis: reception_submillis,
192+
datetime_value: value_timestamp
193+
}
205194

206-
insert_query =
207-
DatabaseQuery.new()
208-
|> DatabaseQuery.statement(insert_statement)
209-
|> DatabaseQuery.put(:device_id, device_id)
210-
|> DatabaseQuery.put(:interface_id, interface_descriptor.interface_id)
211-
|> DatabaseQuery.put(:endpoint_id, endpoint_id)
212-
|> DatabaseQuery.put(:path, path)
213-
|> DatabaseQuery.put(:reception_timestamp, reception_ms)
214-
|> DatabaseQuery.put(:reception_timestamp_submillis, reception_submillis)
215-
|> DatabaseQuery.put(:datetime_value, value_timestamp)
195+
ttl = opts[:ttl]
216196

217-
DatabaseQuery.call!(db_client, insert_query)
197+
Repo.insert!(value, prefix: keyspace, ttl: ttl)
218198

219199
:ok
220200
end
@@ -723,28 +703,6 @@ defmodule Astarte.AppEngine.API.Device.Queries do
723703
{count, values}
724704
end
725705

726-
def get_results_count(_client, _count_query, %InterfaceValuesOptions{downsample_to: nil}) do
727-
# Count will be ignored since there's no downsample_to
728-
nil
729-
end
730-
731-
def get_results_count(client, count_query, opts) do
732-
with {:ok, result} <- DatabaseQuery.call(client, count_query),
733-
[{_count_key, count}] <- DatabaseResult.head(result) do
734-
limit = opts.limit || Config.max_results_limit!()
735-
736-
min(count, limit)
737-
else
738-
error ->
739-
_ =
740-
Logger.warning("Can't retrieve count for #{inspect(count_query)}: #{inspect(error)}.",
741-
tag: "db_error"
742-
)
743-
744-
nil
745-
end
746-
end
747-
748706
def all_properties_for_endpoint!(
749707
realm_name,
750708
device_id,
@@ -1036,4 +994,22 @@ defmodule Astarte.AppEngine.API.Device.Queries do
1036994

1037995
{timestamp_ms, timestamp_submillis}
1038996
end
997+
998+
defp split_datetime_to_ms_and_submillis(datetime) do
999+
datetime_ms = datetime |> DateTime.truncate(:millisecond) |> pad_usec()
1000+
1001+
{usec, _} = datetime.microsecond
1002+
submillis = usec |> rem(1000)
1003+
1004+
{datetime_ms, submillis}
1005+
end
1006+
1007+
defp pad_usec(nil), do: nil
1008+
1009+
defp pad_usec(timestamp) do
1010+
case timestamp.microsecond do
1011+
{_, 6} -> timestamp
1012+
{usec, _} -> %{timestamp | microsecond: {usec, 6}}
1013+
end
1014+
end
10391015
end
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#
2+
# This file is part of Astarte.
3+
#
4+
# Copyright 2025 SECO Mind Srl
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
defmodule Astarte.AppEngine.API.Realms.IndividualProperty do
20+
use TypedEctoSchema
21+
alias Astarte.AppEngine.API.UUID
22+
23+
@primary_key false
24+
typed_schema "individual_properties" do
25+
field :reception, :utc_datetime_usec, virtual: true
26+
field :device_id, UUID, primary_key: true
27+
field :interface_id, UUID, primary_key: true
28+
field :endpoint_id, UUID, primary_key: true
29+
field :path, :string, primary_key: true
30+
field :reception_timestamp, :utc_datetime_usec
31+
field :reception_timestamp_submillis, :integer
32+
field :double_value, :float
33+
field :integer_value, :integer
34+
field :boolean_value, :boolean
35+
field :longinteger_value, :integer
36+
field :string_value, :string
37+
field :binaryblob_value, :binary
38+
field :datetime_value, :utc_datetime_usec
39+
field :doublearray_value, {:array, :float}
40+
field :integerarray_value, {:array, :integer}
41+
field :booleanarray_value, {:array, :boolean}
42+
field :longintegerarray_value, {:array, :integer}
43+
field :stringarray_value, {:array, :string}
44+
field :binaryblobarray_value, {:array, :binary}
45+
field :datetimearray_value, {:array, :utc_datetime_usec}
46+
end
47+
48+
def reception(individual_property) do
49+
nanos =
50+
individual_property.reception_timestamp_submillis
51+
|> Kernel.||(0)
52+
|> Kernel.*(100)
53+
54+
individual_property.reception_timestamp
55+
|> DateTime.add(nanos, :nanosecond)
56+
end
57+
end

0 commit comments

Comments
 (0)