Skip to content

Commit cc6f174

Browse files
committed
refactor(appengine): use datetimes for data insertion
using integer timestamps, especially when doing calculations on them, is error prone and we've just recently resolved a bug that was caused by a double "microsecond to millisecond" conversion. elixir has something more solid than that: `DateTime`s. although currently unused, the submillis should now use the correct number of digits. an unused and nonfunctioning function in `IndividualProperty` has been removed Signed-off-by: Francesco Noacco <francesco.noacco@secomind.com>
1 parent cffa1f4 commit cc6f174

File tree

5 files changed

+110
-62
lines changed

5 files changed

+110
-62
lines changed

apps/astarte_appengine_api/lib/astarte_appengine_api/datetime.ex

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,14 @@ defmodule Astarte.AppEngine.API.DateTime do
4040

4141
@spec cast(t() | any()) :: {:ok, t()} | :error
4242
def cast(datetime_or_timestamp_or_any), do: load(datetime_or_timestamp_or_any)
43+
44+
def split_submillis(timestamp) do
45+
timestamp_ms = DateTime.truncate(timestamp, :millisecond)
46+
submillis = timestamp |> DateTime.to_unix(:microsecond) |> rem(1000)
47+
# `DateTime`s are microsecond precision, individual_properties's submillis
48+
# have one extra digit. Keep the unit consistent with DUP
49+
decimicrosecond_submillis = submillis * 10
50+
51+
{timestamp_ms, decimicrosecond_submillis}
52+
end
4353
end

apps/astarte_appengine_api/lib/astarte_appengine_api/device/device.ex

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,6 @@ defmodule Astarte.AppEngine.API.Device do
256256

257257
now = DateTime.utc_now()
258258

259-
timestamp_micro =
260-
now
261-
|> DateTime.to_unix(:microsecond)
262-
263259
db_max_ttl =
264260
if mapping.database_retention_policy == :use_ttl do
265261
min(realm_max_ttl, mapping.database_retention_ttl)
@@ -284,7 +280,7 @@ defmodule Astarte.AppEngine.API.Device do
284280
mapping,
285281
path,
286282
value,
287-
timestamp_micro,
283+
now,
288284
opts
289285
)
290286

@@ -411,10 +407,6 @@ defmodule Astarte.AppEngine.API.Device do
411407
) do
412408
now = DateTime.utc_now()
413409

414-
timestamp_micro =
415-
now
416-
|> DateTime.to_unix(:microsecond)
417-
418410
with {:ok, mappings} <-
419411
Mappings.fetch_interface_mappings(
420412
realm_name,
@@ -460,7 +452,7 @@ defmodule Astarte.AppEngine.API.Device do
460452
nil,
461453
path,
462454
value,
463-
timestamp_micro,
455+
now,
464456
opts
465457
)
466458

apps/astarte_appengine_api/lib/astarte_appengine_api/device/queries.ex

Lines changed: 28 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
2020
import Ecto.Query
2121

2222
alias Astarte.AppEngine.API.Config
23+
alias Astarte.AppEngine.API.DateTime, as: DateTimeMs
2324
alias Astarte.AppEngine.API.Device.DeletionInProgress
2425
alias Astarte.AppEngine.API.Device.DeviceStatus
2526
alias Astarte.AppEngine.API.Device.DevicesList
@@ -181,19 +182,17 @@ defmodule Astarte.AppEngine.API.Device.Queries do
181182

182183
keyspace = DataAccessRealm.keyspace_name(realm_name)
183184

184-
# Ecto expects microsecond precision
185-
{reception, reception_submillis} = split_datetime_to_ms_and_submillis(reception_timestamp)
186-
value_timestamp = value_timestamp |> DateTime.truncate(:millisecond) |> pad_usec()
185+
value =
186+
%IndividualProperty{
187+
device_id: device_id,
188+
interface_id: interface_descriptor.interface_id,
189+
endpoint_id: endpoint_id,
190+
path: path,
191+
reception: reception_timestamp,
192+
datetime_value: value_timestamp
193+
}
187194

