Skip to content

Commit d7877d4

Browse files
committed
Remove behaviour & use context map
1 parent bc566d3 commit d7877d4

13 files changed

+147
-158
lines changed

deps/rabbit/Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ PARALLEL_CT_SET_3_B = cluster_upgrade list_consumers_sanity_check list_queues_on
273273
PARALLEL_CT_SET_3_C = cli_forget_cluster_node feature_flags_v2 mc_unit message_containers_deaths_v2 message_size_limit metadata_store_migration
274274
PARALLEL_CT_SET_3_D = metadata_store_phase1 metrics mirrored_supervisor peer_discovery_classic_config proxy_protocol runtime_parameters unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor
275275

276-
PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_incoming_message_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue
276+
PARALLEL_CT_SET_4_A = clustering_events rabbit_local_random_exchange rabbit_message_interceptor rabbitmq_4_0_deprecations unit_pg_local unit_plugin_directories unit_plugin_versioning unit_policy_validators unit_priority_queue
277277
PARALLEL_CT_SET_4_B = per_user_connection_tracking per_vhost_connection_limit rabbit_fifo_dlx_integration rabbit_fifo_int
278278
PARALLEL_CT_SET_4_C = msg_size_metrics unit_msg_size_metrics per_vhost_msg_store per_vhost_queue_limit priority_queue upgrade_preparation vhost
279279
PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info publisher_confirms_parallel queue_type rabbitmq_queues_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing rabbit_amqqueue

deps/rabbit/ct.test.spec

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@
115115
, rabbit_fifo_prop_SUITE
116116
, rabbit_fifo_v0_SUITE
117117
, rabbit_local_random_exchange_SUITE
118-
, rabbit_incoming_message_interceptor_SUITE
118+
, rabbit_message_interceptor_SUITE
119119
, rabbit_stream_coordinator_SUITE
120120
, rabbit_stream_sac_coordinator_SUITE
121121
, rabbitmq_4_0_deprecations_SUITE

deps/rabbit/priv/schema/rabbit.schema

+51-14
Original file line numberDiff line numberDiff line change
@@ -2658,14 +2658,17 @@ end}.
26582658
{mapping, "message_interceptors.incoming.$interceptor.overwrite", "rabbit.incoming_message_interceptors", [
26592659
{datatype, {enum, [true, false]}}]}.
26602660

2661-
% Pseudo-key to include the interceptor in the list of interceptors.
2661+
% Pseudo-key to include the interceptor in the list of interceptors.
26622662
% - If any other configuration is provided for the interceptor this
2663-
% configuration is not required.
2663+
% configuration is not required.
26642664
% - If no other configuration is provided, this one is required so that the
2665-
% interceptor gets invoked.
2665+
% interceptor gets invoked.
26662666
{mapping, "message_interceptors.incoming.$interceptor.enabled", "rabbit.incoming_message_interceptors", [
26672667
{datatype, {enum, [true]}}]}.
26682668

2669+
{mapping, "message_interceptors.outgoing.$interceptor.enabled", "rabbit.outgoing_message_interceptors", [
2670+
{datatype, {enum, [true]}}]}.
2671+
26692672
{mapping, "message_interceptors.incoming.set_header_timestamp.overwrite", "rabbit.incoming_message_interceptors", [
26702673
{datatype, {enum, [true, false]}}]}.
26712674

@@ -2674,19 +2677,21 @@ end}.
26742677

