Skip to content

Commit 307cd0c

Browse files
committed
chore(migrations): device migrations
- Added device migrations to add `purge_properties_compression_format` column to `devices` table in the database Signed-off-by: Luca Zaninotto <luca.zaninotto@secomind.com>
1 parent 1147b97 commit 307cd0c

File tree

6 files changed

+103
-37
lines changed

6 files changed

+103
-37
lines changed

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/impl.ex

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
2222
alias Astarte.Core.CQLUtils
2323
alias Astarte.DataUpdaterPlant.Config
2424
alias Astarte.Core.Device
25+
alias Astarte.Core.Device.Capabilities
2526
alias Astarte.Core.InterfaceDescriptor
2627
alias Astarte.Core.Mapping
2728
alias Astarte.Core.Mapping.EndpointsAutomaton
@@ -69,6 +70,10 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
6970
# TODO change this, we want extended device IDs to fall in the same process
7071
{realm, device_id} = sharding_key
7172

73+
capabilities = %Capabilities{
74+
purge_properties_compression_format: :zlib
75+
}
76+
7277
state = %State{
7378
realm: realm,
7479
device_id: device_id,
@@ -91,7 +96,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
9196
discard_messages: false,
9297
last_deletion_in_progress_refresh: 0,
9398
last_datastream_maximum_retention_refresh: 0,
94-
purge_properties_compression_format: :zlib
99+
capabilities: capabilities
95100
}
96101

97102
encoded_device_id = Device.encode_device_id(device_id)
@@ -105,9 +110,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
105110

106111
{:ok, ttl} = Queries.fetch_datastream_maximum_storage_retention(db_client)
107112

113+
{:ok, capabilities} = Queries.fetch_device_capabilities(db_client, device_id)
114+
108115
new_state =
109116
Map.merge(state, stats_and_introspection)
110117
|> Map.put(:datastream_maximum_storage_retention, ttl)
118+
|> Map.put(:capabilities, capabilities)
111119

112120
{:ok, new_state}
113121
end
@@ -1613,12 +1621,36 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
16131621
end
16141622

16151623
def handle_capabilities(state, payload, _timestamp) do
1624+
%{device_id: device_id} = state
1625+
16161626
with {:ok, bson_payload} <- decode_bson_payload(state, payload),
1617-
{:ok, new_state} <- maybe_purge_properties_compression(state, bson_payload) do
1627+
changeset = Capabilities.changeset(state.capabilities, bson_payload),
1628+
{:ok, capabilities} <- update_capabilities(changeset, state) do
1629+
{:ok, db_client} = Database.connect(realm: state.realm)
1630+
Queries.set_device_capabilities(db_client, device_id, capabilities)
1631+
1632+
new_state = %State{state | capabilities: capabilities}
16181633
{:ack, :ok, new_state}
16191634
end
16201635
end
16211636

1637+
defp update_capabilities(changeset, state) do
1638+
with {:error, error} <- Ecto.Changeset.apply_action(changeset, :update) do
1639+
Logger.warning(
1640+
"Error while updating capabilities: #{error}",
1641+
tag: "capabilities_error"
1642+
)
1643+
1644+
:telemetry.execute(
1645+
[:astarte, :data_updater_plant, :data_updater, :discarded_message],
1646+
%{},
1647+
%{realm: state.realm}
1648+
)
1649+
1650+
{:discard, error, state}
1651+
end
1652+
end
1653+
16221654
def handle_control(%State{discard_messages: true} = state, _, _, _) do
16231655
{:ack, :discard_messages, state}
16241656
end
@@ -1753,34 +1785,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
17531785
end
17541786
end
17551787

