Skip to content

Commit ec96d57

Browse files
committed
DUP: replace CQEx with Ecto for all SELECT queries
Signed-off-by: Arnaldo Cesco <arnaldo.cesco@secomind.com>
1 parent 774ef2c commit ec96d57

File tree

4 files changed

+318
-358
lines changed

4 files changed

+318
-358
lines changed

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater.ex

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater do
2121
alias Astarte.DataUpdaterPlant.AMQPDataConsumer
2222
alias Astarte.DataUpdaterPlant.DataUpdater.Server
2323
alias Astarte.DataUpdaterPlant.DataUpdater.Queries
24-
alias Astarte.DataAccess.Database
2524
alias Astarte.DataUpdaterPlant.MessageTracker
2625
require Logger
2726

@@ -235,8 +234,8 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater do
235234

236235
defp verify_device_exists(realm_name, encoded_device_id) do
237236
with {:ok, decoded_device_id} <- Device.decode_device_id(encoded_device_id),
238-
{:ok, client} <- Database.connect(realm: realm_name),
239-
{:ok, exists?} <- Queries.check_device_exists(client, decoded_device_id) do
237+
# TODO this could be a bang!
238+
{:ok, exists?} <- Queries.check_device_exists(realm_name, decoded_device_id) do
240239
if exists? do
241240
:ok
242241
else

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/impl.ex

Lines changed: 68 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,11 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
8787
Logger.metadata(realm: realm, device_id: encoded_device_id)
8888
Logger.info("Created device process.", tag: "device_process_created")
8989

90-
{:ok, db_client} = Database.connect(realm: new_state.realm)
91-
9290
stats_and_introspection =
93-
Queries.retrieve_device_stats_and_introspection!(db_client, device_id)
91+
Queries.retrieve_device_stats_and_introspection!(new_state.realm, device_id)
9492

95-
{:ok, ttl} = Queries.fetch_datastream_maximum_storage_retention(db_client)
93+
# TODO this could be a bang!
94+
{:ok, ttl} = Queries.fetch_datastream_maximum_storage_retention(new_state.realm)
9695

9796
Map.merge(new_state, stats_and_introspection)
9897
|> Map.put(:datastream_maximum_storage_retention, ttl)
@@ -175,7 +174,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
175174

176175
new_state = execute_time_based_actions(state, timestamp, db_client)
177176

178-
Queries.maybe_refresh_device_connected!(db_client, new_state.device_id)
177+
Queries.maybe_refresh_device_connected!(db_client, new_state.realm, new_state.device_id)
179178

180179
MessageTracker.ack_delivery(new_state.message_tracker, message_id)
181180
Logger.info("Device heartbeat.", tag: "device_heartbeat")
@@ -506,7 +505,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
506505
:ok <- validate_path(path),
507506
maybe_descriptor <- Map.get(new_state.interfaces, interface),
508507
{:ok, interface_descriptor, new_state} <-
509-
maybe_handle_cache_miss(maybe_descriptor, interface, new_state, db_client),
508+
maybe_handle_cache_miss(maybe_descriptor, interface, new_state),
510509
:ok <- can_write_on_interface?(interface_descriptor),
511510
interface_id <- interface_descriptor.interface_id,
512511
{:ok, endpoint} <- resolve_path(path, interface_descriptor, new_state.mappings),
@@ -601,8 +600,9 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
601600
:ok
602601