26752678
{translation, "rabbit.incoming_message_interceptors",
26762679
fun(Conf) ->
2677-
case cuttlefish_variable:filter_by_prefix("message_interceptors.incoming", Conf) of
2678-
[] ->
2679-
cuttlefish:unset();
2680-
L ->
2681-
InterceptorsConfig = [
2680+
case cuttlefish_variable:filter_by_prefix("message_interceptors.incoming", Conf) of
2681+
[] ->
2682+
cuttlefish:unset();
2683+
L ->
2684+
InterceptorsConfig = [
26822685
{Module0, Config, Value}
26832686
|| {["message_interceptors", "incoming", Module0, Config], Value} <- L
26842687
],
26852688
{Result, Order0} = lists:foldl(
2686-
fun({Interceptor0, Key0, Value}, {Acc, Order}) ->
2689+
fun({Interceptor0, Key0, Value}, {Acc, Order}) ->
26872690
Interceptor = list_to_atom(Interceptor0),
26882691
Key = list_to_atom(Key0),
2689-
MapPutFun = fun(Key, Value) -> fun(Old) -> maps:put(Key, Value, Old) end end,
2692+
MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end,
2693+
% This Interceptor -> Module alias exists for
2694+
% compatibility reasons
26902695
Module = case Interceptor of
26912696
set_header_timestamp ->
26922697
rabbit_header_timestamp_interceptor;
@@ -2695,10 +2700,10 @@ end}.
26952700
_ ->
26962701
Interceptor
26972702
end,
2698-
NewAcc =
2703+
NewAcc =
26992704
maps:update_with(
2700-
Module,
2701-
MapPutFun(Key, Value),
2705+
Module,
2706+
MapPutFun,
27022707
#{Key => Value},
27032708
Acc),
27042709
{NewAcc, [Module| Order]}
@@ -2708,7 +2713,39 @@ end}.
27082713
),
27092714
Order = lists:uniq(Order0),
27102715
[{O, maps:without([enabled], maps:get(O, Result))} || O <- Order]
2711-
end
2716+
end
2717+
end
2718+
}.
2719+
2720+
{translation, "rabbit.outgoing_message_interceptors",
2721+
fun(Conf) ->
2722+
case cuttlefish_variable:filter_by_prefix("message_interceptors.outgoing", Conf) of
2723+
[] ->
2724+
cuttlefish:unset();
2725+
L ->
2726+
InterceptorsConfig = [
2727+
{Module0, Config, Value}
2728+
|| {["message_interceptors", "outgoing", Module0, Config], Value} <- L
2729+
],
2730+
{Result, Order0} = lists:foldl(
2731+
fun({Interceptor0, Key0, Value}, {Acc, Order}) ->
2732+
Module = list_to_atom(Interceptor0),
2733+
Key = list_to_atom(Key0),
2734+
MapPutFun = fun(Old) -> maps:put(Key, Value, Old) end,
2735+
NewAcc =
2736+
maps:update_with(
2737+
Module,
2738+
MapPutFun,
2739+
#{Key => Value},
2740+
Acc),
2741+
{NewAcc, [Module| Order]}
2742+
end,
2743+
{#{}, []},
2744+
InterceptorsConfig
2745+
),
2746+
Order = lists:uniq(Order0),
2747+
[{O, maps:without([enabled], maps:get(O, Result))} || O <- Order]
2748+
end
27122749
end
27132750
}.
27142751

deps/rabbit/src/rabbit_amqp_session.erl

+11-17
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
-compile({inline, [maps_update_with/4]}).
1111

1212
-behaviour(gen_server).
13-
-behaviour(rabbit_protocol_accessor).
1413

1514
-include_lib("kernel/include/logger.hrl").
1615
-include_lib("rabbit_common/include/rabbit.hrl").
@@ -106,9 +105,6 @@
106105
handle_info/2,
107106
format_status/1]).
108107

109-
% `rabbit_protocol_accessor` behaviour callbacks
110-
-export([get_property/2]).
111-
112108
-import(rabbit_amqp_util,
113109
[protocol_error/3]).
114110
-import(serial_number,
@@ -287,7 +283,8 @@
287283
max_handle :: link_handle(),
288284
max_incoming_window :: pos_integer(),
289285
max_link_credit :: pos_integer(),
290-
max_queue_credit :: pos_integer()
286+
max_queue_credit :: pos_integer(),
287+
msg_interceptor_ctx :: map()
291288
}).
292289

