Skip to content

Commit d5c6578

Browse files
committed
feat(capabilities): Add capabilites endpoint
- Devices can publish on `capabilities` endpoint - Adds the `purge_properties_compression_format` capability, setting whether the device should recive the purge properties compressed with zlib or in plain text Signed-off-by: Luca Zaninotto <luca.zaninotto@secomind.com>
1 parent 7e10975 commit d5c6578

10 files changed

Lines changed: 223 additions & 19 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1212
`booleanarray`, `longintegerarray`, `stringarray`, `datetimearray`, `binaryblobarray`.
1313
- [astarte_export] Added a new command for exporting by device_id.
1414
`mix astarte.export $REALM $FILE_XML $DEVICE_ID`
15+
- [astarte_data_updater_plant] Added the `capabilities` endpoint for devices,
16+
setting whether the device should get the purge properties compressed with
17+
`zlib` or in plaintext
1518

1619
## [1.2.1] - Unreleased
1720
### Changed

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/impl.ex

Lines changed: 97 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#
22
# This file is part of Astarte.
33
#
4-
# Copyright 2017 - 2023 SECO Mind Srl
4+
# Copyright 2017 - 2025 SECO Mind Srl
55
#
66
# Licensed under the Apache License, Version 2.0 (the "License");
77
# you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
2222
alias Astarte.Core.CQLUtils
2323
alias Astarte.DataUpdaterPlant.Config
2424
alias Astarte.Core.Device
25+
alias Astarte.Core.Device.Capabilities
2526
alias Astarte.Core.InterfaceDescriptor
2627
alias Astarte.Core.Mapping
2728
alias Astarte.Core.Mapping.EndpointsAutomaton
@@ -69,6 +70,10 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
6970
# TODO change this, we want extended device IDs to fall in the same process
7071
{realm, device_id} = sharding_key
7172

73+
capabilities = %Capabilities{
74+
purge_properties_compression_format: :zlib
75+
}
76+
7277
state = %State{
7378
realm: realm,
7479
device_id: device_id,
@@ -90,7 +95,8 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
9095
trigger_id_to_policy_name: %{},
9196
discard_messages: false,
9297
last_deletion_in_progress_refresh: 0,
93-
last_datastream_maximum_retention_refresh: 0
98+
last_datastream_maximum_retention_refresh: 0,
99+
capabilities: capabilities
94100
}
95101

96102
encoded_device_id = Device.encode_device_id(device_id)
@@ -104,9 +110,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
104110

105111
{:ok, ttl} = Queries.fetch_datastream_maximum_storage_retention(db_client)
106112

113+
{:ok, capabilities} = Queries.fetch_device_capabilities(db_client, device_id)
114+
107115
new_state =
108116
Map.merge(state, stats_and_introspection)
109117
|> Map.put(:datastream_maximum_storage_retention, ttl)
118+
|> Map.put(:capabilities, capabilities)
110119

111120
{:ok, new_state}
112121
end
@@ -141,6 +150,9 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
141150
%{@control_path_header => control_path} = headers
142151
handle_control(state, control_path, payload, timestamp)
143152

153+
"capabilities" ->
154+
handle_capabilities(state, payload, timestamp)
155+
144156
_ ->
145157
# Ack all messages for now
146158
{:ack, :ok, state}
@@ -1608,6 +1620,37 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
16081620
{:ack, :ok, final_state}
16091621
end
16101622

