Skip to content

Commit 60b662b

Browse files
committed
refactor(appengine): use datetimes for data insertion
using integer timestamps, especially when doing calculations on them, is error prone and we've just recently resolved a bug that was caused by a double "microsecond to millisecond" conversion. elixir has something more solid than that: `DateTime`s. although currently unused, the submillis should now use the correct number of digits. an unused and nonfunctioning function in `IndividualProperty` has been removed Signed-off-by: Francesco Noacco <francesco.noacco@secomind.com>
1 parent a99ddfa commit 60b662b

File tree

5 files changed

+120
-63
lines changed

5 files changed

+120
-63
lines changed

apps/astarte_appengine_api/lib/astarte_appengine_api/datetime.ex

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,24 @@ defmodule Astarte.AppEngine.API.DateTime do
4040

4141
@spec cast(t() | any()) :: {:ok, t()} | :error
4242
def cast(datetime_or_timestamp_or_any), do: load(datetime_or_timestamp_or_any)
43+
44+
def split_submillis(timestamp) do
45+
timestamp_ms = DateTime.truncate(timestamp, :millisecond)
46+
47+
submillis =
48+
case timestamp.microsecond do
49+
{_, 0} ->
50+
0
51+
52+
{usec, precision} ->
53+
invalid_digits = 6 - precision
54+
microseconds = Integer.floor_div(usec, 10 ** invalid_digits) |> rem(1000)
55+
56+
# `DateTime`s are microsecond precision, individual_properties's submillis
57+
# have one extra digit. Keep the unit consistent with DUP
58+
_decimicroseconds = microseconds * 10
59+
end
60+
61+
{timestamp_ms, submillis}
62+
end
4363
end

apps/astarte_appengine_api/lib/astarte_appengine_api/device/device.ex

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,6 @@ defmodule Astarte.AppEngine.API.Device do
256256

257257
now = DateTime.utc_now()
258258

259-
timestamp_micro =
260-
now
261-
|> DateTime.to_unix(:microsecond)
262-
263259
db_max_ttl =
264260
if mapping.database_retention_policy == :use_ttl do
265261
min(realm_max_ttl, mapping.database_retention_ttl)
@@ -284,7 +280,7 @@ defmodule Astarte.AppEngine.API.Device do
284280
mapping,
285281
path,
286282
value,
287-
timestamp_micro,
283+
now,
288284
opts
289285
)
290286

@@ -411,10 +407,6 @@ defmodule Astarte.AppEngine.API.Device do
411407
) do
412408
now = DateTime.utc_now()
413409

414-
timestamp_micro =
415-
now
416-
|> DateTime.to_unix(:microsecond)
417-
418410
with {:ok, mappings} <-
419411
Mappings.fetch_interface_mappings(
420412
realm_name,
@@ -460,7 +452,7 @@ defmodule Astarte.AppEngine.API.Device do
460452
nil,
461453
path,
462454
value,
463-
timestamp_micro,
455+
now,
464456
opts
465457
)
466458

apps/astarte_appengine_api/lib/astarte_appengine_api/device/queries.ex

Lines changed: 28 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
2020
import Ecto.Query
2121

2222
alias Astarte.AppEngine.API.Config
23+
alias Astarte.AppEngine.API.DateTime, as: DateTimeMs
2324
alias Astarte.AppEngine.API.Device.DeletionInProgress
2425
alias Astarte.AppEngine.API.Device.DeviceStatus
2526
alias Astarte.AppEngine.API.Device.DevicesList
@@ -178,19 +179,17 @@ defmodule Astarte.AppEngine.API.Device.Queries do
178179

179180
keyspace = keyspace_name(realm_name)
180181

181-
# Ecto expects microsecond precision
182-
{reception, reception_submillis} = split_datetime_to_ms_and_submillis(reception_timestamp)
183-
value_timestamp = value_timestamp |> DateTime.truncate(:millisecond) |> pad_usec()
182+
value =
183+
%IndividualProperty{
184+
device_id: device_id,
185+
interface_id: interface_descriptor.interface_id,
186+
endpoint_id: endpoint_id,
187+
path: path,
188+
reception: reception_timestamp,
189+
datetime_value: value_timestamp
190+
}
184191