188-
value = %IndividualProperty{
189-
device_id: device_id,
190-
interface_id: interface_descriptor.interface_id,
191-
endpoint_id: endpoint_id,
192-
path: path,
193-
reception_timestamp: reception,
194-
reception_timestamp_submillis: reception_submillis,
195-
datetime_value: value_timestamp
196-
}
195+
value = value |> IndividualProperty.prepare_for_db()
197196

198197
ttl = opts[:ttl]
199198

@@ -262,7 +261,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
262261
value_column = CQLUtils.type_to_db_column_name(endpoint.value_type)
263262
keyspace = DataAccessRealm.keyspace_name(realm_name)
264263

265-
{timestamp_ms, timestamp_submillis} = split_ms_and_submillis(timestamp)
264+
{timestamp_ms, timestamp_submillis} = DateTimeMs.split_submillis(timestamp)
266265

267266
# TODO: :reception_timestamp_submillis is just a place holder right now
268267
interface_storage_attributes = %{
@@ -299,7 +298,7 @@ defmodule Astarte.AppEngine.API.Device.Queries do
299298
) do
300299
value_column = CQLUtils.type_to_db_column_name(endpoint.value_type)
301300
keyspace = DataAccessRealm.keyspace_name(realm_name)
302-
{timestamp_ms, timestamp_submillis} = split_ms_and_submillis(timestamp)
301+
{timestamp_ms, timestamp_submillis} = DateTimeMs.split_submillis(timestamp)
303302

304303
attributes = %{
305304
value_column => to_db_friendly_type(value),
@@ -312,7 +311,12 @@ defmodule Astarte.AppEngine.API.Device.Queries do
312311
reception_timestamp_submillis: timestamp_submillis
313312
}
314313

315-
Repo.insert_all(interface_descriptor.storage, [attributes], prefix: keyspace, ttl: opts[:ttl])
314+
{1, _} =
315+
Repo.insert_all(interface_descriptor.storage, [attributes],
316+
prefix: keyspace,
317+
ttl: opts[:ttl]
318+
)
319+
316320
:ok
317321
end
318322

@@ -348,14 +352,12 @@ defmodule Astarte.AppEngine.API.Device.Queries do
348352
{endpoint_name, %{name: column_name, type: endpoint.value_type}}
349353
end)
350354

351-
{timestamp_ms, submillis} = split_ms_and_submillis(timestamp)
352-
353355
base_attributes = %{
354356
device_id: device_id,
355357
path: path
356358
}
357359

358-
timestamp_attributes = timestamp_attributes(explicit_timestamp?, timestamp_ms, submillis)
360+
timestamp_attributes = timestamp_attributes(explicit_timestamp?, timestamp)
359361
value_attributes = value_attributes(column_meta, value)
360362

361363
object_datastream =
@@ -372,15 +374,21 @@ defmodule Astarte.AppEngine.API.Device.Queries do
372374
:ok
373375
end
374376

375-
defp timestamp_attributes(true = _explicit_timestamp?, timestamp, submillis) do
377+
defp timestamp_attributes(true = _explicit_timestamp?, timestamp) do
378+
{timestamp, submillis} =
379+
Astarte.AppEngine.API.DateTime.split_submillis(timestamp)
380+
376381
%{
377382
value_timestamp: timestamp,
378383
reception_timestamp: timestamp,
379384
reception_timestamp_submillis: submillis
380385
}
381386
end
382387

383-
defp timestamp_attributes(_nil_or_false_explicit_timestamp?, timestamp, submillis) do
388+
defp timestamp_attributes(_nil_or_false_explicit_timestamp?, timestamp) do
389+
{timestamp, submillis} =
390+
Astarte.AppEngine.API.DateTime.split_submillis(timestamp)
391+
384392
%{reception_timestamp: timestamp, reception_timestamp_submillis: submillis}
385393
end
386394