1756-
defp maybe_purge_properties_compression(state, bson) do
1757-
case Map.fetch(bson, "purge_properties_compression_format") do
1758-
# capability not found in bson, the user did not set it
1759-
:error ->
1760-
{:ok, state}
1761-
1762-
{:ok, "plaintext"} ->
1763-
{:ok, Map.put(state, :purge_properties_compression_format, :plaintext)}
1764-
1765-
{:ok, "zlib"} ->
1766-
{:ok, Map.put(state, :purge_properties_compression_format, :zlib)}
1767-
1768-
invalid ->
1769-
Logger.warning(
1770-
"Invalid capabilities purge_properties_compression_format value, ignoring: #{invalid}",
1771-
tag: "capabilities_error"
1772-
)
1773-
1774-
:telemetry.execute(
1775-
[:astarte, :data_updater_plant, :data_updater, :discarded_message],
1776-
%{},
1777-
%{realm: state.realm}
1778-
)
1779-
1780-
{:discard, :capabilities_error, state}
1781-
end
1782-
end
1783-
17841788
defp inflate_purge_properties_payload(<<255, 255, 255, 255, payload::binary>>) do
17851789
_ =
17861790
Logger.debug("Received plaintext purge properties properties payload",

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/queries.ex

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do
2020
alias Astarte.Core.CQLUtils
2121
alias Astarte.Core.Device
22+
alias Astarte.Core.Device.Capabilities
2223
alias Astarte.Core.InterfaceDescriptor
2324
alias Astarte.Core.Mapping
2425
alias Astarte.DataUpdaterPlant.Config
@@ -450,6 +451,53 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do
450451
end
451452
end
452453

454+
def fetch_device_capabilities(db_client, device_id) do
455+
device_capabilities_statement = """
456+
SELECT purge_properties_compression_format
457+
FROM devices
458+
WHERE device_id=:device_id
459+
"""
460+
461+
device_capabilities_query =
462+
DatabaseQuery.new()
463+
|> DatabaseQuery.statement(device_capabilities_statement)
464+
|> DatabaseQuery.put(:device_id, device_id)
465+
466+
with {:ok, result} <- DatabaseQuery.call(db_client, device_capabilities_query) do
467+
capabilities_row =
468+
result
469+
|> DatabaseResult.head()
470+
|> Enum.into(%{})
471+
472+
capabilities =
473+
%Capabilities{}
474+
|> Capabilities.changeset(capabilities_row)
475+
|> Ecto.Changeset.apply_changes()
476+
477+
{:ok, capabilities}
478+
end
479+
end
480+
481+
def set_device_capabilities(
482+
db_client,
483+
device_id,
484+
%Capabilities{purge_properties_compression_format: format} = _capabilities
485+
) do
486+
device_capabilities_statement = """
487+
UPDATE devices
488+
SET purge_properties_compression_format=:format
489+
WHERE device_id=:device_id
490+
"""
491+
492+
device_capabilities_query =
493+
DatabaseQuery.new()
494+
|> DatabaseQuery.statement(device_capabilities_statement)
495+
|> DatabaseQuery.put(:format, format)
496+
|> DatabaseQuery.put(:device_id, device_id)
497+
498+
DatabaseQuery.call!(db_client, device_capabilities_query)
499+
end
500+
453501
def set_device_connected!(db_client, device_id, timestamp_ms, ip_address) do
454502
set_connection_info!(db_client, device_id, timestamp_ms, ip_address)
455503

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/state.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.State do
4747
:discard_messages,
4848
:last_deletion_in_progress_refresh,
4949
:last_datastream_maximum_retention_refresh,
50-
:purge_properties_compression_format
50+
:capabilities
5151
]
5252
end

apps/astarte_data_updater_plant/test/astarte_data_updater_plant/data_updater_test.exs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do
2222

2323
alias Astarte.DataUpdaterPlant.Config
2424
alias Astarte.Core.Device
25+
alias Astarte.Core.Device.Capabilities
2526
alias Astarte.Core.Triggers.SimpleEvents.DeviceConnectedEvent
2627
alias Astarte.Core.Triggers.SimpleEvents.IncomingDataEvent
2728
alias Astarte.Core.Triggers.SimpleEvents.PathRemovedEvent
@@ -1761,8 +1762,8 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do
17611762

