Skip to content

Commit 3bf5a2f

Browse files
Merge pull request #16031 from rabbitmq/mergify/bp/v4.2.x/pr-16030
Follow-up to #16020, #16023: integrate the special value of `0` (backport #16029) (backport #16030)
2 parents 76d9def + ed5abfd commit 3bf5a2f

File tree

5 files changed

+94
-8
lines changed

5 files changed

+94
-8
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@
110110
-ifdef(TEST).
111111
-export([ensure_token_expiry_timer/2,
112112
evaluate_state_after_secret_update/4,
113-
clean_subscriptions/4]).
113+
clean_subscriptions/4,
114+
negotiate_frame_max/2]).
114115
-endif.
115116

116117
callback_mode() ->
@@ -1470,10 +1471,8 @@ handle_frame_pre_auth(_Transport,
14701471
{tune, FrameMax, Heartbeat}) ->
14711472
?LOG_DEBUG("Tuning response ~tp ~tp ",
14721473
[FrameMax, Heartbeat]),
1473-
%% The client must not negotiate a frame_max larger than the
1474-
%% value advertised by the server, so clamp it to the configured
1475-
%% ceiling before pushing the negotiated value into the parser.
1476-
NegotiatedFrameMax = min(FrameMax, ConfiguredFrameMax),
1474+
%% 0 on either side means "no limit" and must not be clamped to 0.
1475+
NegotiatedFrameMax = negotiate_frame_max(FrameMax, ConfiguredFrameMax),
14771476
CoreState = rabbit_stream_core:set_frame_max(NegotiatedFrameMax,
14781477
CoreState0),
14791478
Parent = self(),
@@ -1567,6 +1566,16 @@ handle_frame_pre_auth(_Transport, Connection, State, Command) ->
15671566
[Command]),
15681567
{Connection#stream_connection{connection_step = failure}, State}.
15691568

1569+
%% 0 on either side means "no limit"; fall back to the other value.
1570+
-spec negotiate_frame_max(non_neg_integer(), non_neg_integer()) ->
1571+
non_neg_integer().
1572+
negotiate_frame_max(0, Configured) ->
1573+
Configured;
1574+
negotiate_frame_max(Client, 0) ->
1575+
Client;
1576+
negotiate_frame_max(Client, Configured) ->
1577+
min(Client, Configured).
1578+
15701579
auth_fail(Username, Msg, Args, Connection, ConnectionState) ->
15711580
notify_auth_result(Username,
15721581
user_authentication_failure,

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ groups() ->
7777
oversized_frame_rejected_post_auth,
7878
oversized_frame_rejected_after_tune_negotiation,
7979
frame_max_clamped_when_client_negotiates_higher,
80+
client_tune_response_with_zero_frame_max_is_unlimited,
8081
test_stream_test_utils,
8182
sac_subscription_with_partition_index_conflict_should_return_error,
8283
test_metadata_with_advertised_hints,
@@ -671,6 +672,32 @@ frame_max_clamped_when_client_negotiates_higher(Config) ->
671672
?assertMatch({request, _, {close, ?RESPONSE_CODE_FRAME_TOO_LARGE, _}}, Cmd),
672673
?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)).
673674

675+
%% The stream protocol TUNE exchange follows the AMQP 0-9-1 convention
676+
%% where 0 means "no limit". A client that echoes FrameMax = 0 back to
677+
%% the server must not cause the server to clamp its parser to a
678+
%% 0-byte ceiling; the configured value must apply instead.
679+
client_tune_response_with_zero_frame_max_is_unlimited(Config) ->
680+
Transport = gen_tcp,
681+
Port = get_stream_port(Config),
682+
Opts = [{active, false}, {mode, binary}],
683+
{ok, S} = Transport:connect("localhost", Port, Opts),
684+
C0 = rabbit_stream_core:init(0),
685+
C1 = test_peer_properties(Transport, S, C0),
686+
C2 = sasl_handshake(Transport, S, C1),
687+
C3 = test_plain_sasl_authenticate(Transport, S, C2, <<"guest">>),
688+
{{tune, _ServerFrameMax, _}, C4} = receive_commands(Transport, S, C3),
689+
TuneResponse = frame({response, 0, {tune, 0, 0}}),
690+
ok = Transport:send(S, TuneResponse),
691+
OpenFrame = request(3, {open, <<"/">>}),
692+
ok = Transport:send(S, OpenFrame),
693+
%% If the server naively used min(0, Configured) = 0 for its
694+
%% parser ceiling, the open frame itself would be rejected and
695+
%% the server would send a close request instead of the expected
696+
%% open response.
697+
{Cmd, _C5} = receive_commands(Transport, S, C4),
698+
?assertMatch({response, 3, {open, ?RESPONSE_CODE_OK, _}}, Cmd),
699+
gen_tcp:close(S).
700+
674701
timeout_tcp_connected(Config) ->
675702
Port = get_stream_port(Config),
676703
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),

deps/rabbitmq_stream/test/rabbit_stream_reader_SUITE.erl

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
-include_lib("rabbitmq_stream/src/rabbit_stream_reader.hrl").
2222
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
2323

24-
-import(rabbit_stream_reader, [ensure_token_expiry_timer/2]).
24+
-import(rabbit_stream_reader, [ensure_token_expiry_timer/2,
25+
negotiate_frame_max/2]).
2526

