Skip to content

Commit 1a5e50b

Browse files
authored
Merge pull request #33 from noaccOS/push-wplsmtxxllzq
feat: Allow customizing process distribution strategy
2 parents a76f9f0 + 1e415b1 commit 1a5e50b

File tree

3 files changed

+51
-6
lines changed

3 files changed

+51
-6
lines changed

lib/consumer.ex

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ defmodule Mississippi.Consumer do
2525

2626
amqp_consumer_options = opts[:amqp_consumer_options]
2727

28-
queue_config = opts[:mississippi_config][:queues]
28+
mississippi_config = opts[:mississippi_config]
2929

30-
message_handler = opts[:mississippi_config][:message_handler]
30+
queue_config = mississippi_config[:queues]
3131

3232
channels_per_connection = amqp_consumer_options[:channels]
3333

@@ -47,7 +47,7 @@ defmodule Mississippi.Consumer do
4747
children = [
4848
{ExRabbitPool.PoolSupervisor,
4949
rabbitmq_config: amqp_consumer_options, connection_pools: [events_consumer_pool_config(connection_number)]},
50-
{ConsumersSupervisor, queues: queue_config, message_handler: message_handler}
50+
{ConsumersSupervisor, mississippi_config}
5151
]
5252

5353
Supervisor.init(children, strategy: :rest_for_one)

lib/consumer/consumers_supervisor.ex

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do
2525

2626
queues_config = init_arg[:queues]
2727

28+
distribution_strategy = distribution_strategy!(init_arg[:cluster_distribution_strategy])
29+
2830
children = [
2931
{Registry, [keys: :unique, name: DataUpdater.Registry, members: :auto]},
3032
{Registry, [keys: :unique, name: MessageTracker.Registry, members: :auto]},
@@ -35,19 +37,19 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do
3537
members: :auto,
3638
process_redistribution: :active,
3739
extra_arguments: [message_handler: message_handler],
38-
distribution_strategy: Horde.UniformQuorumDistribution},
40+
distribution_strategy: distribution_strategy},
3941
{DynamicSupervisor,
4042
strategy: :one_for_one,
4143
name: MessageTracker.Supervisor,
4244
members: :auto,
4345
process_redistribution: :active,
44-
distribution_strategy: Horde.UniformQuorumDistribution},
46+
distribution_strategy: distribution_strategy},
4547
{DynamicSupervisor,
4648
strategy: :one_for_one,
4749
name: AMQPDataConsumer.Supervisor,
4850
members: :auto,
4951
process_redistribution: :active,
50-
distribution_strategy: Horde.UniformQuorumDistribution},
52+
distribution_strategy: distribution_strategy},
5153
# This will make queue listeners start after re-sharding in a multi-node cluster
5254
{NodeListener, queues_config},
5355
# This will make queue listeners start in a single-node cluster
@@ -64,9 +66,11 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do
6466
[
6567
mississippi_config: [
6668
type: :keyword_list,
69+
default: [],
6770
keys: [
6871
queues: [
6972
type: :keyword_list,
73+
default: [],
7074
keys: [
7175
events_exchange_name: [
7276
type: :string,
@@ -115,9 +119,20 @@ defmodule Mississippi.Consumer.ConsumersSupervisor do
115119
The module that will be invoked by Mississippi to process incoming messages.
116120
It must implement the `Mississippi.Consumer.DataUpdater.Handler` behaviour.
117121
"""
122+
],
123+
cluster_distribution_strategy: [
124+
type: {:in, [:uniform_quorum, :uniform_random, :uniform]},
125+
default: :uniform_quorum,
126+
doc: """
127+
The strategy to use for redistributing consumer processes within the cluster.
128+
"""
118129
]
119130
]
120131
]
121132
]
122133
end
134+
135+
defp distribution_strategy!(:uniform_quorum), do: Horde.UniformQuorumDistribution
136+
defp distribution_strategy!(:uniform_random), do: Horde.UniformRandomDistribution
137+
defp distribution_strategy!(:uniform), do: Horde.UniformDistribution
123138
end

test/consumer/options_test.exs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Copyright 2026 SECO Mind Srl
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
defmodule Mississippi.Consumer.Options.Test do
5+
use ExUnit.Case, async: true
6+
7+
alias Mississippi.Consumer.DataUpdater.Handler.Impl
8+
alias Mississippi.Consumer.Options
9+
10+
@schema Options.definition()
11+
12+
test "an empty configuration returns defaults" do
13+
assert {:ok, opts} = NimbleOptions.validate([], @schema)
14+
assert opts[:mississippi_config][:message_handler] == Impl
15+
assert opts[:mississippi_config][:cluster_distribution_strategy] == :uniform_quorum
16+
assert is_integer(opts[:mississippi_config][:queues][:total_count])
17+
end
18+
19+
test "allows setting valid distribution strategies" do
20+
valid_distribution_strategy = :uniform
21+
opts = [mississippi_config: [cluster_distribution_strategy: valid_distribution_strategy]]
22+
assert {:ok, res} = NimbleOptions.validate(opts, @schema)
23+
assert res[:mississippi_config][:cluster_distribution_strategy] == valid_distribution_strategy
24+
end
25+
26+
test "does not allow invalid distribution strategies" do
27+
opts = [mississippi_config: [cluster_distribution_strategy: :not_a_distribution_strategy]]
28+
assert {:error, %{key: :cluster_distribution_strategy}} = NimbleOptions.validate(opts, @schema)
29+
end
30+
end

0 commit comments

Comments
 (0)