603602
is_still_valid?(
603+
# TODO this is now a bang!
604604
Queries.fetch_path_expiry(
605-
db_client,
605+
new_state.realm,
606606
new_state.device_id,
607607
interface_descriptor,
608608
endpoint,
@@ -660,6 +660,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
660660
insert_result =
661661
Queries.insert_value_into_db(
662662
db_client,
663+
new_state.realm,
663664
new_state.device_id,
664665
interface_descriptor,
665666
endpoint,
@@ -1255,11 +1256,11 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
12551256
end)
12561257

12571258
any_interface_id = SimpleTriggersProtobufUtils.any_interface_object_id()
1259+
realm = new_state.realm
12581260

12591261
%{device_triggers: device_triggers} =
1260-
populate_triggers_for_object!(new_state, db_client, any_interface_id, :any_interface)
1262+
populate_triggers_for_object!(state, any_interface_id, :any_interface)
12611263

1262-
realm = new_state.realm
12631264
device_id_string = Device.encode_device_id(new_state.device_id)
12641265

12651266
on_introspection_target_with_policy_list =
@@ -1390,7 +1391,8 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
13901391
end
13911392
end)
13921393

1393-
{:ok, old_minors} = Queries.fetch_device_introspection_minors(db_client, state.device_id)
1394+
# TODO this could be a bang!
1395+
{:ok, old_minors} = Queries.fetch_device_introspection_minors(state.realm, state.device_id)
13941396

13951397
readded_introspection = Enum.to_list(added_interfaces)
13961398

@@ -1534,8 +1536,8 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
15341536

15351537
new_state = execute_time_based_actions(state, timestamp, db_client)
15361538

1537-
with :ok <- send_control_consumer_properties(state, db_client),
1538-
{:ok, new_state} <- resend_all_properties(state, db_client),
1539+
with :ok <- send_control_consumer_properties(state),
1540+
{:ok, new_state} <- resend_all_properties(state),
15391541
:ok <- Queries.set_pending_empty_cache(db_client, new_state.device_id, false) do
15401542
MessageTracker.ack_delivery(state.message_tracker, message_id)
15411543

@@ -1852,17 +1854,18 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
18521854
{:ok, %{state | device_triggers: updated_device_triggers}}
18531855
end
18541856

1855-
defp reload_groups_on_expiry(state, timestamp, db_client) do
1857+
defp reload_groups_on_expiry(state, timestamp) do
18561858
if state.last_groups_refresh + @groups_lifespan_decimicroseconds <= timestamp do
1857-
{:ok, groups} = Queries.get_device_groups(db_client, state.device_id)
1859+
# TODO this could be a bang!
1860+
{:ok, groups} = Queries.get_device_groups(state.realm, state.device_id)
18581861

18591862
%{state | last_groups_refresh: timestamp, groups: groups}
18601863
else
18611864
state
18621865
end
18631866
end
18641867

1865-
defp reload_device_triggers_on_expiry(state, timestamp, db_client) do
1868+
defp reload_device_triggers_on_expiry(state, timestamp) do
18661869
if state.last_device_triggers_refresh + @device_triggers_lifespan_decimicroseconds <=
18671870
timestamp do
18681871
any_device_id = SimpleTriggersProtobufUtils.any_device_object_id()
@@ -1879,31 +1882,30 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
18791882
|> Map.put(:last_device_triggers_refresh, timestamp)
18801883
|> Map.put(:device_triggers, %{})
18811884
|> forget_any_interface_data_triggers()
1882-
|> populate_triggers_for_object!(db_client, any_device_id, :any_device)
1883-
|> populate_triggers_for_object!(db_client, state.device_id, :device)
1884-
|> populate_triggers_for_object!(db_client, any_interface_id, :any_interface)
1885+
|> populate_triggers_for_object!(any_device_id, :any_device)
1886+
|> populate_triggers_for_object!(state.device_id, :device)
1887+
|> populate_triggers_for_object!(any_interface_id, :any_interface)
18851888
|> populate_triggers_for_object!(
1886-
db_client,
18871889
device_and_any_interface_object_id,
18881890
:device_and_any_interface
18891891
)
1890-
|> populate_group_device_triggers!(db_client)
1891-
|> populate_group_and_any_interface_triggers!(db_client)
1892+
|> populate_group_device_triggers!()
1893+
|> populate_group_and_any_interface_triggers!()
18921894
else
18931895
state
18941896
end
18951897
end
18961898

1897-
defp populate_group_device_triggers!(state, db_client) do
1899+
defp populate_group_device_triggers!(state) do
18981900
Enum.map(state.groups, &SimpleTriggersProtobufUtils.get_group_object_id/1)
1899-
|> Enum.reduce(state, &populate_triggers_for_object!(&2, db_client, &1, :group))
1901+
|> Enum.reduce(state, &populate_triggers_for_object!(&2, &1, :group))
19001902
end
19011903

1902-
defp populate_group_and_any_interface_triggers!(state, db_client) do
1904+
defp populate_group_and_any_interface_triggers!(state) do
19031905
Enum.map(state.groups, &SimpleTriggersProtobufUtils.get_group_and_any_interface_object_id/1)
19041906
|> Enum.reduce(
19051907
state,
1906-
&populate_triggers_for_object!(&2, db_client, &1, :group_and_any_interface)
1908+
&populate_triggers_for_object!(&2, &1, :group_and_any_interface)
19071909
)
19081910
end
19091911

@@ -1919,11 +1921,11 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
19191921

19201922
state
19211923
|> Map.put(:last_seen_message, timestamp)
1922-
|> reload_groups_on_expiry(timestamp, db_client)
1924+
|> reload_groups_on_expiry(timestamp)
19231925
|> purge_expired_interfaces(timestamp)
1924-
|> reload_device_triggers_on_expiry(timestamp, db_client)
1926+
|> reload_device_triggers_on_expiry(timestamp)
19251927
|> reload_device_deletion_status_on_expiry(timestamp, db_client)
1926-
|> reload_datastream_maximum_storage_retention_on_expiry(timestamp, db_client)
1928+
|> reload_datastream_maximum_storage_retention_on_expiry(timestamp)
19271929
end
19281930

19291931
defp reload_device_deletion_status_on_expiry(state, timestamp, db_client) do
@@ -1936,11 +1938,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
19361938
end
19371939
end
19381940

1939-
defp reload_datastream_maximum_storage_retention_on_expiry(state, timestamp, db_client) do
1941+
defp reload_datastream_maximum_storage_retention_on_expiry(state, timestamp) do
19401942
if state.last_datastream_maximum_retention_refresh +
19411943
@datastream_maximum_retention_refresh_lifespan_decimicroseconds <=
19421944
timestamp do
1943-
case Queries.fetch_datastream_maximum_storage_retention(db_client) do
1945+
# TODO this could be a bang!
1946+
case Queries.fetch_datastream_maximum_storage_retention(state.realm) do
19441947
{:ok, ttl} ->
19451948
%State{
19461949
state
@@ -2081,7 +2084,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
20812084
}
20822085
end
20832086

2084-
defp maybe_handle_cache_miss(nil, interface_name, state, db_client) do
2087+
defp maybe_handle_cache_miss(nil, interface_name, state) do
20852088
with {:ok, major_version} <-
20862089
DeviceQueries.interface_version(state.realm, state.device_id, interface_name),
20872090
{:ok, interface_row} <-
@@ -2108,7 +2111,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
21082111
new_state <-
21092112
populate_triggers_for_object!(
21102113
new_state,
2111-
db_client,
21122114
interface_descriptor.interface_id,
21132115
:interface
21142116
),
@@ -2120,14 +2122,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
21202122
new_state =
21212123
populate_triggers_for_object!(
21222124
new_state,
2123-
db_client,
21242125
device_and_interface_object_id,
21252126
:device_and_interface
21262127
),
21272128
new_state =
21282129
populate_triggers_for_group_and_interface!(
21292130
new_state,
2130-
db_client,
21312131
interface_id
21322132
) do
21332133
# TODO: make everything with-friendly
@@ -2152,18 +2152,18 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
21522152
end
21532153
end
21542154

2155-
defp maybe_handle_cache_miss(interface_descriptor, _interface_name, state, _db_client) do
2155+
defp maybe_handle_cache_miss(interface_descriptor, _interface_name, state) do
21562156
{:ok, interface_descriptor, state}
21572157
end
21582158

2159-
defp populate_triggers_for_group_and_interface!(state, db_client, interface_id) do
2159+
defp populate_triggers_for_group_and_interface!(state, interface_id) do
21602160
Enum.map(
21612161
state.groups,
21622162
&SimpleTriggersProtobufUtils.get_group_and_interface_object_id(&1, interface_id)
21632163
)
21642164
|> Enum.reduce(
21652165
state,
2166-
&populate_triggers_for_object!(&2, db_client, &1, :group_and_interface)
2166+
&populate_triggers_for_object!(&2, &1, :group_and_interface)
21672167
)
21682168
end
21692169

@@ -2186,8 +2186,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
21862186
maybe_handle_cache_miss(
21872187
Map.get(state.interfaces, interface),
21882188
interface,
2189-
state,
2190-
db_client
2189+
state
21912190
) do
21922191
cond do
21932192
interface_descriptor.type != :properties ->
@@ -2210,7 +2209,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
22102209
each_interface_mapping(state.mappings, interface_descriptor, fn mapping ->
22112210
endpoint_id = mapping.endpoint_id
22122211

2213-
Queries.query_all_endpoint_paths!(db, state.device_id, interface_descriptor, endpoint_id)
2212+
Queries.all_device_owned_property_endpoint_paths!(
2213+
state.realm,
2214+
state.device_id,
2215+
interface_descriptor,
2216+
endpoint_id
2217+
)
22142218
|> Enum.each(fn path_row ->
22152219
path = path_row[:path]
22162220

@@ -2400,25 +2404,24 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
24002404
end
24012405
end
24022406

2403-
defp populate_triggers_for_object!(state, client, object_id, object_type) do
2407+
defp populate_triggers_for_object!(state, object_id, object_type) do
2408+
%{realm: realm} = state
2409+
24042410
object_type_int = SimpleTriggersProtobufUtils.object_type_to_int!(object_type)
24052411

2406-
simple_triggers_rows = Queries.query_simple_triggers!(client, object_id, object_type_int)
2412+
simple_triggers = Queries.query_simple_triggers!(realm, object_id, object_type_int)
24072413

24082414
new_state =
2409-
Enum.reduce(simple_triggers_rows, state, fn row, state_acc ->
2410-
trigger_id = row[:simple_trigger_id]
2411-
parent_trigger_id = row[:parent_trigger_id]
2412-
2413-
simple_trigger =
2414-
SimpleTriggersProtobufUtils.deserialize_simple_trigger(row[:trigger_data])
2415+
Enum.reduce(simple_triggers, state, fn simple_trigger, state_acc ->
2416+
trigger_data =
2417+
SimpleTriggersProtobufUtils.deserialize_simple_trigger(simple_trigger.trigger_data)
24152418

24162419
trigger_target =
2417-
SimpleTriggersProtobufUtils.deserialize_trigger_target(row[:trigger_target])
2418-
|> Map.put(:simple_trigger_id, trigger_id)
2419-
|> Map.put(:parent_trigger_id, parent_trigger_id)
2420+
SimpleTriggersProtobufUtils.deserialize_trigger_target(simple_trigger.trigger_target)
2421+
|> Map.put(:simple_trigger_id, simple_trigger.simple_trigger_id)
2422+
|> Map.put(:parent_trigger_id, simple_trigger.parent_trigger_id)
24202423

2421-
load_trigger(state_acc, simple_trigger, trigger_target)
2424+
load_trigger(state_acc, trigger_data, trigger_target)
24222425
end)
24232426

24242427
Enum.reduce(new_state.volatile_triggers, new_state, fn {{obj_id, obj_type},
@@ -2702,16 +2705,16 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
27022705
end)
27032706
end
27042707

2705-
defp send_control_consumer_properties(state, db_client) do
2708+
defp send_control_consumer_properties(state) do
27062709
Logger.debug("Device introspection: #{inspect(state.introspection)}.")
27072710

27082711
abs_paths_list =
27092712
Enum.flat_map(state.introspection, fn {interface, _} ->
27102713
descriptor = Map.get(state.interfaces, interface)
27112714

2712-
case maybe_handle_cache_miss(descriptor, interface, state, db_client) do
2715+
case maybe_handle_cache_miss(descriptor, interface, state) do
27132716
{:ok, interface_descriptor, new_state} ->
2714-
gather_interface_properties(new_state, db_client, interface_descriptor)
2717+
gather_interface_property_paths(new_state.realm, interface_descriptor)
27152718

27162719
{:error, :interface_loading_failed} ->
27172720
Logger.warning("Failed #{interface} interface loading.")
@@ -2726,32 +2729,31 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
27262729
end
27272730
end
27282731

2729-
defp gather_interface_properties(
2730-
%State{device_id: device_id, mappings: mappings} = _state,
2731-
db_client,
2732+
defp gather_interface_property_paths(
2733+
%State{device_id: device_id, mappings: mappings, realm: realm} = _state,
27322734
%InterfaceDescriptor{type: :properties, ownership: :server} = interface_descriptor
27332735
) do
27342736
reduce_interface_mapping(mappings, interface_descriptor, [], fn mapping, i_acc ->
2735-
Queries.retrieve_endpoint_values(db_client, device_id, interface_descriptor, mapping)
2736-
|> Enum.reduce(i_acc, fn [{:path, path}, {_, _value}], acc ->
2737+
Queries.retrieve_property_values(realm, device_id, interface_descriptor, mapping)
2738+
|> Enum.reduce(i_acc, fn [{:path, path}, {_, _}], acc ->
27372739
["#{interface_descriptor.name}#{path}" | acc]
27382740
end)
27392741
end)
27402742
end
27412743

2742-
defp gather_interface_properties(_state, _db, %InterfaceDescriptor{} = _descriptor) do
2744+
defp gather_interface_property_paths(_state, %InterfaceDescriptor{} = _descriptor) do
27432745
[]
27442746
end
27452747

2746-
defp resend_all_properties(state, db_client) do
2748+
defp resend_all_properties(state) do
27472749
Logger.debug("Device introspection: #{inspect(state.introspection)}")
27482750

27492751
Enum.reduce_while(state.introspection, {:ok, state}, fn {interface, _}, {:ok, state_acc} ->
27502752
maybe_descriptor = Map.get(state_acc.interfaces, interface)
27512753

27522754
with {:ok, interface_descriptor, new_state} <-
2753-
maybe_handle_cache_miss(maybe_descriptor, interface, state_acc, db_client),
2754-
:ok <- resend_all_interface_properties(new_state, db_client, interface_descriptor) do
2755+
maybe_handle_cache_miss(maybe_descriptor, interface, state_acc),
2756+
:ok <- resend_all_interface_properties(new_state, interface_descriptor) do
27552757
{:cont, {:ok, new_state}}
27562758
else
27572759
{:error, :interface_loading_failed} ->
@@ -2766,13 +2768,12 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
27662768

27672769
defp resend_all_interface_properties(
27682770
%State{realm: realm, device_id: device_id, mappings: mappings} = _state,
2769-
db_client,
27702771
%InterfaceDescriptor{type: :properties, ownership: :server} = interface_descriptor
27712772
) do
27722773
encoded_device_id = Device.encode_device_id(device_id)
27732774

27742775
each_interface_mapping(mappings, interface_descriptor, fn mapping ->
2775-
Queries.retrieve_endpoint_values(db_client, device_id, interface_descriptor, mapping)
2776+
Queries.retrieve_property_values(realm, device_id, interface_descriptor, mapping)
27762777
|> Enum.reduce_while(:ok, fn [{:path, path}, {_, value}], _acc ->
27772778
case send_value(realm, encoded_device_id, interface_descriptor.name, path, value) do
27782779
{:ok, _bytes} ->
@@ -2786,7 +2787,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
27862787
end)
27872788
end
27882789

2789-
defp resend_all_interface_properties(_state, _db, %InterfaceDescriptor{} = _descriptor) do
2790+
defp resend_all_interface_properties(_state, %InterfaceDescriptor{} = _descriptor) do
27902791
:ok
27912792
end
27922793

0 commit comments

Comments
 (0)