2627
%%%===================================================================
2728
%%% Common Test callbacks
@@ -228,6 +229,21 @@ partial_frame_buffering_test(_) ->
228229
?assertEqual([], Commands2),
229230
ok.
230231

232+
%% Covers every branch of negotiate_frame_max/2, including the cases
233+
%% where either side proposes 0 (protocol convention: "no limit").
234+
negotiate_frame_max_test(_) ->
235+
%% Both sides propose explicit, positive limits: pick the lower.
236+
?assertEqual(1024, negotiate_frame_max(1024, 2048)),
237+
?assertEqual(1024, negotiate_frame_max(2048, 1024)),
238+
?assertEqual(1024, negotiate_frame_max(1024, 1024)),
239+
%% Client proposes 0 (unlimited): the configured ceiling wins.
240+
?assertEqual(2048, negotiate_frame_max(0, 2048)),
241+
%% Server is configured as unlimited: the client proposal wins.
242+
?assertEqual(1024, negotiate_frame_max(1024, 0)),
243+
%% Both unlimited: stays unlimited (0 on the wire).
244+
?assertEqual(0, negotiate_frame_max(0, 0)),
245+
ok.
246+
231247
consumer(S, Pid) ->
232248
#consumer{configuration = #consumer_configuration{stream = S,
233249
member_pid = Pid}}.

deps/rabbitmq_stream_common/src/rabbit_stream_core.erl

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@
174174

175175
-spec init(#{frame_max => non_neg_integer() | unlimited} | term()) -> state().
176176
init(Opts) when is_map(Opts) ->
177-
FrameMax = maps:get(frame_max, Opts, unlimited),
177+
FrameMax = normalise_frame_max(maps:get(frame_max, Opts, unlimited)),
178178
#?MODULE{cfg = #cfg{frame_max = FrameMax}};
179179
init(_) ->
180180
#?MODULE{cfg = #cfg{frame_max = unlimited}}.
@@ -184,7 +184,16 @@ init(_) ->
184184
%% the initial ceiling.
185185
-spec set_frame_max(non_neg_integer() | unlimited, state()) -> state().
186186
set_frame_max(FrameMax, #?MODULE{cfg = Cfg} = State) ->
187-
State#?MODULE{cfg = Cfg#cfg{frame_max = FrameMax}}.
187+
State#?MODULE{cfg = Cfg#cfg{frame_max = normalise_frame_max(FrameMax)}}.
188+
189+
%% `frame_max` = 0 means "no limit" (AMQP 0-9-1 convention);
190+
%% the parser expresses that as the atom `unlimited`.
191+
-spec normalise_frame_max(non_neg_integer() | unlimited) ->
192+
pos_integer() | unlimited.
193+
normalise_frame_max(0) ->
194+
unlimited;
195+
normalise_frame_max(FrameMax) ->
196+
FrameMax.
188197

189198
-spec next_command(state()) -> {command(), state()} | empty.
190199
next_command(#?MODULE{commands = Commands0} = State) ->

deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ groups() ->
3434
set_frame_max_tightens_limit,
3535
set_frame_max_allows_in_flight_frame_to_complete,
3636
set_frame_max_to_unlimited,
37+
init_with_zero_frame_max_means_unlimited,
38+
set_frame_max_to_zero_means_unlimited,
3739
set_frame_max_preserves_pending_commands,
3840
prop_frame_within_limit_accepted,
3941
prop_frame_exceeding_limit_rejected,
@@ -367,6 +369,29 @@ set_frame_max_to_unlimited(_Config) ->
367369
{[{unknown, LargePayload}], _} = rabbit_stream_core:all_commands(State2),
368370
ok.
369371

372+
%% The protocol convention is that frame_max = 0 means "no limit",
373+
%% as historically has been the case in RabbitMQ, AMQP 0-9-1.
374+
%% `init/1` must translate 0 to the parser's 'unlimited' sentinel,
375+
%% otherwise the parser would reject every frame as too large.
376+
init_with_zero_frame_max_means_unlimited(_Config) ->
377+
Init = rabbit_stream_core:init(#{frame_max => 0}),
378+
LargePayload = binary:copy(<<0>>, 5000),
379+
LargeData = <<5000:32, LargePayload/binary>>,
380+
State = rabbit_stream_core:incoming_data(LargeData, Init),
381+
{[{unknown, LargePayload}], _} = rabbit_stream_core:all_commands(State),
382+
ok.
383+
384+
%% Same contract as above for `set_frame_max/2`: a post-TUNE update
385+
%% to 0 must resolve to "unlimited", not a 0-byte ceiling.
386+
set_frame_max_to_zero_means_unlimited(_Config) ->
387+
Init = rabbit_stream_core:init(#{frame_max => 100}),
388+
State1 = rabbit_stream_core:set_frame_max(0, Init),
389+
LargePayload = binary:copy(<<0>>, 5000),
390+
LargeData = <<5000:32, LargePayload/binary>>,
391+
State2 = rabbit_stream_core:incoming_data(LargeData, State1),
392+
{[{unknown, LargePayload}], _} = rabbit_stream_core:all_commands(State2),
393+
ok.
394+
370395
set_frame_max_preserves_pending_commands(_Config) ->
371396
InitialMax = 1000,
372397
TightenedMax = 50,

0 commit comments

Comments
 (0)