1623+
def handle_capabilities(state, payload, _timestamp) do
1624+
%{device_id: device_id} = state
1625+
1626+
with {:ok, bson_payload} <- decode_bson_payload(state, payload),
1627+
changeset = Capabilities.changeset(state.capabilities, bson_payload),
1628+
{:ok, capabilities} <- update_capabilities(changeset, state) do
1629+
{:ok, db_client} = Database.connect(realm: state.realm)
1630+
Queries.set_device_capabilities(db_client, device_id, capabilities)
1631+
1632+
new_state = %State{state | capabilities: capabilities}
1633+
{:ack, :ok, new_state}
1634+
end
1635+
end
1636+
1637+
defp update_capabilities(changeset, state) do
1638+
with {:error, error} <- Ecto.Changeset.apply_action(changeset, :update) do
1639+
Logger.warning(
1640+
"Error while updating capabilities: #{error}",
1641+
tag: "capabilities_error"
1642+
)
1643+
1644+
:telemetry.execute(
1645+
[:astarte, :data_updater_plant, :data_updater, :discarded_message],
1646+
%{},
1647+
%{realm: state.realm}
1648+
)
1649+
1650+
{:discard, error, state}
1651+
end
1652+
end
1653+
16111654
def handle_control(%State{discard_messages: true} = state, _, _, _) do
16121655
{:ack, :discard_messages, state}
16131656
end
@@ -1641,9 +1684,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
16411684

16421685
# TODO: check payload size, to avoid anoying crashes
16431686

1644-
<<_size_header::size(32), zlib_payload::binary>> = payload
1645-
1646-
case PayloadsDecoder.safe_inflate(zlib_payload) do
1687+
case inflate_purge_properties_payload(payload) do
16471688
{:ok, decoded_payload} ->
16481689
:ok = prune_device_properties(new_state, decoded_payload, timestamp_ms)
16491690

@@ -1727,6 +1768,40 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
17271768
{:discard, :unexpected_control_message, new_state, {:continue, continue_arg}}
17281769
end
17291770