17621763
DatabaseTestHelper.insert_device(device_id)
17631764

1764-
assert %State{purge_properties_compression_format: :zlib} =
1765-
dump_state(realm, encoded_device_id)
1765+
state = dump_state(realm, encoded_device_id)
1766+
assert %Capabilities{purge_properties_compression_format: :zlib} = state.capabilities
17661767

17671768
timestamp_us_x_10 = make_timestamp("2025-01-20T14:00:32+00:00")
17681769

@@ -1776,8 +1777,8 @@ defmodule Astarte.DataUpdaterPlant.DataUpdaterTest do
17761777
timestamp_us_x_10
17771778
)
17781779

1779-
assert %State{purge_properties_compression_format: :plaintext} =
1780-
dump_state(realm, encoded_device_id)
1780+
new_state = dump_state(realm, encoded_device_id)
1781+
assert %Capabilities{purge_properties_compression_format: :plaintext} = new_state.capabilities
17811782
end
17821783

17831784
defp retrieve_endpoint_id(client, interface_name, interface_major, path) do

apps/astarte_data_updater_plant/test/support/database_test_helper.exs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,16 @@ defmodule Astarte.DataUpdaterPlant.DatabaseTestHelper do
6363
last_pairing_ip inet,
6464
last_seen_ip inet,
6565
groups map<text, timeuuid>,
66+
purge_properties_compression_format varchar,
6667
6768
PRIMARY KEY (device_id)
6869
);
6970
"""
7071

7172
@insert_device """
72-
INSERT INTO #{CQLUtils.realm_name_to_keyspace_name("autotestrealm", Config.astarte_instance_id!())}.devices (device_id, connected, last_connection, last_disconnection, first_pairing, last_seen_ip, last_pairing_ip, total_received_msgs, total_received_bytes, introspection, groups)
73+
INSERT INTO #{CQLUtils.realm_name_to_keyspace_name("autotestrealm", Config.astarte_instance_id!())}.devices (device_id, connected, last_connection, last_disconnection, first_pairing, last_seen_ip, last_pairing_ip, total_received_msgs, total_received_bytes, introspection, groups, purge_properties_compression_format)
7374
VALUES (:device_id, false, :last_connection, :last_disconnection, :first_pairing,
74-
:last_seen_ip, :last_pairing_ip, :total_received_msgs, :total_received_bytes, :introspection, :groups);
75+
:last_seen_ip, :last_pairing_ip, :total_received_msgs, :total_received_bytes, :introspection, :groups, :purge_properties_compression_format);
7576
"""
7677

7778
@create_interfaces_table """
@@ -803,6 +804,13 @@ defmodule Astarte.DataUpdaterPlant.DatabaseTestHelper do
803804
groups = Keyword.get(opts, :groups, [])
804805
groups_map = for group <- groups, do: {group, UUID.uuid1()}
805806

807+
compression_format =
808+
case Keyword.get(opts, :purge_properties_compression_format) do
809+
nil -> "zlib"
810+
:zlib -> "zlib"
811+
:plaintext -> "plaintext"
812+
end
813+
806814
query =
807815
DatabaseQuery.new()
808816
|> DatabaseQuery.statement(@insert_device)
@@ -816,6 +824,7 @@ defmodule Astarte.DataUpdaterPlant.DatabaseTestHelper do
816824
|> DatabaseQuery.put(:total_received_bytes, total_received_bytes)
817825
|> DatabaseQuery.put(:introspection, introspection)
818826
|> DatabaseQuery.put(:groups, groups_map)
827+
|> DatabaseQuery.put(:purge_properties_compression_format, compression_format)
819828

820829
DatabaseQuery.call(client, query)
821830
end
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
ALTER TABLE devices
2+
ADD (
3+
purge_properties_compression_format tinyint
4+
);

0 commit comments

Comments
 (0)