Skip to content

Commit 0f512f6

Browse files
committed
Add resolve_stream_offset stream protocol command
This command resolves an offset specification (e.g. 'next' or a timestamp) to a numeric offset.
1 parent dc1214d commit 0f512f6

File tree

7 files changed

+216
-23
lines changed

7 files changed

+216
-23
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2123,6 +2123,49 @@ handle_frame_post_auth(Transport,
21232123
{query_offset, ResponseCode, Offset}}),
21242124
send(Transport, S, Frame),
21252125
{Connection1, State};
2126+
handle_frame_post_auth(Transport,
2127+
#stream_connection{socket = S,
2128+
virtual_host = VirtualHost,
2129+
user = User} =
2130+
Connection0,
2131+
State,
2132+
{request, CorrelationId,
2133+
{resolve_offset_spec, Stream, OffsetSpec, Properties}}) ->
2134+
{ResponseCode, Offset, Connection1} =
2135+
case rabbit_stream_utils:check_read_permitted(#resource{name = Stream,
2136+
kind = queue,
2137+
virtual_host =
2138+
VirtualHost},
2139+
User, #{})
2140+
of
2141+
ok ->
2142+
case rabbit_stream_manager:lookup_member(VirtualHost, Stream) of
2143+
{error, not_found} ->
2144+
increase_protocol_counter(?STREAM_DOES_NOT_EXIST),
2145+
{?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0, Connection0};
2146+
{error, not_available} ->
2147+
increase_protocol_counter(?STREAM_NOT_AVAILABLE),
2148+
{?RESPONSE_CODE_STREAM_NOT_AVAILABLE, 0, Connection0};
2149+
{ok, MemberPid} ->
2150+
Opts0 = #{chunk_selector => get_chunk_selector(Properties)},
2151+
Opts1 = maps:merge(Opts0,
2152+
rabbit_stream_utils:filter_spec(Properties)),
2153+
case resolve_offset_spec(MemberPid, OffsetSpec, Opts1) of
2154+
{ok, ResolvedOffset} ->
2155+
{?RESPONSE_CODE_OK, ResolvedOffset, Connection0};
2156+
{error, _} ->
2157+
{?RESPONSE_CODE_NO_OFFSET, 0, Connection0}
2158+
end
2159+
end;
2160+
error ->
2161+
increase_protocol_counter(?ACCESS_REFUSED),
2162+
{?RESPONSE_CODE_ACCESS_REFUSED, 0, Connection0}
2163+
end,
2164+
Frame =
2165+
rabbit_stream_core:frame({response, CorrelationId,
2166+
{resolve_offset_spec, ResponseCode, Offset}}),
2167+
send(Transport, S, Frame),
2168+
{Connection1, State};
21262169
handle_frame_post_auth(Transport,
21272170
#stream_connection{stream_subscriptions =
21282171
StreamSubscriptions} =
@@ -2821,7 +2864,7 @@ init_reader(ConnectionTransport,
28212864
{ok, Segment} = osiris:init_reader(LocalMemberPid, OffsetSpec,
28222865
CounterSpec, Options1),
28232866
?LOG_DEBUG("Next offset for subscription ~tp is ~tp",
2824-
[SubscriptionId, osiris_log:next_offset(Segment)]),
2867+
[SubscriptionId, osiris_log:next_offset(Segment)]),
28252868
Segment.
28262869

28272870
single_active_consumer(#consumer{configuration =
@@ -3995,6 +4038,19 @@ i(connected_at, #stream_connection{connected_at = T}, _) ->
39954038
i(_Unknown, _, _) ->
39964039
?UNKNOWN_FIELD.
39974040

4041+
resolve_offset_spec(MemberPid, OffsetSpec, Options) ->
4042+
case node(MemberPid) of
4043+
Node when Node =:= node() ->
4044+
osiris:resolve_offset_spec(MemberPid, OffsetSpec, Options);
4045+
Node ->
4046+
try erpc:call(Node, osiris, resolve_offset_spec,
4047+
[MemberPid, OffsetSpec, Options])
4048+
catch
4049+
error:{erpc, _} ->
4050+
{error, erpc_error}
4051+
end
4052+
end.
4053+
39984054
-spec send(module(), rabbit_net:socket(), iodata()) -> ok.
39994055
send(Transport, Socket, Data) when is_atom(Transport) ->
40004056
Transport:send(Socket, Data).

deps/rabbitmq_stream/src/rabbit_stream_utils.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,8 @@ command_versions() ->
316316
{partitions, ?VERSION_1, ?VERSION_1},
317317
{stream_stats, ?VERSION_1, ?VERSION_1},
318318
{create_super_stream, ?VERSION_1, ?VERSION_1},
319-
{delete_super_stream, ?VERSION_1, ?VERSION_1}].
319+
{delete_super_stream, ?VERSION_1, ?VERSION_1},
320+
{resolve_offset_spec, ?VERSION_1, ?VERSION_1}].
320321