1771+
defp decode_bson_payload(state, payload) do
1772+
with {:error, error} <- Cyanide.decode(payload) do
1773+
Logger.warning(
1774+
"error parsing capabilities BSON message: #{error}",
1775+
tag: "capabilities_error"
1776+
)
1777+
1778+
:telemetry.execute(
1779+
[:astarte, :data_updater_plant, :data_updater, :discarded_message],
1780+
%{},
1781+
%{realm: state.realm}
1782+
)
1783+
1784+
{:discard, error, state}
1785+
end
1786+
end
1787+
1788+
defp inflate_purge_properties_payload(<<255, 255, 255, 255, payload::binary>>) do
1789+
_ =
1790+
Logger.debug("Received plaintext purge properties properties payload",
1791+
tag: "purge_properties"
1792+
)
1793+
1794+
{:ok, payload}
1795+
end
1796+
1797+
defp inflate_purge_properties_payload(<<_size_header::size(32), zlib_payload::binary>>) do
1798+
Logger.debug("Received zlib compressed purge properties properties payload",
1799+
tag: "purge_properties"
1800+
)
1801+
1802+
PayloadsDecoder.safe_inflate(zlib_payload)
1803+
end
1804+
17301805
defp delete_volatile_trigger(
17311806
state,
17321807
{obj_id, _obj_type},
@@ -2673,7 +2748,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
26732748

26742749
# TODO: use the returned byte count in stats
26752750
with {:ok, _bytes} <-
2676-
send_consumer_properties_payload(state.realm, state.device_id, abs_paths_list) do
2751+
send_consumer_properties_payload(
2752+
state.realm,
2753+
state.device_id,
2754+
abs_paths_list,
2755+
state.purge_properties_compression_format
2756+
) do
26772757
:ok
26782758
end
26792759
end
@@ -2742,15 +2822,22 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
27422822
:ok
27432823
end
27442824

2745-
defp send_consumer_properties_payload(realm, device_id, abs_paths_list) do
2825+
defp send_consumer_properties_payload(realm, device_id, abs_paths_list, compression_format) do
27462826
topic = "#{realm}/#{Device.encode_device_id(device_id)}/control/consumer/properties"
27472827

27482828
uncompressed_payload = Enum.join(abs_paths_list, ";")
27492829

2750-
payload_size = byte_size(uncompressed_payload)
2751-
compressed_payload = :zlib.compress(uncompressed_payload)
2830+
payload =
2831+
case compression_format do
2832+
:zlib ->
2833+
payload_size = byte_size(uncompressed_payload)
2834+
compressed_payload = :zlib.compress(uncompressed_payload)
27522835

2753-
payload = <<payload_size::unsigned-big-integer-size(32), compressed_payload::binary>>
2836+
<<payload_size::unsigned-big-integer-size(32), compressed_payload::binary>>
2837+
2838+
:plaintext ->
2839+
<<255, 255, 255, 255>> <> uncompressed_payload
2840+
end
27542841

27552842
case VMQPlugin.publish(topic, payload, 2) do
27562843
{:ok, %{local_matches: local, remote_matches: remote}} when local + remote == 1 ->

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/queries.ex

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do
2020
alias Astarte.Core.CQLUtils
2121
alias Astarte.Core.Device
22+
alias Astarte.Core.Device.Capabilities
2223
alias Astarte.Core.InterfaceDescriptor
2324
alias Astarte.Core.Mapping
2425
alias Astarte.DataUpdaterPlant.Config
@@ -450,6 +451,53 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do
450451
end
451452
end
452453

454+
def fetch_device_capabilities(db_client, device_id) do
455+
device_capabilities_statement = """
456+
SELECT purge_properties_compression_format
457+
FROM devices
458+
WHERE device_id=:device_id
459+
"""
460+
461+
device_capabilities_query =
462+
DatabaseQuery.new()
463+
|> DatabaseQuery.statement(device_capabilities_statement)
464+
|> DatabaseQuery.put(:device_id, device_id)
465+
466+
with {:ok, result} <- DatabaseQuery.call(db_client, device_capabilities_query) do
467+
capabilities_row =
468+
result
469+
|> DatabaseResult.head()
470+
|> Enum.into(%{})
471+
472+
capabilities =
473+
%Capabilities{}
474+
|> Capabilities.changeset(capabilities_row)
475+
|> Ecto.Changeset.apply_changes()
476+
477+
{:ok, capabilities}
478+
end
479+
end
480+
481+
def set_device_capabilities(
482+
db_client,
483+
device_id,
484+
%Capabilities{purge_properties_compression_format: format} = _capabilities
485+
) do
486+
device_capabilities_statement = """
487+
UPDATE devices
488+
SET purge_properties_compression_format=:format
489+
WHERE device_id=:device_id
490+
"""
491+
492+
device_capabilities_query =
493+
DatabaseQuery.new()
494+
|> DatabaseQuery.statement(device_capabilities_statement)
495+
|> DatabaseQuery.put(:format, format)
496+
|> DatabaseQuery.put(:device_id, device_id)
497+
498+
DatabaseQuery.call!(db_client, device_capabilities_query)
499+
end
500+
453501
def set_device_connected!(db_client, device_id, timestamp_ms, ip_address) do
454502
set_connection_info!(db_client, device_id, timestamp_ms, ip_address)
455503

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/state.ex

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#
22
# This file is part of Astarte.
33
#
4-
# Copyright 2017 - 2023 SECO Mind Srl
4+
# Copyright 2017 - 2025 SECO Mind Srl
55
#
66
# Licensed under the Apache License, Version 2.0 (the "License");
77
# you may not use this file except in compliance with the License.
@@ -46,6 +46,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.State do
4646
:trigger_id_to_policy_name,
4747
:discard_messages,
4848
:last_deletion_in_progress_refresh,
49-
:last_datastream_maximum_retention_refresh
49+
:last_datastream_maximum_retention_refresh,
50+
:capabilities
5051
]
5152
end

apps/astarte_data_updater_plant/mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#
22
# This file is part of Astarte.
33
#
4-
# Copyright 2017 - 2023 SECO Mind Srl
4+
# Copyright 2017 - 2025 SECO Mind Srl
55
#
66
# Licensed under the Apache License, Version 2.0 (the "License");
77
# you may not use this file except in compliance with the License.

apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#
22
# This file is part of Astarte.
33
#
4-
# Copyright 2017 - 2023 SECO Mind Srl
4+
# Copyright 2017 - 2025 SECO Mind Srl
55
#
66
# Licensed under the Apache License, Version 2.0 (the "License");
77
# you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do
2222

2323
alias Astarte.DataUpdaterPlant.Config
2424
alias Astarte.Core.Device
25+
alias Astarte.Core.Device.Capabilities
2526
alias Astarte.Core.Triggers.SimpleEvents.DeviceConnectedEvent
2627
alias Astarte.Core.Triggers.SimpleEvents.IncomingDataEvent
2728
alias Astarte.Core.Triggers.SimpleEvents.PathRemovedEvent
@@ -40,6 +41,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do
4041
alias Astarte.DataAccess.Database
4142
alias Astarte.DataUpdaterPlant.AMQPTestHelper
4243
alias Astarte.DataUpdaterPlant.DatabaseTestHelper
44+
alias Astarte.DataUpdaterPlant.DataUpdater.State
4345
alias Astarte.Core.CQLUtils
4446
alias CQEx.Query, as: DatabaseQuery
4547
alias CQEx.Result, as: DatabaseResult
@@ -1750,6 +1752,35 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do
17501752
assert [%{"dup_end_ack" => true}] = dup_end_ack_result
17511753
end
17521754

1755+
test "capabilities message correctly handled" do
1756+
AMQPTestHelper.clean_queue()
1757+
1758+
realm = "autotestrealm"
1759+
1760+
device_id = Device.random_device_id()
1761+
encoded_device_id = Device.encode_device_id(device_id)
1762+
1763+
DatabaseTestHelper.insert_device(device_id)
1764+
1765+
state = dump_state(realm, encoded_device_id)
1766+
assert %Capabilities{purge_properties_compression_format: :zlib} = state.capabilities
1767+
1768+
timestamp_us_x_10 = make_timestamp("2025-01-20T14:00:32+00:00")
1769+
1770+
payload = Cyanide.encode!(%{"purge_properties_compression_format" => "plaintext"})
1771+
1772+
handle_capabilities(
1773+
realm,
1774+
encoded_device_id,
1775+
payload,
1776+
gen_tracking_id(),
1777+
timestamp_us_x_10
1778+
)
1779+
1780+
new_state = dump_state(realm, encoded_device_id)
1781+
assert %Capabilities{purge_properties_compression_format: :plaintext} = new_state.capabilities
1782+
end
1783+
17531784
defp retrieve_endpoint_id(client, interface_name, interface_major, path) do
17541785
query =
17551786
DatabaseQuery.new()
@@ -1882,6 +1913,27 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do
18821913
ensure_message_has_been_handled(realm, device_id)
18831914
end
18841915

1916+
defp handle_capabilities(realm, encoded_device_id, payload, _tracking_id, timestamp) do
1917+
{:ok, device_id} = Astarte.Core.Device.decode_device_id(encoded_device_id)
1918+
1919+
headers =
1920+
headers_fixture(realm, encoded_device_id, x_astarte_msg_type: "capabilities")
1921+
1922+
publish_opts = [
1923+
headers: headers,
1924+
message_id: generate_message_id(realm, encoded_device_id, timestamp),
1925+
timestamp: timestamp,
1926+
sharding_key: {realm, device_id}
1927+
]
1928+
1929+
{:ok, pid} = Mississippi.Consumer.MessageTracker.get_message_tracker({realm, device_id})
1930+
:erlang.trace(pid, true, [:receive])
1931+
1932+
:ok = Mississippi.Producer.EventsProducer.publish(payload, publish_opts)
1933+
1934+
assert_receive {:trace, ^pid, :receive, {_, {_, _}, {:ack_delivery, _message}}}
1935+
end
1936+
18851937
defp handle_control(realm, encoded_device_id, control_path, value, _tracking_id, timestamp) do
18861938
{:ok, device_id} = Astarte.Core.Device.decode_device_id(encoded_device_id)
18871939

0 commit comments

Comments
 (0)