Skip to content

Commit ee888e8

Browse files
committed
refactor(dup): use DateTime internally
implement a new type, DecimicrosecondDateTime to allow using DateTimes with one extra precision digit compared to std DateTimes. Functions from the original DateTime are implemented as they're needed. timestamps are preserved in telemetry. Signed-off-by: Francesco Noacco <francesco.noacco@secomind.com>
1 parent f94c478 commit ee888e8

File tree

9 files changed

+140
-79
lines changed

9 files changed

+140
-79
lines changed

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/amqp_data_consumer.ex

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ defmodule Astarte.DataUpdaterPlant.AMQPDataConsumer do
2929
use GenServer
3030

3131
alias AMQP.Channel
32+
alias Astarte.Core.DecimicrosecondDateTime
3233
alias Astarte.DataUpdaterPlant.Config
3334
alias Astarte.DataUpdaterPlant.DataUpdater
3435

@@ -185,6 +186,7 @@ defmodule Astarte.DataUpdaterPlant.AMQPDataConsumer do
185186
msg_type = Map.get(headers_map, @msg_type_header, headers_map)
186187

187188
{timestamp, clean_meta} = Map.pop(no_headers_meta, :timestamp)
189+
timestamp = DecimicrosecondDateTime.from_unix!(timestamp, :decimicrosecond)
188190

189191
case handle_consume(msg_type, payload, headers_map, timestamp, clean_meta) do
190192
:ok ->
@@ -391,6 +393,8 @@ defmodule Astarte.DataUpdaterPlant.AMQPDataConsumer do
391393
end
392394

393395
defp handle_invalid_msg(payload, headers, timestamp, meta) do
396+
timestamp = DecimicrosecondDateTime.to_unix(timestamp, :millisecond)
397+
394398
Logger.warning(
395399
"Invalid AMQP message: #{inspect(Base.encode64(payload))} #{inspect(headers)} #{inspect(timestamp)} #{inspect(meta)}",
396400
tag: "data_consumer_invalid_msg"

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/deletion_scheduler.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ defmodule Astarte.DataUpdater.DeletionScheduler do
2828
alias Astarte.DataUpdaterPlant.DataUpdater.Queries
2929
alias Astarte.DataUpdaterPlant.DataUpdater
3030
alias Astarte.DataUpdaterPlant.Config
31+
alias Astarte.Core.DecimicrosecondDateTime
3132
alias Astarte.Core.Device
3233
alias Astarte.Core.CQLUtils
3334

@@ -56,7 +57,7 @@ defmodule Astarte.DataUpdater.DeletionScheduler do
5657
defp start_device_deletion! do
5758
retrieve_devices_to_delete!()
5859
|> Enum.each(fn %{realm_name: realm_name, encoded_device_id: encoded_device_id} ->
59-
timestamp = now_us_x10_timestamp()
60+
timestamp = now_us_x10_timestamp() |> DecimicrosecondDateTime.from_unix!(:decimicrosecond)
6061
# This must be a call, as we want to be sure this was completed
6162
:ok = DataUpdater.start_device_deletion(realm_name, encoded_device_id, timestamp)
6263
end)

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/impl.ex

Lines changed: 64 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
2020
alias Astarte.Core.CQLUtils
2121
alias Astarte.DataUpdaterPlant.Config
22+
alias Astarte.Core.DecimicrosecondDateTime
2223
alias Astarte.Core.Device
2324
alias Astarte.Core.InterfaceDescriptor
2425
alias Astarte.Core.Mapping
@@ -46,12 +47,14 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
4647
alias Astarte.DataUpdaterPlant.TriggerPolicy.Queries, as: PolicyQueries
4748
require Logger
4849

50+
@epoch DecimicrosecondDateTime.from_unix!(0)
51+
4952
@paths_cache_size 32
50-
@interface_lifespan_decimicroseconds 60 * 10 * 1000 * 10000
51-
@device_triggers_lifespan_decimicroseconds 60 * 10 * 1000 * 10000
52-
@groups_lifespan_decimicroseconds 60 * 10 * 1000 * 10000
53-
@deletion_refresh_lifespan_decimicroseconds 60 * 10 * 1000 * 10000
54-
@datastream_maximum_retention_refresh_lifespan_decimicroseconds 60 * 10 * 1000 * 10000
53+
@interface_lifespan minute: 10
54+
@device_triggers_lifespan minute: 10
55+
@groups_lifespan minute: 10
56+
@deletion_refresh_lifespan minute: 10
57+
@datastream_maximum_retention_refresh_lifespan minute: 10
5558

5659
def init_state(realm, device_id, message_tracker) do
5760
MessageTracker.register_data_updater(message_tracker)
@@ -73,13 +76,13 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
7376
volatile_triggers: [],
7477
interface_exchanged_bytes: %{},
7578
interface_exchanged_msgs: %{},
76-
last_seen_message: 0,
77-
last_device_triggers_refresh: 0,
78-
last_groups_refresh: 0,
79+
last_seen_message: @epoch,
80+
last_device_triggers_refresh: @epoch,
81+
last_groups_refresh: @epoch,
7982
trigger_id_to_policy_name: %{},
8083
discard_messages: false,
81-
last_deletion_in_progress_refresh: 0,
82-
last_datastream_maximum_retention_refresh: 0
84+
last_deletion_in_progress_refresh: @epoch,
85+
last_datastream_maximum_retention_refresh: @epoch
8386
}
8487

