Skip to content

Commit e6253a0

Browse files
committed
Use Khepri multi-table projection for topic routing
Replace the two separate Khepri projections for topic routing (rabbit_khepri_topic_binding_v4 and rabbit_khepri_topic_trie_v4) with a single multi-table projection that manages both ETS tables. The new multi-table projection API from Khepri (khepri/khepri#366) allows a single projection to declare multiple named ETS tables via the `tables` option.
1 parent 3aefcee commit e6253a0

File tree

2 files changed

+12
-32
lines changed

2 files changed

+12
-32
lines changed

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 10 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,7 +1313,6 @@ register_projections() ->
13131313
fun register_rabbit_bindings_projection/0,
13141314
fun register_rabbit_route_by_source_key_projection/0,
13151315
fun register_rabbit_route_by_source_projection/0,
1316-
fun register_rabbit_topic_binding_projection/0,
13171316
fun register_rabbit_topic_trie_projection/0],
13181317
rabbit_misc:for_each_while_ok(
13191318
fun(RegisterFun) ->
@@ -1535,6 +1534,7 @@ projection_fun_for_sets(MapFun) ->
15351534
end.
15361535

15371536
%% Topic routing via trie + ordered_set ETS projections (v4).
1537+
%% Uses a single Khepri projection with two ETS tables:
15381538
%%
15391539
%% Trie edges table (set) for fast trie navigation during routing:
15401540
%% Row: {{XSrc, ParentNodeId, Word}, ChildNodeId, ChildCount}
@@ -1547,18 +1547,7 @@ projection_fun_for_sets(MapFun) ->
15471547
%% Word = binary() (a single topic segment, e.g. <<"foo">>, <<"*">>, <<"#">>)
15481548
%% ChildCount = non_neg_integer() (number of outgoing edges)
15491549
%% Dest = #resource{}
1550-
register_rabbit_topic_binding_projection() ->
1551-
PathPattern = topic_binding_path_pattern(),
1552-
BindingPFun = fun(_Table, _Path, _OldProps, _NewProps) -> ok end,
1553-
BindingOpts = #{type => ordered_set,
1554-
keypos => 1,
1555-
read_concurrency => true},
1556-
Projection = khepri_projection:new(
1557-
rabbit_khepri_topic_binding_v4, BindingPFun, BindingOpts),
1558-
khepri:register_projection(?STORE_ID, PathPattern, Projection).
1559-
15601550
register_rabbit_topic_trie_projection() ->
1561-
BindingTabName = rabbit_khepri_topic_binding_v4,
15621551
ShouldProcessFun =
15631552
fun (rabbit_db_topic_exchange, split_topic_key_binary, 1, _From) ->
15641553
%% This function uses `persistent_term' to store a lazily compiled
@@ -1574,12 +1563,16 @@ register_rabbit_topic_trie_projection() ->
15741563
(M, F, A, From) ->
15751564
khepri_tx_adv:should_process_function(M, F, A, From)
15761565
end,
1577-
Opts = #{type => set,
1566+
Opts = #{tables => #{rabbit_khepri_topic_trie_v4 =>
1567+
#{type => set},
1568+
rabbit_khepri_topic_binding_v4 =>
1569+
#{type => ordered_set}},
15781570
keypos => 1,
1579-
standalone_fun_options => #{should_process_function => ShouldProcessFun},
1580-
read_concurrency => true},
1581-
PFun = fun(TrieTab, Path, OldProps, NewProps) ->
1582-
BindingTab = ensure_topic_binding_table(BindingTabName),
1571+
read_concurrency => true,
1572+
standalone_fun_options => #{should_process_function => ShouldProcessFun}},
1573+
PFun = fun(Tables, Path, OldProps, NewProps) ->
1574+
#{rabbit_khepri_topic_trie_v4 := TrieTab,
1575+
rabbit_khepri_topic_binding_v4 := BindingTab} = Tables,
15831576
{VHost, ExchangeName, Kind, DstName, BindingKey} =
15841577
rabbit_db_binding:khepri_route_path_to_args(Path),
15851578
XSrc = {VHost, ExchangeName},
@@ -1615,20 +1608,6 @@ topic_binding_path_pattern() ->
16151608
_DstName = ?KHEPRI_WILDCARD_STAR,
16161609
_BindingKey = ?KHEPRI_WILDCARD_STAR).
16171610

1618-
%% TODO: Khepri should support declaring dependencies between projections
1619-
%% so that restore order is guaranteed. Until then, we lazily create the
1620-
%% binding table here as a safety net: during restore_projections, the trie
1621-
%% projection may be triggered before the binding projection's table is
1622-
%% initialised due to Khepri's prepend-based list ordering.
1623-
ensure_topic_binding_table(Name) ->
1624-
case ets:whereis(Name) of
1625-
undefined ->
1626-
ets:new(Name, [ordered_set, named_table, protected,
1627-
{read_concurrency, true}]);
1628-
Tid ->
1629-
Tid
1630-
end.
1631-
16321611
%% Walk down the trie following the given words, creating edges and
16331612
%% intermediate nodes as needed. Returns the leaf node ID.
16341613
%%

rabbitmq-components.mk

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ dep_credentials_obfuscation = hex 3.5.0
4949
dep_cuttlefish = hex 3.6.0
5050
dep_gen_batch_server = hex 0.8.8
5151
dep_jose = hex 1.11.10
52-
dep_khepri = hex 0.17.5
52+
# TODO point Khepri to hex when https://github.com/rabbitmq/khepri/pull/366 is released
53+
dep_khepri = git https://github.com/rabbitmq/khepri.git support-multi-table-projections
5354
dep_khepri_mnesia_migration = hex 0.8.1
5455
dep_meck = hex 1.0.0
5556
dep_osiris = git https://github.com/rabbitmq/osiris v1.12.0

0 commit comments

Comments
 (0)