Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,14 @@
{rabbit_db_queue,
tie_binding_to_dest_with_keep_while_cond_enable}}
}}).

-rabbit_feature_flag(
{topic_binding_projection_v4,
#{desc => "Enable the topic binding Khepri projection v4",
stability => stable,
depends_on => ['rabbitmq_4.3.0'],
callbacks => #{enable =>
{rabbit_khepri, topic_binding_projection_enable},
post_enable =>
{rabbit_khepri, topic_binding_projection_post_enable}}
}}).
213 changes: 160 additions & 53 deletions deps/rabbit/src/rabbit_db_topic_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
%% Used by Khepri projections and the Mnesia-to-Khepri migration.
-export([split_topic_key_binary/1]).

-define(KHEPRI_PROJECTION, rabbit_khepri_topic_trie_v3).
-define(KHEPRI_PROJECTION_V3, rabbit_khepri_topic_trie_v3).

-define(TOPIC_TRIE_PROJECTION, rabbit_khepri_topic_trie_v4).
-define(TOPIC_BINDING_PROJECTION, rabbit_khepri_topic_binding_v4).

-type match_result() :: [rabbit_types:binding_destination() |
{rabbit_amqqueue:name(), rabbit_types:binding_key()}].
Expand All @@ -31,14 +34,20 @@
%%
%% @private

