Skip to content

Commit 22464ce

Browse files
committed
Replace Khepri topic routing projection with trie + ordered_set (v4)
Resolves #15588. The previous Khepri topic routing projection (v3) stored topic bindings as sets:set(#binding{}) inside trie leaf nodes. This design had a major performance drawback: On the insertion/deletion path (in the single Khepri Ra process), every binding change required a read-modify-write of the entire sets:set(), making it O(N) in the number of bindings at that leaf. With many MQTT clients connecting concurrently (each subscribing to the same topic filter), this made the Ra process a bottleneck. Another less severe performance issue was that the entire binding was being copied including the binding arguments containing the MQTT 5.0 subscription options such as: ``` {<<"x-mqtt-subscription-opts">>,table, [{<<"id">>,unsignedint,1}, {<<"no-local">>,bool,false}, {<<"qos">>,unsignedbyte,0}, {<<"retain-as-published">>,bool,false}, {<<"retain-handling">>,unsignedbyte,0}]}] ``` Replace the single ETS projection table with two purpose-built tables: 1. Trie edges table (ETS set, read_concurrency=true): - Row: `{{XSrc, ParentNodeId, Word}, ChildNodeId, ChildCount}` - XSrc = {VHost, ExchangeName} (compact 2-tuple of binaries) - NodeId = root | reference() - ChildCount tracks outgoing edges for garbage collection 2. Leaf bindings table (ETS ordered_set, read_concurrency=true): - Key: {NodeId, BindingKey, Dest} - Stored as 1-tuples: {{NodeId, BindingKey, Dest}} - No value column; all data is in the key to minimize copying The trie structure preserves O(depth * 3) routing complexity regardless of the number of overall bindings or wildcard filters. At each trie level, we probe at most 3 edges (literal word, <<"*">>, <<"#">>), each via ets:lookup_element/4 which copies only the ChildNodeId (a reference). The ordered_set for bindings provides: - O(log N) insert and delete per binding (no read-modify-write) - The binding key (needed for MQTT subscription identifiers and topic aliases) is part of the key, so it is returned directly during destination collection without additional lookups Collecting destinations at a matched trie leaf uses a hybrid strategy: - Fanout 0-2 (the common case: unicast, device + stream): up to 3 ets:next/2 probes. Each ets:next/2 call costs O(log N) because the CATree (used with read_concurrency) allocates a fresh tree traversal stack on each call. - Fanout > 2: ets:select/2 with a partially bound key does an O(log N) seek followed by an O(F) range scan. The match spec compilation overhead amortises over the larger result set. ets:lookup_element/4 (OTP 26+) returns a default value on miss instead of throwing badarg, and copies only the requested element on hit. This avoids both exception overhead (misses are common during trie traversal of <<"*">> and <<"#">> branches) and unnecessary data copying (we only need the ChildNodeId, not the full row). Trie node IDs are ephemeral (the tables are rebuilt when the Khepri projection is re-registered). make_ref() is fast, globally unique within a node, and has good hash distribution for the ETS set table. When a binding is deleted, the trie path from root to leaf is collected in a single downward walk (trie_follow_down_get_path). Empty nodes are then pruned bottom-up: a node is empty when its ChildCount is 0 and it has no bindings in the ordered_set table. Benchmarks below were run with 500K routing operations per scenario (on the same machine, back-to-back between main (v3) and this commit. Significant insert/delete improvements: Churn insert (8K bindings, 4 filters/client): ~1,120 vs ~810 ops/s (+38%) v3 did a read-modify-write of sets:set() per binding; v4 does a single ets:insert into the ordered_set plus trie edge updates. MQTT device insert (20K bindings): ~650 vs ~420 ops/s (+55%) Same mechanism as churn insert. Particularly impactful when many clients share the same wildcard filter (e.g. "broadcast.#"), since v3's sets:set() grew with each client while v4 inserts are O(log N) regardless. Same-key fanout insert (10K): ~415 vs ~290 ops/s (+43%) The worst case for v3: all 10K bindings share the same key, so each insert copies and rebuilds the growing sets:set(). Routing improvements: MQTT unicast (10K devices, 20K bindings): ~460K vs ~250K ops/s (+80%) Each route matches 1 queue among 10K unique exact keys plus 10K queues sharing "broadcast.#". v3 stored bindings in the same ETS row as the trie edge, so every trie lookup copied the entire sets:set(). v4 separates trie edges (small rows, set table) from bindings (ordered_set), so the trie walk copies only references. Large fanout (10K queues, same key): ~3,100 vs ~1,170 ops/s (+165%) v3 copied a 10K-element sets:set() out of ETS in a single ets:lookup, then called sets:to_list/1. v4 uses ets:select/2 with a partially bound key, which does an O(log N) seek and then an efficient O(F) range scan without intermediate set conversion. MQTT broadcast (10K fanout): ~0.6 vs ~0.9 ms/route (+50%) Same mechanism as above. Scenarios with no significant change (within benchmark noise): Exact match, wildcard *, wildcard #, mixed wildcards, and many wildcard filters showed no clear difference. Both v3 and v4 use a trie walk, so routing speed is comparable when the fanout is small and the bottleneck is trie traversal rather than destination collection.
1 parent 2bd6a02 commit 22464ce