293290
-record(state, {
@@ -478,7 +475,10 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ContainerId,
478475
max_handle = EffectiveHandleMax,
479476
max_incoming_window = MaxIncomingWindow,
480477
max_link_credit = MaxLinkCredit,
481-
max_queue_credit = MaxQueueCredit
478+
max_queue_credit = MaxQueueCredit,
479+
msg_interceptor_ctx = #{user => User,
480+
vhost => Vhost,
481+
conn_name => ConnName}
482482
}}}.
483483

484484
terminate(_Reason, #state{incoming_links = IncomingLinks,
@@ -2415,7 +2415,8 @@ incoming_link_transfer(
24152415
trace_state = Trace,
24162416
conn_name = ConnName,
24172417
channel_num = ChannelNum,
2418-
max_link_credit = MaxLinkCredit}}) ->
2418+
max_link_credit = MaxLinkCredit,
2419+
msg_interceptor_ctx = MsgInterceptorCtx}}) ->
24192420

24202421
{PayloadBin, DeliveryId, Settled} =
24212422
case MultiTransfer of
@@ -2440,7 +2441,9 @@ incoming_link_transfer(
24402441
Mc0 = mc:init(mc_amqp, PayloadBin, #{}),
24412442
case lookup_target(LinkExchange, LinkRKey, Mc0, Vhost, User, PermCache0) of
24422443
{ok, X, RoutingKeys, Mc1, PermCache} ->
2443-
Mc2 = rabbit_incoming_message_interceptor:intercept(Mc1, ?MODULE, State0),
2444+
Mc2 = rabbit_message_interceptor:intercept(Mc1,
2445+
MsgInterceptorCtx,
2446+
incoming),
24442447
check_user_id(Mc2, User),
24452448
TopicPermCache = check_write_permitted_on_topics(
24462449
X, User, RoutingKeys, TopicPermCache0),
@@ -3878,15 +3881,6 @@ format_status(
38783881
topic_permission_cache => TopicPermissionCache},
38793882
maps:update(state, State, Status).
38803883

3881-
get_property(user, #state{cfg = #cfg{user = User}}) ->
3882-
User;
3883-
get_property(vhost, #state{cfg = #cfg{vhost = VHost}}) ->
3884-
VHost;
3885-
get_property(connection_name, #state{cfg = #cfg{conn_name = ConnectionName}}) ->
3886-
ConnectionName;
3887-
get_property(_, _) ->
3888-
undefined.
3889-
38903884
-spec info(pid()) ->
38913885
{ok, rabbit_types:infos()} | {error, term()}.
38923886
info(Pid) ->

deps/rabbit/src/rabbit_channel.erl

+11-18
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77

88
-module(rabbit_channel).
99

10-
-behaviour(rabbit_protocol_accessor).
11-
1210
%% rabbit_channel processes represent an AMQP 0-9-1 channels.
1311
%%
1412
%% Connections parse protocol frames coming from clients and
@@ -62,9 +60,6 @@
6260
prioritise_call/4, prioritise_cast/3, prioritise_info/3,
6361
format_message_queue/2]).
6462

65-
% `rabbit_protocol_accessor` behaviour callbacks
66-
-export([get_property/2]).
67-
6863
%% Internal
6964
-export([list_local/0, emit_info_local/3, deliver_reply_local/3]).
7065
-export([get_vhost/1, get_user/1]).
@@ -115,7 +110,8 @@
115110
authz_context,
116111
max_consumers, % taken from rabbit.consumer_max_per_channel
117112
%% defines how ofter gc will be executed
118-
writer_gc_threshold
113+
writer_gc_threshold,
114+
msg_interceptor_ctx
119115
}).
120116

121117
-record(pending_ack, {
@@ -514,7 +510,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
514510
consumer_timeout = ConsumerTimeout,
515511
authz_context = OptionalVariables,
516512
max_consumers = MaxConsumers,
517-
writer_gc_threshold = GCThreshold
513+
writer_gc_threshold = GCThreshold,
514+
msg_interceptor_ctx = #{user => User,
515+
vhost => VHost,
516+
conn_name => ConnName}
518517
},
519518
limiter = Limiter,
520519
tx = none,
@@ -819,15 +818,6 @@ get_consumer_timeout() ->
819818
undefined
820819
end.
821820

822-
get_property(user, #ch{cfg = #conf{user = User}}) ->
823-
User;
824-
get_property(vhost, #ch{cfg = #conf{virtual_host = VHost}}) ->
825-
VHost;
826-
get_property(connection_name, #ch{cfg = #conf{conn_name = ConnectionName}}) ->
827-
ConnectionName;
828-
get_property(_, _) ->
829-
undefined.
830-
831821
%%---------------------------------------------------------------------------
832822

833823
reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
@@ -1182,7 +1172,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
11821172
user = #user{username = Username} = User,
11831173
trace_state = TraceState,
11841174
authz_context = AuthzContext,
1185-
writer_gc_threshold = GCThreshold
1175+
writer_gc_threshold = GCThreshold,
1176+
msg_interceptor_ctx = MsgInterceptorCtx
11861177
},
11871178
tx = Tx,
11881179
confirm_enabled = ConfirmEnabled,
@@ -1221,7 +1212,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
12211212
rabbit_misc:precondition_failed("invalid message: ~tp", [Reason]);
12221213
{ok, Message0} ->
12231214
check_write_permitted_on_topics(Exchange, User, Message0, AuthzContext),
1224-
Message = rabbit_incoming_message_interceptor:intercept(Message0, ?MODULE, State),
1215+
Message = rabbit_message_interceptor:intercept(Message0,
1216+
MsgInterceptorCtx,
1217+
incoming),
12251218
check_user_id_header(Message, User),
12261219
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
12271220
[deliver_reply(RK, Message) || {virtual_reply_queue, RK} <- QNames],

deps/rabbit/src/rabbit_header_routing_node_interceptor.erl

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
-module(rabbit_header_routing_node_interceptor).
2-
-behaviour(rabbit_incoming_message_interceptor).
2+
-behaviour(rabbit_message_interceptor).
33

44
-define(HEADER_ROUTING_NODE, <<"x-routed-by">>).
55

66
-export([
7-
intercept/4
7+
intercept/3
88
]).
99

10-
intercept(Msg, _ProtoMod, _ProtoState, Config) ->
10+
intercept(Msg, _MsgInterceptorCtx, Config) ->
1111
Node = atom_to_binary(node()),
1212
Overwrite = maps:get(overwrite, Config, false),
13-
rabbit_incoming_message_interceptor:set_msg_annotation(
13+
rabbit_message_interceptor:set_msg_annotation(
1414
Msg,
1515
?HEADER_ROUTING_NODE,
1616
Node,

deps/rabbit/src/rabbit_header_timestamp_interceptor.erl

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
-module(rabbit_header_timestamp_interceptor).
2-
-behaviour(rabbit_incoming_message_interceptor).
2+
-behaviour(rabbit_message_interceptor).
33

44
-include("mc.hrl").
55

66
-define(HEADER_TIMESTAMP, <<"timestamp_in_ms">>).
77

88
-export([
9-
intercept/4
9+
intercept/3
1010
]).
1111

12-
intercept(Msg0, _ProtoMod, _ProtoState, Config) ->
12+
intercept(Msg0, _MsgInterceptorCtx, Config) ->
1313
Ts = mc:get_annotation(?ANN_RECEIVED_AT_TIMESTAMP, Msg0),
1414
Overwrite = maps:get(overwrite, Config, false),
15-
Msg = rabbit_incoming_message_interceptor:set_msg_annotation(
15+
Msg = rabbit_message_interceptor:set_msg_annotation(
1616
Msg0,
1717
?HEADER_TIMESTAMP,
1818
Ts,

deps/rabbit/src/rabbit_incoming_message_interceptor.erl

-38
This file was deleted.

0 commit comments

Comments
 (0)