diff --git a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/amqp_data_consumer.ex b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/amqp_data_consumer.ex index 483df8737f..c6b566a056 100644 --- a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/amqp_data_consumer.ex +++ b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/amqp_data_consumer.ex @@ -29,6 +29,7 @@ defmodule Astarte.DataUpdaterPlant.AMQPDataConsumer do use GenServer alias AMQP.Channel + alias Astarte.Core.DecimicrosecondDateTime alias Astarte.DataUpdaterPlant.Config alias Astarte.DataUpdaterPlant.DataUpdater @@ -185,6 +186,7 @@ defmodule Astarte.DataUpdaterPlant.AMQPDataConsumer do msg_type = Map.get(headers_map, @msg_type_header, headers_map) {timestamp, clean_meta} = Map.pop(no_headers_meta, :timestamp) + timestamp = DecimicrosecondDateTime.from_unix!(timestamp, :decimicrosecond) case handle_consume(msg_type, payload, headers_map, timestamp, clean_meta) do :ok -> @@ -391,6 +393,8 @@ defmodule Astarte.DataUpdaterPlant.AMQPDataConsumer do end defp handle_invalid_msg(payload, headers, timestamp, meta) do + timestamp = DecimicrosecondDateTime.to_unix(timestamp, :millisecond) + Logger.warning( "Invalid AMQP message: #{inspect(Base.encode64(payload))} #{inspect(headers)} #{inspect(timestamp)} #{inspect(meta)}", tag: "data_consumer_invalid_msg" diff --git a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/deletion_scheduler.ex b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/deletion_scheduler.ex index 9111305386..8ee67a6fd2 100644 --- a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/deletion_scheduler.ex +++ b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/deletion_scheduler.ex @@ -28,6 +28,7 @@ defmodule Astarte.DataUpdater.DeletionScheduler do alias Astarte.DataUpdaterPlant.DataUpdater.Queries alias Astarte.DataUpdaterPlant.DataUpdater alias Astarte.DataUpdaterPlant.Config + alias Astarte.Core.DecimicrosecondDateTime alias Astarte.Core.Device alias Astarte.Core.CQLUtils @@ -56,7 +57,7 @@ defmodule Astarte.DataUpdater.DeletionScheduler do defp start_device_deletion! do retrieve_devices_to_delete!() |> Enum.each(fn %{realm_name: realm_name, encoded_device_id: encoded_device_id} -> - timestamp = now_us_x10_timestamp() + timestamp = now_us_x10_timestamp() |> DecimicrosecondDateTime.from_unix!(:decimicrosecond) # This must be a call, as we want to be sure this was completed :ok = DataUpdater.start_device_deletion(realm_name, encoded_device_id, timestamp) end) diff --git a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/impl.ex b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/impl.ex index 56fd29ce0b..731c471a4e 100644 --- a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/impl.ex +++ b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/impl.ex @@ -19,6 +19,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do alias Astarte.Core.CQLUtils alias Astarte.DataUpdaterPlant.Config + alias Astarte.Core.DecimicrosecondDateTime alias Astarte.Core.Device alias Astarte.Core.InterfaceDescriptor alias Astarte.Core.Mapping @@ -46,12 +47,14 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do alias Astarte.DataUpdaterPlant.TriggerPolicy.Queries, as: PolicyQueries require Logger + @epoch DecimicrosecondDateTime.from_unix!(0) + @paths_cache_size 32 - @interface_lifespan_decimicroseconds 60 * 10 * 1000 * 10000 - @device_triggers_lifespan_decimicroseconds 60 * 10 * 1000 * 10000 - @groups_lifespan_decimicroseconds 60 * 10 * 1000 * 10000 - @deletion_refresh_lifespan_decimicroseconds 60 * 10 * 1000 * 10000 - @datastream_maximum_retention_refresh_lifespan_decimicroseconds 60 * 10 * 1000 * 10000 + @interface_lifespan minute: 10 + @device_triggers_lifespan minute: 10 + @groups_lifespan minute: 10 + @deletion_refresh_lifespan minute: 10 + @datastream_maximum_retention_refresh_lifespan minute: 10 def init_state(realm, device_id, message_tracker) do MessageTracker.register_data_updater(message_tracker) @@ -73,13 +76,13 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do volatile_triggers: [], interface_exchanged_bytes: %{}, interface_exchanged_msgs: %{}, - last_seen_message: 0, - last_device_triggers_refresh: 0, - last_groups_refresh: 0, + last_seen_message: @epoch, + last_device_triggers_refresh: @epoch, + last_groups_refresh: @epoch, trigger_id_to_policy_name: %{}, discard_messages: false, - last_deletion_in_progress_refresh: 0, - last_datastream_maximum_retention_refresh: 0 + last_deletion_in_progress_refresh: @epoch, + last_datastream_maximum_retention_refresh: @epoch } encoded_device_id = Device.encode_device_id(device_id) @@ -110,8 +113,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do def handle_connection(state, ip_address_string, message_id, timestamp) do new_state = execute_time_based_actions(state, timestamp) - timestamp_ms = div(timestamp, 10_000) - ip_address_result = ip_address_string |> to_charlist() @@ -130,7 +131,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do Queries.set_device_connected!( new_state.realm, new_state.device_id, - DateTime.from_unix!(timestamp_ms, :millisecond), + timestamp, ip_address ) @@ -147,7 +148,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do new_state.realm, device_id_string, ip_address_string, - timestamp_ms + timestamp ) MessageTracker.ack_delivery(new_state.message_tracker, message_id) @@ -460,8 +461,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end defp execute_device_error_triggers(state, error_name, error_metadata \\ %{}, timestamp) do - timestamp_ms = div(timestamp, 10_000) - trigger_target_with_policy_list = Map.get(state.device_triggers, :on_device_error, []) |> Enum.map(fn target -> @@ -476,7 +475,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do device_id_string, error_name, error_metadata, - timestamp_ms + timestamp ) :ok @@ -510,9 +509,9 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do maybe_explicit_value_timestamp = if mapping.explicit_timestamp do - value_timestamp + %DecimicrosecondDateTime{datetime: value_timestamp} else - div(timestamp, 10000) + timestamp end execute_incoming_data_triggers( @@ -1014,15 +1013,13 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end defp is_still_valid?({:ok, expiry_date}, ttl) do - expiry_secs = DateTime.to_unix(expiry_date) - - now_secs = - DateTime.utc_now() - |> DateTime.to_unix() + # add 1 hour of tolerance for clock synchronization issues + now_with_tolerance = + DateTime.utc_now(:second) + |> DateTime.add(ttl) + |> DateTime.add(1, :hour) - # 3600 seconds is one hour - # this adds 1 hour of tolerance to clock synchronization issues - now_secs + ttl + 3600 < expiry_secs + DateTime.before?(now_with_tolerance, expiry_date) end defp validate_interface(interface) do @@ -1229,8 +1226,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do def process_introspection(state, new_introspection_list, payload, message_id, timestamp) do new_state = execute_time_based_actions(state, timestamp) - timestamp_ms = div(timestamp, 10_000) - {db_introspection_map, db_introspection_minor_map} = List.foldl(new_introspection_list, {%{}, %{}}, fn {interface, major, minor}, {introspection_map, @@ -1260,7 +1255,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do realm, device_id_string, payload, - timestamp_ms + timestamp ) # TODO: implement here object_id handling for a certain interface name. idea: introduce interface_family_id @@ -1315,7 +1310,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do interface_name, interface_major, minor, - timestamp_ms + timestamp ) end) @@ -1352,7 +1347,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do device_id_string, interface_name, interface_major, - timestamp_ms + timestamp ) end) @@ -1414,7 +1409,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do interface_major, old_minor, new_minor, - timestamp_ms + timestamp ) end @@ -1460,9 +1455,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do def handle_control(state, "/producer/properties", <<0, 0, 0, 0>>, message_id, timestamp) do new_state = execute_time_based_actions(state, timestamp) - timestamp_ms = div(timestamp, 10_000) - - :ok = prune_device_properties(new_state, "", timestamp_ms) + :ok = prune_device_properties(new_state, "", timestamp) MessageTracker.ack_delivery(new_state.message_tracker, message_id) @@ -1478,15 +1471,13 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do def handle_control(state, "/producer/properties", payload, message_id, timestamp) do new_state = execute_time_based_actions(state, timestamp) - timestamp_ms = div(timestamp, 10_000) - # TODO: check payload size, to avoid anoying crashes <<_size_header::size(32), zlib_payload::binary>> = payload case PayloadsDecoder.safe_inflate(zlib_payload) do {:ok, decoded_payload} -> - :ok = prune_device_properties(new_state, decoded_payload, timestamp_ms) + :ok = prune_device_properties(new_state, decoded_payload, timestamp) MessageTracker.ack_delivery(new_state.message_tracker, message_id) %{ @@ -1835,7 +1826,10 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end defp reload_groups_on_expiry(state, timestamp) do - if state.last_groups_refresh + @groups_lifespan_decimicroseconds <= timestamp do + next_refresh = DecimicrosecondDateTime.shift(state.last_groups_refresh, @groups_lifespan) + refresh? = DecimicrosecondDateTime.compare(next_refresh, timestamp) != :gt + + if refresh? do # TODO this could be a bang! {:ok, groups} = Queries.get_device_groups(state.realm, state.device_id) @@ -1846,8 +1840,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end defp reload_device_triggers_on_expiry(state, timestamp) do - if state.last_device_triggers_refresh + @device_triggers_lifespan_decimicroseconds <= - timestamp do + next_refresh = + DecimicrosecondDateTime.shift(state.last_device_triggers_refresh, @device_triggers_lifespan) + + refresh? = DecimicrosecondDateTime.compare(next_refresh, timestamp) != :gt + + if refresh? do any_device_id = SimpleTriggersProtobufUtils.any_device_object_id() any_interface_id = SimpleTriggersProtobufUtils.any_interface_object_id() @@ -1890,11 +1888,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end defp execute_time_based_actions(state, timestamp) do - if state.connected && state.last_seen_message > 0 do - # timestamps are handled as microseconds*10, so we need to divide by 10 when saving as a metric for a coherent data + if state.connected && DecimicrosecondDateTime.after?(state.last_seen_message, @epoch) do :telemetry.execute( [:astarte, :data_updater_plant, :service, :connected_devices], - %{duration: Integer.floor_div(timestamp - state.last_seen_message, 10)}, + %{ + duration: DecimicrosecondDateTime.diff(timestamp, state.last_seen_message, :microsecond) + }, %{realm: state.realm, status: :ok} ) end @@ -1909,8 +1908,15 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end defp reload_device_deletion_status_on_expiry(state, timestamp) do - if state.last_deletion_in_progress_refresh + @deletion_refresh_lifespan_decimicroseconds <= - timestamp do + next_refresh = + DecimicrosecondDateTime.shift( + state.last_deletion_in_progress_refresh, + @deletion_refresh_lifespan + ) + + refresh? = DecimicrosecondDateTime.compare(next_refresh, timestamp) != :gt + + if refresh? do new_state = maybe_start_device_deletion(state, timestamp) %State{new_state | last_deletion_in_progress_refresh: timestamp} else @@ -1919,9 +1925,15 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end defp reload_datastream_maximum_storage_retention_on_expiry(state, timestamp) do - if state.last_datastream_maximum_retention_refresh + - @datastream_maximum_retention_refresh_lifespan_decimicroseconds <= - timestamp do + next_refresh = + DecimicrosecondDateTime.shift( + state.last_datastream_maximum_retention_refresh, + @datastream_maximum_retention_refresh_lifespan + ) + + refresh? = DecimicrosecondDateTime.compare(next_refresh, timestamp) != :gt + + if refresh? do # TODO this could be a bang! case Queries.fetch_datastream_maximum_storage_retention(state.realm) do {:ok, ttl} -> @@ -2077,7 +2089,10 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do Mappings.fetch_interface_mappings_map(state.realm, interface_id), new_interfaces_by_expiry <- state.interfaces_by_expiry ++ - [{state.last_seen_message + @interface_lifespan_decimicroseconds, interface_name}], + [ + {DecimicrosecondDateTime.shift(state.last_seen_message, @interface_lifespan), + interface_name} + ], new_state <- %State{ state | interfaces: Map.put(state.interfaces, interface_name, interface_descriptor), @@ -2239,19 +2254,17 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do end defp set_device_disconnected(state, timestamp) do - timestamp_ms = div(timestamp, 10_000) - Queries.set_device_disconnected!( state.realm, state.device_id, - DateTime.from_unix!(timestamp_ms, :millisecond), + timestamp, state.total_received_msgs, state.total_received_bytes, state.interface_exchanged_msgs, state.interface_exchanged_bytes ) - maybe_execute_device_disconnected_trigger(state, timestamp_ms) + maybe_execute_device_disconnected_trigger(state, timestamp) %{state | connected: false} end @@ -2260,7 +2273,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do :ok end - defp maybe_execute_device_disconnected_trigger(state, timestamp_ms) do + defp maybe_execute_device_disconnected_trigger(state, timestamp) do trigger_target_with_policy_list = Map.get(state.device_triggers, :on_device_disconnection, []) |> Enum.map(fn target -> @@ -2273,7 +2286,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do trigger_target_with_policy_list, state.realm, device_id_string, - timestamp_ms + timestamp ) :telemetry.execute( diff --git a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/payloads_decoder.ex b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/payloads_decoder.ex index 2cb57e6b4b..bf25e952dc 100644 --- a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/payloads_decoder.ex +++ b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/payloads_decoder.ex @@ -18,6 +18,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.PayloadsDecoder do require Logger + alias Astarte.Core.DecimicrosecondDateTime alias Astarte.Core.Interface @max_uncompressed_payload_size 10_485_760 @@ -26,30 +27,30 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.PayloadsDecoder do Decode a BSON payload a returns a tuple containing the decoded value, the timestamp and metadata. reception_timestamp is used if no timestamp has been sent with the payload. """ - @spec decode_bson_payload(binary, integer) :: {map, integer, map} + @spec decode_bson_payload(binary, DecimicrosecondDateTime.t()) :: {map, DateTime.t(), map} def decode_bson_payload(payload, reception_timestamp) do + reception = DateTime.truncate(reception_timestamp.datetime, :millisecond) + if byte_size(payload) != 0 do case Cyanide.decode(payload) do - {:ok, %{"v" => bson_value, "t" => %DateTime{} = timestamp, "m" => %{} = metadata}} -> - bson_timestamp = DateTime.to_unix(timestamp, :millisecond) + {:ok, %{"v" => bson_value, "t" => %DateTime{} = bson_timestamp, "m" => %{} = metadata}} -> {bson_value, bson_timestamp, metadata} {:ok, %{"v" => bson_value, "m" => %{} = metadata}} -> - {bson_value, div(reception_timestamp, 10000), metadata} + {bson_value, reception, metadata} - {:ok, %{"v" => bson_value, "t" => %DateTime{} = timestamp}} -> - bson_timestamp = DateTime.to_unix(timestamp, :millisecond) + {:ok, %{"v" => bson_value, "t" => %DateTime{} = bson_timestamp}} -> {bson_value, bson_timestamp, %{}} {:ok, %{"v" => %Cyanide.Binary{data: <<>>}}} -> {nil, nil, nil} {:ok, %{"v" => bson_value}} -> - {bson_value, div(reception_timestamp, 10000), %{}} + {bson_value, reception, %{}} {:ok, %{} = bson_value} -> # Handling old format object aggregation - {bson_value, div(reception_timestamp, 10000), %{}} + {bson_value, reception, %{}} {:error, _reason} -> {:error, :undecodable_bson_payload} diff --git a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/queries.ex b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/queries.ex index 4d44ee1a71..40dd14c79f 100644 --- a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/queries.ex +++ b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/queries.ex @@ -21,6 +21,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do alias Astarte.Core.Device, as: CoreDevice alias Astarte.Core.InterfaceDescriptor alias Astarte.Core.Mapping + alias Astarte.DataAccess.DateTime, as: MsDateTime alias Astarte.DataUpdaterPlant.Config alias Astarte.DataAccess.Realms.SimpleTrigger alias Astarte.DataAccess.Device.DeletionInProgress @@ -131,8 +132,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do %InterfaceDescriptor{interface_id: interface_id, storage: storage} = interface_descriptor %Mapping{endpoint_id: endpoint_id, value_type: value_type} = mapping keyspace_name = Realm.keyspace_name(realm) - timestamp = div(reception_timestamp, 10000) - reception_timestamp_submillis = rem(reception_timestamp, 10000) + {timestamp, reception_timestamp_submillis} = MsDateTime.split_submillis(reception_timestamp) column_name = CQLUtils.type_to_db_column_name(value_type) # TODO: :reception_timestamp_submillis is just a place holder right now @@ -170,8 +170,8 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do %InterfaceDescriptor{interface_id: interface_id, storage: storage} = interface_descriptor %Mapping{endpoint_id: endpoint_id, value_type: value_type} = mapping keyspace_name = Realm.keyspace_name(realm) - timestamp = div(reception_timestamp, 10000) - reception_timestamp_submillis = rem(reception_timestamp, 10000) + {timestamp, reception_timestamp_submillis} = MsDateTime.split_submillis(reception_timestamp) + value_timestamp = Ecto.Type.cast!(MsDateTime, value_timestamp) column_name = CQLUtils.type_to_db_column_name(value_type) # TODO: use received value_timestamp when needed @@ -211,8 +211,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do %InterfaceDescriptor{interface_id: interface_id, storage: storage} = interface_descriptor keyspace_name = Realm.keyspace_name(realm) - timestamp = div(reception_timestamp, 10000) - reception_timestamp_submillis = rem(reception_timestamp, 10000) + {timestamp, reception_timestamp_submillis} = MsDateTime.split_submillis(reception_timestamp) # TODO: we should cache endpoints by interface_id column_info = @@ -252,6 +251,8 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do insert_value = if explicit_timestamp? do + value_timestamp = Ecto.Type.cast!(MsDateTime, value_timestamp) + Map.put(insert_value, "value_timestamp", value_timestamp) else insert_value @@ -366,8 +367,8 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do %InterfaceDescriptor{interface_id: interface_id} = interface_descriptor %Mapping{endpoint_id: endpoint_id} = mapping keyspace_name = Realm.keyspace_name(realm) - timestamp = div(reception_timestamp, 10000) |> DateTime.from_unix!(:microsecond) - reception_timestamp_submillis = rem(reception_timestamp, 10000) + {timestamp, reception_timestamp_submillis} = MsDateTime.split_submillis(reception_timestamp) + value_timestamp = Ecto.Type.cast!(MsDateTime, value_timestamp) # TODO: :reception_timestamp_submillis is just a place holder right now entry = %{ @@ -377,7 +378,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do path: path, reception_timestamp: timestamp, reception_timestamp_submillis: reception_timestamp_submillis, - datetime_value: DateTime.from_unix!(value_timestamp, :microsecond) + datetime_value: value_timestamp } opts = @@ -457,7 +458,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do defp set_connection_info!(realm, device_id, timestamp, ip_address) do keyspace_name = Realm.keyspace_name(realm) - timestamp = Ecto.Type.cast!(:utc_datetime_usec, timestamp) + timestamp = Ecto.Type.cast!(MsDateTime, timestamp) %Device{device_id: device_id} |> Ecto.Changeset.change( @@ -516,7 +517,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do interface_exchanged_bytes ) do keyspace_name = Realm.keyspace_name(realm) - timestamp_ms = Ecto.Type.cast!(:utc_datetime_usec, timestamp_ms) + timestamp_ms = Ecto.Type.cast!(MsDateTime, timestamp_ms) %Device{device_id: device_id} |> Ecto.Changeset.change( @@ -800,10 +801,8 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do [ttl] when is_integer(ttl) -> expiry_datetime = - DateTime.utc_now() - |> DateTime.to_unix() - |> :erlang.+(ttl) - |> DateTime.from_unix!() + DateTime.utc_now(:second) + |> DateTime.add(ttl) {:ok, expiry_datetime} diff --git a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/triggers_handler.ex b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/triggers_handler.ex index 52bb3d188b..df251610e4 100644 --- a/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/triggers_handler.ex +++ b/apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/triggers_handler.ex @@ -20,6 +20,7 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandler do # https://hexdocs.pm/elixir/1.13.4/Bitwise.html import Bitwise require Logger + alias Astarte.Core.DecimicrosecondDateTime alias Astarte.DataUpdaterPlant.Config @moduledoc """ @@ -507,7 +508,7 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandler do parent_trigger_id: parent_trigger_id, realm: realm, device_id: device_id, - timestamp: timestamp, + timestamp: DecimicrosecondDateTime.to_unix(timestamp, :millisecond), event: {event_type, event} } end diff --git a/apps/astarte_data_updater_plant/mix.exs b/apps/astarte_data_updater_plant/mix.exs index 7d14913ddc..634a66a877 100644 --- a/apps/astarte_data_updater_plant/mix.exs +++ b/apps/astarte_data_updater_plant/mix.exs @@ -68,9 +68,8 @@ defmodule Astarte.DataUpdaterPlant.Mixfile do defp astarte_required_modules(_) do [ - {:astarte_core, github: "astarte-platform/astarte_core", branch: "release-1.2"}, - {:astarte_data_access, - github: "astarte-platform/astarte_data_access", branch: "release-1.2"}, + {:astarte_core, github: "noaccOS/astarte_core", branch: "chore/decimicro"}, + {:astarte_data_access, github: "noaccOS/astarte_data_access", branch: "chore/decimicro"}, {:astarte_rpc, "~> 1.2"} ] end diff --git a/apps/astarte_data_updater_plant/mix.lock b/apps/astarte_data_updater_plant/mix.lock index f315e13e0d..c150a2f67e 100644 --- a/apps/astarte_data_updater_plant/mix.lock +++ b/apps/astarte_data_updater_plant/mix.lock @@ -1,8 +1,8 @@ %{ "amqp": {:hex, :amqp, "3.3.0", "056d9f4bac96c3ab5a904b321e70e78b91ba594766a1fc2f32afd9c016d9f43b", [:mix], [{:amqp_client, "~> 3.9", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "8d3ae139d2646c630d674a1b8d68c7f85134f9e8b2a1c3dd5621616994b10a8b"}, "amqp_client": {:hex, :amqp_client, "3.12.10", "dcc0d5d0037fa2b486c6eb8b52695503765b96f919e38ca864a7b300b829742d", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:rabbit_common, "3.12.10", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "16a23959899a82d9c2534ed1dcf1fa281d3b660fb7f78426b880647f0a53731f"}, - "astarte_core": {:git, "https://github.com/astarte-platform/astarte_core.git", "7ba3d8f672e54c55b0abe8fd7c4d00f40a4f023e", [branch: "release-1.2"]}, - "astarte_data_access": {:git, "https://github.com/astarte-platform/astarte_data_access.git", "04196a757aa8d2c8fa1596aedc8cc495cea14e9c", [branch: "release-1.2"]}, + "astarte_core": {:git, "https://github.com/noaccOS/astarte_core.git", "ca2c71612f421a68e77df85b820eb2c3e8d3aeab", [branch: "chore/decimicro"]}, + "astarte_data_access": {:git, "https://github.com/noaccOS/astarte_data_access.git", "43acce9685c38f2c7f929704316759f35b842cdb", [branch: "chore/decimicro"]}, "astarte_rpc": {:hex, :astarte_rpc, "1.2.0", "dcef7434bf3f19ff30ff0bc245ef2d3b0f9abeb369405713cfd28916a5635926", [:mix], [{:amqp, "~> 3.3", [hex: :amqp, repo: "hexpm", optional: false]}, {:castore, "~> 1.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:protobuf, "~> 0.12", [hex: :protobuf, repo: "hexpm", optional: false]}, {:skogsra, "~> 2.2", [hex: :skogsra, repo: "hexpm", optional: false]}], "hexpm", "8470ed2f116fa8c9d70845f295f3738529aff123d9d3f98cbfa37107314763cb"}, "castore": {:hex, :castore, "1.0.7", "b651241514e5f6956028147fe6637f7ac13802537e895a724f90bf3e36ddd1dd", [:mix], [], "hexpm", "da7785a4b0d2a021cd1292a60875a784b6caef71e76bf4917bdee1f390455cf5"}, "certifi": {:hex, :certifi, "2.9.0", "6f2a475689dd47f19fb74334859d460a2dc4e3252a3324bd2111b8f0429e7e21", [:rebar3], [], "hexpm", "266da46bdb06d6c6d35fde799bcb28d36d985d424ad7c08b5bb48f5b5cdd4641"}, diff --git a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater/payloads_decoder_test.exs b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater/payloads_decoder_test.exs index 3562107ec7..d69383ed41 100644 --- a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater/payloads_decoder_test.exs +++ b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater/payloads_decoder_test.exs @@ -18,11 +18,13 @@ defmodule Astarte.DataUpdaterPlant.PayloadsDecoderTest do use ExUnit.Case + alias Astarte.Core.DecimicrosecondDateTime alias Astarte.DataUpdaterPlant.DataUpdater.PayloadsDecoder test "unset" do {:ok, date_time, 0} = DateTime.from_iso8601("2018-03-19T14:15:32+00:00") timestamp = DateTime.to_unix(date_time, :millisecond) * 10000 + 123 + timestamp = DecimicrosecondDateTime.from_unix!(timestamp, :decimicrosecond) assert PayloadsDecoder.decode_bson_payload(<<>>, timestamp) == {nil, nil, nil} end @@ -30,6 +32,7 @@ defmodule Astarte.DataUpdaterPlant.PayloadsDecoderTest do test "deprecated unset" do {:ok, date_time, 0} = DateTime.from_iso8601("2018-03-19T14:15:32+00:00") timestamp = DateTime.to_unix(date_time, :millisecond) * 10000 + 123 + timestamp = DecimicrosecondDateTime.from_unix!(timestamp, :decimicrosecond) unset_payload = Base.decode64!("DQAAAAV2AAAAAAAAAA==") @@ -39,7 +42,8 @@ defmodule Astarte.DataUpdaterPlant.PayloadsDecoderTest do test "individual value payloads without metadata and without timestamp" do {:ok, date_time, 0} = DateTime.from_iso8601("2018-03-19T14:15:32+00:00") timestamp = DateTime.to_unix(date_time, :microsecond) * 10 + 123 - expected_timestamp = DateTime.to_unix(date_time, :millisecond) + timestamp = DecimicrosecondDateTime.from_unix!(timestamp, :decimicrosecond) + expected_timestamp = DateTime.add(date_time, 0, :millisecond) string_payload = Base.decode64!("FAAAAAJ2AAgAAAAjRTVEOTAwAAA=") @@ -60,7 +64,8 @@ defmodule Astarte.DataUpdaterPlant.PayloadsDecoderTest do test "individual value payloads with timestamp and without metadata" do {:ok, date_time, 0} = DateTime.from_iso8601("2018-02-19T14:15:32+00:00") rec_timestamp = DateTime.to_unix(date_time, :microsecond) * 10 + 123 - expected_timestamp = 1_521_464_570_595 + rec_timestamp = DecimicrosecondDateTime.from_unix!(rec_timestamp, :decimicrosecond) + expected_timestamp = DateTime.from_unix!(1_521_464_570_595, :millisecond) double_payload = Base.decode64!("GwAAAAF2AGZRYzaGqOE/CXQA4/JaPmIBAAAA") @@ -71,7 +76,8 @@ defmodule Astarte.DataUpdaterPlant.PayloadsDecoderTest do test "individual value payloads with metadata and without timestamp" do {:ok, date_time, 0} = DateTime.from_iso8601("2018-02-19T14:15:32+00:00") rec_timestamp = DateTime.to_unix(date_time, :microsecond) * 10 + 123 - expected_timestamp = DateTime.to_unix(date_time, :millisecond) + rec_timestamp = DecimicrosecondDateTime.from_unix!(rec_timestamp, :decimicrosecond) + expected_timestamp = DateTime.add(date_time, 0, :millisecond) double_payload = Base.decode64!("MAAAAANtAB0AAAACbWV0YTEAAgAAAGEAEG1ldGEyAAIAAAAAAXYAZlFjNoao4T8A") @@ -83,7 +89,8 @@ defmodule Astarte.DataUpdaterPlant.PayloadsDecoderTest do test "deprecated object aggregation" do {:ok, date_time, 0} = DateTime.from_iso8601("2018-03-19T14:15:32+00:00") timestamp = DateTime.to_unix(date_time, :microsecond) * 10 + 123 - expected_timestamp = DateTime.to_unix(date_time, :millisecond) + timestamp = DecimicrosecondDateTime.from_unix!(timestamp, :decimicrosecond) + expected_timestamp = DateTime.add(date_time, 0, :millisecond) object_payload = "SwAAAAViaW4ABAAAAAAAAQIDCHRlc3QxAAECdGVzdDIACgAAAMSnZcWCxYLDuAABdGVzdDMAAAAAAAAAFEAJdG0AaGcvSGIBAAAA" @@ -102,7 +109,8 @@ defmodule Astarte.DataUpdaterPlant.PayloadsDecoderTest do test "object aggregation without timestamp and without metadata" do {:ok, date_time, 0} = DateTime.from_iso8601("2018-03-19T14:15:32+00:00") timestamp = DateTime.to_unix(date_time, :microsecond) * 10 + 123 - expected_timestamp = DateTime.to_unix(date_time, :millisecond) + timestamp = DecimicrosecondDateTime.from_unix!(timestamp, :decimicrosecond) + expected_timestamp = DateTime.add(date_time, 0, :millisecond) object_payload = "UwAAAAN2AEsAAAAFYmluAAQAAAAAAAECAwh0ZXN0MQABAnRlc3QyAAoAAADEp2XFgsWCw7gAAXRlc3QzAAAAAAAAABRACXRtAGhnL0hiAQAAAAA=" @@ -121,7 +129,8 @@ defmodule Astarte.DataUpdaterPlant.PayloadsDecoderTest do test "object aggregation with timestamp and without metadata" do {:ok, date_time, 0} = DateTime.from_iso8601("2018-03-19T14:15:32+00:00") timestamp = DateTime.to_unix(date_time, :microsecond) * 10 + 123 - expected_timestamp = 1_521_464_570_595 + timestamp = DecimicrosecondDateTime.from_unix!(timestamp, :decimicrosecond) + expected_timestamp = DateTime.from_unix!(1_521_464_570_595, :millisecond) object_payload = "XgAAAAl0AOPyWj5iAQAAA3YASwAAAAViaW4ABAAAAAAAAQIDCHRlc3QxAAECdGVzdDIACgAAAMSnZcWCxYLDuAABdGVzdDMAAAAAAAAAFEAJdG0AaGcvSGIBAAAAAA==" @@ -140,7 +149,8 @@ defmodule Astarte.DataUpdaterPlant.PayloadsDecoderTest do test "object aggregation with timestamp and metadata" do {:ok, date_time, 0} = DateTime.from_iso8601("2018-03-19T14:15:32+00:00") timestamp = DateTime.to_unix(date_time, :microsecond) * 10 + 123 - expected_timestamp = 1_521_464_570_595 + timestamp = DecimicrosecondDateTime.from_unix!(timestamp, :decimicrosecond) + expected_timestamp = DateTime.from_unix!(1_521_464_570_595, :millisecond) object_payload = "cAAAAANtAA8AAAAQbWV0YQACAAAAAAl0AOPyWj5iAQAAA3YASwAAAAViaW4ABAAAAAAAAQIDCHRlc3QxAAECdGVzdDIACgAAAMSnZcWCxYLDuAABdGVzdDMAAAAAAAAAFEAJdG0AaGcvSGIBAAAAAA==" diff --git a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs index 757962e0c5..bb3b8a275c 100644 --- a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs +++ b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs @@ -21,6 +21,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do import Mox alias Astarte.DataUpdaterPlant.Config + alias Astarte.Core.DecimicrosecondDateTime alias Astarte.Core.Device alias Astarte.Core.Triggers.SimpleEvents.DeviceConnectedEvent alias Astarte.Core.Triggers.SimpleEvents.DeviceDisconnectedEvent @@ -140,7 +141,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do ) == :ok timestamp_us_x_10 = make_timestamp("2017-10-09T14:00:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) + timestamp_ms = DecimicrosecondDateTime.to_unix(timestamp_us_x_10, :millisecond) DataUpdater.handle_connection( realm, @@ -658,7 +659,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do # Incoming data sub-test timestamp_us_x_10 = make_timestamp("2017-10-09T14:10:31+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) + timestamp_ms = DecimicrosecondDateTime.to_unix(timestamp_us_x_10, :millisecond) DataUpdater.handle_data( realm, @@ -854,7 +855,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do ) == {:error, :invalid_match_path} timestamp_us_x_10 = make_timestamp("2017-10-09T14:10:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) + timestamp_ms = DecimicrosecondDateTime.to_unix(timestamp_us_x_10, :millisecond) DataUpdater.handle_data( realm, @@ -922,7 +923,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do } timestamp_us_x_10 = make_timestamp("2017-10-09T14:15:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) + timestamp_ms = DecimicrosecondDateTime.to_unix(timestamp_us_x_10, :millisecond) # This should trigger matching_simple_trigger DataUpdater.handle_data( @@ -1026,7 +1027,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do ) == :ok timestamp_us_x_10 = make_timestamp("2017-10-09T14:15:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) + timestamp_ms = DecimicrosecondDateTime.to_unix(timestamp_us_x_10, :millisecond) # Introspection change subtest DataUpdater.handle_introspection( @@ -1177,7 +1178,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do :zlib.compress("com.test.LCDMonitor/time/to;com.test.LCDMonitor/weekSchedule/10/start") timestamp_us_x_10 = make_timestamp("2017-10-09T14:00:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) + timestamp_ms = DecimicrosecondDateTime.to_unix(timestamp_us_x_10, :millisecond) DataUpdater.handle_control( realm, @@ -1377,7 +1378,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do {:ok, db_client} = Database.connect(realm: realm) timestamp_us_x_10 = make_timestamp("2017-12-09T14:00:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) + timestamp_ms = DecimicrosecondDateTime.to_unix(timestamp_us_x_10, :millisecond) DataUpdater.handle_connection( realm, @@ -1427,7 +1428,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do assert old_device_introspection == nil timestamp_us_x_10 = make_timestamp("2017-10-09T14:00:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) + timestamp_ms = DecimicrosecondDateTime.to_unix(timestamp_us_x_10, :millisecond) DataUpdater.handle_introspection( realm, @@ -1601,7 +1602,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do DatabaseTestHelper.insert_device(device_id) timestamp_us_x_10 = make_timestamp("2017-12-09T14:00:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) + timestamp_ms = DecimicrosecondDateTime.to_unix(timestamp_us_x_10, :millisecond) # Make sure a process for the device exists DataUpdater.handle_connection( @@ -1644,7 +1645,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do DatabaseTestHelper.insert_device(device_id) timestamp_us_x_10 = make_timestamp("2017-12-09T14:00:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) + timestamp_ms = DecimicrosecondDateTime.to_unix(timestamp_us_x_10, :millisecond) # Make sure a process for the device exists DataUpdater.handle_connection( @@ -1719,9 +1720,9 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do end) timestamp_us_x_10 = make_timestamp("2017-10-09T15:00:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) + timestamp_ms = DecimicrosecondDateTime.to_unix(timestamp_us_x_10, :millisecond) - DataUpdater.start_device_deletion(realm, encoded_device_id, timestamp_ms) + DataUpdater.start_device_deletion(realm, encoded_device_id, timestamp_us_x_10) # Check DUP start ack in deleted_devices table dup_start_ack_statement = """ @@ -1828,7 +1829,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do DatabaseTestHelper.insert_device(device_id) timestamp_us_x_10 = make_timestamp("2017-12-09T14:00:32+00:00") - timestamp_ms = div(timestamp_us_x_10, 10_000) + timestamp_ms = DecimicrosecondDateTime.to_unix(timestamp_us_x_10, :millisecond) volatile_trigger_parent_id = :crypto.strong_rand_bytes(16) volatile_trigger_id = :crypto.strong_rand_bytes(16) @@ -1935,7 +1936,8 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do defp make_timestamp(timestamp_string) do {:ok, date_time, _} = DateTime.from_iso8601(timestamp_string) - DateTime.to_unix(date_time, :millisecond) * 10000 + timestamp = DateTime.to_unix(date_time, :millisecond) * 10000 + DecimicrosecondDateTime.from_unix!(timestamp, :decimicrosecond) end defp gen_tracking_id() do diff --git a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/triggers_handler_test.exs b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/triggers_handler_test.exs index 1585b345da..e93821605d 100644 --- a/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/triggers_handler_test.exs +++ b/apps/astarte_data_updater_plant/test/astarte_data_updater_plant/triggers_handler_test.exs @@ -40,6 +40,7 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do alias AMQP.Channel alias AMQP.Connection alias AMQP.Queue + alias Astarte.Core.DecimicrosecondDateTime alias Astarte.Core.Triggers.SimpleTriggersProtobuf.AMQPTriggerTarget alias Astarte.DataUpdaterPlant.Config alias Astarte.DataUpdaterPlant.TriggersHandler @@ -121,6 +122,8 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do assert_receive {:event, payload, meta} + timestamp = DecimicrosecondDateTime.to_unix(timestamp, :millisecond) + assert %SimpleEvent{ device_id: @device_id, parent_trigger_id: ^parent_trigger_id, @@ -171,6 +174,8 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do assert_receive {:event, payload, meta} + timestamp = DecimicrosecondDateTime.to_unix(timestamp, :millisecond) + assert %SimpleEvent{ device_id: @device_id, parent_trigger_id: ^parent_trigger_id, @@ -215,6 +220,8 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do assert_receive {:event, payload, meta} + timestamp = DecimicrosecondDateTime.to_unix(timestamp, :millisecond) + assert %SimpleEvent{ device_id: @device_id, parent_trigger_id: ^parent_trigger_id, @@ -262,6 +269,8 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do assert_receive {:event, payload, meta} + timestamp = DecimicrosecondDateTime.to_unix(timestamp, :millisecond) + assert %SimpleEvent{ device_id: @device_id, parent_trigger_id: ^parent_trigger_id, @@ -318,6 +327,8 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do assert_receive {:event, payload, meta} + timestamp = DecimicrosecondDateTime.to_unix(timestamp, :millisecond) + assert %SimpleEvent{ device_id: @device_id, parent_trigger_id: ^parent_trigger_id, @@ -367,6 +378,8 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do assert_receive {:event, payload, meta} + timestamp = DecimicrosecondDateTime.to_unix(timestamp, :millisecond) + assert %SimpleEvent{ device_id: @device_id, parent_trigger_id: ^parent_trigger_id, @@ -415,6 +428,8 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do assert_receive {:event, payload, meta} + timestamp = DecimicrosecondDateTime.to_unix(timestamp, :millisecond) + assert %SimpleEvent{ device_id: @device_id, parent_trigger_id: ^parent_trigger_id, @@ -470,6 +485,8 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do assert_receive {:event, payload, meta} + timestamp = DecimicrosecondDateTime.to_unix(timestamp, :millisecond) + assert %SimpleEvent{ device_id: @device_id, parent_trigger_id: ^parent_trigger_id, @@ -521,6 +538,8 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do assert_receive {:event, payload, meta} + timestamp = DecimicrosecondDateTime.to_unix(timestamp, :millisecond) + assert %SimpleEvent{ device_id: @device_id, parent_trigger_id: ^parent_trigger_id, @@ -571,6 +590,8 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do assert_receive {:event, payload, meta} + timestamp = DecimicrosecondDateTime.to_unix(timestamp, :millisecond) + assert %SimpleEvent{ device_id: @device_id, parent_trigger_id: ^parent_trigger_id, @@ -620,6 +641,8 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do assert_receive {:event, payload, meta} + timestamp = DecimicrosecondDateTime.to_unix(timestamp, :millisecond) + assert %SimpleEvent{ device_id: @device_id, parent_trigger_id: ^parent_trigger_id, @@ -680,6 +703,8 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do assert_receive {:event, payload, meta} + timestamp = DecimicrosecondDateTime.to_unix(timestamp, :millisecond) + assert %SimpleEvent{ device_id: @device_id, parent_trigger_id: ^parent_trigger_id, @@ -742,6 +767,8 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do assert_receive {:event, payload, meta} + timestamp = DecimicrosecondDateTime.to_unix(timestamp, :millisecond) + assert %SimpleEvent{ device_id: @device_id, parent_trigger_id: ^parent_trigger_id, @@ -818,6 +845,8 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do assert_receive {:event, payload, meta} + timestamp = DecimicrosecondDateTime.to_unix(timestamp, :millisecond) + assert %SimpleEvent{ device_id: @device_id, parent_trigger_id: ^parent_trigger_id, @@ -881,6 +910,8 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do assert_receive {:event, payload, meta} + timestamp = DecimicrosecondDateTime.to_unix(timestamp, :millisecond) + assert %SimpleEvent{ device_id: @device_id, parent_trigger_id: ^parent_trigger_id, @@ -944,6 +975,8 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do assert_receive {:event, payload, meta} + timestamp = DecimicrosecondDateTime.to_unix(timestamp, :millisecond) + assert %SimpleEvent{ device_id: @device_id, parent_trigger_id: ^parent_trigger_id, @@ -986,6 +1019,7 @@ defmodule Astarte.DataUpdaterPlant.TriggersHandlerTest do defp get_timestamp do DateTime.utc_now() |> DateTime.to_unix(:microsecond) + |> DecimicrosecondDateTime.from_unix!(:microsecond) end defp subscribe_to_queue(chan, queue_name) do