Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ defmodule Astarte.DataUpdaterPlant.AMQPDataConsumer do
use GenServer

alias AMQP.Channel
alias Astarte.Core.DecimicrosecondDateTime
alias Astarte.DataUpdaterPlant.Config
alias Astarte.DataUpdaterPlant.DataUpdater

Expand Down Expand Up @@ -185,6 +186,7 @@ defmodule Astarte.DataUpdaterPlant.AMQPDataConsumer do
msg_type = Map.get(headers_map, @msg_type_header, headers_map)

{timestamp, clean_meta} = Map.pop(no_headers_meta, :timestamp)
timestamp = DecimicrosecondDateTime.from_unix!(timestamp, :decimicrosecond)

case handle_consume(msg_type, payload, headers_map, timestamp, clean_meta) do
:ok ->
Expand Down Expand Up @@ -391,6 +393,8 @@ defmodule Astarte.DataUpdaterPlant.AMQPDataConsumer do
end

defp handle_invalid_msg(payload, headers, timestamp, meta) do
timestamp = DecimicrosecondDateTime.to_unix(timestamp, :millisecond)

Logger.warning(
"Invalid AMQP message: #{inspect(Base.encode64(payload))} #{inspect(headers)} #{inspect(timestamp)} #{inspect(meta)}",
tag: "data_consumer_invalid_msg"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ defmodule Astarte.DataUpdater.DeletionScheduler do
alias Astarte.DataUpdaterPlant.DataUpdater.Queries
alias Astarte.DataUpdaterPlant.DataUpdater
alias Astarte.DataUpdaterPlant.Config
alias Astarte.Core.DecimicrosecondDateTime
alias Astarte.Core.Device
alias Astarte.Core.CQLUtils

Expand Down Expand Up @@ -56,7 +57,7 @@ defmodule Astarte.DataUpdater.DeletionScheduler do
defp start_device_deletion! do
retrieve_devices_to_delete!()
|> Enum.each(fn %{realm_name: realm_name, encoded_device_id: encoded_device_id} ->
timestamp = now_us_x10_timestamp()
timestamp = now_us_x10_timestamp() |> DecimicrosecondDateTime.from_unix!(:decimicrosecond)
# This must be a call, as we want to be sure this was completed
:ok = DataUpdater.start_device_deletion(realm_name, encoded_device_id, timestamp)
end)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
alias Astarte.Core.CQLUtils
alias Astarte.DataUpdaterPlant.Config
alias Astarte.Core.DecimicrosecondDateTime
alias Astarte.Core.Device
alias Astarte.Core.InterfaceDescriptor
alias Astarte.Core.Mapping
Expand Down Expand Up @@ -46,12 +47,14 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
alias Astarte.DataUpdaterPlant.TriggerPolicy.Queries, as: PolicyQueries
require Logger

@epoch DecimicrosecondDateTime.from_unix!(0)

@paths_cache_size 32
@interface_lifespan_decimicroseconds 60 * 10 * 1000 * 10000
@device_triggers_lifespan_decimicroseconds 60 * 10 * 1000 * 10000
@groups_lifespan_decimicroseconds 60 * 10 * 1000 * 10000
@deletion_refresh_lifespan_decimicroseconds 60 * 10 * 1000 * 10000
@datastream_maximum_retention_refresh_lifespan_decimicroseconds 60 * 10 * 1000 * 10000
@interface_lifespan minute: 10
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems rather ad hoc, I'd suggest using the standard :timer.minutes(10) and update DecimicrosecondDateTime.shift consequently.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It mimics DateTime.shift. Although the duration type is not yet available, we can mimic its functionality if we're just compiling a constant minute: 10 value

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't like, I'd argue for having two variables for each lifespan, as in

@interface_lifespan_amount 10
@interface_lifespan_unit :minute

# usage
DecimicrosecondDateTime.add(datetime, @interface_lifespan_amount, @interface_lifespan_unit)

DateTime.add expects a unit anyway, IMO it's easier to understand if we give it in minutes ourselves

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not familiar with DateTime.shift's game, now I see why. Looks ok

@device_triggers_lifespan minute: 10
@groups_lifespan minute: 10
@deletion_refresh_lifespan minute: 10
@datastream_maximum_retention_refresh_lifespan minute: 10

def init_state(realm, device_id, message_tracker) do
MessageTracker.register_data_updater(message_tracker)
Expand All @@ -73,13 +76,13 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
volatile_triggers: [],
interface_exchanged_bytes: %{},
interface_exchanged_msgs: %{},
last_seen_message: 0,
last_device_triggers_refresh: 0,
last_groups_refresh: 0,
last_seen_message: @epoch,
last_device_triggers_refresh: @epoch,
last_groups_refresh: @epoch,
trigger_id_to_policy_name: %{},
discard_messages: false,
last_deletion_in_progress_refresh: 0,
last_datastream_maximum_retention_refresh: 0
last_deletion_in_progress_refresh: @epoch,
last_datastream_maximum_retention_refresh: @epoch
}