@@ -988,29 +996,4 @@ defmodule Astarte.AppEngine.API.Device.Queries do
988996
{:error, :database_error}
989997
end
990998
end
991-
992-
defp split_ms_and_submillis(timestamp_micro) do
993-
timestamp_ms = div(timestamp_micro, 1000)
994-
timestamp_submillis = rem(timestamp_micro, 1000)
995-
996-
{timestamp_ms, timestamp_submillis}
997-
end
998-
999-
defp split_datetime_to_ms_and_submillis(datetime) do
1000-
datetime_ms = datetime |> DateTime.truncate(:millisecond) |> pad_usec()
1001-
1002-
{usec, _} = datetime.microsecond
1003-
submillis = usec |> rem(1000)
1004-
1005-
{datetime_ms, submillis}
1006-
end
1007-
1008-
defp pad_usec(nil), do: nil
1009-
1010-
defp pad_usec(timestamp) do
1011-
case timestamp.microsecond do
1012-
{_, 6} -> timestamp
1013-
{usec, _} -> %{timestamp | microsecond: {usec, 6}}
1014-
end
1015-
end
1016999
end
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#
2+
# This file is part of Astarte.
3+
#
4+
# Copyright 2025 SECO Mind Srl
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
defmodule Astarte.AppEngine.API.Realms.IndividualDatastream do
20+
use TypedEctoSchema
21+
alias Astarte.AppEngine.API.DateTime, as: DateTimeMs
22+
alias Astarte.AppEngine.API.UUID
23+
24+
@primary_key false
25+
typed_schema "individual_datastreams" do
26+
field :reception, DateTimeMs, virtual: true
27+
field :device_id, UUID, primary_key: true
28+
field :interface_id, UUID, primary_key: true
29+
field :endpoint_id, UUID, primary_key: true
30+
field :path, :string, primary_key: true
31+
field :value_timestamp, DateTimeMs, primary_key: true
32+
field :reception_timestamp, DateTimeMs, primary_key: true
33+
field :reception_timestamp_submillis, :integer, primary_key: true
34+
field :binaryblob_value, :binary
35+
field :binaryblobarray_value, {:array, :binary}
36+
field :boolean_value, :boolean
37+
field :booleanarray_value, {:array, :boolean}
38+
field :datetime_value, DateTimeMs
39+
field :datetimearray_value, {:array, DateTimeMs}
40+
field :double_value, :float
41+
field :doublearray_value, {:array, :float}
42+
field :integer_value, :integer
43+
field :integerarray_value, {:array, :integer}
44+
field :longinteger_value, :integer
45+
field :longintegerarray_value, {:array, :integer}
46+
field :string_value, :string
47+
field :stringarray_value, {:array, :string}
48+
end
49+
50+
def prepare_for_db(%{reception: nil} = individual_datastream), do: individual_datastream
51+
52+
def prepare_for_db(individual_datastream) do
53+
{reception_ms, submillis} = DateTimeMs.split_submillis(individual_datastream.reception)
54+
55+
%{
56+
individual_datastream
57+
| reception_timestamp: reception_ms,
58+
reception_timestamp_submillis: submillis
59+
}
60+
end
61+
end

apps/astarte_appengine_api/lib/astarte_appengine_api/realms/individual_property.ex

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,15 @@ defmodule Astarte.AppEngine.API.Realms.IndividualProperty do
4646
field :datetimearray_value, {:array, DateTimeMs}
4747
end
4848

49-
def reception(individual_property) do
50-
nanos =
51-
individual_property.reception_timestamp_submillis
52-
|> Kernel.||(0)
53-
|> Kernel.*(100)
49+
def prepare_for_db(%{reception: nil} = individual_property), do: individual_property
5450

55-
individual_property.reception_timestamp
56-
|> DateTime.add(nanos, :nanosecond)
51+
def prepare_for_db(individual_property) do
52+
{reception_ms, submillis} = DateTimeMs.split_submillis(individual_property.reception)
53+
54+
%{
55+
individual_property
56+
| reception_timestamp: reception_ms,
57+
reception_timestamp_submillis: submillis
58+
}
5759
end
5860
end

0 commit comments

Comments
 (0)