321322
q(VirtualHost, Name) ->
322323
rabbit_misc:r(VirtualHost, queue, Name).

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,14 @@ groups() ->
7676
test_stream_test_utils,
7777
sac_subscription_with_partition_index_conflict_should_return_error,
7878
test_metadata_with_advertised_hints,
79-
test_connection_properties_with_advertised_hints
79+
test_connection_properties_with_advertised_hints,
80+
test_resolve_offset_spec
8081
]},
8182
%% Run `test_global_counters` on its own so the global metrics are
8283
%% initialised to 0 for each testcase
8384
{single_node_1, [], [test_global_counters]},
84-
{cluster, [], [test_stream, test_stream_tls, test_metadata, java]}].
85+
{cluster, [], [test_stream, test_stream_tls, test_metadata, java,
86+
test_resolve_offset_spec]}].
8587

8688
init_per_suite(Config) ->
8789
case rabbit_ct_helpers:is_mixed_versions() of
@@ -1236,6 +1238,56 @@ test_connection_properties_with_advertised_hints(Config) ->
12361238

12371239
ok.
12381240

1241+
test_resolve_offset_spec(Config) ->
1242+
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
1243+
Transport = gen_tcp,
1244+
Port = get_stream_port(Config),
1245+
Opts = [{active, false}, {mode, binary}],
1246+
{ok, S} = Transport:connect("localhost", Port, Opts),
1247+
C0 = rabbit_stream_core:init(0),
1248+
C1 = test_peer_properties(Transport, S, C0),
1249+
C2 = test_authenticate(Transport, S, C1),
1250+
C3 = test_create_stream(Transport, S, Stream, C2),
1251+
1252+
%% Test resolve_offset_spec on empty stream
1253+
C4 = test_resolve_offset_spec(Transport, S, Stream, first, #{},
1254+
?RESPONSE_CODE_OK, 0, C3),
1255+
C5 = test_resolve_offset_spec(Transport, S, Stream, last, #{},
1256+
?RESPONSE_CODE_OK, 0, C4),
1257+
C6 = test_resolve_offset_spec(Transport, S, Stream, next, #{},
1258+
?RESPONSE_CODE_OK, 0, C5),
1259+
1260+
%% Publish some messages
1261+
PublisherId = 1,
1262+
C7 = test_declare_publisher(Transport, S, PublisherId, Stream, C6),
1263+
Body = <<"hello">>,
1264+
C8 = test_publish_confirm(Transport, S, PublisherId, 1, Body, C7),
1265+
C9 = test_publish_confirm(Transport, S, PublisherId, 2, Body, C8),
1266+
1267+
%% Test resolve_offset_spec after publishing
1268+
C10 = test_resolve_offset_spec(Transport, S, Stream, first, #{},
1269+
?RESPONSE_CODE_OK, 0, C9),
1270+
C11 = test_resolve_offset_spec(Transport, S, Stream, last, #{},
1271+
?RESPONSE_CODE_OK, C10),
1272+
C12 = test_resolve_offset_spec(Transport, S, Stream, next, #{},
1273+
?RESPONSE_CODE_OK, 2, C11),
1274+
C13 = test_resolve_offset_spec(Transport, S, Stream, 0, #{},
1275+
?RESPONSE_CODE_OK, 0, C12),
1276+
1277+
%% Test with timestamp (far future should return next offset)
1278+
FutureTimestamp = os:system_time(millisecond) + 3600000,
1279+
C14 = test_resolve_offset_spec(Transport, S, Stream, {timestamp, FutureTimestamp}, #{},
1280+
?RESPONSE_CODE_OK, 2, C13),
1281+
1282+
%% Test on non-existent stream
1283+
C15 = test_resolve_offset_spec(Transport, S, <<"non_existent_stream">>, first, #{},
1284+
?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0, C14),
1285+
1286+
C16 = test_delete_stream(Transport, S, Stream, C15),
1287+
_C17 = test_close(Transport, S, C16),
1288+
closed = wait_for_socket_close(Transport, S, 10),
1289+
ok.
1290+
12391291
filtered_events(Config, EventType) ->
12401292
Events = rabbit_ct_broker_helpers:rpc(Config, 0,
12411293
gen_event,
@@ -1729,6 +1781,24 @@ test_stream_stats(Transport, S, Stream, C0) ->
17291781
Cmd),
17301782
C.
17311783