match(XName, RoutingKey, Opts) ->
match(#resource{virtual_host = VHost, name = XName} = X, RoutingKey, Opts) ->
BKeys = maps:get(return_binding_keys, Opts, false),
Words = split_topic_key_binary(RoutingKey),
trie_match_in_khepri(XName, Words, BKeys).

%% --------------------------------------------------------------
%% split_topic_key_binary().
%% --------------------------------------------------------------
case rabbit_khepri:get_effective_topic_binding_projection_version() of
V when V >= 4 ->
try
trie_match({VHost, XName}, root, Words, BKeys, [])
catch
error:badarg ->
[]
end;
_ ->
trie_match_v3(X, Words, BKeys)
end.

-spec split_topic_key_binary(RoutingKey) -> Words when
RoutingKey :: binary(),
Expand All @@ -58,81 +67,164 @@ split_topic_key_binary(RoutingKey) ->
end,
binary:split(RoutingKey, Pattern, [global]).

%% --------------------------------------------------------------
%% Internal
%% --------------------------------------------------------------
%% ==============================================================
%% Trie-based routing
%%
%% Uses two ETS tables:
%%
%% 1. Trie edges table (set): {Key, ChildNodeId, ChildCount}
%% Key = {XSrc, ParentNodeId, Word}
%% Navigation: ets:lookup_element/4 for O(1) edge traversal.
%%
%% 2. Leaf bindings table (ordered_set): {{NodeId, BindingKey, Dest}}
%% Collection: ets:next/2 probes for fanout 0-2 (fast path), then
%% ets:select/2 with a partially bound key for fanout > 2 (does an
%% O(log N) seek followed by O(F) range scan).
%%
%% Routing walks the trie (branching on literal word, <<"*">>, <<"#">>)
%% then collects destinations from the bindings table at each matching
%% leaf. This is O(depth * 3) for the trie walk, plus O(log N) per
%% leaf for fanout 0-2, or O(log N + F) per leaf for fanout F > 2.
%% ==============================================================

-spec add_matched([rabbit_types:binding_destination() |
{rabbit_types:binding_destination(), BindingArgs :: list()}],
ReturnBindingKeys :: boolean(),
match_result()) ->
match_result().
add_matched(Destinations, false, Acc) ->
Destinations ++ Acc;
add_matched(DestinationsArgs, true, Acc) ->
lists:foldl(
fun({DestQ = #resource{kind = queue}, BindingArgs}, L) ->
case rabbit_misc:table_lookup(BindingArgs, <<"x-binding-key">>) of
{longstr, BKey} ->
[{DestQ, BKey} | L];
_ ->
[DestQ | L]
end;
({DestX, _BindingArgs}, L) ->
[DestX | L]
end, Acc, DestinationsArgs).
trie_match(XSrc, Node, [], BKeys, Acc0) ->
Acc1 = trie_bindings(Node, BKeys, Acc0),
trie_match_try(XSrc, Node, <<"#">>,
fun trie_match_skip_any/5,
[], BKeys, Acc1);
trie_match(XSrc, Node, [W | RestW] = Words, BKeys, Acc0) ->
Acc1 = trie_match_try(XSrc, Node, W,
fun trie_match/5,
RestW, BKeys, Acc0),
Acc2 = trie_match_try(XSrc, Node, <<"*">>,
fun trie_match/5,
RestW, BKeys, Acc1),
trie_match_try(XSrc, Node, <<"#">>,
fun trie_match_skip_any/5,
Words, BKeys, Acc2).

trie_match_try(XSrc, Node, Word, MatchFun, RestW, BKeys, Acc) ->
case ets:lookup_element(?TOPIC_TRIE_PROJECTION,
{XSrc, Node, Word}, 2, undefined) of
undefined ->
Acc;
NextNode ->
MatchFun(XSrc, NextNode, RestW, BKeys, Acc)
end.

%% Khepri topic graph
trie_match_skip_any(XSrc, Node, [], BKeys, Acc) ->
trie_match(XSrc, Node, [], BKeys, Acc);
trie_match_skip_any(XSrc, Node, [_ | RestW] = Words, BKeys, Acc) ->
trie_match_skip_any(
XSrc, Node, RestW, BKeys,
trie_match(XSrc, Node, Words, BKeys, Acc)).

trie_match_in_khepri(X, Words, BKeys) ->
%% Collect all destinations bound at the given trie node.
%%
%% Uses ets:next/2 for up to two elements (fast path for the common
%% fanout 0-2 cases), then switches to ets:select/2 when fanout > 2.
%%
%% ets:select/2 occurs the expensive match spec compilation overhead.
%% For larger fanouts, the cost for compiling the match spec amortises.
%% ets:select/2 occurs an O(log N) seek followed by an O(F) range scan,
%% which is cheaper than F individual ets:next/2 calls
%% (each O(log N) due to CATree fresh-stack allocation).
trie_bindings(NodeId, BKeys, Acc) ->
StartKey = {NodeId, <<>>, {}},
case ets:next(?TOPIC_BINDING_PROJECTION, StartKey) of
{NodeId, BKey1, Dest1} = Key1 ->
case ets:next(?TOPIC_BINDING_PROJECTION, Key1) of
{NodeId, BKey2, Dest2} = Key2 ->
case ets:next(?TOPIC_BINDING_PROJECTION, Key2) of
{NodeId, _, _} ->
collect_select(NodeId, BKeys, Acc);
_ ->
Acc1 = collect_binding(Dest1, BKey1, BKeys, Acc),
collect_binding(Dest2, BKey2, BKeys, Acc1)
end;
_ ->
collect_binding(Dest1, BKey1, BKeys, Acc)
end;
_ ->
Acc
end.

collect_binding(#resource{kind = queue} = Dest, BindingKey, true, Acc) ->
[{Dest, BindingKey} | Acc];
collect_binding(Dest, _BindingKey, _ReturnBindingKeys, Acc) ->
[Dest | Acc].

collect_select(NodeId, false, Acc) ->
Dests = ets:select(?TOPIC_BINDING_PROJECTION,
[{{{NodeId, '_', '$1'}}, [], ['$1']}]),
Dests ++ Acc;
collect_select(NodeId, true, Acc) ->
DestsAndBKeys = ets:select(?TOPIC_BINDING_PROJECTION,
[{{{NodeId, '$1', '$2'}}, [], [{{'$2', '$1'}}]}]),
format_dest_bkeys(DestsAndBKeys, Acc).

format_dest_bkeys([], Acc) ->
Acc;
format_dest_bkeys([{#resource{kind = queue} = Dest, BKey} | Rest], Acc) ->
format_dest_bkeys(Rest, [{Dest, BKey} | Acc]);
format_dest_bkeys([{Dest, _BKey} | Rest], Acc) ->
format_dest_bkeys(Rest, [Dest | Acc]).

%% ==============================================================
%% Old v3 Khepri topic graph.
%% Delete these *_v3 functions when feature flag
%% topic_binding_projection_v4 becomes required.
%% ==============================================================

trie_match_v3(X, Words, BKeys) ->
try
trie_match_in_khepri(X, root, Words, BKeys, [])
trie_match_v3(X, root, Words, BKeys, [])
catch
error:badarg ->
[]
end.

trie_match_in_khepri(X, Node, [], BKeys, ResAcc0) ->
Destinations = trie_bindings_in_khepri(X, Node, BKeys),
ResAcc = add_matched(Destinations, BKeys, ResAcc0),
trie_match_part_in_khepri(
trie_match_v3(X, Node, [], BKeys, ResAcc0) ->
Destinations = trie_bindings_v3(X, Node, BKeys),
ResAcc = add_matched_v3(Destinations, BKeys, ResAcc0),
trie_match_part_v3(
X, Node, <<"#">>,
fun trie_match_skip_any_in_khepri/5, [], BKeys, ResAcc);
trie_match_in_khepri(X, Node, [W | RestW] = Words, BKeys, ResAcc) ->
fun trie_match_skip_any_v3/5, [], BKeys, ResAcc);
trie_match_v3(X, Node, [W | RestW] = Words, BKeys, ResAcc) ->
lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) ->
trie_match_part_in_khepri(
trie_match_part_v3(
X, Node, WArg, MatchFun, RestWArg, BKeys, Acc)
end, ResAcc, [{W, fun trie_match_in_khepri/5, RestW},
{<<"*">>, fun trie_match_in_khepri/5, RestW},
end, ResAcc, [{W, fun trie_match_v3/5, RestW},
{<<"*">>, fun trie_match_v3/5, RestW},
{<<"#">>,
fun trie_match_skip_any_in_khepri/5, Words}]).
fun trie_match_skip_any_v3/5, Words}]).

trie_match_part_in_khepri(X, Node, Search, MatchFun, RestW, BKeys, ResAcc) ->
case trie_child_in_khepri(X, Node, Search) of
trie_match_part_v3(X, Node, Search, MatchFun, RestW, BKeys, ResAcc) ->
case trie_child_v3(X, Node, Search) of
{ok, NextNode} -> MatchFun(X, NextNode, RestW, BKeys, ResAcc);
error -> ResAcc
end.

trie_match_skip_any_in_khepri(X, Node, [], BKeys, ResAcc) ->
trie_match_in_khepri(X, Node, [], BKeys, ResAcc);
trie_match_skip_any_in_khepri(X, Node, [_ | RestW] = Words, BKeys, ResAcc) ->
trie_match_skip_any_in_khepri(
trie_match_skip_any_v3(X, Node, [], BKeys, ResAcc) ->
trie_match_v3(X, Node, [], BKeys, ResAcc);
trie_match_skip_any_v3(X, Node, [_ | RestW] = Words, BKeys, ResAcc) ->
trie_match_skip_any_v3(
X, Node, RestW, BKeys,
trie_match_in_khepri(X, Node, Words, BKeys, ResAcc)).
trie_match_v3(X, Node, Words, BKeys, ResAcc)).

trie_child_in_khepri(X, Node, Word) ->
trie_child_v3(X, Node, Word) ->
case ets:lookup(
?KHEPRI_PROJECTION,
?KHEPRI_PROJECTION_V3,
#trie_edge{exchange_name = X,
node_id = Node,
word = Word}) of
[#topic_trie_edge_v2{node_id = NextNode}] -> {ok, NextNode};
[] -> error
end.

trie_bindings_in_khepri(X, Node, BKeys) ->
trie_bindings_v3(X, Node, BKeys) ->
case ets:lookup(
?KHEPRI_PROJECTION,
?KHEPRI_PROJECTION_V3,
#trie_edge{exchange_name = X,
node_id = Node,
word = bindings}) of
Expand All @@ -147,3 +239,18 @@ trie_bindings_in_khepri(X, Node, BKeys) ->
[] ->
[]
end.

add_matched_v3(Destinations, false, Acc) ->
Destinations ++ Acc;
add_matched_v3(DestinationsArgs, true, Acc) ->
lists:foldl(
fun({DestQ = #resource{kind = queue}, BindingArgs}, L) ->
case rabbit_misc:table_lookup(BindingArgs, <<"x-binding-key">>) of
{longstr, BKey} ->
[{DestQ, BKey} | L];
_ ->
[DestQ | L]
end;
({DestX, _BindingArgs}, L) ->
[DestX | L]
end, Acc, DestinationsArgs).
Loading
Loading