Skip to content

Commit c838715

Browse files
authored
Merge pull request #1175 from lusergit/feat/dynamic-dup
chore(DUP): dynamic process spawning
2 parents 99b863f + fad7388 commit c838715

10 files changed

Lines changed: 211 additions & 158 deletions

File tree

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/amqp_data_consumer.ex

Lines changed: 2 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#
22
# This file is part of Astarte.
33
#
4-
# Copyright 2017 Ispirata Srl
4+
# Copyright 2017 - 2025 SECO Mind Srl
55
#
66
# Licensed under the Apache License, Version 2.0 (the "License");
77
# you may not use this file except in compliance with the License.
@@ -51,52 +51,8 @@ defmodule Astarte.DataUpdaterPlant.AMQPDataConsumer do
5151
GenServer.start_link(__MODULE__, args, name: get_queue_via_tuple(index))
5252
end
5353

54-
def ack(pid, delivery_tag) do
55-
Logger.debug("Going to ack #{inspect(delivery_tag)}")
56-
GenServer.call(pid, {:ack, delivery_tag})
57-
end
58-
59-
def discard(pid, delivery_tag) do
60-
Logger.debug("Going to discard #{inspect(delivery_tag)}")
61-
GenServer.call(pid, {:discard, delivery_tag})
62-
end
63-
64-
def requeue(pid, delivery_tag) do
65-
Logger.debug("Going to requeue #{inspect(delivery_tag)}")
66-
GenServer.call(pid, {:requeue, delivery_tag})
67-
end
68-
69-
def start_message_tracker(realm, encoded_device_id) do
70-
with {:ok, via_tuple} <- fetch_queue_via_tuple(realm, encoded_device_id) do
71-
GenServer.call(via_tuple, {:start_message_tracker, realm, encoded_device_id})
72-
end
73-
end
74-
75-
def start_data_updater(realm, encoded_device_id, message_tracker) do
76-
with {:ok, via_tuple} <- fetch_queue_via_tuple(realm, encoded_device_id) do
77-
GenServer.call(via_tuple, {:start_data_updater, realm, encoded_device_id, message_tracker})
78-
end
79-
end
80-
8154
defp get_queue_via_tuple(queue_index) when is_integer(queue_index) do
82-
{:via, Registry, {Registry.AMQPDataConsumer, {:queue_index, queue_index}}}
83-
end
84-
85-
defp fetch_queue_via_tuple(realm, encoded_device_id)
86-
when is_binary(realm) and is_binary(encoded_device_id) do
87-
# This is the same sharding algorithm used in astarte_vmq_plugin
88-
# Make sure they stay in sync
89-
queue_index =
90-
{realm, encoded_device_id}
91-
|> :erlang.phash2(Config.data_queue_total_count!())
92-
93-
if queue_index >= Config.data_queue_range_start!() and
94-
queue_index <= Config.data_queue_range_end!() do
95-
{:ok, get_queue_via_tuple(queue_index)}
96-
else
97-
# This device is handled by a differente DUP instance
98-
{:error, :unhandled_device}
99-
end
55+
{:via, Horde.Registry, {Registry.AMQPDataConsumer, {:queue_index, queue_index}}}
10056
end
10157

10258
# Server callbacks
@@ -126,16 +82,6 @@ defmodule Astarte.DataUpdaterPlant.AMQPDataConsumer do
12682
{:reply, res, state}
12783
end
12884

129-
def handle_call({:start_message_tracker, realm, device_id}, _from, state) do
130-
res = DataUpdater.get_message_tracker(realm, device_id)
131-
{:reply, res, state}
132-
end
133-
134-
def handle_call({:start_data_updater, realm, device_id, message_tracker}, _from, state) do
135-
res = DataUpdater.get_data_updater_process(realm, device_id, message_tracker)
136-
{:reply, res, state}
137-
end
138-
13985
@impl true
14086
def handle_info(:init_consume, state), do: init_consume(state)
14187

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/consumers_supervisor.ex

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ defmodule Astarte.DataUpdaterPlant.ConsumersSupervisor do
3333
Logger.info("AMQPDataConsumer supervisor init.", tag: "data_consumer_sup_init")
3434

3535
children = [
36-
{Registry, [keys: :unique, name: Registry.AMQPDataConsumer]},
3736
{ExRabbitPool.PoolSupervisor,
3837
rabbitmq_config: Config.amqp_consumer_options!(),
3938
connection_pools: [Config.amqp_consumer_pool_config!()]},

apps/astarte_data_updater_plant/lib/astarte_data_updater_plant/data_pipeline_supervisor.ex

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#
22
# This file is part of Astarte.
33
#
4-
# Copyright 2020 Ispirata Srl
4+
# Copyright 2020 - 2025 SECO Mind Srl
55
#
66
# Licensed under the Apache License, Version 2.0 (the "License");
77
# you may not use this file except in compliance with the License.
@@ -33,8 +33,25 @@ defmodule Astarte.DataUpdaterPlant.DataPipelineSupervisor do
3333
@impl true
3434
def init(_init_arg) do
3535
children = [
36-
{Registry, [keys: :unique, name: Registry.MessageTracker]},
37-
{Registry, [keys: :unique, name: Registry.DataUpdater]},
36+
{Horde.Registry, [keys: :unique, name: Registry.MessageTracker, members: :auto]},
37+
{Horde.Registry, [keys: :unique, name: Registry.DataUpdater, members: :auto]},
38+
{Horde.Registry, [keys: :unique, name: Registry.AMQPDataConsumer, members: :auto]},
39+
{Horde.DynamicSupervisor,
40+
[
41+
name: Supervisor.MessageTracker,
42+
strategy: :one_for_one,
43+
restart: :transient,
44+
members: :auto,
45+
distribution_strategy: Horde.UniformDistribution
46+
]},
47+
{Horde.DynamicSupervisor,
48+
[
49+
name: Supervisor.DataUpdater,
50+
strategy: :one_for_one,
51+
restart: :transient,
52+
members: :auto,
53+
distribution_strategy: Horde.UniformDistribution
54+
]},
3855
{ExRabbitPool.PoolSupervisor,
3956
rabbitmq_config: Config.amqp_producer_options!(),
4057
connection_pools: [Config.events_producer_pool_config!()]},

0 commit comments

Comments
 (0)