File tree

6 files changed

+1112
-495
lines changed

6 files changed

+1112
-495
lines changed

deps/rabbit/include/rabbit_khepri.hrl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,9 @@
5656
?RABBITMQ_KHEPRI_EXCHANGE_PATH(
5757
VHost, Exchange, [user_permissions, Username])).
5858

59-
-define(RABBITMQ_KHEPRI_ROUTE_PATH(VHost, SrcName, Kind, DstName, RoutingKey),
59+
-define(RABBITMQ_KHEPRI_ROUTE_PATH(VHost, SrcName, Kind, DstName, BindingKey),
6060
?RABBITMQ_KHEPRI_EXCHANGE_PATH(
61-
VHost, SrcName, [bindings, Kind, DstName, RoutingKey])).
61+
VHost, SrcName, [bindings, Kind, DstName, BindingKey])).
6262

6363
-define(RABBITMQ_KHEPRI_QUEUE_PATH(VHost, Name),
6464
?RABBITMQ_KHEPRI_VHOST_PATH(VHost, [queues, Name])).

deps/rabbit/src/rabbit_db_binding.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -635,11 +635,11 @@ khepri_route_path(
635635
key = RoutingKey}) ->
636636
khepri_route_path(VHost, SrcName, Kind, DstName, RoutingKey).
637637

638-
khepri_route_path(VHost, SrcName, Kind, DstName, RoutingKey)
638+
khepri_route_path(VHost, SrcName, Kind, DstName, BindingKey)
639639
when ?IS_KHEPRI_PATH_CONDITION(Kind) andalso
640640
?IS_KHEPRI_PATH_CONDITION(DstName) andalso
641-
?IS_KHEPRI_PATH_CONDITION(RoutingKey) ->
642-
?RABBITMQ_KHEPRI_ROUTE_PATH(VHost, SrcName, Kind, DstName, RoutingKey).
641+
?IS_KHEPRI_PATH_CONDITION(BindingKey) ->
642+
?RABBITMQ_KHEPRI_ROUTE_PATH(VHost, SrcName, Kind, DstName, BindingKey).
643643