encoded_device_id = Device.encode_device_id(device_id)
Expand Down Expand Up @@ -110,8 +113,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
def handle_connection(state, ip_address_string, message_id, timestamp) do
new_state = execute_time_based_actions(state, timestamp)

timestamp_ms = div(timestamp, 10_000)

ip_address_result =
ip_address_string
|> to_charlist()
Expand All @@ -130,7 +131,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
Queries.set_device_connected!(
new_state.realm,
new_state.device_id,
DateTime.from_unix!(timestamp_ms, :millisecond),
timestamp,
ip_address
)

Expand All @@ -147,7 +148,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
new_state.realm,
device_id_string,
ip_address_string,
timestamp_ms
timestamp
)

MessageTracker.ack_delivery(new_state.message_tracker, message_id)
Expand Down Expand Up @@ -460,8 +461,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
end

defp execute_device_error_triggers(state, error_name, error_metadata \\ %{}, timestamp) do
timestamp_ms = div(timestamp, 10_000)

trigger_target_with_policy_list =
Map.get(state.device_triggers, :on_device_error, [])
|> Enum.map(fn target ->
Expand All @@ -476,7 +475,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
device_id_string,
error_name,
error_metadata,
timestamp_ms
timestamp
)

:ok
Expand Down Expand Up @@ -510,9 +509,9 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do

maybe_explicit_value_timestamp =
if mapping.explicit_timestamp do
value_timestamp
%DecimicrosecondDateTime{datetime: value_timestamp}
else
div(timestamp, 10000)
timestamp
end

execute_incoming_data_triggers(
Expand Down Expand Up @@ -1014,15 +1013,13 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
end

defp is_still_valid?({:ok, expiry_date}, ttl) do
expiry_secs = DateTime.to_unix(expiry_date)

now_secs =
DateTime.utc_now()
|> DateTime.to_unix()
# add 1 hour of tolerance for clock synchronization issues
now_with_tolerance =
DateTime.utc_now(:second)
|> DateTime.add(ttl)
|> DateTime.add(1, :hour)

# 3600 seconds is one hour
# this adds 1 hour of tolerance to clock synchronization issues
now_secs + ttl + 3600 < expiry_secs
DateTime.before?(now_with_tolerance, expiry_date)
end

defp validate_interface(interface) do
Expand Down Expand Up @@ -1229,8 +1226,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
def process_introspection(state, new_introspection_list, payload, message_id, timestamp) do
new_state = execute_time_based_actions(state, timestamp)

timestamp_ms = div(timestamp, 10_000)

{db_introspection_map, db_introspection_minor_map} =
List.foldl(new_introspection_list, {%{}, %{}}, fn {interface, major, minor},
{introspection_map,
Expand Down Expand Up @@ -1260,7 +1255,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
realm,
device_id_string,
payload,
timestamp_ms
timestamp
)

# TODO: implement here object_id handling for a certain interface name. idea: introduce interface_family_id
Expand Down Expand Up @@ -1315,7 +1310,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
interface_name,
interface_major,
minor,
timestamp_ms
timestamp
)
end)

Expand Down Expand Up @@ -1352,7 +1347,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
device_id_string,
interface_name,
interface_major,
timestamp_ms
timestamp
)
end)

Expand Down Expand Up @@ -1414,7 +1409,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
interface_major,
old_minor,
new_minor,
timestamp_ms
timestamp
)
end

Expand Down Expand Up @@ -1460,9 +1455,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
def handle_control(state, "/producer/properties", <<0, 0, 0, 0>>, message_id, timestamp) do
new_state = execute_time_based_actions(state, timestamp)

timestamp_ms = div(timestamp, 10_000)

:ok = prune_device_properties(new_state, "", timestamp_ms)
:ok = prune_device_properties(new_state, "", timestamp)

MessageTracker.ack_delivery(new_state.message_tracker, message_id)

Expand All @@ -1478,15 +1471,13 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
def handle_control(state, "/producer/properties", payload, message_id, timestamp) do
new_state = execute_time_based_actions(state, timestamp)

timestamp_ms = div(timestamp, 10_000)

# TODO: check payload size, to avoid anoying crashes

<<_size_header::size(32), zlib_payload::binary>> = payload

case PayloadsDecoder.safe_inflate(zlib_payload) do
{:ok, decoded_payload} ->
:ok = prune_device_properties(new_state, decoded_payload, timestamp_ms)
:ok = prune_device_properties(new_state, decoded_payload, timestamp)
MessageTracker.ack_delivery(new_state.message_tracker, message_id)