185-
value = %IndividualProperty{
186-
device_id: device_id,
187-
interface_id: interface_descriptor.interface_id,
188-
endpoint_id: endpoint_id,
189-
path: path,
190-
reception_timestamp: reception,
191-
reception_timestamp_submillis: reception_submillis,
192-
datetime_value: value_timestamp
193-
}
192+
value = value |> IndividualProperty.prepare_for_db()
194193

195194
ttl = opts[:ttl]
196195

@@ -258,8 +257,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
258257
) do
259258
value_column = CQLUtils.type_to_db_column_name(endpoint.value_type)
260259
keyspace = keyspace_name(realm_name)
261-
262-
{timestamp_ms, timestamp_submillis} = split_ms_and_submillis(timestamp)
260+
{timestamp_ms, timestamp_submillis} = DateTimeMs.split_submillis(timestamp)
263261

264262
# TODO: :reception_timestamp_submillis is just a place holder right now
265263
interface_storage_attributes = %{
@@ -296,7 +294,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
296294
) do
297295
value_column = CQLUtils.type_to_db_column_name(endpoint.value_type)
298296
keyspace = keyspace_name(realm_name)
299-
{timestamp_ms, timestamp_submillis} = split_ms_and_submillis(timestamp)
297+
{timestamp_ms, timestamp_submillis} = DateTimeMs.split_submillis(timestamp)
300298

301299
attributes = %{
302300
value_column => to_db_friendly_type(value),
@@ -309,7 +307,12 @@ defmodule Astarte.AppEngine.API.Device.Queries do
309307
reception_timestamp_submillis: timestamp_submillis
310308
}
311309

312-
Repo.insert_all(interface_descriptor.storage, [attributes], prefix: keyspace, ttl: opts[:ttl])
310+
{1, _} =
311+
Repo.insert_all(interface_descriptor.storage, [attributes],
312+
prefix: keyspace,
313+
ttl: opts[:ttl]
314+
)
315+
313316
:ok
314317
end
315318

@@ -345,14 +348,12 @@ defmodule Astarte.AppEngine.API.Device.Queries do
345348
{endpoint_name, %{name: column_name, type: endpoint.value_type}}
346349
end)
347350

348-
{timestamp_ms, submillis} = split_ms_and_submillis(timestamp)
349-
350351
base_attributes = %{
351352
device_id: device_id,
352353
path: path
353354
}
354355

355-
timestamp_attributes = timestamp_attributes(explicit_timestamp?, timestamp_ms, submillis)
356+
timestamp_attributes = timestamp_attributes(explicit_timestamp?, timestamp)
356357
value_attributes = value_attributes(column_meta, value)
357358

358359
object_datastream =
@@ -369,15 +370,21 @@ defmodule Astarte.AppEngine.API.Device.Queries do
369370
:ok
370371
end
371372

372-
defp timestamp_attributes(true = _explicit_timestamp?, timestamp, submillis) do
373+
defp timestamp_attributes(true = _explicit_timestamp?, timestamp) do
374+
{timestamp, submillis} =
375+
Astarte.AppEngine.API.DateTime.split_submillis(timestamp)
376+
373377
%{
374378
value_timestamp: timestamp,
375379
reception_timestamp: timestamp,
376380
reception_timestamp_submillis: submillis
377381
}
378382
end
379383

380-
defp timestamp_attributes(_nil_or_false_explicit_timestamp?, timestamp, submillis) do
384+
defp timestamp_attributes(_nil_or_false_explicit_timestamp?, timestamp) do
385+
{timestamp, submillis} =
386+
Astarte.AppEngine.API.DateTime.split_submillis(timestamp)
387+
381388
%{reception_timestamp: timestamp, reception_timestamp_submillis: submillis}
382389
end
383390