8588
encoded_device_id = Device.encode_device_id(device_id)
@@ -110,8 +113,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
110113
def handle_connection(state, ip_address_string, message_id, timestamp) do
111114
new_state = execute_time_based_actions(state, timestamp)
112115

113-
timestamp_ms = div(timestamp, 10_000)
114-
115116
ip_address_result =
116117
ip_address_string
117118
|> to_charlist()
@@ -130,7 +131,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
130131
Queries.set_device_connected!(
131132
new_state.realm,
132133
new_state.device_id,
133-
DateTime.from_unix!(timestamp_ms, :millisecond),
134+
timestamp,
134135
ip_address
135136
)
136137

@@ -147,7 +148,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
147148
new_state.realm,
148149
device_id_string,
149150
ip_address_string,
150-
timestamp_ms
151+
timestamp
151152
)
152153

153154
MessageTracker.ack_delivery(new_state.message_tracker, message_id)
@@ -460,8 +461,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
460461
end
461462

462463
defp execute_device_error_triggers(state, error_name, error_metadata \\ %{}, timestamp) do
463-
timestamp_ms = div(timestamp, 10_000)
464-
465464
trigger_target_with_policy_list =
466465
Map.get(state.device_triggers, :on_device_error, [])
467466
|> Enum.map(fn target ->
@@ -476,7 +475,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
476475
device_id_string,
477476
error_name,
478477
error_metadata,
479-
timestamp_ms
478+
timestamp
480479
)
481480

482481
:ok
@@ -510,9 +509,9 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
510509

511510
maybe_explicit_value_timestamp =
512511
if mapping.explicit_timestamp do
513-
value_timestamp
512+
%DecimicrosecondDateTime{datetime: value_timestamp}
514513
else
515-
div(timestamp, 10000)
514+
timestamp
516515
end
517516

518517
execute_incoming_data_triggers(
@@ -1227,8 +1226,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
12271226
def process_introspection(state, new_introspection_list, payload, message_id, timestamp) do
12281227
new_state = execute_time_based_actions(state, timestamp)
12291228

1230-
timestamp_ms = div(timestamp, 10_000)
1231-
12321229
{db_introspection_map, db_introspection_minor_map} =
12331230
List.foldl(new_introspection_list, {%{}, %{}}, fn {interface, major, minor},
12341231
{introspection_map,
@@ -1258,7 +1255,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
12581255
realm,
12591256
device_id_string,
12601257
payload,
1261-
timestamp_ms
1258+
timestamp
12621259
)
12631260

12641261
# TODO: implement here object_id handling for a certain interface name. idea: introduce interface_family_id
@@ -1313,7 +1310,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
13131310
interface_name,
13141311
interface_major,
13151312
minor,
1316-
timestamp_ms
1313+
timestamp
13171314
)
13181315
end)
13191316

@@ -1350,7 +1347,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
13501347
device_id_string,
13511348
interface_name,
13521349
interface_major,
1353-
timestamp_ms
1350+
timestamp
13541351
)
13551352
end)
13561353

@@ -1412,7 +1409,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
14121409
interface_major,
14131410
old_minor,
14141411
new_minor,
1415-
timestamp_ms
1412+
timestamp
14161413
)
14171414
end
14181415

