Skip to content

Commit 3227b71

Browse files
authored
refactor: move AMQP events producer and supervisor into astarte events library (#1522)
Extracted the AMQP events producer and its supervision tree into a separate library to improve modularity and reuse across services. The original files in astarte_data_updater_plant have been removed and all references updated to point to the external library. Signed-off-by: Eddy Babetto <eddy.babetto@secomind.com>
1 parent b57b7c8 commit 3227b71

9 files changed

Lines changed: 46 additions & 71 deletions

File tree

apps/astarte_data_updater_plant/config/config.exs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,6 @@ config :astarte_data_updater_plant, ecto_repos: [Astarte.DataAccess.Repo]
2121

2222
config :astarte_data_updater_plant, Astarte.DataAccess.Repo, []
2323

24+
config :astarte_events, :connection_backoff, 10_000
25+
2426
import_config "#{config_env()}.exs"

apps/astarte_data_updater_plant/config/test.exs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,5 @@ config :astarte_data_updater_plant, :astarte_instance_id, "test"
1212
config :astarte_data_updater_plant,
1313
:vernemq_plugin_rpc_client,
1414
Astarte.DataUpdaterPlant.RPC.VMQPlugin.ClientMock
15+
16+
config :astarte_events, :connection_backoff, 0

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_pipeline_supervisor.ex

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
defmodule Astarte.DataUpdaterPlant.DataPipelineSupervisor do
2020
use Supervisor
2121

22-
alias Astarte.DataUpdaterPlant.ProducersSupervisor
2322
alias Astarte.DataUpdaterPlant.ConsumersSupervisor
2423

2524
def start_link(init_arg) do
@@ -50,7 +49,6 @@ defmodule Astarte.DataUpdaterPlant.DataPipelineSupervisor do
5049
members: :auto,
5150
distribution_strategy: Horde.UniformDistribution
5251
]},
53-
ProducersSupervisor,
5452
ConsumersSupervisor,
5553
Astarte.DataUpdaterPlant.RPC.Supervisor
5654
]

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/producers_supervisor.ex

Lines changed: 0 additions & 55 deletions
This file was deleted.

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/triggers_handler.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
defmodule Astarte.DataUpdaterPlant.TriggersHandler do
2020
import Bitwise, only: [<<<: 2]
2121
require Logger
22-
alias Astarte.DataUpdaterPlant.AMQPEventsProducer
22+
alias Astarte.Events.AMQPEventsProducer
2323
alias Astarte.DataUpdaterPlant.Config
2424