@@ -985,29 +992,4 @@ defmodule Astarte.AppEngine.API.Device.Queries do
985992
{:error, :database_error}
986993
end
987994
end
988-
989-
defp split_ms_and_submillis(timestamp_micro) do
990-
timestamp_ms = div(timestamp_micro, 1000)
991-
timestamp_submillis = rem(timestamp_micro, 1000)
992-
993-
{timestamp_ms, timestamp_submillis}
994-
end
995-
996-
defp split_datetime_to_ms_and_submillis(datetime) do
997-
datetime_ms = datetime |> DateTime.truncate(:millisecond) |> pad_usec()
998-
999-
{usec, _} = datetime.microsecond
1000-
submillis = usec |> rem(1000)
1001-
1002-
{datetime_ms, submillis}
1003-
end
1004-
1005-
defp pad_usec(nil), do: nil
1006-
1007-
defp pad_usec(timestamp) do
1008-
case timestamp.microsecond do
1009-
{_, 6} -> timestamp
1010-
{usec, _} -> %{timestamp | microsecond: {usec, 6}}
1011-
end
1012-
end
1013995
end
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#
2+
# This file is part of Astarte.
3+
#
4+
# Copyright 2025 SECO Mind Srl
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
defmodule Astarte.AppEngine.API.Realms.IndividualDatastream do
20+
use TypedEctoSchema
21+
alias Astarte.AppEngine.API.DateTime, as: DateTimeMs
22+
alias Astarte.AppEngine.API.UUID
23+
24+
@primary_key false
25+
typed_schema "individual_datastreams" do
26+
field :reception, DateTimeMs, virtual: true
27+
field :device_id, UUID, primary_key: true
28+
field :interface_id, UUID, primary_key: true
29+
field :endpoint_id, UUID, primary_key: true
30+
field :path, :string, primary_key: true
31+
field :value_timestamp, DateTimeMs, primary_key: true
32+
field :reception_timestamp, DateTimeMs, primary_key: true
33+
field :reception_timestamp_submillis, :integer, primary_key: true
34+
field :binaryblob_value, :binary
35+
field :binaryblobarray_value, {:array, :binary}
36+
field :boolean_value, :boolean
37+
field :booleanarray_value, {:array, :boolean}
38+
field :datetime_value, DateTimeMs
39+
field :datetimearray_value, {:array, DateTimeMs}
40+
field :double_value, :float
41+
field :doublearray_value, {:array, :float}
42+
field :integer_value, :integer
43+
field :integerarray_value, {:array, :integer}
44+
field :longinteger_value, :integer
45+
field :longintegerarray_value, {:array, :integer}
46+
field :string_value, :string
47+
field :stringarray_value, {:array, :string}
48+
end
49+
50+
def prepare_for_db(%{reception: nil} = individual_datastream), do: individual_datastream
51+
52+
def prepare_for_db(individual_datastream) do
53+
{reception_ms, submillis} = DateTimeMs.split_submillis(individual_datastream.reception)
54+
55+
%{
56+
individual_datastream
57+
| reception_timestamp: reception_ms,
58+
reception_timestamp_submillis: submillis
59+
}
60+
end
61+
end

apps/astarte_appengine_api/lib/astarte_appengine_api/realms/individual_property.ex

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,15 @@ defmodule Astarte.AppEngine.API.Realms.IndividualProperty do
4646
field :datetimearray_value, {:array, DateTimeMs}
4747
end
4848

49-
def reception(individual_property) do
50-
nanos =
51-
individual_property.reception_timestamp_submillis
52-
|> Kernel.||(0)
53-
|> Kernel.*(100)
49+
def prepare_for_db(%{reception: nil} = individual_property), do: individual_property
5450

55-
individual_property.reception_timestamp
56-
|> DateTime.add(nanos, :nanosecond)
51+
def prepare_for_db(individual_property) do
52+
{reception_ms, submillis} = DateTimeMs.split_submillis(individual_property.reception)
53+
54+
%{
55+
individual_property
56+
| reception_timestamp: reception_ms,
57+
reception_timestamp_submillis: submillis
58+
}
5759
end
5860
end

0 commit comments

Comments
 (0)