644644
khepri_route_path_to_args(Path) ->
645645
Pattern = khepri_route_path(

deps/rabbit/src/rabbit_db_topic_exchange.erl

Lines changed: 124 additions & 182 deletions
Original file line numberDiff line numberDiff line change
@@ -9,122 +9,36 @@
99

1010
-include_lib("rabbit_common/include/rabbit.hrl").
1111

12-
-export([set/1, delete_all_for_exchange/1, delete/1, match/3]).
12+
-export([match/3]).
1313

1414
%% Used by Khepri projections and the Mnesia-to-Khepri migration.
15-
-export([
16-
split_topic_key/1,
17-
split_topic_key_binary/1
18-
]).
15+
-export([split_topic_key_binary/1,
16+
split_topic_key/1]).
1917

20-
%% For testing
21-
-export([clear/0]).
22-
23-
-define(KHEPRI_PROJECTION, rabbit_khepri_topic_trie_v3).
18+
-define(TOPIC_TRIE_PROJECTION, rabbit_khepri_topic_trie_v4).
19+
-define(TOPIC_BINDING_PROJECTION, rabbit_khepri_topic_binding_v4).
2420

2521
-type match_result() :: [rabbit_types:binding_destination() |
2622
{rabbit_amqqueue:name(), rabbit_types:binding_key()}].
2723

2824
-define(COMPILED_TOPIC_SPLIT_PATTERN, cp_dot).
2925

30-
%% -------------------------------------------------------------------
31-
%% set().
32-
%% -------------------------------------------------------------------
33-
34-
-spec set(Binding) -> ok when
35-
Binding :: rabbit_types:binding().
36-
%% @doc Sets a topic binding.
37-
%%
38-
%% @private
39-
40-
set(#binding{}) ->
41-
ok.
42-
43-
%% -------------------------------------------------------------------
44-
%% delete_all_for_exchange().
45-
%% -------------------------------------------------------------------
46-
47-
-spec delete_all_for_exchange(ExchangeName) -> ok when
48-
ExchangeName :: rabbit_exchange:name().
49-
%% @doc Deletes all topic bindings for the exchange named `ExchangeName'
50-
%%
51-
%% @private
52-
53-
delete_all_for_exchange(_XName) ->
54-
ok.
55-
56-
%% -------------------------------------------------------------------
57-
%% delete().
58-
%% -------------------------------------------------------------------
59-
60-
-spec delete([Binding]) -> ok when
61-
Binding :: rabbit_types:binding().
62-
%% @doc Deletes all given topic bindings
63-
%%
64-
%% @private
65-
66-
delete(Bs) when is_list(Bs) ->
67-
ok.
68-
69-
%% -------------------------------------------------------------------
70-
%% match().
71-
%% -------------------------------------------------------------------
72-
7326
-spec match(rabbit_exchange:name(),
7427
rabbit_types:routing_key(),
7528
rabbit_exchange:route_opts()) -> match_result().
76-
%% @doc Finds the topic bindings matching the given exchange and routing key and returns
77-
%% the destination of the bindings potentially with the matched binding key.
78-
%%
79-
%% @returns destinations with matched binding key
80-
%%
81-
%% @private
82-
83-
match(XName, RoutingKey, Opts) ->
29+
match(#resource{virtual_host = VHost, name = XName}, RoutingKey, Opts) ->
8430
BKeys = maps:get(return_binding_keys, Opts, false),
8531
Words = split_topic_key_binary(RoutingKey),
86-
trie_match_in_khepri(XName, Words, BKeys).
87-
88-
%% -------------------------------------------------------------------
89-
%% clear().
90-
%% -------------------------------------------------------------------
91-
92-
-spec clear() -> ok.
93-
%% @doc Deletes all topic bindings
94-
%%
95-
%% @private
96-
97-
clear() ->
98-
ok.
99-
100-
%% --------------------------------------------------------------
101-
%% split_topic_key().
102-
%% --------------------------------------------------------------
103-
104-
-spec split_topic_key(RoutingKey) -> Words when
105-
RoutingKey :: binary(),
106-
Words :: [[byte()]].
107-
108-
split_topic_key(Key) ->
109-
split_topic_key(Key, [], []).
110-
111-
split_topic_key(<<>>, [], []) ->
112-
[];
113-
split_topic_key(<<>>, RevWordAcc, RevResAcc) ->
114-
lists:reverse([lists:reverse(RevWordAcc) | RevResAcc]);
115-
split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) ->
116-
split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]);
117-
split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) ->
118-
split_topic_key(Rest, [C | RevWordAcc], RevResAcc).
119-
120-
%% --------------------------------------------------------------
121-
%% split_topic_key_binary().
122-
%% --------------------------------------------------------------
32+
try
33+
trie_match({VHost, XName}, root, Words, BKeys, [])
34+
catch
35+
error:badarg ->
36+
[]
37+
end.
12338

12439
-spec split_topic_key_binary(RoutingKey) -> Words when
12540
RoutingKey :: binary(),
12641
Words :: [binary()].
127-
12842
split_topic_key_binary(<<>>) ->
12943
[];
13044
split_topic_key_binary(RoutingKey) ->
@@ -139,92 +53,120 @@ split_topic_key_binary(RoutingKey) ->
13953
end,
14054
binary:split(RoutingKey, Pattern, [global]).
14155

142-
%% --------------------------------------------------------------
143-
%% Internal
144-
%% --------------------------------------------------------------
145-
146-
-spec add_matched([rabbit_types:binding_destination() |
147-
{rabbit_types:binding_destination(), BindingArgs :: list()}],
148-
ReturnBindingKeys :: boolean(),
149-
match_result()) ->
150-
match_result().
151-
add_matched(Destinations, false, Acc) ->
152-
Destinations ++ Acc;
153-
add_matched(DestinationsArgs, true, Acc) ->
154-
lists:foldl(
155-
fun({DestQ = #resource{kind = queue}, BindingArgs}, L) ->
156-
case rabbit_misc:table_lookup(BindingArgs, <<"x-binding-key">>) of
157-
{longstr, BKey} ->
158-
[{DestQ, BKey} | L];
159-
_ ->
160-
[DestQ | L]
161-
end;
162-
({DestX, _BindingArgs}, L) ->
163-
[DestX | L]
164-
end, Acc, DestinationsArgs).
165-
166-
%% Khepri topic graph
167-
168-
trie_match_in_khepri(X, Words, BKeys) ->
169-
try
170-
trie_match_in_khepri(X, root, Words, BKeys, [])
171-
catch
172-
error:badarg ->
173-
[]
174-
end.
56+
-spec split_topic_key(RoutingKey) -> Words when
57+
RoutingKey :: binary(),
58+
Words :: [[byte()]].
59+
split_topic_key(Key) ->
60+
split_topic_key(Key, [], []).
17561

176-
trie_match_in_khepri(X, Node, [], BKeys, ResAcc0) ->
177-
Destinations = trie_bindings_in_khepri(X, Node, BKeys),
178-
ResAcc = add_matched(Destinations, BKeys, ResAcc0),
179-
trie_match_part_in_khepri(
180-
X, Node, <<"#">>,
181-
fun trie_match_skip_any_in_khepri/5, [], BKeys, ResAcc);
182-
trie_match_in_khepri(X, Node, [W | RestW] = Words, BKeys, ResAcc) ->
183-
lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) ->
184-
trie_match_part_in_khepri(
185-
X, Node, WArg, MatchFun, RestWArg, BKeys, Acc)
186-
end, ResAcc, [{W, fun trie_match_in_khepri/5, RestW},
187-
{<<"*">>, fun trie_match_in_khepri/5, RestW},
188-
{<<"#">>,
189-
fun trie_match_skip_any_in_khepri/5, Words}]).
190-
191-
trie_match_part_in_khepri(X, Node, Search, MatchFun, RestW, BKeys, ResAcc) ->
192-
case trie_child_in_khepri(X, Node, Search) of
193-
{ok, NextNode} -> MatchFun(X, NextNode, RestW, BKeys, ResAcc);
194-
error -> ResAcc
195-
end.
62+
split_topic_key(<<>>, [], []) ->
63+
[];
64+
split_topic_key(<<>>, RevWordAcc, RevResAcc) ->
65+
lists:reverse([lists:reverse(RevWordAcc) | RevResAcc]);
66+
split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) ->
67+
split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]);
68+
split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) ->
69+
split_topic_key(Rest, [C | RevWordAcc], RevResAcc).
19670

197-
trie_match_skip_any_in_khepri(X, Node, [], BKeys, ResAcc) ->
198-
trie_match_in_khepri(X, Node, [], BKeys, ResAcc);
199-
trie_match_skip_any_in_khepri(X, Node, [_ | RestW] = Words, BKeys, ResAcc) ->
200-
trie_match_skip_any_in_khepri(
201-
X, Node, RestW, BKeys,
202-
trie_match_in_khepri(X, Node, Words, BKeys, ResAcc)).
203-
204-
trie_child_in_khepri(X, Node, Word) ->
205-
case ets:lookup(
206-
?KHEPRI_PROJECTION,
207-
#trie_edge{exchange_name = X,
208-
node_id = Node,
209-
word = Word}) of
210-
[#topic_trie_edge_v2{node_id = NextNode}] -> {ok, NextNode};
211-
[] -> error
71+
%% ==============================================================
72+
%% Trie-based routing
73+
%%
74+
%% Uses two ETS tables:
75+
%%
76+
%% 1. Trie edges table (set): {Key, ChildNodeId, ChildCount}
77+
%% Key = {XSrc, ParentNodeId, Word}
78+
%% Navigation: ets:lookup_element/4 for O(1) edge traversal.
79+
%%
80+
%% 2. Leaf bindings table (ordered_set): {{NodeId, BindingKey, Dest}}
81+
%% Collection: ets:next/2 probes for fanout 0-2 (fast path), then
82+
%% ets:select/2 with a partially bound key for fanout > 2 (does an
83+
%% O(log N) seek followed by O(F) range scan).
84+
%%
85+
%% Routing walks the trie (branching on literal word, <<"*">>, <<"#">>)
86+
%% then collects destinations from the bindings table at each matching
87+
%% leaf. This is O(depth * 3) for the trie walk, plus O(log N) per
88+
%% leaf for fanout 0-2, or O(log N + F) per leaf for fanout F > 2.
89+
%% ==============================================================
90+
91+
trie_match(XSrc, Node, [], BKeys, Acc0) ->
92+
Acc1 = trie_bindings(Node, BKeys, Acc0),
93+
trie_match_try(XSrc, Node, <<"#">>,
94+
fun trie_match_skip_any/5,
95+
[], BKeys, Acc1);
96+
trie_match(XSrc, Node, [W | RestW] = Words, BKeys, Acc0) ->
97+
Acc1 = trie_match_try(XSrc, Node, W,
98+
fun trie_match/5,
99+
RestW, BKeys, Acc0),
100+
Acc2 = trie_match_try(XSrc, Node, <<"*">>,
101+
fun trie_match/5,
102+
RestW, BKeys, Acc1),
103+
trie_match_try(XSrc, Node, <<"#">>,
104+
fun trie_match_skip_any/5,
105+
Words, BKeys, Acc2).
106+
107+
trie_match_try(XSrc, Node, Word, MatchFun, RestW, BKeys, Acc) ->
108+
case ets:lookup_element(?TOPIC_TRIE_PROJECTION,
109+
{XSrc, Node, Word}, 2, undefined) of
110+
undefined ->
111+
Acc;
112+
NextNode ->
113+
MatchFun(XSrc, NextNode, RestW, BKeys, Acc)
212114
end.
213115

214-
trie_bindings_in_khepri(X, Node, BKeys) ->
215-
case ets:lookup(
216-
?KHEPRI_PROJECTION,
217-
#trie_edge{exchange_name = X,
218-
node_id = Node,
219-
word = bindings}) of
220-
[#topic_trie_edge_v2{node_id = {bindings, Bindings}}] ->
221-
[case BKeys of
222-
true ->
223-
{Dest, Args};
224-
false ->
225-
Dest
226-
end || #binding{destination = Dest,
227-
args = Args} <- sets:to_list(Bindings)];
228-
[] ->
229-
[]
116+
trie_match_skip_any(XSrc, Node, [], BKeys, Acc) ->
117+
trie_match(XSrc, Node, [], BKeys, Acc);
118+
trie_match_skip_any(XSrc, Node, [_ | RestW] = Words, BKeys, Acc) ->
119+
trie_match_skip_any(
120+
XSrc, Node, RestW, BKeys,
121+
trie_match(XSrc, Node, Words, BKeys, Acc)).
122+
123+
%% Collect all destinations bound at the given trie node.
124+
%%
125+
%% Uses ets:next/2 for up to two elements (fast path for the common
126+
%% fanout 0-2 cases), then switches to ets:select/2 when fanout > 2.
127+
%%
128+
%% ets:select/2 occurs the expensive match spec compilation overhead.
129+
%% For larger fanouts, the cost for compiling the match spec amortises.
130+
%% ets:select/2 occurs an O(log N) seek followed by an O(F) range scan,
131+
%% which is cheaper than F individual ets:next/2 calls
132+
%% (each O(log N) due to CATree fresh-stack allocation).
133+
trie_bindings(NodeId, BKeys, Acc) ->
134+
StartKey = {NodeId, <<>>, {}},
135+
case ets:next(?TOPIC_BINDING_PROJECTION, StartKey) of
136+
{NodeId, BKey1, Dest1} = Key1 ->
137+
case ets:next(?TOPIC_BINDING_PROJECTION, Key1) of
138+
{NodeId, BKey2, Dest2} = Key2 ->
139+
case ets:next(?TOPIC_BINDING_PROJECTION, Key2) of
140+
{NodeId, _, _} ->
141+
collect_select(NodeId, BKeys, Acc);
142+
_ ->
143+
Acc1 = collect_binding(Dest1, BKey1, BKeys, Acc),
144+
collect_binding(Dest2, BKey2, BKeys, Acc1)
145+
end;
146+
_ ->
147+
collect_binding(Dest1, BKey1, BKeys, Acc)
148+
end;
149+
_ ->
150+
Acc
230151
end.
152+
153+
collect_binding(#resource{kind = queue} = Dest, BindingKey, true, Acc) ->
154+
[{Dest, BindingKey} | Acc];
155+
collect_binding(Dest, _BindingKey, _ReturnBindingKeys, Acc) ->
156+
[Dest | Acc].
157+
158+
collect_select(NodeId, false, Acc) ->
159+
Dests = ets:select(?TOPIC_BINDING_PROJECTION,
160+
[{{{NodeId, '_', '$1'}}, [], ['$1']}]),
161+
Dests ++ Acc;
162+
collect_select(NodeId, true, Acc) ->
163+
DestsAndBKeys = ets:select(?TOPIC_BINDING_PROJECTION,
164+
[{{{NodeId, '$1', '$2'}}, [], [{{'$2', '$1'}}]}]),
165+
format_dest_bkeys(DestsAndBKeys, Acc).
166+
167+
format_dest_bkeys([], Acc) ->
168+
Acc;
169+
format_dest_bkeys([{#resource{kind = queue} = Dest, BKey} | Rest], Acc) ->
170+
format_dest_bkeys(Rest, [{Dest, BKey} | Acc]);
171+
format_dest_bkeys([{Dest, _BKey} | Rest], Acc) ->
172+
format_dest_bkeys(Rest, [Dest | Acc]).

0 commit comments

Comments
 (0)