@@ -1458,9 +1455,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
14581455
def handle_control(state, "/producer/properties", <<0, 0, 0, 0>>, message_id, timestamp) do
14591456
new_state = execute_time_based_actions(state, timestamp)
14601457

1461-
timestamp_ms = div(timestamp, 10_000)
1462-
1463-
:ok = prune_device_properties(new_state, "", timestamp_ms)
1458+
:ok = prune_device_properties(new_state, "", timestamp)
14641459

14651460
MessageTracker.ack_delivery(new_state.message_tracker, message_id)
14661461

@@ -1476,15 +1471,13 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
14761471
def handle_control(state, "/producer/properties", payload, message_id, timestamp) do
14771472
new_state = execute_time_based_actions(state, timestamp)
14781473

1479-
timestamp_ms = div(timestamp, 10_000)
1480-
14811474
# TODO: check payload size, to avoid anoying crashes
14821475

14831476
<<_size_header::size(32), zlib_payload::binary>> = payload
14841477

14851478
case PayloadsDecoder.safe_inflate(zlib_payload) do
14861479
{:ok, decoded_payload} ->
1487-
:ok = prune_device_properties(new_state, decoded_payload, timestamp_ms)
1480+
:ok = prune_device_properties(new_state, decoded_payload, timestamp)
14881481
MessageTracker.ack_delivery(new_state.message_tracker, message_id)
14891482

