Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info queue_ty
PARALLEL_CT_SET_5_A = rabbit_direct_reply_to_prop rabbit_quorum_queue_prop direct_reply_to_amqpl direct_reply_to_amqp classic_queue
PARALLEL_CT_SET_5_B = feature_flags_v2 backing_queue transactions
PARALLEL_CT_SET_5_C = cluster_upgrade maintenance_mode
PARALLEL_CT_SET_5_D = rabbit_fifo_dlx_integration publisher_confirms_parallel
PARALLEL_CT_SET_5_D = rabbit_fifo_dlx_integration publisher_confirms_parallel rabbit_exchange_type_modulus_hash

PARALLEL_CT_SET_1 = $(sort $(PARALLEL_CT_SET_1_A) $(PARALLEL_CT_SET_1_B) $(PARALLEL_CT_SET_1_C) $(PARALLEL_CT_SET_1_D))
PARALLEL_CT_SET_2 = $(sort $(PARALLEL_CT_SET_2_A) $(PARALLEL_CT_SET_2_B) $(PARALLEL_CT_SET_2_C) $(PARALLEL_CT_SET_2_D))
Expand Down
65 changes: 65 additions & 0 deletions deps/rabbit/src/rabbit_exchange_type_modulus_hash.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(rabbit_exchange_type_modulus_hash).

-behaviour(rabbit_exchange_type).

-include_lib("rabbit_common/include/rabbit.hrl").

-export([description/0,
route/3,
serialise_events/0,
info/1,
info/2,
validate/1,
validate_binding/2,
create/2,
delete/2,
policy_changed/2,
add_binding/3,
remove_bindings/3,
assert_args_equivalence/2]).

-rabbit_boot_step({?MODULE,
[{description, "exchange type x-modulus-hash"},
{mfa, {rabbit_registry, register,
[exchange, <<"x-modulus-hash">>, ?MODULE]}},
{requires, rabbit_registry},
{enables, kernel_ready}]}).

%% 2^27
-define(PHASH2_RANGE, 134217728).

description() ->
[{description, <<"Modulus Hashing Exchange">>}].

route(#exchange{name = Name}, Msg, _Options) ->
Destinations = rabbit_router:match_routing_key(Name, ['_']),
case length(Destinations) of
0 ->
[];
Len ->
%% We sort to guarantee stable routing after node restarts.
DestinationsSorted = lists:sort(Destinations),
Hash = erlang:phash2(mc:routing_keys(Msg), ?PHASH2_RANGE),
Destination = lists:nth(Hash rem Len + 1, DestinationsSorted),
[Destination]
end.

info(_) -> [].
info(_, _) -> [].
serialise_events() -> false.
validate(_X) -> ok.
validate_binding(_X, _B) -> ok.
create(_Serial, _X) -> ok.
delete(_Serial, _X) -> ok.
policy_changed(_X1, _X2) -> ok.
add_binding(_Serial, _X, _B) -> ok.
remove_bindings(_Serial, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
277 changes: 277 additions & 0 deletions deps/rabbit/test/rabbit_exchange_type_modulus_hash_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_exchange_type_modulus_hash_SUITE).

-compile(export_all).

-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("eunit/include/eunit.hrl").

all() ->
[
{group, tests}
].

groups() ->
[
{tests, [],
[
routed_to_zero_queue_test,
routed_to_one_queue_test,
routed_to_many_queue_test,
stable_routing_across_restarts_test,
weighted_routing_test
]}
].

%% -------------------------------------------------------------------
%% Test suite setup/teardown
%% -------------------------------------------------------------------

init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, ?MODULE}
]),
rabbit_ct_helpers:run_setup_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).

init_per_group(_, Config) ->
Config.

end_per_group(_, Config) ->
Config.

init_per_testcase(Testcase, Config) ->
TestCaseName = rabbit_ct_helpers:config_to_testcase_name(Config, Testcase),
Config1 = rabbit_ct_helpers:set_config(Config, {test_resource_name,
re:replace(TestCaseName, "/", "-", [global, {return, list}])}),
rabbit_ct_helpers:testcase_started(Config1, Testcase).

end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).

%% -------------------------------------------------------------------
%% Test cases
%% -------------------------------------------------------------------

routed_to_zero_queue_test(Config) ->
ok = route(Config, [], 5, 0).

routed_to_one_queue_test(Config) ->
ok = route(Config, [<<"q1">>, <<"q2">>, <<"q3">>], 1, 1).

routed_to_many_queue_test(Config) ->
ok = route(Config, [<<"q1">>, <<"q2">>, <<"q3">>], 5, 5).