1784+
test_resolve_offset_spec(Transport, S, Stream, OffsetSpec, Properties,
1785+
ExpectedResponseCode, C0) ->
1786+
Frame = request({resolve_offset_spec, Stream, OffsetSpec, Properties}),
1787+
ok = Transport:send(S, Frame),
1788+
{Cmd, C} = receive_commands(Transport, S, C0),
1789+
?assertMatch({response, 1, {resolve_offset_spec, ExpectedResponseCode, _}},
1790+
Cmd),
1791+
C.
1792+
1793+
test_resolve_offset_spec(Transport, S, Stream, OffsetSpec, Properties,
1794+
ExpectedResponseCode, ExpectedOffset, C0) ->
1795+
Frame = request({resolve_offset_spec, Stream, OffsetSpec, Properties}),
1796+
ok = Transport:send(S, Frame),
1797+
{Cmd, C} = receive_commands(Transport, S, C0),
1798+
?assertMatch({response, 1, {resolve_offset_spec, ExpectedResponseCode, ExpectedOffset}},
1799+
Cmd),
1800+
C.
1801+
17321802
test_close(Transport, S, C0) ->
17331803
CloseReason = <<"OK">>,
17341804
CloseFrame = request({close, ?RESPONSE_CODE_OK, CloseReason}),

deps/rabbitmq_stream_common/include/rabbit_stream.hrl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
-define(COMMAND_STREAM_STATS, 28).
4545
-define(COMMAND_CREATE_SUPER_STREAM, 29).
4646
-define(COMMAND_DELETE_SUPER_STREAM, 30).
47+
-define(COMMAND_RESOLVE_OFFSET_SPEC, 31).
4748

4849
-define(REQUEST, 0).
4950
-define(RESPONSE, 1).

deps/rabbitmq_stream_common/src/rabbit_stream_core.erl

