Skip to content

rabbit_stream_coordinator: Make new_stream command idempotent#15706

Draft
the-mikedavis wants to merge 1 commit intomainfrom
md/idempotent-new-stream
Draft

rabbit_stream_coordinator: Make new_stream command idempotent#15706
the-mikedavis wants to merge 1 commit intomainfrom
md/idempotent-new-stream

Conversation

@the-mikedavis
Copy link
Collaborator

This is similar to #14884 but for new_stream.

Although its unlikely from the calling code, the stream coordinator can end up with two {new_stream, StreamId, #{}} commands in its log for the same StreamId. When handling the second new_stream it will then warning-log that the stream can't be updated, like so:

2026-03-10 17:31:38.897507+00:00 [warning] <0.14468.0> rabbit_stream_coordinator failed to update stream:
2026-03-10 17:31:38.897507+00:00 [warning] <0.14468.0> function_clause
2026-03-10 17:31:38.897507+00:00 [warning] <0.14468.0> [{rabbit_stream_coordinator,update_stream0,[#{index => 51,system_time => 1773163898892,reply_mode => await_consensus,machine_version => 6,...},{new_stream,[95|...],#{}},{stream,[...],...}],[{file,[114|...]},{line,1397}]},{rabbit_stream_coordinator,update_stream,3,[{file,[...]},{line,...}]},{rabbit_stream_coordinator,apply,3,[{file,...},{...}]},{ra_server,apply_with,2,[{...}|...]},{ra_log,fold,5,[...]},{ra_server,apply_to,5,...},{ra_server,evaluate_commit_index_follower,...},{ra_server,...}]

update_stream0/3 only has a function clause for when there is no stream (i.e. undefined,

update_stream0(#{system_time := _} = Meta,
{new_stream, StreamId, #{leader_node := LeaderNode,
queue := Q}}, undefined) ->
#{nodes := Nodes} = Conf = amqqueue:get_type_state(Q),
%% this jumps straight to the state where all members
%% have been stopped and a new writer has been chosen
E = 1,
QueueRef = amqqueue:get_name(Q),
Members = maps:from_list(
[{N, #member{role = case LeaderNode of
N -> {writer, E};
_ -> {replica, E}
end,
state = {ready, E},
%% no members are running actions
current = undefined}
} || N <- Nodes]),
#stream{id = StreamId,
epoch = E,
nodes = Nodes,
queue_ref = QueueRef,
conf = Conf,
members = Members,
reply_to = maps:get(from, Meta, undefined)};
). I think it's reasonable to have the new_stream command become idempotent and effectively skip it if it is in the log twice.

I haven't been able to reproduce the scenario where this happened. Next time I will dump Khepri and the coordinator's logs to see what sequence of commands lead to this.

@the-mikedavis
Copy link
Collaborator Author

I don't think this needs a new machine version since it only affects the reply effects. We could base it on #15695 to take advantage of version 7 though.

@acogoluegnes
Copy link
Contributor

LGTM. We can indeed wait for #15695 to be merged and use the v7+ macro in a guard on the new filter_command clause.

@the-mikedavis the-mikedavis marked this pull request as draft March 12, 2026 14:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants