Skip to content

Commit 85068c9

Browse files
refactoring(exandra): CQEx to Exandra
Signed-off-by: Gabriele Ghio <gabriele.ghio@secomind.com>
1 parent 812af66 commit 85068c9

File tree

8 files changed

+131
-40
lines changed

8 files changed

+131
-40
lines changed
Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,22 @@
1+
#
2+
# This file is part of Astarte.
3+
#
4+
# Copyright 2017 - 2025 SECO Mind Srl
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
119
[
220
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"],
3-
import_deps: [:plug, :skogsra]
21+
import_deps: [:plug, :skogsra, :ecto]
422
]

apps/astarte_trigger_engine/config/config.exs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#
22
# This file is part of Astarte.
33
#
4-
# Copyright 2017 Ispirata Srl
4+
# Copyright 2017 - 2025 SECO Mind Srl
55
#
66
# Licensed under the Apache License, Version 2.0 (the "License");
77
# you may not use this file except in compliance with the License.
@@ -37,4 +37,8 @@ config :astarte_trigger_engine, :events_consumer, Astarte.TriggerEngine.EventsCo
3737

3838
config :astarte_trigger_engine, :amqp_adapter, ExRabbitPool.RabbitMQ
3939

40+
config :astarte_trigger_engine, ecto_repos: [Astarte.TriggerEngine.Repo]
41+
42+
config :astarte_trigger_engine, Astarte.TriggerEngine.Repo, []
43+
4044
import_config "#{config_env()}.exs"

apps/astarte_trigger_engine/lib/astarte_trigger_engine/application.ex

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#
22
# This file is part of Astarte.
33
#
4-
# Copyright 2017 Ispirata Srl
4+
# Copyright 2017 - 2025 SECO Mind Srl
55
#
66
# Licensed under the Apache License, Version 2.0 (the "License");
77
# you may not use this file except in compliance with the License.
@@ -40,13 +40,13 @@ defmodule Astarte.TriggerEngine.Application do
4040
Config.validate!()
4141
DataAccessConfig.validate!()
4242

43-
xandra_options =
44-
Config.xandra_options!()
45-
|> Keyword.put(:name, :xandra)
43+
xandra_options = Config.xandra_options!()
44+
xandra_cluster_options = Keyword.put(xandra_options, :name, :xandra)
4645

4746
children = [
4847
Astarte.TriggerEngineWeb.Telemetry,
49-
{Xandra.Cluster, xandra_options},
48+
{Xandra.Cluster, xandra_cluster_options},
49+
{Astarte.TriggerEngine.Repo, xandra_options},
5050
DeliverySupervisor
5151
]
5252

apps/astarte_trigger_engine/lib/astarte_trigger_engine/events_consumer.ex

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#
22
# This file is part of Astarte.
33
#
4-
# Copyright 2017-2024 SECO Mind Srl
4+
# Copyright 2017 - 2025 SECO Mind Srl
55
#
66
# Licensed under the Apache License, Version 2.0 (the "License");
77
# you may not use this file except in compliance with the License.
@@ -17,14 +17,18 @@
1717
#
1818

1919
defmodule Astarte.TriggerEngine.EventsConsumer do
20+
require Logger
21+
22+
import Ecto.Query
23+
24+
alias Astarte.Core.CQLUtils
2025
alias Astarte.Core.Triggers.SimpleEvents.IncomingIntrospectionEvent
2126
alias Astarte.Core.Triggers.SimpleEvents.InterfaceVersion
2227
alias Astarte.Core.Triggers.SimpleEvents.SimpleEvent
2328
alias Astarte.Core.Triggers.Trigger
24-
alias Astarte.DataAccess.Database
25-
alias CQEx.Query, as: DatabaseQuery
26-
alias CQEx.Result, as: DatabaseResult
27-
require Logger
29+
alias Astarte.TriggerEngine.Config
30+
alias Astarte.TriggerEngine.Repo
31+
alias Astarte.TriggerEngine.KvStore
2832

2933
defmodule Behaviour do
3034
@callback consume(payload :: binary, headers :: map) :: :ok | {:error, reason :: atom}
@@ -312,27 +316,27 @@ defmodule Astarte.TriggerEngine.EventsConsumer do
312316
end
313317

314318
defp retrieve_trigger_configuration(realm_name, trigger_id) do
319+
keyspace_name =
320+
CQLUtils.realm_name_to_keyspace_name(realm_name, Config.astarte_instance_id!())
321+
315322
query =
316-
DatabaseQuery.new()
317-
|> DatabaseQuery.statement(
318-
"SELECT value FROM kv_store WHERE group='triggers' AND key=:trigger_id;"
319-
)
320-
|> DatabaseQuery.put(:trigger_id, trigger_id)
321-
322-
with {:ok, client} <- Database.connect(realm: realm_name),
323-
{:ok, result} <- DatabaseQuery.call(client, query),
324-
[value: trigger_data] <- DatabaseResult.head(result),
325-
trigger <- Trigger.decode(trigger_data),
323+
from kvstore in KvStore,
324+
prefix: ^keyspace_name,
325+
where: kvstore.group == "triggers" and kvstore.key == ^trigger_id,
326+
select: kvstore.value
327+
328+
with encoded_trigger when encoded_trigger != nil <- Repo.one(query),
329+
trigger <- Trigger.decode(encoded_trigger),
326330
{:ok, action} <- Jason.decode(trigger.action) do
327331
{:ok, %{action: action, trigger_name: trigger.name}}
328332
else
329-
{:error, :database_connection_error} ->
330-
Logger.warning("Database connection error.")
331-
{:error, :database_connection_error}
332-
333-
error ->
334-
Logger.warning("Error while processing event: #{inspect(error)}")
333+
nil ->
334+
Logger.warning("Trigger not found: #{inspect(trigger_id)}")
335335
{:error, :trigger_not_found}
336+
337+
_ ->
338+
Logger.warning("Error while decoding trigger: #{inspect(trigger_id)}")
339+
{:error, :trigger_decoding_error}
336340
end
337341
end
338342
end
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# This file is part of Astarte.
2+
#
3+
# Copyright 2025 SECO Mind Srl
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
defmodule Astarte.TriggerEngine.KvStore do
19+
use TypedEctoSchema
20+
21+
@primary_key false
22+
typed_schema "kv_store" do
23+
field :group, :string, primary_key: true
24+
field :key, :string, primary_key: true
25+
field :value, :binary
26+
end
27+
end
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#
2+
# This file is part of Astarte.
3+
#
4+
# Copyright 2025 SECO Mind Srl
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
defmodule Astarte.TriggerEngine.Repo do
20+
use Ecto.Repo, otp_app: :astarte_trigger_engine, adapter: Exandra
21+
alias Astarte.TriggerEngine.Config
22+
23+
@impl Ecto.Repo
24+
def init(_context, config) do
25+
config =
26+
Config.xandra_options!()
27+
|> Keyword.merge(config)
28+
29+
{:ok, config}
30+
end
31+
end

apps/astarte_trigger_engine/mix.exs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#
22
# This file is part of Astarte.
33
#
4-
# Copyright 2017-2021 Ispirata Srl
4+
# Copyright 2017 - 2025 SECO Mind Srl
55
#
66
# Licensed under the Apache License, Version 2.0 (the "License");
77
# you may not use this file except in compliance with the License.
@@ -80,21 +80,24 @@ defmodule Astarte.TriggerEngine.Mixfile do
8080
{:cyanide, "~> 2.0"},
8181
{:httpoison, "~> 1.6"},
8282
{:jason, "~> 1.2"},
83-
{:excoveralls, "~> 0.15", only: :test},
84-
# hex.pm package and esl/ex_rabbit_pool do not support amqp version 2.1.
85-
# This fork is supporting amqp ~> 2.0 and also ~> 3.0.
86-
{:ex_rabbit_pool, github: "leductam/ex_rabbit_pool"},
8783
{:plug_cowboy, "~> 2.1"},
8884
{:telemetry_metrics_prometheus_core, "~> 0.4"},
8985
{:telemetry_metrics, "~> 0.4"},
9086
{:telemetry_poller, "~> 0.4"},
91-
{:mox, "~> 0.5", only: :test},
87+
{:typed_ecto_schema, "~> 0.4"},
88+
{:ecto, "~> 3.12"},
89+
{:exandra, "~> 0.12"},
9290
{:pretty_log, "~> 0.1"},
9391
{:telemetry, "~> 0.4"},
94-
{:xandra, "~> 0.13"},
92+
{:xandra, "~> 0.19"},
9593
{:skogsra, "~> 2.2"},
9694
{:observer_cli, "~> 1.5"},
95+
# hex.pm package and esl/ex_rabbit_pool do not support amqp version 2.1.
96+
# This fork is supporting amqp ~> 2.0 and also ~> 3.0.
97+
{:ex_rabbit_pool, github: "leductam/ex_rabbit_pool"},
9798
{:dialyxir, "~> 1.0", only: [:dev, :ci], runtime: false},
99+
{:excoveralls, "~> 0.15", only: :test},
100+
{:mox, "~> 0.5", only: :test},
98101
# Workaround for Elixir 1.15 / ssl_verify_fun issue
99102
# See also: https://github.com/deadtrickster/ssl_verify_fun.erl/pull/27
100103
{:ssl_verify_fun, "~> 1.1.0", manager: :rebar3, override: true}

0 commit comments

Comments
 (0)