route(Config, Queues, PublishCount, ExpectedRoutedCount) ->
{Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
B = rabbit_ct_helpers:get_config(Config, test_resource_name),
XNameBin = erlang:list_to_binary("x-" ++ B),

#'exchange.declare_ok'{} = amqp_channel:call(Chan,
#'exchange.declare'{
exchange = XNameBin,
type = <<"x-modulus-hash">>,
durable = true,
auto_delete = true}),
[begin
#'queue.declare_ok'{} = amqp_channel:call(Chan, #'queue.declare'{
queue = Q,
durable = true,
exclusive = true}),
#'queue.bind_ok'{} = amqp_channel:call(Chan, #'queue.bind'{
queue = Q,
exchange = XNameBin})
end
|| Q <- Queues],

amqp_channel:call(Chan, #'confirm.select'{}),
[amqp_channel:call(Chan,
#'basic.publish'{exchange = XNameBin,
routing_key = rnd()},
#amqp_msg{props = #'P_basic'{},
payload = <<>>}) ||
_ <- lists:duplicate(PublishCount, const)],
amqp_channel:wait_for_confirms_or_die(Chan),

Count = lists:foldl(
fun(Q, Acc) ->
#'queue.declare_ok'{message_count = M} = amqp_channel:call(
Chan,
#'queue.declare'{
queue = Q,
durable = true,
exclusive = true}),
Acc + M
end, 0, Queues),
?assertEqual(ExpectedRoutedCount, Count),

amqp_channel:call(Chan, #'exchange.delete'{exchange = XNameBin}),
[amqp_channel:call(Chan, #'queue.delete'{queue = Q}) || Q <- Queues],
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan).

stable_routing_across_restarts_test(Config) ->
{Conn1, Chan1} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
XNameBin = atom_to_binary(?FUNCTION_NAME),
NumQs = 40,
NumMsgs = 500,

#'exchange.declare_ok'{} = amqp_channel:call(Chan1,
#'exchange.declare'{
exchange = XNameBin,
type = <<"x-modulus-hash">>,
durable = true}),
Queues = [erlang:list_to_binary("q-" ++ integer_to_list(I)) || I <- lists:seq(1, NumQs)],
[begin
#'queue.declare_ok'{} = amqp_channel:call(Chan1, #'queue.declare'{
queue = Q,
durable = true}),
#'queue.bind_ok'{} = amqp_channel:call(Chan1, #'queue.bind'{
queue = Q,
exchange = XNameBin,
%% The routing key shouldn't matter
routing_key = rnd()})
end
|| Q <- Queues],

RoutingKeys = [rnd() || _ <- lists:seq(1, NumMsgs)],

amqp_channel:call(Chan1, #'confirm.select'{}),
[amqp_channel:call(Chan1,
#'basic.publish'{exchange = XNameBin,
routing_key = RK},
#amqp_msg{payload = RK})
|| RK <- RoutingKeys],
amqp_channel:wait_for_confirms_or_die(Chan1),

Map1 = consume_all(Chan1, Queues),

%% Assert at least two queues got messages
NonEmptyQueues1 = maps:filter(fun(_Q, Msgs) -> length(Msgs) > 0 end, Map1),
?assert(maps:size(NonEmptyQueues1) >= 2),

%% Assert all messages routed
?assertEqual(NumMsgs, lists:sum([length(Msgs) || Msgs <- maps:values(Map1)])),

ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn1, Chan1),
%% Restart node
ok = rabbit_ct_broker_helpers:restart_node(Config, 0),
{Conn2, Chan2} = rabbit_ct_client_helpers:open_connection_and_channel(Config),

amqp_channel:call(Chan2, #'confirm.select'{}),
%% Publish the same messages again.
[amqp_channel:call(Chan2,
#'basic.publish'{exchange = XNameBin,
routing_key = RK},
#amqp_msg{payload = RK})
|| RK <- RoutingKeys],
amqp_channel:wait_for_confirms_or_die(Chan2),

Map2 = consume_all(Chan2, Queues),

%% Assert the same messages ended up in the same queues,
%% i.e. that routing was stable.
?assertEqual(Map1, Map2),

amqp_channel:call(Chan2, #'exchange.delete'{exchange = XNameBin}),
[amqp_channel:call(Chan2, #'queue.delete'{queue = Q}) || Q <- Queues],
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn2, Chan2).

weighted_routing_test(Config) ->
{Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
XNameBin = atom_to_binary(?FUNCTION_NAME),
Queues = [<<"q1">>, <<"q2">>, <<"q3">>],
NumMsgs = 600,

#'exchange.declare_ok'{} = amqp_channel:call(Chan,
#'exchange.declare'{
exchange = XNameBin,
type = <<"x-modulus-hash">>,
durable = true}),

[#'queue.declare_ok'{} = amqp_channel:call(Chan, #'queue.declare'{queue = Q,
durable = true})
|| Q <- Queues],

