Skip to content

Commit dc5a703

Browse files
authored
Merge pull request #12753 from rabbitmq/md/khepri-0-17
Bump Khepri to 0.17.0
2 parents 9bb5dc2 + 2754fb7 commit dc5a703

22 files changed

+540
-308
lines changed

.github/workflows/test-make-target.yaml

+2-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ jobs:
5757
uses: dsaltares/fetch-gh-release-asset@master
5858
if: inputs.mixed_clusters
5959
with:
60-
version: 'tags/v4.0.5'
60+
repo: 'rabbitmq/server-packages'
61+
version: 'tags/alphas.1744021065493'
6162
regex: true
6263
file: "rabbitmq-server-generic-unix-\\d.+\\.tar\\.xz"
6364
target: ./

deps/rabbit/src/rabbit_db_binding.erl

+32-16
Original file line numberDiff line numberDiff line change
@@ -837,17 +837,25 @@ delete_all_for_exchange_in_khepri(X = #exchange{name = XName}, OnlyDurable, Remo
837837
end,
838838
{deleted, X, Bindings, delete_for_destination_in_khepri(XName, OnlyDurable)}.
839839

840-
delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
841-
Path = khepri_route_path(
842-
VHost,
843-
Name,
844-
_Kind = ?KHEPRI_WILDCARD_STAR,
845-
_DstName = ?KHEPRI_WILDCARD_STAR,
846-
_RoutingKey = #if_has_data{}),
847-
{ok, Bindings} = khepri_tx_adv:delete_many(Path),
848-
maps:fold(fun(_P, #{data := Set}, Acc) ->
849-
sets:to_list(Set) ++ Acc
850-
end, [], Bindings).
840+
delete_for_source_in_khepri(#resource{virtual_host = VHost, name = SrcName}) ->
841+
Pattern = khepri_route_path(
842+
VHost,
843+
SrcName,
844+
?KHEPRI_WILDCARD_STAR, %% Kind
845+
?KHEPRI_WILDCARD_STAR, %% DstName
846+
#if_has_data{}), %% RoutingKey
847+
{ok, Bindings} = khepri_tx_adv:delete_many(Pattern),
848+
maps:fold(
849+
fun(Path, Props, Acc) ->
850+
case {Path, Props} of
851+
{?RABBITMQ_KHEPRI_ROUTE_PATH(
852+
VHost, SrcName, _Kind, _Name, _RoutingKey),
853+
#{data := Set}} ->
854+
sets:to_list(Set) ++ Acc;
855+
{_, _} ->
856+
Acc
857+
end
858+
end, [], Bindings).
851859

852860
%% -------------------------------------------------------------------
853861
%% delete_for_destination_in_mnesia().
@@ -892,14 +900,22 @@ delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) ->
892900
delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}, OnlyDurable) ->
893901
Pattern = khepri_route_path(
894902
VHost,
895-
_SrcName = ?KHEPRI_WILDCARD_STAR,
903+
?KHEPRI_WILDCARD_STAR, %% SrcName
896904
Kind,
897905
Name,
898-
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
906+
?KHEPRI_WILDCARD_STAR), %% RoutingKey
899907
{ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern),
900-
Bindings = maps:fold(fun(_, #{data := Set}, Acc) ->
901-
sets:to_list(Set) ++ Acc
902-
end, [], BindingsMap),
908+
Bindings = maps:fold(
909+
fun(Path, Props, Acc) ->
910+
case {Path, Props} of
911+
{?RABBITMQ_KHEPRI_ROUTE_PATH(
912+
VHost, _SrcName, Kind, Name, _RoutingKey),
913+
#{data := Set}} ->
914+
sets:to_list(Set) ++ Acc;
915+
{_, _} ->
916+
Acc
917+
end
918+
end, [], BindingsMap),
903919
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
904920
lists:keysort(#binding.source, Bindings), OnlyDurable).
905921

deps/rabbit/src/rabbit_db_exchange.erl

+16-10
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ update_in_khepri(XName, Fun) ->
331331
Path = khepri_exchange_path(XName),
332332
Ret1 = rabbit_khepri:adv_get(Path),
333333
case Ret1 of
334-
{ok, #{data := X, payload_version := Vsn}} ->
334+
{ok, #{Path := #{data := X, payload_version := Vsn}}} ->
335335
X1 = Fun(X),
336336
UpdatePath =
337337
khepri_path:combine_with_conditions(
@@ -534,8 +534,7 @@ next_serial_in_khepri(XName) ->
534534
Path = khepri_exchange_serial_path(XName),
535535
Ret1 = rabbit_khepri:adv_get(Path),
536536
case Ret1 of
537-
{ok, #{data := Serial,
538-
payload_version := Vsn}} ->
537+
{ok, #{Path := #{data := Serial, payload_version := Vsn}}} ->
539538
UpdatePath =
540539
khepri_path:combine_with_conditions(
541540
Path, [#if_payload_version{version = Vsn}]),
@@ -711,13 +710,20 @@ delete_all_in_khepri_tx(VHostName) ->
711710
{ok, NodeProps} = khepri_tx_adv:delete_many(Pattern),
712711
Deletions =
713712
maps:fold(
714-
fun(_Path, #{data := X}, Deletions) ->
715-
{deleted, #exchange{name = XName}, Bindings, XDeletions} =
716-
rabbit_db_binding:delete_all_for_exchange_in_khepri(
717-
X, false, true),
718-
Deletions1 = rabbit_binding:add_deletion(
719-
XName, X, deleted, Bindings, XDeletions),
720-
rabbit_binding:combine_deletions(Deletions, Deletions1)
713+
fun(Path, Props, Deletions) ->
714+
case {Path, Props} of
715+
{?RABBITMQ_KHEPRI_EXCHANGE_PATH(VHostName, _),
716+
#{data := X}} ->
717+
{deleted,
718+
#exchange{name = XName}, Bindings, XDeletions} =
719+
rabbit_db_binding:delete_all_for_exchange_in_khepri(
720+
X, false, true),
721+
Deletions1 = rabbit_binding:add_deletion(
722+
XName, X, deleted, Bindings, XDeletions),
723+
rabbit_binding:combine_deletions(Deletions, Deletions1);
724+
{_, _} ->
725+
Deletions
726+
end
721727
end, rabbit_binding:new_deletions(), NodeProps),
722728
{ok, Deletions}.
723729

deps/rabbit/src/rabbit_db_msup.erl

+3-2
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) ->
135135
mirroring_pid = Overall,
136136
childspec = ChildSpec},
137137
case rabbit_khepri:adv_get(Path) of
138-
{ok, #{data := #mirrored_sup_childspec{mirroring_pid = Pid},
139-
payload_version := Vsn}} ->
138+
{ok, #{Path := #{data := #mirrored_sup_childspec{mirroring_pid = Pid},
139+
payload_version := Vsn}}} ->
140140
case Overall of
141141
Pid ->
142142
Delegate;
@@ -160,6 +160,7 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) ->
160160
end
161161
end;
162162
_ ->
163+
%% FIXME: Not atomic with the get above.
163164
ok = rabbit_khepri:put(Path, S),
164165
start
165166
end.

deps/rabbit/src/rabbit_db_queue.erl

+45-22
Original file line numberDiff line numberDiff line change
@@ -411,8 +411,18 @@ delete_in_khepri(QueueName, OnlyDurable) ->
411411
rabbit_khepri:transaction(
412412
fun () ->
413413
Path = khepri_queue_path(QueueName),
414+
UsesUniformWriteRet = try
415+
khepri_tx:does_api_comply_with(uniform_write_ret)
416+
catch
417+
error:undef ->
418+
false
419+
end,
414420
case khepri_tx_adv:delete(Path) of
415-
{ok, #{data := _}} ->
421+
{ok, #{Path := #{data := _}}} when UsesUniformWriteRet ->
422+
%% we want to execute some things, as decided by rabbit_exchange,
423+
%% after the transaction.
424+
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
425+
{ok, #{data := _}} when not UsesUniformWriteRet ->
416426
%% we want to execute some things, as decided by rabbit_exchange,
417427
%% after the transaction.
418428
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
@@ -607,7 +617,7 @@ update_in_khepri(QName, Fun) ->
607617
Path = khepri_queue_path(QName),
608618
Ret1 = rabbit_khepri:adv_get(Path),
609619
case Ret1 of
610-
{ok, #{data := Q, payload_version := Vsn}} ->
620+
{ok, #{Path := #{data := Q, payload_version := Vsn}}} ->
611621
UpdatePath = khepri_path:combine_with_conditions(
612622
Path, [#if_payload_version{version = Vsn}]),
613623
Q1 = Fun(Q),
@@ -658,7 +668,7 @@ update_decorators_in_khepri(QName, Decorators) ->
658668
Path = khepri_queue_path(QName),
659669
Ret1 = rabbit_khepri:adv_get(Path),
660670
case Ret1 of
661-
{ok, #{data := Q1, payload_version := Vsn}} ->
671+
{ok, #{Path := #{data := Q1, payload_version := Vsn}}} ->
662672
Q2 = amqqueue:set_decorators(Q1, Decorators),
663673
UpdatePath = khepri_path:combine_with_conditions(
664674
Path, [#if_payload_version{version = Vsn}]),
@@ -1098,15 +1108,12 @@ delete_transient_in_khepri(FilterFun) ->
10981108
case rabbit_khepri:adv_get_many(PathPattern) of
10991109
{ok, Props} ->
11001110
Qs = maps:fold(
1101-
fun(Path0, #{data := Q, payload_version := Vsn}, Acc)
1111+
fun(Path, #{data := Q, payload_version := Vsn}, Acc)
11021112
when ?is_amqqueue(Q) ->
11031113
case FilterFun(Q) of
11041114
true ->
1105-
Path = khepri_path:combine_with_conditions(
1106-
Path0,
1107-
[#if_payload_version{version = Vsn}]),
11081115
QName = amqqueue:get_name(Q),
1109-
[{Path, QName} | Acc];
1116+
[{Path, Vsn, QName} | Acc];
11101117
false ->
11111118
Acc
11121119
end
@@ -1125,20 +1132,7 @@ do_delete_transient_queues_in_khepri([], _FilterFun) ->
11251132
do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
11261133
Res = rabbit_khepri:transaction(
11271134
fun() ->
1128-
rabbit_misc:fold_while_ok(
1129-
fun({Path, QName}, Acc) ->
1130-
%% Also see `delete_in_khepri/2'.
1131-
case khepri_tx_adv:delete(Path) of
1132-
{ok, #{data := _}} ->
1133-
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1134-
QName, false),
1135-
{ok, [{QName, Deletions} | Acc]};
1136-
{ok, _} ->
1137-
{ok, Acc};
1138-
{error, _} = Error ->
1139-
Error
1140-
end
1141-
end, [], Qs)
1135+
do_delete_transient_queues_in_khepri_tx(Qs, [])
11421136
end),
11431137
case Res of
11441138
{ok, Items} ->
@@ -1152,6 +1146,35 @@ do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
11521146
Error
11531147
end.
11541148

1149+
do_delete_transient_queues_in_khepri_tx([], Acc) ->
1150+
{ok, Acc};
1151+
do_delete_transient_queues_in_khepri_tx([{Path, Vsn, QName} | Rest], Acc) ->
1152+
%% Also see `delete_in_khepri/2'.
1153+
VersionedPath = khepri_path:combine_with_conditions(
1154+
Path, [#if_payload_version{version = Vsn}]),
1155+
UsesUniformWriteRet = try
1156+
khepri_tx:does_api_comply_with(uniform_write_ret)
1157+
catch
1158+
error:undef ->
1159+
false
1160+
end,
1161+
case khepri_tx_adv:delete(VersionedPath) of
1162+
{ok, #{Path := #{data := _}}} when UsesUniformWriteRet ->
1163+
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1164+
QName, false),
1165+
Acc1 = [{QName, Deletions} | Acc],
1166+
do_delete_transient_queues_in_khepri_tx(Rest, Acc1);
1167+
{ok, #{data := _}} when not UsesUniformWriteRet ->
1168+
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
1169+
QName, false),
1170+
Acc1 = [{QName, Deletions} | Acc],
1171+
do_delete_transient_queues_in_khepri_tx(Rest, Acc1);
1172+
{ok, _} ->
1173+
do_delete_transient_queues_in_khepri_tx(Rest, Acc);
1174+
{error, _} = Error ->
1175+
Error
1176+
end.
1177+
11551178
%% -------------------------------------------------------------------
11561179
%% foreach_transient().
11571180
%% -------------------------------------------------------------------

deps/rabbit/src/rabbit_db_rtparams.erl

+27-7
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ set_in_khepri(Key, Term) ->
5959
Record = #runtime_parameters{key = Key,
6060
value = Term},
6161
case rabbit_khepri:adv_put(Path, Record) of
62-
{ok, #{data := Params}} ->
62+
{ok, #{Path := #{data := Params}}} ->
6363
{old, Params#runtime_parameters.value};
6464
{ok, _} ->
6565
new
@@ -113,8 +113,16 @@ set_in_khepri_tx(Key, Term) ->
113113
Path = khepri_rp_path(Key),
114114
Record = #runtime_parameters{key = Key,
115115
value = Term},
116+
UsesUniformWriteRet = try
117+
khepri_tx:does_api_comply_with(uniform_write_ret)
118+
catch
119+
error:undef ->
120+
false
121+
end,
116122
case khepri_tx_adv:put(Path, Record) of
117-
{ok, #{data := Params}} ->
123+
{ok, #{Path := #{data := Params}}} when UsesUniformWriteRet ->
124+
{old, Params#runtime_parameters.value};
125+
{ok, #{data := Params}} when not UsesUniformWriteRet ->
118126
{old, Params#runtime_parameters.value};
119127
{ok, _} ->
120128
new
@@ -347,11 +355,23 @@ delete_vhost_in_mnesia_tx(VHostName) ->
347355
<- mnesia:match_object(?MNESIA_TABLE, Match, read)].
348356

349357
delete_vhost_in_khepri(VHostName) ->
350-
Path = khepri_vhost_rp_path(
351-
VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
352-
case rabbit_khepri:adv_delete_many(Path) of
353-
{ok, Props} ->
354-
{ok, rabbit_khepri:collect_payloads(Props)};
358+
Pattern = khepri_vhost_rp_path(
359+
VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
360+
case rabbit_khepri:adv_delete_many(Pattern) of
361+
{ok, NodePropsMap} ->
362+
RTParams =
363+
maps:fold(
364+
fun(Path, Props, Acc) ->
365+
case {Path, Props} of
366+
{?RABBITMQ_KHEPRI_VHOST_RUNTIME_PARAM_PATH(
367+
VHostName, _, _),
368+
#{data := RTParam}} ->
369+
[RTParam | Acc];
370+
{_, _} ->
371+
Acc
372+
end
373+
end, [], NodePropsMap),
374+
{ok, RTParams};
355375
{error, _} = Err ->
356376
Err
357377
end.

deps/rabbit/src/rabbit_db_user.erl

+34-12
Original file line numberDiff line numberDiff line change
@@ -628,20 +628,42 @@ clear_all_permissions_for_vhost_in_mnesia(VHostName) ->
628628
clear_all_permissions_for_vhost_in_khepri(VHostName) ->
629629
rabbit_khepri:transaction(
630630
fun() ->
631-
UserPermissionsPath = khepri_user_permission_path(
632-
?KHEPRI_WILDCARD_STAR, VHostName),
633-
TopicPermissionsPath = khepri_topic_permission_path(
634-
?KHEPRI_WILDCARD_STAR, VHostName,
635-
?KHEPRI_WILDCARD_STAR),
636-
{ok, UserProps} = khepri_tx_adv:delete_many(UserPermissionsPath),
637-
{ok, TopicProps} = khepri_tx_adv:delete_many(
638-
TopicPermissionsPath),
639-
Deletions = rabbit_khepri:collect_payloads(
640-
TopicProps,
641-
rabbit_khepri:collect_payloads(UserProps)),
642-
{ok, Deletions}
631+
clear_all_permissions_for_vhost_in_khepri_tx(VHostName)
643632
end, rw, #{timeout => infinity}).
644633

634+
clear_all_permissions_for_vhost_in_khepri_tx(VHostName) ->
635+
UserPermissionsPattern = khepri_user_permission_path(
636+
?KHEPRI_WILDCARD_STAR, VHostName),
637+
TopicPermissionsPattern = khepri_topic_permission_path(
638+
?KHEPRI_WILDCARD_STAR, VHostName,
639+
?KHEPRI_WILDCARD_STAR),
640+
{ok, UserNodePropsMap} = khepri_tx_adv:delete_many(UserPermissionsPattern),
641+
{ok, TopicNodePropsMap} = khepri_tx_adv:delete_many(
642+
TopicPermissionsPattern),
643+
Deletions0 =
644+
maps:fold(
645+
fun(Path, Props, Acc) ->
646+
case {Path, Props} of
647+
{?RABBITMQ_KHEPRI_USER_PERMISSION_PATH(VHostName, _),
648+
#{data := Permission}} ->
649+
[Permission | Acc];
650+
{_, _} ->
651+
Acc
652+
end
653+
end, [], UserNodePropsMap),
654+
Deletions1 =
655+
maps:fold(
656+
fun(Path, Props, Acc) ->
657+
case {Path, Props} of
658+
{?RABBITMQ_KHEPRI_TOPIC_PERMISSION_PATH(VHostName, _, _),
659+
#{data := Permission}} ->
660+
[Permission | Acc];
661+
{_, _} ->
662+
Acc
663+
end
664+
end, Deletions0, TopicNodePropsMap),
665+
{ok, Deletions1}.
666+
645667
%% -------------------------------------------------------------------
646668
%% get_topic_permissions().
647669
%% -------------------------------------------------------------------

deps/rabbit/src/rabbit_db_vhost.erl

+3-3
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ merge_metadata_in_khepri(VHostName, Metadata) ->
167167
Path = khepri_vhost_path(VHostName),
168168
Ret1 = rabbit_khepri:adv_get(Path),
169169
case Ret1 of
170-
{ok, #{data := VHost0, payload_version := DVersion}} ->
170+
{ok, #{Path := #{data := VHost0, payload_version := DVersion}}} ->
171171
VHost = vhost:merge_metadata(VHost0, Metadata),
172172
rabbit_log:debug("Updating a virtual host record ~p", [VHost]),
173173
Path1 = khepri_path:combine_with_conditions(
@@ -443,10 +443,10 @@ update_in_mnesia_tx(VHostName, UpdateFun)
443443
update_in_khepri(VHostName, UpdateFun) ->
444444
Path = khepri_vhost_path(VHostName),
445445
case rabbit_khepri:adv_get(Path) of
446-
{ok, #{data := V, payload_version := DVersion}} ->
446+
{ok, #{Path := #{data := V, payload_version := Vsn}}} ->
447447
V1 = UpdateFun(V),
448448
Path1 = khepri_path:combine_with_conditions(
449-
Path, [#if_payload_version{version = DVersion}]),
449+
Path, [#if_payload_version{version = Vsn}]),
450450
case rabbit_khepri:put(Path1, V1) of
451451
ok ->
452452
V1;

0 commit comments

Comments
 (0)