Lines changed: 71 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,8 @@
134134
MaxVersion :: command_version()}]} |
135135
{stream_stats, Stream :: binary()} |
136136
{create_super_stream, stream_name(), Partitions :: [binary()], BindingKeys :: [binary()], Args :: #{binary() => binary()}} |
137-
{delete_super_stream, stream_name()}} |
137+
{delete_super_stream, stream_name()} |
138+
{resolve_offset_spec, stream_name(), offset_spec(), Properties :: #{binary() => binary()}}} |
138139
{response, correlation_id(),
139140
{declare_publisher |
140141
delete_publisher |
@@ -167,7 +168,8 @@
167168
{exchange_command_versions, response_code(),
168169
[{Command :: atom(), MinVersion :: command_version(),
169170
MaxVersion :: command_version()}]} |
170-
{stream_stats, response_code(), Stats :: #{binary() => integer()}}} |
171+
{stream_stats, response_code(), Stats :: #{binary() => integer()}} |
172+
{resolve_offset_spec, response_code(), osiris:offset()}} |
171173
{unknown, binary()}.
172174

173175
-spec init(term()) -> state().
@@ -423,6 +425,8 @@ response_body({query_publisher_sequence = Tag, Code, Sequence}) ->
423425
{command_id(Tag), <<Code:16, Sequence:64>>};
424426
response_body({query_offset = Tag, Code, Offset}) ->
425427
{command_id(Tag), <<Code:16, Offset:64>>};
428+
response_body({resolve_offset_spec = Tag, Code, Offset}) ->
429+
{command_id(Tag), <<Code:16, Offset:64>>};
426430
response_body({metadata = Tag, Endpoints, Metadata}) ->
427431
NumEps = length(Endpoints),
428432
{_, EndpointsBin} =
@@ -633,7 +637,30 @@ request_body({create_super_stream = Tag, SuperStream, Partitions, BindingKeys, A
633637
<<(length(BindingKeys)):32>>, BindingKeysBin,
634638
<<(map_size(Args)):32>>, ArgsBin]};
635639
request_body({delete_super_stream = Tag, SuperStream}) ->
636-
{Tag, <<?STRING(SuperStream)>>}.
640+
{Tag, <<?STRING(SuperStream)>>};
641+
request_body({resolve_offset_spec = Tag, Stream, OffsetSpec, Properties}) ->
642+
OffsetSpecBin =
643+
case OffsetSpec of
644+
first ->
645+
<<?OFFSET_TYPE_FIRST:16>>;
646+
last ->
647+
<<?OFFSET_TYPE_LAST:16>>;
648+
next ->
649+
<<?OFFSET_TYPE_NEXT:16>>;
650+
Offset when is_integer(Offset) ->
651+
<<?OFFSET_TYPE_OFFSET:16, Offset:64/unsigned>>;
652+
{timestamp, Timestamp} ->
653+
<<?OFFSET_TYPE_TIMESTAMP:16, Timestamp:64/signed>>
654+
end,
655+
PropertiesBin =
656+
case map_size(Properties) of
657+
0 ->
658+
<<>>;
659+
_ ->
660+
PropsBin = generate_map(Properties),
661+
[<<(map_size(Properties)):32>>, PropsBin]
662+
end,
663+
{Tag, [<<?STRING(Stream), OffsetSpecBin/binary>> | PropertiesBin]}.
637664

638665
append_data(Prev, Data) when is_binary(Prev) ->
639666
[Prev, Data];
@@ -772,24 +799,13 @@ parse_request(<<?REQUEST:1,
772799
OffsetCreditProperties,
773800
{{timestamp, Timestamp}, Crdt, PropertiesBin}
774801
end,
775-
Properties =
776-
case PropsBin of
777-
<<>> ->
778-
#{};
779-
<<_Count:32, Bin/binary>> ->
780-
parse_map(Bin, #{});
781-
_ ->
782-
?LOG_WARNING("Incorrect binary for subscription properties: ~w",
783-
[PropsBin]),
784-
#{}
785-
end,
786802
request(CorrelationId,
787803
{subscribe,
788804
SubscriptionId,
789805
Stream,
790806
OffsetSpec,
791807
Credit,
792-
Properties});
808+
parse_properties(PropsBin)});
793809
parse_request(<<?REQUEST:1,
794810
?COMMAND_QUERY_OFFSET:15,
795811
?VERSION_1:16,
@@ -930,6 +946,29 @@ parse_request(<<?REQUEST:1,
930946
CorrelationId:32,
931947
?STRING(SuperStreamSize, SuperStream)>>) ->
932948
request(CorrelationId, {delete_super_stream, SuperStream});
949+
parse_request(<<?REQUEST:1,
950+
?COMMAND_RESOLVE_OFFSET_SPEC:15,
951+
?VERSION_1:16,
952+
CorrelationId:32,
953+
?STRING(StreamSize, Stream),
954+
OffsetType:16/signed,
955+
OffsetValuePropsBin/binary>>) ->
956+
{OffsetSpec, PropsBin} =
957+
case OffsetType of
958+
?OFFSET_TYPE_FIRST ->
959+
{first, OffsetValuePropsBin};
960+
?OFFSET_TYPE_LAST ->
961+
{last, OffsetValuePropsBin};
962+
?OFFSET_TYPE_NEXT ->
963+
{next, OffsetValuePropsBin};
964+
?OFFSET_TYPE_OFFSET ->
965+
<<Offset:64/unsigned, Rest/binary>> = OffsetValuePropsBin,
966+
{Offset, Rest};
967+
?OFFSET_TYPE_TIMESTAMP ->
968+
<<Timestamp:64/signed, Rest/binary>> = OffsetValuePropsBin,
969+
{{timestamp, Timestamp}, Rest}
970+
end,
971+
request(CorrelationId, {resolve_offset_spec, Stream, OffsetSpec, parse_properties(PropsBin)});
933972
parse_request(Bin) ->
934973
{unknown, Bin}.
935974

@@ -1020,7 +1059,10 @@ parse_response_body(?COMMAND_EXCHANGE_COMMAND_VERSIONS,
10201059
parse_response_body(?COMMAND_STREAM_STATS,
10211060
<<ResponseCode:16, _Count:32, StatsBin/binary>>) ->
10221061
Info = parse_int_map(StatsBin, #{}),
1023-
{stream_stats, ResponseCode, Info}.
1062+
{stream_stats, ResponseCode, Info};
1063+
parse_response_body(?COMMAND_RESOLVE_OFFSET_SPEC,
1064+
<<ResponseCode:16, Offset:64>>) ->
1065+
{resolve_offset_spec, ResponseCode, Offset}.
10241066

10251067
offset_spec(OffsetType, OffsetValueBin) ->
10261068
case OffsetType of
@@ -1135,6 +1177,13 @@ list_of_longcodes(<<>>) ->
11351177
list_of_longcodes(<<I:64, C:16, Rem/binary>>) ->
11361178
[{I, C} | list_of_longcodes(Rem)].
11371179

1180+
parse_properties(<<>>) ->
1181+
#{};
1182+
parse_properties(<<_Count:32, Bin/binary>>) ->
1183+
parse_map(Bin, #{});
1184+
parse_properties(_) ->
1185+
#{}.
1186+
11381187
command_id(declare_publisher) ->
11391188
?COMMAND_DECLARE_PUBLISHER;
11401189
command_id(publish) ->
@@ -1198,7 +1247,9 @@ command_id(stream_stats) ->
11981247
command_id(create_super_stream) ->
11991248
?COMMAND_CREATE_SUPER_STREAM;
12001249
command_id(delete_super_stream) ->
1201-
?COMMAND_DELETE_SUPER_STREAM.
1250+
?COMMAND_DELETE_SUPER_STREAM;
1251+
command_id(resolve_offset_spec) ->
1252+
?COMMAND_RESOLVE_OFFSET_SPEC.
12021253

12031254
parse_command_id(?COMMAND_DECLARE_PUBLISHER) ->
12041255
declare_publisher;
@@ -1259,7 +1310,9 @@ parse_command_id(?COMMAND_STREAM_STATS) ->
12591310
parse_command_id(?COMMAND_CREATE_SUPER_STREAM) ->
12601311
create_super_stream;
12611312
parse_command_id(?COMMAND_DELETE_SUPER_STREAM) ->
1262-
delete_super_stream.
1313+
delete_super_stream;
1314+
parse_command_id(?COMMAND_RESOLVE_OFFSET_SPEC) ->
1315+
resolve_offset_spec.
12631316

12641317
element_index(Element, List) ->
12651318
element_index(Element, List, 0).

deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,17 @@ roundtrip(_Config) ->
119119
[<<"stream1">>, <<"stream2">>, <<"stream3">>], [<<"bk1">>, <<"bk2">>, <<"bk3">>],
120120
#{}}}),
121121
test_roundtrip({request, 99, {delete_super_stream, <<"super_stream_name">>}}),
122+
[test_roundtrip({request, 99,
123+
{resolve_offset_spec, <<"stream_name">>, Spec, #{}}})
124+
|| Spec
125+
<- [last,
126+
next,
127+
first,
128+
65432,
129+
{timestamp, erlang:system_time(millisecond)}]],
130+
test_roundtrip({request, 99,
131+
{resolve_offset_spec, <<"stream_name">>, first,
132+
#{<<"filter.0">> => <<"value0">>}}}),
122133

123134
%% RESPONSES
124135
[test_roundtrip({response, 99, {Tag, 53}})
@@ -136,6 +147,7 @@ roundtrip(_Config) ->
136147

137148
test_roundtrip({response, 99, {query_publisher_sequence, 98, 1234}}),
138149
test_roundtrip({response, 99, {query_offset, 1, 12}}),
150+
test_roundtrip({response, 99, {resolve_offset_spec, 1, 12345}}),
139151

140152
test_roundtrip({response, 99,
141153
{peer_properties, 1, #{<<"k1">> => <<"v1">>}}}),

rabbitmq-components.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ dep_jose = hex 1.11.10
5252
dep_khepri = hex 0.17.5
5353
dep_khepri_mnesia_migration = hex 0.8.1
5454
dep_meck = hex 1.0.0
55-
dep_osiris = git https://github.com/rabbitmq/osiris v1.11.0
55+
dep_osiris = git https://github.com/rabbitmq/osiris v1.12.0
5656
dep_prometheus = hex 6.1.1
5757
dep_ra = hex 2.17.2
5858
dep_ranch = hex 2.2.0

0 commit comments

Comments
 (0)