Skip to content

Commit c4bf535

Browse files
committed
Fix amqp091->amqp10 shovel with complex headers
Some AMQP 0.9.1 headers, in particular x-death headers, cannot be set as application properties. Before this change, trying to shovel dead-lettered messages from an AMQP 0.9.1 source to AMQP 1.0 destination would fail with: ``` reason: {badarg, [{unicode,characters_to_binary, [[{table, [{<<"count">>,long,1}, {<<"reason">>,longstr,<<"maxlen">>}, {<<"queue">>,longstr,<<"tmp">>}, {<<"time">>,timestamp,1745575728}, {<<"exchange">>,longstr,<<>>}, {<<"routing-keys">>,array, [{longstr,<<"tmp">>}]}]}]], [{file,"unicode.erl"}, {line,1219}, {error_info,#{module => erl_stdlib_errors}}]}, {amqp10_client_types,utf8,1, [{file,"amqp10_client_types.erl"},{line,99}]}, {amqp10_msg,'-set_application_properties/2-fun-0-',3, [{file,"amqp10_msg.erl"},{line,385}]}, {maps,fold_1,4,[{file,"maps.erl"},{line,860}]}, {amqp10_msg,set_application_properties,2, [{file,"amqp10_msg.erl"},{line,384}]}, {maps,fold_1,4,[{file,"maps.erl"},{line,860}]}, {rabbit_amqp10_shovel,forward,4, [{file,"rabbit_amqp10_shovel.erl"},{line,337}]}, {rabbit_shovel_worker,handle_info,2, [{file,"rabbit_shovel_worker.erl"},{line,104}]}]} ```
1 parent d2b5f51 commit c4bf535

File tree

3 files changed

+35
-8
lines changed

3 files changed

+35
-8
lines changed

deps/rabbitmq_shovel/Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ dep_amqp10_client = git https://github.com/rabbitmq/rabbitmq-amqp1.0-client.git
2323

2424
LOCAL_DEPS = crypto
2525

26-
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_amqp1_0 meck
26+
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers rabbitmq_amqp1_0 rabbitmq_amqp_client meck
2727

2828
PLT_APPS += rabbitmq_cli
2929

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

+9-6
Original file line numberDiff line numberDiff line change
@@ -442,9 +442,12 @@ bin_to_hex(Bin) ->
442442
true -> N + $0 end>>
443443
|| <<N:4>> <= Bin>>.
444444

445-
is_amqp10_compat(T) ->
446-
is_binary(T) orelse
447-
is_number(T) orelse
448-
%% TODO: not all lists are compatible
449-
is_list(T) orelse
450-
is_boolean(T).
445+
-spec is_amqp10_compat(T :: term()) -> boolean().
446+
is_amqp10_compat(T) when is_binary(T) -> true;
447+
is_amqp10_compat(T) when is_number(T) -> true;
448+
is_amqp10_compat(T) when is_boolean(T) -> true;
449+
is_amqp10_compat(T) when is_list(T) ->
450+
%% A list is compatible only if all its elements are also compatible.
451+
lists:all(fun is_amqp10_compat/1, T);
452+
is_amqp10_compat(_) ->
453+
false.

deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl

+25-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ groups() ->
2727
autodelete_amqp091_dest_on_confirm,
2828
autodelete_amqp091_dest_on_publish,
2929
simple_amqp10_dest,
30-
simple_amqp10_src
30+
simple_amqp10_src,
31+
amqp091_to_amqp10_with_dead_lettering
3132
]},
3233
{with_map_config, [], [
3334
simple,
@@ -96,6 +97,29 @@ simple_amqp10_dest(Config) ->
9697
<<"src-queue">>)
9798
end).
9899

100+
amqp091_to_amqp10_with_dead_lettering(Config) ->
101+
Dest = ?config(destq, Config),
102+
Src = ?config(srcq, Config),
103+
TmpQ = <<"tmp">>,
104+
with_session(Config,
105+
fun (Sess) ->
106+
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Sess, <<"my link pair">>),
107+
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, TmpQ,
108+
#{arguments =>#{<<"x-max-length">> => {uint, 0},
109+
<<"x-dead-letter-exchange">> => {utf8, <<"">>},
110+
<<"x-dead-letter-routing-key">> => {utf8, Src}}}),
111+
{ok, Sender} = amqp10_client:attach_sender_link(Sess,
112+
<<"sender-tmp">>,
113+
<<"/queues/", TmpQ/binary>>,
114+
unsettled,
115+
unsettled_state),
116+
ok = await_amqp10_event(link, Sender, attached),
117+
expect_empty(Sess, TmpQ),
118+
test_amqp10_destination(Config, Src, Dest, Sess, <<"amqp091">>, <<"src-queue">>),
119+
%% publish to tmp, it should be dead-lettered to src and then shovelled to dest
120+
_ = publish_expect(Sess, TmpQ, Dest, <<"tag1">>, <<"hello">>)
121+
end).
122+
99123
test_amqp10_destination(Config, Src, Dest, Sess, Protocol, ProtocolSrc) ->
100124
MapConfig = ?config(map_config, Config),
101125
shovel_test_utils:set_param(Config, <<"test">>,

0 commit comments

Comments
 (0)