-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19244: Add support for kafka-streams-groups.sh options (reset-offsets) [1/N] #19646
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @aliehsaeedii I made a pass over the non-testing code, I'll do more in a follow up review
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
Outdated
Show resolved
Hide resolved
throw new IllegalArgumentException("Cannot shift offset for partition " + topicPartition + " since there is no current committed offset"); | ||
} | ||
|
||
return currentOffset.offset() + shiftBy; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to do any validation on what the acceptable values for shiftBy
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume any long value should be accepted. Good point. I'll add some tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems shiftby 150
and -150
are already existing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @aliehsaeedii ! Thanks for the PR. Mostly looking good to me, left a few comments
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! I left some comments. These are still relatively high-level things.
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
Outdated
Show resolved
Hide resolved
tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
Outdated
Show resolved
Hide resolved
List<String> internalTopics = retrieveInternalTopics(List.of(groupId)).get(groupId); | ||
if (internalTopics != null && !internalTopics.isEmpty()) { | ||
try { | ||
adminClient.deleteTopics(internalTopics).all().get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we delete internal topics here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As of #19646 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more comment @aliehsaeedii
tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, I left some commments on the documentation. Please make sure that --all-topics
and --topics
are not present in more places.
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
Outdated
Show resolved
Hide resolved
private static final String INPUT_TOPIC_DOC = "The input topic whose streams group information should be deleted or topic that should be included in the reset offset process. " + | ||
"In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the process. " + | ||
"Reset-offsets also supports multiple topic inputs."; | ||
private static final String ALL_INPUT_TOPICS_DOC = "Consider all topics assigned to a group in the `reset-offsets` process."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete-offsets
as well? Maybe use the format Supported operations: delete-offsets, reset-offsets.
as in the other options. Also, topics aren't really assigned to a group. How about:
Consider all source topics used in the topology of the group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, topics aren't really assigned to a group.
I meant associated:|
tools/src/main/java/org/apache/kafka/tools/streams/OffsetsUtils.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/ListStreamsGroupOffsetsSpec.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces full support for the --reset-offsets
command in kafka-streams-groups.sh
, refactors offset handling into OffsetsUtils
, and updates related tests and utility classes.
- Add new
reset-offsets
options (e.g.,--to-offset
,--by-duration
,--export
, etc.) inStreamsGroupCommandOptions
. - Refactor
ConsumerGroupCommand
to delegate offset logic toOffsetsUtils
. - Update tests in
StreamsGroupCommandTest
andDescribeStreamsGroupTest
to cover new behavior and require explicit--group
for describe; make CSV and spec methods public.
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
tools/src/main/java/.../StreamsGroupCommandOptions.java | Add reset-offsets CLI options, docs, and validation logic. |
tools/src/main/java/.../ConsumerGroupCommand.java | Replace inline offset logic with OffsetsUtils delegation. |
tools/src/test/java/.../StreamsGroupCommandTest.java | Update tests to use shared ADMIN_CLIENT mock and new helpers. |
tools/src/test/java/.../DescribeStreamsGroupTest.java | Require --group for all describe invocations in tests. |
tools/src/main/java/.../consumer/group/CsvUtils.java | Make readerFor/writerFor methods public. |
tools/src/main/java/.../consumer/group/ConsumerGroupCommand.java | (Reviewed above) |
clients/src/main/java/.../ListStreamsGroupOffsetsSpec.java | Make topicPartitions methods public. |
Comments suppressed due to low confidence (2)
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java:227
- Add a similar check to ensure
exportOpt
is only allowed whenresetOffsetsOpt
is present (e.g., only--reset-offsets
may be used with--export
).
if ((options.has(dryRunOpt) || options.has(executeOpt)) && !options.has(resetOffsetsOpt))
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java:233
- Include
deleteOffsetsOpt
in the allowed set forinputTopicOpt
, since deleting offsets also uses--input-topic
. E.g.,minus(allStreamsGroupLevelOpts, resetOffsetsOpt, deleteOffsetsOpt)
.
CommandLineUtils.checkInvalidArgs(parser, options, inputTopicOpt, minus(allStreamsGroupLevelOpts, resetOffsetsOpt));
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @aliehsaeedii ! I left some comments
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @aliehsaeedii ! I left some comments
|
||
public static class Unknown implements LogOffsetResult { } | ||
|
||
public static class Ignore implements LogOffsetResult { } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excuse me, what is the purpose of Ignore
? will it be used in next PR?
This PR implements all the options for
--reset-offset
, (supports--execute
,--dry-run
,--export
). Includes unit and integrationtests.
duration
datetime
Reviewers: Lucas Brutschy [email protected], Bill Bejeck
[email protected]