2525
@moduledoc """

apps/astarte_data_updater_plant/test/astarte_data_updater_plant/amqp_events_producer_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ defmodule Astarte.DataUpdaterPlant.AMQPEventsProducerTest do
2222
use Mimic
2323

2424
alias AMQP.Channel
25-
alias Astarte.DataUpdaterPlant.AMQPEventsProducer
25+
alias Astarte.Events.AMQPEventsProducer
2626

2727
@tag :regression
2828
test "events producer reconnects in case of error" do

libs/astarte_events/lib/application.ex

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,23 @@
1818

1919
defmodule Astarte.Events.Application do
2020
use Application
21+
alias Astarte.Events.AMQPEventsProducer
22+
alias Astarte.Events.AMQPTriggers.VHostSupervisor
23+
alias Astarte.Events.Config
2124

2225
def start(_type, _args) do
26+
events_pool =
27+
Supervisor.child_spec(
28+
{ExRabbitPool.PoolSupervisor,
29+
rabbitmq_config: Config.amqp_options!(), connection_pools: [Config.events_pool_config!()]},
30+
id: :events_producer_pool
31+
)
32+
2333
children = [
34+
events_pool,
35+
AMQPEventsProducer,
2436
{Registry, keys: :unique, name: Astarte.Events.AMQPTriggers.Registry},
25-
Astarte.Events.AMQPTriggers.VHostSupervisor
37+
VHostSupervisor
2638
]
2739

2840
opts = [strategy: :rest_for_one, name: Astarte.Events.Supervisor]

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/amqp_events_producer.ex renamed to libs/astarte_events/lib/astarte_events/amqp_events_producer.ex

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,15 @@
1616
# limitations under the License.
1717
#
1818

19-
defmodule Astarte.DataUpdaterPlant.AMQPEventsProducer do
19+
defmodule Astarte.Events.AMQPEventsProducer do
2020
require Logger
2121
use GenServer
2222

23-
alias Astarte.DataUpdaterPlant.Config
23+
alias Astarte.Events.Config
2424
alias AMQP.Channel
2525

26-
@connection_backoff if Mix.env() == :test, do: 0, else: 10000
27-
@adapter Config.amqp_adapter!()
28-
29-
# API
26+
@connection_backoff Application.compile_env(:astarte_events, :connection_backoff, 10_000)
27+
alias ExRabbitPool.RabbitMQ
3028

3129
def start_link(args \\ []) do
3230
GenServer.start_link(__MODULE__, args, name: __MODULE__)
@@ -57,15 +55,15 @@ defmodule Astarte.DataUpdaterPlant.AMQPEventsProducer do
5755

5856
@impl true
5957
def handle_call({:publish, exchange, routing_key, payload, opts}, _from, chan) do
60-
reply = @adapter.publish(chan, exchange, routing_key, payload, opts)
58+
reply = RabbitMQ.publish(chan, exchange, routing_key, payload, opts)
6159

6260
{:reply, reply, chan}
6361
end
6462

6563
def handle_call({:declare_exchange, exchange}, _from, chan) do
6664
# TODO: we need to decide who is responsible of deleting the exchange once it is
6765
# no longer needed
68-
reply = @adapter.declare_exchange(chan, exchange, type: :direct, durable: true)
66+
reply = RabbitMQ.declare_exchange(chan, exchange, type: :direct, durable: true)
6967

7068
{:reply, reply, chan}
7169
end
@@ -74,7 +72,7 @@ defmodule Astarte.DataUpdaterPlant.AMQPEventsProducer do
7472
def handle_info({:DOWN, _, :process, _pid, reason}, _state) do
7573
# Track channel crash
7674
:telemetry.execute(
77-
[:astarte, :data_updater_plant, :amqp_events_producer, :channel_crash],
75+
[:astarte, :astarte_events, :amqp_events_producer, :channel_crash],
7876
%{},
7977
%{reason: inspect(reason)}
8078
)
@@ -139,7 +137,7 @@ defmodule Astarte.DataUpdaterPlant.AMQPEventsProducer do
139137

140138
defp declare_default_events_exchange(channel, conn) do
141139
with {:error, reason} <-
142-
@adapter.declare_exchange(channel, Config.events_exchange_name!(),
140+
RabbitMQ.declare_exchange(channel, Config.amqp_events_exchange_name!(),
143141
type: :direct,
144142
durable: true
145143
) do

libs/astarte_events/lib/astarte_events/config.ex

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,16 @@ defmodule Astarte.Events.Config do
117117

118118
@envdoc "The total number of data queues in all the Astarte cluster."
119119
app_env :data_queue_total_count, :astarte_events, :amqp_data_queue_total_count,
120-
os_env: "ASTARTE_EVENTS_AMQP_DATA_QUEUE_TOTAL_COUNT",
120+
os_env: "ASTARTE_EVENTS_PRODUCER_AMQP_DATA_QUEUE_TOTAL_COUNT",
121121
type: :integer,
122122
default: 128
123123

124+
@envdoc "The exchange used by the AMQP producer to publish events."
125+
app_env :amqp_events_exchange_name, :astarte_events, :amqp_events_exchange_name,
126+
os_env: "ASTARTE_EVENTS_PRODUCER_AMQP_EVENTS_EXCHANGE_NAME",
127+
type: :binary,
128+
default: "astarte_events"
129+
124130
# Since we have one channel per queue, this is not configurable
125131
def amqp_channels_per_connection_number!() do
126132
ceil(data_queue_total_count!() / amqp_connection_number!())
@@ -169,6 +175,18 @@ defmodule Astarte.Events.Config do
169175
end
170176
end
171177

178+
# Since we have only one producer, this is not configurable
179+
def events_connection_number!(), do: 1
180+
181+
def events_pool_config!() do
182+
[
183+
name: {:local, :events_producer_pool},
184+
worker_module: ExRabbitPool.Worker.RabbitConnection,
185+
size: events_connection_number!(),
186+
max_overflow: 0
187+
]
188+
end
189+
172190
defdelegate astarte_instance_id!, to: DataAccessConfig
173191
defdelegate astarte_instance_id, to: DataAccessConfig
174192
end

0 commit comments

Comments
 (0)