%{
Expand Down Expand Up @@ -1835,7 +1826,10 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
end

defp reload_groups_on_expiry(state, timestamp) do
if state.last_groups_refresh + @groups_lifespan_decimicroseconds <= timestamp do
next_refresh = DecimicrosecondDateTime.shift(state.last_groups_refresh, @groups_lifespan)
refresh? = DecimicrosecondDateTime.compare(next_refresh, timestamp) != :gt

if refresh? do
# TODO this could be a bang!
{:ok, groups} = Queries.get_device_groups(state.realm, state.device_id)

Expand All @@ -1846,8 +1840,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
end

defp reload_device_triggers_on_expiry(state, timestamp) do
if state.last_device_triggers_refresh + @device_triggers_lifespan_decimicroseconds <=
timestamp do
next_refresh =
DecimicrosecondDateTime.shift(state.last_device_triggers_refresh, @device_triggers_lifespan)

refresh? = DecimicrosecondDateTime.compare(next_refresh, timestamp) != :gt

if refresh? do
any_device_id = SimpleTriggersProtobufUtils.any_device_object_id()

any_interface_id = SimpleTriggersProtobufUtils.any_interface_object_id()
Expand Down Expand Up @@ -1890,11 +1888,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
end

defp execute_time_based_actions(state, timestamp) do
if state.connected && state.last_seen_message > 0 do
# timestamps are handled as microseconds*10, so we need to divide by 10 when saving as a metric for a coherent data
if state.connected && DecimicrosecondDateTime.after?(state.last_seen_message, @epoch) do
:telemetry.execute(
[:astarte, :data_updater_plant, :service, :connected_devices],
%{duration: Integer.floor_div(timestamp - state.last_seen_message, 10)},
%{
duration: DecimicrosecondDateTime.diff(timestamp, state.last_seen_message, :microsecond)
},
%{realm: state.realm, status: :ok}
)
end
Expand All @@ -1909,8 +1908,15 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
end

defp reload_device_deletion_status_on_expiry(state, timestamp) do
if state.last_deletion_in_progress_refresh + @deletion_refresh_lifespan_decimicroseconds <=
timestamp do
next_refresh =
DecimicrosecondDateTime.shift(
state.last_deletion_in_progress_refresh,
@deletion_refresh_lifespan
)

refresh? = DecimicrosecondDateTime.compare(next_refresh, timestamp) != :gt

if refresh? do
new_state = maybe_start_device_deletion(state, timestamp)
%State{new_state | last_deletion_in_progress_refresh: timestamp}
else
Expand All @@ -1919,9 +1925,15 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
end

defp reload_datastream_maximum_storage_retention_on_expiry(state, timestamp) do
if state.last_datastream_maximum_retention_refresh +
@datastream_maximum_retention_refresh_lifespan_decimicroseconds <=
timestamp do
next_refresh =
DecimicrosecondDateTime.shift(
state.last_datastream_maximum_retention_refresh,
@datastream_maximum_retention_refresh_lifespan
)

refresh? = DecimicrosecondDateTime.compare(next_refresh, timestamp) != :gt

if refresh? do
# TODO this could be a bang!
case Queries.fetch_datastream_maximum_storage_retention(state.realm) do
{:ok, ttl} ->
Expand Down Expand Up @@ -2077,7 +2089,10 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
Mappings.fetch_interface_mappings_map(state.realm, interface_id),
new_interfaces_by_expiry <-
state.interfaces_by_expiry ++
[{state.last_seen_message + @interface_lifespan_decimicroseconds, interface_name}],
[
{DecimicrosecondDateTime.shift(state.last_seen_message, @interface_lifespan),
interface_name}
],
new_state <- %State{
state
| interfaces: Map.put(state.interfaces, interface_name, interface_descriptor),
Expand Down Expand Up @@ -2239,19 +2254,17 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
end

defp set_device_disconnected(state, timestamp) do
timestamp_ms = div(timestamp, 10_000)

Queries.set_device_disconnected!(
state.realm,
state.device_id,
DateTime.from_unix!(timestamp_ms, :millisecond),
timestamp,
state.total_received_msgs,
state.total_received_bytes,
state.interface_exchanged_msgs,
state.interface_exchanged_bytes
)

maybe_execute_device_disconnected_trigger(state, timestamp_ms)
maybe_execute_device_disconnected_trigger(state, timestamp)

%{state | connected: false}
end
Expand All @@ -2260,7 +2273,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
:ok
end

defp maybe_execute_device_disconnected_trigger(state, timestamp_ms) do
defp maybe_execute_device_disconnected_trigger(state, timestamp) do
trigger_target_with_policy_list =
Map.get(state.device_triggers, :on_device_disconnection, [])
|> Enum.map(fn target ->
Expand All @@ -2273,7 +2286,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
trigger_target_with_policy_list,
state.realm,
device_id_string,
timestamp_ms
timestamp
)

:telemetry.execute(
Expand Down
Loading
Loading