14901483
%{
@@ -1833,7 +1826,10 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
18331826
end
18341827

18351828
defp reload_groups_on_expiry(state, timestamp) do
1836-
if state.last_groups_refresh + @groups_lifespan_decimicroseconds <= timestamp do
1829+
next_refresh = DecimicrosecondDateTime.shift(state.last_groups_refresh, @groups_lifespan)
1830+
refresh? = DecimicrosecondDateTime.compare(next_refresh, timestamp) != :gt
1831+
1832+
if refresh? do
18371833
# TODO this could be a bang!
18381834
{:ok, groups} = Queries.get_device_groups(state.realm, state.device_id)
18391835

@@ -1844,8 +1840,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
18441840
end
18451841

18461842
defp reload_device_triggers_on_expiry(state, timestamp) do
1847-
if state.last_device_triggers_refresh + @device_triggers_lifespan_decimicroseconds <=
1848-
timestamp do
1843+
next_refresh =
1844+
DecimicrosecondDateTime.shift(state.last_device_triggers_refresh, @device_triggers_lifespan)
1845+
1846+
refresh? = DecimicrosecondDateTime.compare(next_refresh, timestamp) != :gt
1847+
1848+
if refresh? do
18491849
any_device_id = SimpleTriggersProtobufUtils.any_device_object_id()
18501850

18511851
any_interface_id = SimpleTriggersProtobufUtils.any_interface_object_id()
@@ -1888,11 +1888,13 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
18881888
end
18891889

18901890
defp execute_time_based_actions(state, timestamp) do
1891-
if state.connected && state.last_seen_message > 0 do
1891+
if state.connected && DecimicrosecondDateTime.after?(state.last_seen_message, @epoch) do
18921892
# timestamps are handled as microseconds*10, so we need to divide by 10 when saving as a metric for a coherent data
18931893
:telemetry.execute(
18941894
[:astarte, :data_updater_plant, :service, :connected_devices],
1895-
%{duration: Integer.floor_div(timestamp - state.last_seen_message, 10)},
1895+
%{
1896+
duration: DecimicrosecondDateTime.diff(timestamp, state.last_seen_message, :microsecond)
1897+
},
18961898
%{realm: state.realm, status: :ok}
18971899
)
18981900
end
@@ -1907,8 +1909,15 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
19071909
end
19081910

19091911
defp reload_device_deletion_status_on_expiry(state, timestamp) do
1910-
if state.last_deletion_in_progress_refresh + @deletion_refresh_lifespan_decimicroseconds <=
1911-
timestamp do
1912+
next_refresh =
1913+
DecimicrosecondDateTime.shift(
1914+
state.last_deletion_in_progress_refresh,
1915+
@deletion_refresh_lifespan
1916+
)
1917+
1918+
refresh? = DecimicrosecondDateTime.compare(next_refresh, timestamp) != :gt
1919+
1920+
if refresh? do
19121921
new_state = maybe_start_device_deletion(state, timestamp)
19131922
%State{new_state | last_deletion_in_progress_refresh: timestamp}
19141923
else
@@ -1917,9 +1926,15 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
19171926
end
19181927

19191928
defp reload_datastream_maximum_storage_retention_on_expiry(state, timestamp) do
1920-
if state.last_datastream_maximum_retention_refresh +
1921-
@datastream_maximum_retention_refresh_lifespan_decimicroseconds <=
1922-
timestamp do
1929+
next_refresh =
1930+
DecimicrosecondDateTime.shift(
1931+
state.last_datastream_maximum_retention_refresh,
1932+
@datastream_maximum_retention_refresh_lifespan
1933+
)
1934+
1935+
refresh? = DecimicrosecondDateTime.compare(next_refresh, timestamp) != :gt
1936+
1937+
if refresh? do
19231938
# TODO this could be a bang!
19241939
case Queries.fetch_datastream_maximum_storage_retention(state.realm) do
19251940
{:ok, ttl} ->
@@ -2075,7 +2090,10 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
20752090
Mappings.fetch_interface_mappings_map(state.realm, interface_id),
20762091
new_interfaces_by_expiry <-
20772092
state.interfaces_by_expiry ++
2078-
[{state.last_seen_message + @interface_lifespan_decimicroseconds, interface_name}],
2093+
[
2094+
{DecimicrosecondDateTime.shift(state.last_seen_message, @interface_lifespan),
2095+
interface_name}
2096+
],
20792097
new_state <- %State{
20802098
state
20812099
| interfaces: Map.put(state.interfaces, interface_name, interface_descriptor),
@@ -2237,19 +2255,17 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
22372255
end
22382256

22392257
defp set_device_disconnected(state, timestamp) do
2240-
timestamp_ms = div(timestamp, 10_000)
2241-
22422258
Queries.set_device_disconnected!(
22432259
state.realm,
22442260
state.device_id,
2245-
DateTime.from_unix!(timestamp_ms, :millisecond),
2261+
timestamp,
22462262
state.total_received_msgs,
22472263
state.total_received_bytes,
22482264
state.interface_exchanged_msgs,
22492265
state.interface_exchanged_bytes
22502266
)
22512267

2252-
maybe_execute_device_disconnected_trigger(state, timestamp_ms)
2268+
maybe_execute_device_disconnected_trigger(state, timestamp)
22532269

22542270
%{state | connected: false}
22552271
end
@@ -2258,7 +2274,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
22582274
:ok
22592275
end
22602276

2261-
defp maybe_execute_device_disconnected_trigger(state, timestamp_ms) do
2277+
defp maybe_execute_device_disconnected_trigger(state, timestamp) do
22622278
trigger_target_with_policy_list =
22632279
Map.get(state.device_triggers, :on_device_disconnection, [])
22642280
|> Enum.map(fn target ->
@@ -2271,7 +2287,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
22712287
trigger_target_with_policy_list,
22722288
state.realm,
22732289
device_id_string,
2274-
timestamp_ms
2290+
timestamp
22752291
)
22762292

22772293
:telemetry.execute(

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/payloads_decoder.ex

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
defmodule Astarte.DataUpdaterPlant.DataUpdater.PayloadsDecoder do
2020
require Logger
21+
alias Astarte.Core.DecimicrosecondDateTime
2122
alias Astarte.Core.Interface
2223

2324
@max_uncompressed_payload_size 10_485_760
@@ -26,9 +27,9 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.PayloadsDecoder do
2627
Decode a BSON payload a returns a tuple containing the decoded value, the timestamp and metadata.
2728
reception_timestamp is used if no timestamp has been sent with the payload.
2829
"""
29-
@spec decode_bson_payload(binary, integer) :: {map, DateTime.t(), map}
30+
@spec decode_bson_payload(binary, DecimicrosecondDateTime.t()) :: {map, DateTime.t(), map}
3031
def decode_bson_payload(payload, reception_timestamp) do
31-
reception = reception_timestamp |> div(10000) |> DateTime.from_unix!(:millisecond)
32+
reception = DateTime.truncate(reception_timestamp.datetime, :millisecond)
3233

3334
if byte_size(payload) != 0 do
3435
case Cyanide.decode(payload) do

0 commit comments

Comments
 (0)