Skip to content

Commit bda93d0

Browse files
committed
DUP: move Queries.register_device_with_interface to Ecto + a small TODO + yeet CQEx
Signed-off-by: Arnaldo Cesco <arnaldo.cesco@secomind.com>
1 parent 931fb8d commit bda93d0

File tree

2 files changed

+17
-30
lines changed
  • apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater

2 files changed

+17
-30
lines changed

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/impl.ex

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
3131
alias Astarte.Core.Triggers.SimpleTriggersProtobuf.Utils, as: SimpleTriggersProtobufUtils
3232
alias Astarte.Core.Triggers.SimpleTriggersProtobuf.AMQPTriggerTarget
3333
alias Astarte.DataAccess.Data
34-
alias Astarte.DataAccess.Database
3534
alias Astarte.DataAccess.Device, as: DeviceQueries
3635
alias Astarte.DataAccess.Interface, as: InterfaceQueries
3736
alias Astarte.DataAccess.Mappings
@@ -1228,8 +1227,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
12281227
end
12291228

12301229
def process_introspection(state, new_introspection_list, payload, message_id, timestamp) do
1231-
{:ok, db_client} = Database.connect(realm: state.realm)
1232-
12331230
new_state = execute_time_based_actions(state, timestamp)
12341231

12351232
timestamp_ms = div(timestamp, 10_000)
@@ -1289,7 +1286,7 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Impl do
12891286
:ok =
12901287
if interface_major == 0 do
12911288
Queries.register_device_with_interface(
1292-
db_client,
1289+
realm,
12931290
state.device_id,
12941291
interface_name,
12951292
0

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_updater/queries.ex

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do
2828
alias Astarte.DataUpdaterPlant.DataUpdater.IndividualProperty
2929
alias Astarte.DataUpdaterPlant.DataUpdater.KvStore
3030
alias Astarte.DataUpdaterPlant.DataUpdater.Realm
31-
alias CQEx.Query, as: DatabaseQuery
3231
alias Astarte.DataUpdaterPlant.Repo
3332
import Ecto.Query
3433
require Logger
@@ -602,34 +601,25 @@ defmodule Astarte.DataUpdaterPlant.DataUpdater.Queries do
602601
end
603602
end
604603

605-
def register_device_with_interface(db_client, device_id, interface_name, interface_major) do
606-
key_insert_statement = """
607-
INSERT INTO kv_store (group, key)
608-
VALUES (:group, :key)
609-
"""
610-
611-
major_str = "v#{Integer.to_string(interface_major)}"
604+
def register_device_with_interface(realm, device_id, interface_name, interface_major) do
605+
keyspace_name = Realm.keyspace_name(realm)
612606
encoded_device_id = CoreDevice.encode_device_id(device_id)
613607

614-
insert_device_by_interface_query =
615-
DatabaseQuery.new()
616-
|> DatabaseQuery.statement(key_insert_statement)
617-
|> DatabaseQuery.put(:group, "devices-by-interface-#{interface_name}-#{major_str}")
618-
|> DatabaseQuery.put(:key, encoded_device_id)
619-
|> DatabaseQuery.consistency(:each_quorum)
620-
621-
insert_to_with_data_on_interface =
622-
DatabaseQuery.new()
623-
|> DatabaseQuery.statement(key_insert_statement)
624-
|> DatabaseQuery.put(
625-
:group,
626-
"devices-with-data-on-interface-#{interface_name}-#{major_str}"
627-
)
628-
|> DatabaseQuery.put(:key, encoded_device_id)
629-
|> DatabaseQuery.consistency(:each_quorum)
608+
devices_by_interface = %{
609+
"group" => "devices-by-interface-#{interface_name}-v#{interface_major}",
610+
"key" => encoded_device_id
611+
}
612+
613+
devices_on_interface = %{
614+
"group" => "devices-with-data-on-interface-#{interface_name}-v#{interface_major}",
615+
"key" => encoded_device_id
616+
}
617+
618+
opts = [prefix: keyspace_name, consistency: :each_quorum]
630619

631-
with {:ok, _result} <- DatabaseQuery.call(db_client, insert_device_by_interface_query),
632-
{:ok, _result} <- DatabaseQuery.call(db_client, insert_to_with_data_on_interface) do
620+
# TODO: what about a batch here?
621+
with {n, _} when is_integer(n) <- Repo.safe_insert(KvStore, devices_by_interface, opts),
622+
{m, _} when is_integer(m) <- Repo.safe_insert(KvStore, devices_on_interface, opts) do
633623
:ok
634624
else
635625
{:error, reason} ->

0 commit comments

Comments
 (0)