%% Bind q1 once
#'queue.bind_ok'{} = amqp_channel:call(Chan, #'queue.bind'{queue = <<"q1">>,
exchange = XNameBin}),

%% Bind q2 twice
#'queue.bind_ok'{} = amqp_channel:call(Chan, #'queue.bind'{queue = <<"q2">>,
exchange = XNameBin,
routing_key = <<"a">>}),
#'queue.bind_ok'{} = amqp_channel:call(Chan, #'queue.bind'{queue = <<"q2">>,
exchange = XNameBin,
routing_key = <<"b">>}),

%% Bind q3 three times
#'queue.bind_ok'{} = amqp_channel:call(Chan, #'queue.bind'{queue = <<"q3">>,
exchange = XNameBin,
routing_key = <<"a">>}),
#'queue.bind_ok'{} = amqp_channel:call(Chan, #'queue.bind'{queue = <<"q3">>,
exchange = XNameBin,
routing_key = <<"b">>}),
#'queue.bind_ok'{} = amqp_channel:call(Chan, #'queue.bind'{queue = <<"q3">>,
exchange = XNameBin,
routing_key = <<"c">>}),

amqp_channel:call(Chan, #'confirm.select'{}),
[amqp_channel:call(Chan,
#'basic.publish'{exchange = XNameBin,
routing_key = integer_to_binary(I)},
#amqp_msg{})
|| I <- lists:seq(1, NumMsgs)],
amqp_channel:wait_for_confirms_or_die(Chan),

Counts = lists:foldl(
fun(Q, Acc) ->
#'queue.declare_ok'{message_count = M} = amqp_channel:call(
Chan,
#'queue.declare'{queue = Q,
durable = true}),
maps:put(Q, M, Acc)
end, #{}, Queues),

C1 = maps:get(<<"q1">>, Counts),
C2 = maps:get(<<"q2">>, Counts),
C3 = maps:get(<<"q3">>, Counts),
ct:pal("q1: ~b, q2: ~b, q3: ~b", [C1, C2, C3]),

?assertEqual(NumMsgs, C1 + C2 + C3),
%% Assert weighted distribution
?assert(C1 < C2),
?assert(C2 < C3),

amqp_channel:call(Chan, #'exchange.delete'{exchange = XNameBin}),
[amqp_channel:call(Chan, #'queue.delete'{queue = Q}) || Q <- Queues],
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan).

consume_all(Chan, Queues) ->
lists:foldl(fun(Q, Map) ->
Msgs = consume_queue(Chan, Q, []),
maps:put(Q, Msgs, Map)
end, #{}, Queues).

consume_queue(Chan, Q, L) ->
case amqp_channel:call(Chan, #'basic.get'{queue = Q,
no_ack = true}) of
#'basic.get_empty'{} ->
L;
{#'basic.get_ok'{}, #amqp_msg{payload = Payload}} ->
consume_queue(Chan, Q, L ++ [Payload])
end.

rnd() ->
integer_to_binary(rand:uniform(10_000_000)).
16 changes: 7 additions & 9 deletions deps/rabbitmq_sharding/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,21 @@ Do not use this plugin with quorum queues. Avoid classic mirrored queues in gene

## Messages Distribution Between Shards (Partitioning)

The exchanges that ship by default with RabbitMQ work in an "all or
nothing" fashion, i.e: if a routing key matches a set of queues bound
to the exchange, then RabbitMQ will route the message to all the
queues in that set. For this plugin to work it is necessary to
route messages to an exchange that would partition messages, so they
are routed to _at most_ one queue (a subset).

The plugin provides a new exchange type, `"x-modulus-hash"`, that will use
RabbitMQ provides a built-in exchange type, `"x-modulus-hash"`, that will use
a hashing function to partition messages routed to a logical queue
across a number of regular queues (shards).
across a number of regular queues (shards). This exchange type is available
in core RabbitMQ and does not require enabling this plugin to be used.

The `"x-modulus-hash"` exchange will hash the routing key used to
publish the message and then it will apply a `Hash mod N` to pick the
queue where to route the message, where N is the number of queues
bound to the exchange. **This exchange will completely ignore the
binding key used to bind the queue to the exchange**.

This exchange guarantees stable routing. As long as the bindings to the exchange remain the same,
messages with the same routing key will always be routed to exactly the same destination queue,
even across node restarts.

There are other exchanges with similar behaviour:
the _Consistent Hash Exchange_ or the _Random Exchange_.
Those were designed with regular queues in mind, not this plugin, so `"x-modulus-hash"`
Expand Down
Loading
Loading