Skip to content

Commit acdcf58

Browse files
authored
Merge pull request #1100 from davidebriani/port-pairing-edit-queries-to-ecto
Pairing: port remaining CQEx queries to Ecto
2 parents e29c64f + 158e894 commit acdcf58

File tree

8 files changed

+83
-214
lines changed

8 files changed

+83
-214
lines changed

apps/astarte_pairing/lib/astarte_pairing/engine.ex

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ defmodule Astarte.Pairing.Engine do
2727
alias Astarte.Pairing.Config
2828
alias Astarte.Pairing.CredentialsSecret
2929
alias Astarte.Pairing.Queries
30-
alias CQEx.Client
31-
alias Astarte.Core.CQLUtils
3230

3331
require Logger
3432

@@ -71,19 +69,9 @@ defmodule Astarte.Pairing.Engine do
7169
)
7270

7371
:telemetry.execute([:astarte, :pairing, :get_credentials], %{}, %{realm: realm})
74-
keyspace_name = CQLUtils.realm_name_to_keyspace_name(realm, Config.astarte_instance_id!())
75-
76-
cqex_options =
77-
Config.cqex_options!()
78-
|> Keyword.put(:keyspace, keyspace_name)
7972

8073
with {:ok, device_id} <- Device.decode_device_id(hardware_id, allow_extended_id: true),
8174
{:ok, ip_tuple} <- parse_ip(device_ip),
82-
{:ok, client} <-
83-
Client.new(
84-
Config.cassandra_node!(),
85-
cqex_options
86-
),
8775
{:ok, device} <- Queries.fetch_device(realm, device_id),
8876
{:authorized?, true} <-
8977
{:authorized?, CredentialsSecret.verify(credentials_secret, device.credentials_secret)},
@@ -93,10 +81,10 @@ defmodule Astarte.Pairing.Engine do
9381
encoded_device_id <- Device.encode_device_id(device_id),
9482
{:ok, %{cert: cert, aki: _aki, serial: _serial} = cert_data} <-
9583
CFSSLCredentials.get_certificate(csr, realm, encoded_device_id),
96-
:ok <-
84+
{:ok, _device} <-
9785
Queries.update_device_after_credentials_request(
98-
client,
99-
device_id,
86+
realm,
87+
device,
10088
cert_data,
10189
ip_tuple,
10290
device.first_credentials_request
@@ -109,9 +97,6 @@ defmodule Astarte.Pairing.Engine do
10997
{:credentials_inhibited?, true} ->
11098
{:error, :credentials_request_inhibited}
11199

112-
{:error, :shutdown} ->
113-
{:error, :realm_not_found}
114-
115100
{:error, reason} ->
116101
{:error, reason}
117102
end
@@ -169,7 +154,8 @@ defmodule Astarte.Pairing.Engine do
169154
:ok <- verify_can_register_device(realm, device_id),
170155
credentials_secret <- CredentialsSecret.generate(),
171156
secret_hash <- CredentialsSecret.hash(credentials_secret),
172-
:ok <- Queries.register_device(realm, device_id, hardware_id, secret_hash, opts) do
157+
{:ok, _device} <-
158+
Queries.register_device(realm, device_id, hardware_id, secret_hash, opts) do
173159
{:ok, credentials_secret}
174160
else
175161
{:error, :shutdown} ->

apps/astarte_pairing/lib/astarte_pairing/queries.ex

Lines changed: 48 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ defmodule Astarte.Pairing.Queries do
2121
This module is responsible for the interaction with the database.
2222
"""
2323

24-
alias CQEx.Client
25-
alias CQEx.Query
2624
alias Astarte.Core.CQLUtils
2725
alias Astarte.Pairing.Config
2826
alias Astarte.Pairing.Astarte.Realm
@@ -32,7 +30,6 @@ defmodule Astarte.Pairing.Queries do
3230
require Logger
3331
import Ecto.Query
3432

35-
@protocol_revision 1
3633
@keyspace_does_not_exist_regex ~r/Keyspace (.*) does not exist/
3734

3835
def get_agent_public_key_pems(realm_name) do
@@ -104,9 +101,8 @@ defmodule Astarte.Pairing.Queries do
104101

105102
do_register_unconfirmed_device(
106103
realm_name,
107-
device_id,
104+
device,
108105
credentials_secret,
109-
device.first_registration,
110106
opts
111107
)
112108
else
@@ -123,14 +119,10 @@ defmodule Astarte.Pairing.Queries do
123119
end
124120

125121
def unregister_device(realm_name, device_id) do
126-
with :ok <- verify_already_registered_device(realm_name, device_id),
127-
:ok <- do_unregister_device(realm_name, device_id) do
122+
with {:ok, device} <- fetch_device(realm_name, device_id),
123+
{:ok, _device} <- do_unregister_device(realm_name, device) do
128124
:ok
129125
else
130-
%{acc: _acc, msg: msg} ->
131-
Logger.warning("DB error: #{inspect(msg)}")
132-
{:error, :database_error}
133-
134126
{:error, reason} ->
135127
Logger.warning("Unregister error: #{inspect(reason)}")
136128
{:error, reason}
@@ -147,44 +139,16 @@ defmodule Astarte.Pairing.Queries do
147139
end
148140
end
149141

150-
defp verify_already_registered_device(realm_name, device_id) do
151-
case fetch_device(realm_name, device_id) do
152-
{:ok, _device} -> :ok
153-
{:error, :device_not_found} -> {:error, :device_not_registered}
154-
{:error, reason} -> {:error, reason}
155-
end
156-
end
157-
158-
defp do_unregister_device(realm_name, device_id) do
159-
statement = """
160-
INSERT INTO devices
161-
(device_id, first_credentials_request, credentials_secret)
162-
VALUES (:device_id, :first_credentials_request, :credentials_secret)
163-
"""
164-
165-
query =
166-
Query.new()
167-
|> Query.statement(statement)
168-
|> Query.put(:device_id, device_id)
169-
|> Query.put(:first_credentials_request, nil)
170-
|> Query.put(:credentials_secret, nil)
171-
|> Query.consistency(:quorum)
172-
142+
defp do_unregister_device(realm_name, %Device{} = device) do
173143
keyspace_name =
174144
CQLUtils.realm_name_to_keyspace_name(realm_name, Config.astarte_instance_id!())
175145

176-
cqex_options =
177-
Config.cqex_options!()
178-
|> Keyword.put(:keyspace, keyspace_name)
179-
180-
with {:ok, client} <-
181-
CQEx.Client.new(
182-
Config.cassandra_node!(),
183-
cqex_options
184-
),
185-
{:ok, _res} <- Query.call(client, query) do
186-
:ok
187-
end
146+
device
147+
|> Ecto.Changeset.change(
148+
first_credentials_request: nil,
149+
credentials_secret: nil
150+
)
151+
|> Repo.update(prefix: keyspace_name, consistency: :quorum)
188152
end
189153

190154
def fetch_device(realm_name, device_id) do
@@ -202,54 +166,36 @@ defmodule Astarte.Pairing.Queries do
202166
end
203167
end
204168

205-
def update_device_after_credentials_request(client, device_id, cert_data, device_ip, nil) do
169+
def update_device_after_credentials_request(realm_name, device, cert_data, device_ip, nil) do
206170
first_credentials_request_timestamp = DateTime.utc_now()
207171

208172
update_device_after_credentials_request(
209-
client,
210-
device_id,
173+
realm_name,
174+
device,
211175
cert_data,
212176
device_ip,
213177
first_credentials_request_timestamp
214178
)
215179
end
216180

217181
def update_device_after_credentials_request(
218-
client,
219-
device_id,
182+
realm_name,
183+
%Device{} = device,
220184
%{serial: serial, aki: aki} = _cert_data,
221185
device_ip,
222186
%DateTime{} = first_credentials_request_timestamp
223187
) do
224-
statement = """
225-
UPDATE devices
226-
SET cert_aki=:cert_aki, cert_serial=:cert_serial, last_credentials_request_ip=:last_credentials_request_ip,
227-
first_credentials_request=:first_credentials_request
228-
WHERE device_id=:device_id
229-
"""
230-
231-
query =
232-
Query.new()
233-
|> Query.statement(statement)
234-
|> Query.put(:device_id, device_id)
235-
|> Query.put(:cert_aki, aki)
236-
|> Query.put(:cert_serial, serial)
237-
|> Query.put(:last_credentials_request_ip, device_ip)
238-
|> Query.put(
239-
:first_credentials_request,
240-
first_credentials_request_timestamp |> DateTime.to_unix(:millisecond)
241-
)
242-
|> Query.put(:protocol_revision, @protocol_revision)
243-
|> Query.consistency(:quorum)
244-
245-
case Query.call(client, query) do
246-
{:ok, _res} ->
247-
:ok
188+
keyspace_name =
189+
CQLUtils.realm_name_to_keyspace_name(realm_name, Config.astarte_instance_id!())
248190

249-
error ->
250-
Logger.warning("DB error: #{inspect(error)}")
251-
{:error, :database_error}
252-
end
191+
device
192+
|> Ecto.Changeset.change(%{
193+
cert_aki: aki,
194+
cert_serial: serial,
195+
last_credentials_request_ip: device_ip,
196+
first_credentials_request: first_credentials_request_timestamp
197+
})
198+
|> Repo.update(prefix: keyspace_name, consistency: :quorum)
253199
end
254200

255201
def fetch_device_registration_limit(realm_name) do
@@ -300,113 +246,52 @@ defmodule Astarte.Pairing.Queries do
300246
%DateTime{} = registration_timestamp,
301247
opts
302248
) do
303-
statement = """
304-
INSERT INTO devices
305-
(device_id, first_registration, credentials_secret, inhibit_credentials_request,
306-
protocol_revision, total_received_bytes, total_received_msgs, introspection,
307-
introspection_minor)
308-
VALUES
309-
(:device_id, :first_registration, :credentials_secret, :inhibit_credentials_request,
310-
:protocol_revision, :total_received_bytes, :total_received_msgs, :introspection,
311-
:introspection_minor)
312-
"""
313-
314249
{introspection, introspection_minor} =
315250
opts
316251
|> Keyword.get(:initial_introspection, [])
317252
|> build_initial_introspection_maps()
318253

319-
query =
320-
Query.new()
321-
|> Query.statement(statement)
322-
|> Query.put(:device_id, device_id)
323-
|> Query.put(:first_registration, registration_timestamp |> DateTime.to_unix(:millisecond))
324-
|> Query.put(:credentials_secret, credentials_secret)
325-
|> Query.put(:inhibit_credentials_request, false)
326-
|> Query.put(:protocol_revision, 0)
327-
|> Query.put(:total_received_bytes, 0)
328-
|> Query.put(:total_received_msgs, 0)
329-
|> Query.put(:introspection, introspection)
330-
|> Query.put(:introspection_minor, introspection_minor)
331-
|> Query.consistency(:quorum)
332-
333254
keyspace_name =
334255
CQLUtils.realm_name_to_keyspace_name(realm_name, Config.astarte_instance_id!())
335256

336-
cqex_options =
337-
Config.cqex_options!()
338-
|> Keyword.put(:keyspace, keyspace_name)
339-
340-
with {:ok, client} <-
341-
Client.new(
342-
Config.cassandra_node!(),
343-
cqex_options
344-
),
345-
{:ok, _res} <- Query.call(client, query) do
346-
:ok
347-
else
348-
error ->
349-
Logger.warning("DB error: #{inspect(error)}")
350-
{:error, :database_error}
351-
end
257+
%Device{}
258+
|> Ecto.Changeset.change(%{
259+
device_id: device_id,
260+
first_registration: registration_timestamp,
261+
credentials_secret: credentials_secret,
262+
inhibit_credentials_request: false,
263+
protocol_revision: 0,
264+
total_received_bytes: 0,
265+
total_received_msgs: 0,
266+
introspection: introspection,
267+
introspection_minor: introspection_minor
268+
})
269+
|> Repo.insert(prefix: keyspace_name, consistency: :quorum)
352270
end
353271

354272
defp do_register_unconfirmed_device(
355273
realm_name,
356-
device_id,
274+
%Device{} = device,
357275
credentials_secret,
358-
%DateTime{} = registration_timestamp,
359276
opts
360277
) do
361-
statement = """
362-
UPDATE devices
363-
SET
364-
first_registration = :first_registration,
365-
credentials_secret = :credentials_secret,
366-
inhibit_credentials_request = :inhibit_credentials_request,
367-
protocol_revision = :protocol_revision,
368-
introspection = :introspection,
369-
introspection_minor = :introspection_minor
370-
371-
WHERE device_id = :device_id
372-
"""
373-
374278
{introspection, introspection_minor} =
375279
opts
376280
|> Keyword.get(:initial_introspection, [])
377281
|> build_initial_introspection_maps()
378282

379-
query =
380-
Query.new()
381-
|> Query.statement(statement)
382-
|> Query.put(:device_id, device_id)
383-
|> Query.put(:first_registration, registration_timestamp |> DateTime.to_unix(:millisecond))
384-
|> Query.put(:credentials_secret, credentials_secret)
385-
|> Query.put(:inhibit_credentials_request, false)
386-
|> Query.put(:protocol_revision, 0)
387-
|> Query.put(:introspection, introspection)
388-
|> Query.put(:introspection_minor, introspection_minor)
389-
|> Query.consistency(:quorum)
390-
391283
keyspace_name =
392284
CQLUtils.realm_name_to_keyspace_name(realm_name, Config.astarte_instance_id!())
393285

394-
cqex_options =
395-
Config.cqex_options!()
396-
|> Keyword.put(:keyspace, keyspace_name)
397-
398-
with {:ok, client} <-
399-
Client.new(
400-
Config.cassandra_node!(),
401-
cqex_options
402-
),
403-
{:ok, _res} <- Query.call(client, query) do
404-
:ok
405-
else
406-
error ->
407-
Logger.warning("DB error: #{inspect(error)}")
408-
{:error, :database_error}
409-
end
286+
device
287+
|> Ecto.Changeset.change(%{
288+
credentials_secret: credentials_secret,
289+
inhibit_credentials_request: false,
290+
protocol_revision: 0,
291+
introspection: introspection,
292+
introspection_minor: introspection_minor
293+
})
294+
|> Repo.update(prefix: keyspace_name, consistency: :quorum)
410295
end
411296

412297
defp build_initial_introspection_maps(initial_introspection) do

apps/astarte_pairing/test/astarte_pairing/engine_test.exs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ defmodule Astarte.Pairing.EngineTest do
2020
use ExUnit.Case
2121

2222
alias Astarte.Core.Device
23-
alias Astarte.Pairing.Config
24-
alias Astarte.Core.CQLUtils
2523
alias Astarte.Pairing.CredentialsSecret
2624
alias Astarte.Pairing.DatabaseTestHelper
2725
alias Astarte.Pairing.Engine
@@ -226,7 +224,7 @@ defmodule Astarte.Pairing.EngineTest do
226224
test "fails with never registered device_id" do
227225
device_id = DatabaseTestHelper.unregistered_128_bit_hw_id()
228226

229-
assert {:error, :device_not_registered} = Engine.unregister_device(@test_realm, device_id)
227+
assert {:error, :device_not_found} = Engine.unregister_device(@test_realm, device_id)
230228
end
231229

232230
test "succeeds with registered and confirmed device_id, and makes it possible to register it again" do

0 commit comments

Comments
 (0)