> importedOffsets = service.resetOffsets();
assertEquals(exp, toOffsetMap(importedOffsets.get(appId)));
}
+
///////////////////////////////////////// Specific topic and partition (--topic topic1, --topic topic2) /////////////////////////////////////////
resetForNextTest(appId, 10L, topic1);
// reset to specific offset
args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--topic", topic1 + ":1", "--to-offset", "5"};
- resetOffsetsAndAssert(args, appId, topic1, 5L, 10L, 1);
- resetOffsetsAndAssert(addTo(args, "--dry-run"), appId, topic1, 5L, 10L, 1);
- resetOffsetsAndAssert(addTo(args, "--execute"), appId, topic1, 5L, 5L, 1);
+ resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 5L, 10L, 1);
resetForNextTest(appId, 10L, topic1);
@@ -236,22 +225,19 @@ public void testResetOffset() throws Exception {
new TopicPartition(topic1, 1), 5L,
new TopicPartition(topic2, 0), 10L,
new TopicPartition(topic2, 1), 5L));
- ///////////////////////////////////////// All topics (--all-topics) /////////////////////////////////////////
+
+ ///////////////////////////////////////// All topics (--all-topics) /////////////////////////////////////////
resetForNextTest(appId, 10L, topic1, topic2);
// reset to specific offset
args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--all-topics", "--to-offset", "5"};
- resetOffsetsAndAssert(args, appId, topic1, topic2, 5L, 10L);
- resetOffsetsAndAssert(addTo(args, "--dry-run"), appId, topic1, topic2, 5L, 10L);
- resetOffsetsAndAssert(addTo(args, "--execute"), appId, topic1, topic2, 5L, 5L);
+ resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, topic2, 5L, 10L);
resetForNextTest(appId, 10L, topic1, topic2);
// reset to specific offset with two --topic options
args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--topic", topic1, "--topic", topic2, "--to-offset", "5"};
- resetOffsetsAndAssert(args, appId, topic1, topic2, 5L, 10L);
- resetOffsetsAndAssert(addTo(args, "--dry-run"), appId, topic1, topic2, 5L, 10L);
- resetOffsetsAndAssert(addTo(args, "--execute"), appId, topic1, topic2, 5L, 5L);
+ resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, topic2, 5L, 10L);
resetForNextTest(appId, 10L, topic1, topic2);
@@ -320,7 +306,25 @@ private void AssertCommittedOffsets(String appId,
tp21, expectedCommittedOffset), committedOffsets);
}
- // Performs resetting offsets and assertion for one topic
+ /**
+ * Resets offsets for a specific topic and partition(s) and verifies the results.
+ *
+ * This method performs the following steps:
+ *
+ * - Resets offsets for the specified topic and partitions using the provided arguments.
+ * - Asserts that the reset offsets match the expected offsets.
+ * - Asserts that the committed offsets match the expected committed offsets.
+ *
+ *
+ * @param args The command-line arguments for resetting offsets.
+ * @param appId The application ID for the Kafka Streams application.
+ * @param topic The topic for which offsets will be reset.
+ * @param expectedOffset The expected offset value after the reset.
+ * @param expectedCommittedOffset The expected committed offset value after the reset.
+ * @param partitions The partitions of the topic to reset offsets for.
+ * @throws ExecutionException If an error occurs during the execution of the reset operation.
+ * @throws InterruptedException If the thread is interrupted during the reset operation.
+ */
private void resetOffsetsAndAssert(String[] args,
String appId,
String topic,
@@ -346,7 +350,25 @@ private void resetOffsetsAndAssert(String[] args,
AssertCommittedOffsets(appId, topic, expectedCommittedOffset, partitions);
}
- // Performs resetting offsets and assertion for two topics
+ /**
+ * Resets offsets for two topics and verifies the results.
+ *
+ * This method performs the following steps:
+ *
+ * - Resets offsets for the specified topics using the provided arguments.
+ * - Asserts that the reset offsets match the expected offsets.
+ * - Asserts that the committed offsets match the expected committed offsets.
+ *
+ *
+ * @param args The command-line arguments for resetting offsets.
+ * @param appId The application ID for the Kafka Streams application.
+ * @param topic1 The first topic for which offsets will be reset.
+ * @param topic2 The second topic for which offsets will be reset.
+ * @param expectedOffset The expected offset value after the reset.
+ * @param expectedCommittedOffset The expected committed offset value after the reset.
+ * @throws ExecutionException If an error occurs during the execution of the reset operation.
+ * @throws InterruptedException If the thread is interrupted during the reset operation.
+ */
private void resetOffsetsAndAssert(String[] args,
String appId,
String topic1,
@@ -372,7 +394,24 @@ private void resetOffsetsAndAssert(String[] args,
AssertCommittedOffsets(appId, topic1, topic2, expectedCommittedOffset);
}
- // Performs resetting offsets and assertion for given topic partitions
+ /**
+ * Resets offsets for the specified topics and verifies the results.
+ *
+ * This method performs the following steps:
+ *
+ * - Resets offsets for the given topics using the provided arguments.
+ * - Asserts that the reset offsets match the expected offsets.
+ * - Asserts that the committed offsets match the expected committed offsets.
+ *
+ *
+ * @param args The command-line arguments for resetting offsets.
+ * @param appId The application ID for the Kafka Streams application.
+ * @param topics The list of topics for which offsets will be reset.
+ * @param expectedOffsets A map of expected offsets for each topic partition after the reset.
+ * @param expectedCommittedOffsets A map of expected committed offsets for each topic partition after the reset.
+ * @throws ExecutionException If an error occurs during the execution of the reset operation.
+ * @throws InterruptedException If the thread is interrupted during the reset operation.
+ */
private void resetOffsetsAndAssert(String[] args,
String appId,
List topics,
@@ -389,6 +428,28 @@ private void resetOffsetsAndAssert(String[] args,
assertEquals(expectedCommittedOffsets, committedOffsets(topics, appId));
}
+ private void resetOffsetsAndAssertForDryRunAndExecute(String[] args,
+ String appId,
+ String topic,
+ long expectedOffset,
+ long expectedCommittedOffset,
+ int... partitions) throws ExecutionException, InterruptedException {
+ resetOffsetsAndAssert(args, appId, topic, expectedOffset, expectedCommittedOffset, partitions);
+ resetOffsetsAndAssert(addTo(args, "--dry-run"), appId, topic, expectedOffset, expectedCommittedOffset, partitions);
+ resetOffsetsAndAssert(addTo(args, "--execute"), appId, topic, expectedOffset, expectedOffset, partitions);
+ }
+
+ private void resetOffsetsAndAssertForDryRunAndExecute(String[] args,
+ String appId,
+ String topic1,
+ String topic2,
+ long expectedOffset,
+ long expectedCommittedOffset) throws ExecutionException, InterruptedException {
+ resetOffsetsAndAssert(args, appId, topic1, topic2, expectedOffset, expectedCommittedOffset);
+ resetOffsetsAndAssert(addTo(args, "--dry-run"), appId, topic1, topic2, expectedOffset, expectedCommittedOffset);
+ resetOffsetsAndAssert(addTo(args, "--execute"), appId, topic1, topic2, expectedOffset, expectedOffset);
+ }
+
private Map committedOffsets(List topics,
String group) throws ExecutionException, InterruptedException {
return adminClient.listConsumerGroupOffsets(group)
@@ -407,6 +468,12 @@ private static Map> convertOffsetsToLong(Map e1.getValue().offset()))));
}
+ private Map toOffsetMap(Map map) {
+ return map.entrySet()
+ .stream()
+ .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset()));
+ }
+
private StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] args) {
StreamsGroupCommandOptions opts = StreamsGroupCommandOptions.fromArgs(args);
return new StreamsGroupCommand.StreamsGroupService(
@@ -421,12 +488,6 @@ private void writeContentToFile(File file, String content) throws IOException {
}
}
- private Map toOffsetMap(Map map) {
- return map.entrySet()
- .stream()
- .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset()));
- }
-
private String[] addTo(String[] args, String... extra) {
List res = new ArrayList<>(asList(args));
res.addAll(asList(extra));
@@ -441,6 +502,14 @@ private String generateRandomAppId() {
return APP_ID_PREFIX + TestUtils.randomString(10);
}
+ /**
+ * Produces messages to two partitions of the specified topic and consumes them.
+ *
+ * @param appId The application ID for the Kafka Streams application.
+ * @param topic1 The first topic to produce and consume messages from.
+ * @param topic2 The second topic to produce and consume messages from.
+ * @param numOfCommittedMessages The number of committed messages to process before shutting down.
+ */
private void produceConsumeShutdown(String appId, String topic1, String topic2, long numOfCommittedMessages) throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
@@ -489,10 +558,15 @@ private void produceConsumeShutdown(String appId, String topic1, String topic2,
} finally {
assertEquals(numOfCommittedMessages, recordCount.get(), "Expected " + numOfCommittedMessages + " records processed but only got " + recordCount.get());
streams.close();
-// IntegrationTestUtils.waitForEmptyConsumerGroup(CLUSTER.createAdminClient(), appId, 60000);
}
}
+ /**
+ * Produces messages to two partitions of the specified topic.
+ *
+ * @param numOfMessages The number of messages to produce for each partition.
+ * @param topic The topic to which the messages will be produced.
+ */
private static void produceMessagesOnTwoPartitions(final int numOfMessages, final String topic) {
// partition 0
From 204cd032362f96025b1b07870f0f5ca104b3371d Mon Sep 17 00:00:00 2001
From: aliehsaeedii
Date: Mon, 5 May 2025 21:57:06 +0200
Subject: [PATCH 04/26] revert irrelavent changes
---
.../kafka/clients/admin/KafkaAdminClient.java | 20 +------------------
.../consumer/group/ConsumerGroupCommand.java | 8 +++-----
2 files changed, 4 insertions(+), 24 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index f09b8cff4cae7..3206d6f19ed96 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -3771,17 +3771,6 @@ public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map groupSpecs,
-// ListStreamsGroupOffsetsOptions options) {
-// SimpleAdminApiFuture> future =
-// ListStteamsGroupOffsetsHandler.newFuture(groupSpecs.keySet());
-// ListConsumerGroupOffsetsHandler handler =
-// new ListConsumerGroupOffsetsHandler(groupSpecs, options.requireStable(), logContext);
-// invokeDriver(handler, future, options.timeoutMs);
-// return new ListConsumerGroupOffsetsResult(future.all());
-// }
-
@Override
public ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map groupSpecs,
ListStreamsGroupOffsetsOptions options) {
@@ -3790,14 +3779,7 @@ public ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map new ListConsumerGroupOffsetsSpec().topicPartitions(entry.getValue().topicPartitions())
));
-// Map consumerGroupSpecs = new HashMap<>();
-// for (Map.Entry entry : groupSpecs.entrySet()) {
-// ListConsumerGroupOffsetsSpec spec = new ListConsumerGroupOffsetsSpec();
-// spec.topicPartitions(entry.getValue().topicPartitions());
-// consumerGroupSpecs.put(entry.getKey(), spec);
-// }
- ListConsumerGroupOffsetsResult res = listConsumerGroupOffsets(consumerGroupSpecs, new ListConsumerGroupOffsetsOptions());
- return new ListStreamsGroupOffsetsResult(res);
+ return new ListStreamsGroupOffsetsResult(listConsumerGroupOffsets(consumerGroupSpecs, new ListConsumerGroupOffsetsOptions()));
}
@Override
diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
index 7db6ed5fdfade..b001ae7c6f7fd 100644
--- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
@@ -29,7 +29,6 @@
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
-import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
@@ -1090,11 +1089,10 @@ private Collection getPartitionsToReset(String groupId) throws E
private Map getCommittedOffsets(String groupId) {
try {
- ListConsumerGroupOffsetsResult res = adminClient.listConsumerGroupOffsets(
+ return adminClient.listConsumerGroupOffsets(
Collections.singletonMap(groupId, new ListConsumerGroupOffsetsSpec()),
- withTimeoutMs(new ListConsumerGroupOffsetsOptions()));
- return res.
- partitionsToOffsetAndMetadata(groupId).get();
+ withTimeoutMs(new ListConsumerGroupOffsetsOptions())
+ ).partitionsToOffsetAndMetadata(groupId).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
From 52d709b8739804b44d8c6e1e1a04746c563a8c39 Mon Sep 17 00:00:00 2001
From: aliehsaeedii
Date: Tue, 6 May 2025 01:11:41 +0200
Subject: [PATCH 05/26] clean-up
---
.../tools/streams/ResetStreamsGroupOffsetTest.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java b/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
index 2ccf142340576..2120d9e54e6a0 100644
--- a/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java
@@ -73,6 +73,8 @@
public class ResetStreamsGroupOffsetTest {
private static final String TOPIC_PREFIX = "foo-";
private static final String APP_ID_PREFIX = "streams-group-command-test";
+ private static final Properties STREAMS_CONFIG = new Properties();
+ private static final int RECORD_TOTAL = 10;
public static EmbeddedKafkaCluster cluster;
private static String bootstrapServers;
private static Admin adminClient;
@@ -87,11 +89,16 @@ public static void startCluster() {
bootstrapServers = cluster.bootstrapServers();
adminClient = cluster.createAdminClient();
+ createStreamsConfig(bootstrapServers);
+ }
+
+ private static void createStreamsConfig(String bootstrapServers) {
STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
STREAMS_CONFIG.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
+ STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
}
@AfterAll
@@ -99,9 +106,6 @@ public static void closeCluster() {
cluster.stop();
}
- private static final Properties STREAMS_CONFIG = new Properties();
- private static final int RECORD_TOTAL = 10;
-
@Test
public void testResetWithUnrecognizedOption() {
String[] args = new String[]{"--unrecognized-option", "--bootstrap-server", bootstrapServers, "--reset-offsets", "--all-topics", "--to-offset", "5"};
@@ -512,7 +516,6 @@ private String generateRandomAppId() {
*/
private void produceConsumeShutdown(String appId, String topic1, String topic2, long numOfCommittedMessages) throws Exception {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
- STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
cluster.createTopic(topic1, 2);
cluster.createTopic(topic2, 2);
From 557a76c6cd03eccdc8c1fe6b06a1ec5055311817 Mon Sep 17 00:00:00 2001
From: aliehsaeedii
Date: Mon, 26 May 2025 12:48:29 +0200
Subject: [PATCH 06/26] address reviews
---
.../kafka/tools/streams/StreamsGroupCommand.java | 11 +++++++----
.../tools/streams/StreamsGroupCommandOptions.java | 2 +-
2 files changed, 8 insertions(+), 5 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index 5cfa386df6127..9c66104bbbe71 100644
--- a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -73,6 +73,8 @@
public class StreamsGroupCommand {
+ private static final String TOPIC_PARTITION_SEPARATOR = ":";
+
public static void main(String[] args) {
StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
try {
@@ -118,8 +120,9 @@ public static void run(StreamsGroupCommandOptions opts) {
static void printOffsetsToReset(Map> groupAssignmentsToReset) {
String format = "%n%-30s %-30s %-10s %-15s";
- if (!groupAssignmentsToReset.isEmpty())
+ if (!groupAssignmentsToReset.isEmpty()) {
System.out.printf(format, "GROUP", "TOPIC", "PARTITION", "NEW-OFFSET");
+ }
groupAssignmentsToReset.forEach((groupId, assignment) ->
assignment.forEach((streamsAssignment, offsetAndMetadata) ->
@@ -487,7 +490,7 @@ private List parseTopicPartitionsToReset(List topicArgs)
List topics = new ArrayList<>();
topicArgs.forEach(topicArg -> {
- if (topicArg.contains(":"))
+ if (topicArg.contains(TOPIC_PARTITION_SEPARATOR))
topicsWithPartitions.add(topicArg);
else
topics.add(topicArg);
@@ -837,9 +840,9 @@ private static class Ignore implements LogOffsetResult { }
*/
private static void maybePrintEmptyGroupState(String group, GroupState state) {
if (state == GroupState.DEAD) {
- printError("Streams group '" + group + "' does not exist.", Optional.empty());
+ printError("streams group '" + group + "' does not exist.", Optional.empty());
} else if (state == GroupState.EMPTY) {
- printError("Streams group '" + group + "' has no active members.", Optional.empty());
+ printError("streams group '" + group + "' has no active members.", Optional.empty());
}
}
diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
index ee70c27a1c3ee..645e13b0bc2d3 100644
--- a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
+++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
@@ -52,7 +52,7 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions {
public static final String MEMBERS_DOC = "Describe members of the group. This option may be used with the '--describe' option only.";
public static final String OFFSETS_DOC = "Describe the group and list all topic partitions in the group along with their offset information." +
"This is the default sub-action and may be used with the '--describe' option only.";
- private static final String RESET_OFFSETS_DOC = "Reset offsets of Streams group. The instances should be inactive" + NL +
+ private static final String RESET_OFFSETS_DOC = "Reset offsets of streams group. The instances should be inactive" + NL +
"Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to update the offsets." + NL +
"You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, " +
"--to-latest, --shift-by, --from-file, --to-current, --to-offset." + NL +
From ab7d858c630699c04ddbb32bff6d11ac5141ce89 Mon Sep 17 00:00:00 2001
From: aliehsaeedii
Date: Mon, 26 May 2025 23:16:24 +0200
Subject: [PATCH 07/26] correct comments/msgs
---
.../kafka/clients/admin/ListStreamsGroupOffsetsSpec.java | 4 ++--
.../org/apache/kafka/tools/streams/StreamsGroupCommand.java | 4 ++--
.../kafka/tools/streams/StreamsGroupCommandOptions.java | 2 +-
3 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListStreamsGroupOffsetsSpec.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListStreamsGroupOffsetsSpec.java
index dc49942f5c05b..6daef6b0b0746 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ListStreamsGroupOffsetsSpec.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListStreamsGroupOffsetsSpec.java
@@ -33,7 +33,7 @@ public class ListStreamsGroupOffsetsSpec {
private Collection topicPartitions;
/**
- * Set the topic partitions whose offsets are to be listed for a Streams group.
+ * Set the topic partitions whose offsets are to be listed for a streams group.
*/
public ListStreamsGroupOffsetsSpec topicPartitions(Collection topicPartitions) {
this.topicPartitions = topicPartitions;
@@ -41,7 +41,7 @@ public ListStreamsGroupOffsetsSpec topicPartitions(Collection to
}
/**
- * Returns the topic partitions whose offsets are to be listed for a Streams group.
+ * Returns the topic partitions whose offsets are to be listed for a streams group.
*/
public Collection topicPartitions() {
return topicPartitions;
diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index 9c66104bbbe71..a350180c0a4dc 100644
--- a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -840,9 +840,9 @@ private static class Ignore implements LogOffsetResult { }
*/
private static void maybePrintEmptyGroupState(String group, GroupState state) {
if (state == GroupState.DEAD) {
- printError("streams group '" + group + "' does not exist.", Optional.empty());
+ printError("Streams group '" + group + "' does not exist.", Optional.empty());
} else if (state == GroupState.EMPTY) {
- printError("streams group '" + group + "' has no active members.", Optional.empty());
+ printError("Streams group '" + group + "' has no active members.", Optional.empty());
}
}
diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
index 645e13b0bc2d3..37f79bf84d2b7 100644
--- a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
+++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
@@ -57,7 +57,7 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions {
"You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, " +
"--to-latest, --shift-by, --from-file, --to-current, --to-offset." + NL +
"To define the scope use --all-topics or --topic. One scope must be specified unless you use '--from-file'.";
- private static final String DRY_RUN_DOC = "Only show results without executing changes on Streams Group. Supported operations: reset-offsets.";
+ private static final String DRY_RUN_DOC = "Only show results without executing changes on streams group. Supported operations: reset-offsets.";
private static final String EXECUTE_DOC = "Execute operation. Supported operations: reset-offsets.";
private static final String EXPORT_DOC = "Export operation execution to a CSV file. Supported operations: reset-offsets.";
private static final String RESET_TO_OFFSET_DOC = "Reset offsets to a specific offset.";
From 78aed575f6909b49301a6d672af89c9d7da75a09 Mon Sep 17 00:00:00 2001
From: aliehsaeedii
Date: Mon, 2 Jun 2025 12:44:00 +0200
Subject: [PATCH 08/26] reset offsets of src topics
---
.../tools/streams/StreamsGroupCommand.java | 81 ++++++++++++++++++-
.../streams/ResetStreamsGroupOffsetTest.java | 12 ++-
2 files changed, 90 insertions(+), 3 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index a350180c0a4dc..82ec993ca0694 100644
--- a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -18,6 +18,7 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListGroupsOptions;
@@ -37,6 +38,7 @@
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
@@ -403,8 +405,17 @@ Map getOffsets(StreamsGroupDescription description)
Map getCommittedOffsets(String groupId) {
try {
- return adminClient.listStreamsGroupOffsets(
- Map.of(groupId, new ListStreamsGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
+ var sourceTopics = adminClient.describeStreamsGroups(List.of(groupId))
+ .all().get().get(groupId)
+ .subtopologies().stream()
+ .flatMap(subtopology -> subtopology.sourceTopics().stream())
+ .collect(Collectors.toSet());
+
+ var allTopicPartitions = adminClient.listStreamsGroupOffsets(Map.of(groupId, new ListStreamsGroupOffsetsSpec()))
+ .partitionsToOffsetAndMetadata(groupId).get();
+
+ allTopicPartitions.keySet().removeIf(tp -> !sourceTopics.contains(tp.topic()));
+ return allTopicPartitions;
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
@@ -424,7 +435,17 @@ Map> resetOffsets() {
switch (state) {
case "Empty":
case "Dead":
+ // reset offsets in source topics
result.put(groupId, resetOffsetsForInactiveGroup(groupId));
+ // delete internal topics
+ List internalTopics = retrieveInternalTopics(List.of(groupId)).get(groupId);
+ if (internalTopics != null && !internalTopics.isEmpty()) {
+ try {
+ adminClient.deleteTopics(internalTopics).all().get();
+ } catch (InterruptedException | ExecutionException e) {
+ printError("Deleting internal topics for group '" + groupId + "' failed due to " + e.getMessage(), Optional.of(e));
+ }
+ }
break;
default:
printError("Assignments can only be reset if the group '" + groupId + "' is inactive, but the current state is " + state + ".", Optional.empty());
@@ -444,6 +465,50 @@ Map> resetOffsets() {
return result;
}
+ // Visibility for testing
+ Map> retrieveInternalTopics(List groupIds) {
+ Map> groupToInternalTopics = new HashMap<>();
+ try {
+ Map descriptionMap = adminClient.describeStreamsGroups(groupIds).all().get();
+ for (StreamsGroupDescription description : descriptionMap.values()) {
+
+ List nonInternalTopics = description.subtopologies().stream()
+ .flatMap(subtopology -> Stream.concat(
+ subtopology.sourceTopics().stream(),
+ subtopology.repartitionSinkTopics().stream()))
+ .distinct()
+ .toList();
+
+
+ List internalTopics = description.subtopologies().stream()
+ .flatMap(subtopology -> Stream.concat(
+ subtopology.repartitionSourceTopics().keySet().stream(),
+ subtopology.stateChangelogTopics().keySet().stream()))
+ .filter(topic -> !nonInternalTopics.contains(topic))
+ .collect(Collectors.toList());
+ internalTopics.removeIf(topic -> {
+ if (!isInferredInternalTopic(topic, description.groupId())) {
+ printError("The internal topic '" + topic + "' is not inferred as internal " +
+ "and thus will not be deleted with the group '" + description.groupId() + "'.", Optional.empty());
+ return true;
+ }
+ return false;
+ });
+ if (!internalTopics.isEmpty()) {
+ groupToInternalTopics.put(description.groupId(), internalTopics);
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ if (e.getCause() instanceof UnsupportedVersionException) {
+ printError("Retrieving internal topics is not supported by the broker version. " +
+ "Use 'kafka-topics.sh' to list and delete the group's internal topics.", Optional.of(e.getCause()));
+ } else {
+ printError("Retrieving internal topics failed due to " + e.getMessage(), Optional.of(e));
+ }
+ }
+ return groupToInternalTopics;
+ }
+
private Map resetOffsetsForInactiveGroup(String groupId) {
try {
Collection partitionsToReset = getPartitionsToReset(groupId);
@@ -816,6 +881,18 @@ private Map getLogOffsets(Collection inputStream2 = builder.stream(topic2);
final AtomicInteger recordCount = new AtomicInteger(0);
+//
+// final KTable valueCounts = inputStream1.merge(inputStream2)
+// .groupByKey()
+// .aggregate(
+// () -> "()",
+// (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")",
+// Materialized.as("aggregated_value"));
final KTable valueCounts = inputStream1.merge(inputStream2)
- .groupByKey()
+ // Explicit repartition step with a custom internal topic name
+ .groupBy((key, value) -> key, Grouped.with(Serdes.String(), Serdes.String()))
.aggregate(
() -> "()",
(key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")",
From a5fe8c8f3b664ca09448624907b99b65eb94bedd Mon Sep 17 00:00:00 2001
From: aliehsaeedii