diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsResult.java index b19c3e38e9cf5..795d8523d52cf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListGroupsResult.java @@ -37,7 +37,7 @@ public class ListGroupsResult { private final KafkaFutureImpl> valid; private final KafkaFutureImpl> errors; - ListGroupsResult(KafkaFuture> future) { + public ListGroupsResult(KafkaFuture> future) { this.all = new KafkaFutureImpl<>(); this.valid = new KafkaFutureImpl<>(); this.errors = new KafkaFutureImpl<>(); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListStreamsGroupOffsetsSpec.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListStreamsGroupOffsetsSpec.java index c3fb9babb9a97..6daef6b0b0746 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListStreamsGroupOffsetsSpec.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListStreamsGroupOffsetsSpec.java @@ -33,17 +33,17 @@ public class ListStreamsGroupOffsetsSpec { private Collection topicPartitions; /** - * Set the topic partitions whose offsets are to be listed for a Streams group. + * Set the topic partitions whose offsets are to be listed for a streams group. */ - ListStreamsGroupOffsetsSpec topicPartitions(Collection topicPartitions) { + public ListStreamsGroupOffsetsSpec topicPartitions(Collection topicPartitions) { this.topicPartitions = topicPartitions; return this; } /** - * Returns the topic partitions whose offsets are to be listed for a Streams group. + * Returns the topic partitions whose offsets are to be listed for a streams group. */ - Collection topicPartitions() { + public Collection topicPartitions() { return topicPartitions; } } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/CsvUtils.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/CsvUtils.java index e2ca0e325a779..233792060a788 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/CsvUtils.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/CsvUtils.java @@ -25,11 +25,11 @@ public class CsvUtils { private static final CsvMapper MAPPER = new CsvMapper(); - static ObjectReader readerFor(Class clazz) { + public static ObjectReader readerFor(Class clazz) { return MAPPER.readerFor(clazz).with(getSchema(clazz)); } - static ObjectWriter writerFor(Class clazz) { + public static ObjectWriter writerFor(Class clazz) { return MAPPER.writerFor(clazz).with(getSchema(clazz)); } diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java index 1a8be4104aa90..55266265ba20a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java @@ -20,26 +20,42 @@ import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult; import org.apache.kafka.clients.admin.GroupListing; -import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec; import org.apache.kafka.clients.admin.ListGroupsOptions; import org.apache.kafka.clients.admin.ListGroupsResult; import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsSpec; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.StreamsGroupDescription; import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment; import org.apache.kafka.clients.admin.StreamsGroupMemberDescription; import org.apache.kafka.clients.admin.StreamsGroupSubtopologyDescription; +import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.GroupIdNotFoundException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.requests.ListOffsetsResponse; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.tools.consumer.group.CsvUtils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; import java.io.IOException; +import java.text.ParseException; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -48,22 +64,32 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import java.util.function.ToIntFunction; import java.util.stream.Collectors; import java.util.stream.Stream; import joptsimple.OptionException; +import static org.apache.kafka.tools.streams.StreamsGroupCommandOptions.LOGGER; + public class StreamsGroupCommand { + private static final String TOPIC_PARTITION_SEPARATOR = ":"; + public static void main(String[] args) { StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args); try { opts.checkArgs(); // should have exactly one action - long numberOfActions = Stream.of(opts.listOpt, opts.describeOpt).filter(opts.options::has).count(); + long numberOfActions = Stream.of( + opts.listOpt, + opts.describeOpt, + opts.resetOffsetsOpt + ).filter(opts.options::has).count(); if (numberOfActions != 1) - CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --list, or --describe."); + CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --list, --describe, or --reset-offset."); run(opts); } catch (OptionException e) { @@ -77,6 +103,13 @@ public static void run(StreamsGroupCommandOptions opts) { streamsGroupService.listGroups(); } else if (opts.options.has(opts.describeOpt)) { streamsGroupService.describeGroups(); + } else if (opts.options.has(opts.resetOffsetsOpt)) { + Map> offsetsToReset = streamsGroupService.resetOffsets(); + if (opts.options.has(opts.exportOpt)) { + String exported = streamsGroupService.exportOffsetsToCsv(offsetsToReset); + System.out.println(exported); + } else + printOffsetsToReset(offsetsToReset); } else { throw new IllegalArgumentException("Unknown action!"); } @@ -87,6 +120,22 @@ public static void run(StreamsGroupCommandOptions opts) { } } + static void printOffsetsToReset(Map> groupAssignmentsToReset) { + String format = "%n%-30s %-30s %-10s %-15s"; + if (!groupAssignmentsToReset.isEmpty()) { + System.out.printf(format, "GROUP", "TOPIC", "PARTITION", "NEW-OFFSET"); + } + + groupAssignmentsToReset.forEach((groupId, assignment) -> + assignment.forEach((streamsAssignment, offsetAndMetadata) -> + System.out.printf(format, + groupId, + streamsAssignment.topic(), + streamsAssignment.partition(), + offsetAndMetadata.offset()))); + System.out.println(); + } + static Set groupStatesFromString(String input) { Set parsedStates = Arrays.stream(input.split(",")).map(s -> GroupState.parse(s.trim())).collect(Collectors.toSet()); @@ -223,6 +272,32 @@ private void printMembers(StreamsGroupDescription description, boolean verbose) } } + String exportOffsetsToCsv(Map> assignments) { + boolean isSingleGroupQuery = opts.options.valuesOf(opts.groupOpt).size() == 1; + ObjectWriter csvWriter = isSingleGroupQuery + ? CsvUtils.writerFor(CsvUtils.CsvRecordNoGroup.class) + : CsvUtils.writerFor(CsvUtils.CsvRecordWithGroup.class); + + return assignments.entrySet().stream().flatMap(e -> { + String groupId = e.getKey(); + Map partitionInfo = e.getValue(); + + return partitionInfo.entrySet().stream().map(e1 -> { + TopicPartition k = e1.getKey(); + OffsetAndMetadata v = e1.getValue(); + Object csvRecord = isSingleGroupQuery + ? new CsvUtils.CsvRecordNoGroup(k.topic(), k.partition(), v.offset()) + : new CsvUtils.CsvRecordWithGroup(groupId, k.topic(), k.partition(), v.offset()); + + try { + return csvWriter.writeValueAsString(csvRecord); + } catch (JsonProcessingException err) { + throw new RuntimeException(err); + } + }); + }).collect(Collectors.joining()); + } + private String prepareTaskType(List tasks, String taskType) { if (tasks.isEmpty()) { return ""; @@ -330,13 +405,549 @@ Map getOffsets(StreamsGroupDescription description) Map getCommittedOffsets(String groupId) { try { - return adminClient.listConsumerGroupOffsets( - Map.of(groupId, new ListConsumerGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get(); + var sourceTopics = adminClient.describeStreamsGroups(List.of(groupId)) + .all().get().get(groupId) + .subtopologies().stream() + .flatMap(subtopology -> subtopology.sourceTopics().stream()) + .collect(Collectors.toSet()); + + var allTopicPartitions = adminClient.listStreamsGroupOffsets(Map.of(groupId, new ListStreamsGroupOffsetsSpec())) + .partitionsToOffsetAndMetadata(groupId).get(); + + allTopicPartitions.keySet().removeIf(tp -> !sourceTopics.contains(tp.topic())); + return allTopicPartitions; + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + private List filterExistingGroupTopics(String groupId, List topicPartitions) { + try { + var allTopicPartitions = adminClient.listStreamsGroupOffsets(Map.of(groupId, new ListStreamsGroupOffsetsSpec())) + .partitionsToOffsetAndMetadata(groupId).get(); + boolean allPresent = topicPartitions.stream().allMatch(allTopicPartitions::containsKey); + if (!allPresent) { + printError("One or more topics are not part of the group '" + groupId + "'.", Optional.empty()); + return Collections.emptyList(); + } + return topicPartitions; } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } } + + Map> resetOffsets() { + // Dry-run is the default behavior if --execute is not specified + boolean dryRun = opts.options.has(opts.dryRunOpt) || !opts.options.has(opts.executeOpt); + + Map> result = new HashMap<>(); + List groupIds = opts.options.has(opts.allGroupsOpt) + ? listStreamsGroups() + : opts.options.valuesOf(opts.groupOpt); + if (!groupIds.isEmpty()) { + Map> streamsGroups = adminClient.describeStreamsGroups( + groupIds + ).describedGroups(); + + streamsGroups.forEach((groupId, groupDescription) -> { + try { + String state = groupDescription.get().groupState().toString(); + switch (state) { + case "Empty": + case "Dead": + // reset offsets in source topics + result.put(groupId, resetOffsetsForInactiveGroup(groupId, dryRun)); + // delete internal topics + if (!dryRun) { + List internalTopics = retrieveInternalTopics(List.of(groupId)).get(groupId); + if (internalTopics != null && !internalTopics.isEmpty()) { + try { + adminClient.deleteTopics(internalTopics).all().get(); + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof UnknownTopicOrPartitionException) { + printError("Deleting internal topics for group '" + groupId + "' failed because the topics do not exist.", Optional.empty()); + } else if (e.getCause() instanceof UnsupportedVersionException) { + printError("Deleting internal topics is not supported by the broker version. " + + "Use 'kafka-topics.sh' to delete the group's internal topics.", Optional.of(e.getCause())); + } else { + printError("Deleting internal topics for group '" + groupId + "' failed due to " + e.getMessage(), Optional.of(e)); + } + } + } + } + break; + default: + printError("Assignments can only be reset if the group '" + groupId + "' is inactive, but the current state is " + state + ".", Optional.empty()); + result.put(groupId, Collections.emptyMap()); + } + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } catch (ExecutionException ee) { + if (ee.getCause() instanceof GroupIdNotFoundException) { + result.put(groupId, resetOffsetsForInactiveGroup(groupId, dryRun)); + } else { + throw new RuntimeException(ee); + } + } + }); + } + return result; + } + + // Visibility for testing + Map> retrieveInternalTopics(List groupIds) { + Map> groupToInternalTopics = new HashMap<>(); + try { + Map descriptionMap = adminClient.describeStreamsGroups(groupIds).all().get(); + for (StreamsGroupDescription description : descriptionMap.values()) { + + List sourceTopics = description.subtopologies().stream() + .flatMap(subtopology -> subtopology.sourceTopics().stream()).toList(); + + List internalTopics = description.subtopologies().stream() + .flatMap(subtopology -> Stream.concat( + subtopology.repartitionSourceTopics().keySet().stream(), + subtopology.stateChangelogTopics().keySet().stream())) + .filter(topic -> !sourceTopics.contains(topic)) + .collect(Collectors.toList()); + internalTopics.removeIf(topic -> { + if (!isInferredInternalTopic(topic, description.groupId())) { + printError("The internal topic '" + topic + "' is not inferred as internal " + + "and thus will not be deleted with the group '" + description.groupId() + "'.", Optional.empty()); + return true; + } + return false; + }); + if (!internalTopics.isEmpty()) { + groupToInternalTopics.put(description.groupId(), internalTopics); + } + } + } catch (InterruptedException | ExecutionException e) { + if (e.getCause() instanceof UnsupportedVersionException) { + printError("Retrieving internal topics is not supported by the broker version. " + + "Use 'kafka-topics.sh' to list and delete the group's internal topics.", Optional.of(e.getCause())); + } else { + printError("Retrieving internal topics failed due to " + e.getMessage(), Optional.of(e)); + } + } + return groupToInternalTopics; + } + + private Map resetOffsetsForInactiveGroup(String groupId, boolean dryRun) { + try { + Collection partitionsToReset = getPartitionsToReset(groupId); + Map preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset); + if (!dryRun) { + adminClient.alterStreamsGroupOffsets( + groupId, + preparedOffsets + ).all().get(); + } + + return preparedOffsets; + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } catch (ExecutionException ee) { + Throwable cause = ee.getCause(); + if (cause instanceof KafkaException) { + throw (KafkaException) cause; + } else { + throw new RuntimeException(cause); + } + } + } + + private Collection getPartitionsToReset(String groupId) throws ExecutionException, InterruptedException { + if (opts.options.has(opts.allTopicsOpt)) { + return getCommittedOffsets(groupId).keySet(); + } else if (opts.options.has(opts.topicOpt)) { + List topics = opts.options.valuesOf(opts.topicOpt); + + List partitions = parseTopicPartitionsToReset(topics); + // if the user specified topics that do not belong to this group, we filter them out + partitions = filterExistingGroupTopics(groupId, partitions); + return partitions; + } else { + if (!opts.options.has(opts.resetFromFileOpt)) + CommandLineUtils.printUsageAndExit(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic."); + + return Collections.emptyList(); + } + } + + private List parseTopicPartitionsToReset(List topicArgs) throws ExecutionException, InterruptedException { + List topicsWithPartitions = new ArrayList<>(); + List topics = new ArrayList<>(); + + topicArgs.forEach(topicArg -> { + if (topicArg.contains(TOPIC_PARTITION_SEPARATOR)) + topicsWithPartitions.add(topicArg); + else + topics.add(topicArg); + }); + + List specifiedPartitions = + topicsWithPartitions.stream().flatMap(this::parseTopicsWithPartitions).collect(Collectors.toList()); + + List unspecifiedPartitions = new ArrayList<>(); + + if (!topics.isEmpty()) { + Map descriptionMap = adminClient.describeTopics( + topics + ).allTopicNames().get(); + + descriptionMap.forEach((topic, description) -> + description.partitions().forEach(tpInfo -> unspecifiedPartitions.add(new TopicPartition(topic, tpInfo.partition()))) + ); + } + + specifiedPartitions.addAll(unspecifiedPartitions); + + return specifiedPartitions; + } + + private Stream parseTopicsWithPartitions(String topicArg) { + ToIntFunction partitionNum = partition -> { + try { + return Integer.parseInt(partition); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid partition '" + partition + "' specified in topic arg '" + topicArg + "''"); + } + }; + + String[] arr = topicArg.split(":"); + + if (arr.length != 2) + throw new IllegalArgumentException("Invalid topic arg '" + topicArg + "', expected topic name and partitions"); + + String topic = arr[0]; + String partitions = arr[1]; + + return Arrays.stream(partitions.split(",")). + map(partition -> new TopicPartition(topic, partitionNum.applyAsInt(partition))); + } + + @SuppressWarnings("CyclomaticComplexity") + private Map prepareOffsetsToReset(String groupId, Collection partitionsToReset) { + if (opts.options.has(opts.resetToOffsetOpt)) { + long offset = opts.options.valueOf(opts.resetToOffsetOpt); + return checkOffsetsRange(partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), tp -> offset))) + .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); + } else if (opts.options.has(opts.resetToEarliestOpt)) { + Map logStartOffsets = getLogStartOffsets(partitionsToReset); + return partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> { + LogOffsetResult logOffsetResult = logStartOffsets.get(topicPartition); + + if (!(logOffsetResult instanceof LogOffset)) { + CommandLineUtils.printUsageAndExit(opts.parser, "Error getting starting offset of topic partition: " + topicPartition); + } + + return new OffsetAndMetadata(((LogOffset) logOffsetResult).value); + })); + } else if (opts.options.has(opts.resetToLatestOpt)) { + Map logEndOffsets = getLogEndOffsets(partitionsToReset); + return partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> { + LogOffsetResult logOffsetResult = logEndOffsets.get(topicPartition); + + if (!(logOffsetResult instanceof LogOffset)) { + CommandLineUtils.printUsageAndExit(opts.parser, "Error getting ending offset of topic partition: " + topicPartition); + } + + return new OffsetAndMetadata(((LogOffset) logOffsetResult).value); + })); + } else if (opts.options.has(opts.resetShiftByOpt)) { + Map currentCommittedOffsets = getCommittedOffsets(groupId); + Map requestedOffsets = partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> { + long shiftBy = opts.options.valueOf(opts.resetShiftByOpt); + OffsetAndMetadata currentOffset = currentCommittedOffsets.get(topicPartition); + + if (currentOffset == null) { + throw new IllegalArgumentException("Cannot shift offset for partition " + topicPartition + " since there is no current committed offset"); + } + + return currentOffset.offset() + shiftBy; + })); + return checkOffsetsRange(requestedOffsets).entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); + } else if (opts.options.has(opts.resetToDatetimeOpt)) { + try { + long timestamp = Utils.getDateTime(opts.options.valueOf(opts.resetToDatetimeOpt)); + Map logTimestampOffsets = getLogTimestampOffsets(partitionsToReset, timestamp); + return partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> { + LogOffsetResult logTimestampOffset = logTimestampOffsets.get(topicPartition); + + if (!(logTimestampOffset instanceof LogOffset)) { + CommandLineUtils.printUsageAndExit(opts.parser, "Error getting offset by timestamp of topic partition: " + topicPartition); + } + + return new OffsetAndMetadata(((LogOffset) logTimestampOffset).value); + })); + } catch (ParseException e) { + throw new RuntimeException(e); + } + } else if (opts.options.has(opts.resetByDurationOpt)) { + String duration = opts.options.valueOf(opts.resetByDurationOpt); + Duration durationParsed = Duration.parse(duration); + Instant now = Instant.now(); + durationParsed.negated().addTo(now); + long timestamp = now.minus(durationParsed).toEpochMilli(); + Map logTimestampOffsets = getLogTimestampOffsets(partitionsToReset, timestamp); + return partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> { + LogOffsetResult logTimestampOffset = logTimestampOffsets.get(topicPartition); + + if (!(logTimestampOffset instanceof LogOffset)) { + CommandLineUtils.printUsageAndExit(opts.parser, "Error getting offset by timestamp of topic partition: " + topicPartition); + } + + return new OffsetAndMetadata(((LogOffset) logTimestampOffset).value); + })); + } else if (resetPlanFromFile().isPresent()) { + return resetPlanFromFile().map(resetPlan -> { + Map resetPlanForGroup = resetPlan.get(groupId); + + if (resetPlanForGroup == null) { + printError("No reset plan for group " + groupId + " found", Optional.empty()); + return Collections.emptyMap(); + } + + Map requestedOffsets = resetPlanForGroup.keySet().stream().collect(Collectors.toMap( + Function.identity(), + topicPartition -> resetPlanForGroup.get(topicPartition).offset())); + + return checkOffsetsRange(requestedOffsets).entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); + }).orElseGet(Collections::emptyMap); + } else if (opts.options.has(opts.resetToCurrentOpt)) { + Map currentCommittedOffsets = getCommittedOffsets(groupId); + Collection partitionsToResetWithCommittedOffset = new ArrayList<>(); + Collection partitionsToResetWithoutCommittedOffset = new ArrayList<>(); + + for (TopicPartition topicPartition : partitionsToReset) { + if (currentCommittedOffsets.containsKey(topicPartition)) + partitionsToResetWithCommittedOffset.add(topicPartition); + else + partitionsToResetWithoutCommittedOffset.add(topicPartition); + } + + Map preparedOffsetsForPartitionsWithCommittedOffset = partitionsToResetWithCommittedOffset.stream() + .collect(Collectors.toMap(Function.identity(), topicPartition -> { + OffsetAndMetadata committedOffset = currentCommittedOffsets.get(topicPartition); + + if (committedOffset == null) { + throw new IllegalStateException("Expected a valid current offset for topic partition: " + topicPartition); + } + + return new OffsetAndMetadata(committedOffset.offset()); + })); + + Map preparedOffsetsForPartitionsWithoutCommittedOffset = getLogEndOffsets(partitionsToResetWithoutCommittedOffset) + .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> { + if (!(e.getValue() instanceof LogOffset)) { + CommandLineUtils.printUsageAndExit(opts.parser, "Error getting ending offset of topic partition: " + e.getKey()); + } + + return new OffsetAndMetadata(((LogOffset) e.getValue()).value); + })); + + preparedOffsetsForPartitionsWithCommittedOffset.putAll(preparedOffsetsForPartitionsWithoutCommittedOffset); + + return preparedOffsetsForPartitionsWithCommittedOffset; + } + + CommandLineUtils.printUsageAndExit(opts.parser, String.format("Option '%s' requires one of the following scenarios: %s", opts.resetOffsetsOpt, opts.allResetOffsetScenarioOpts)); + return null; + } + + Optional>> resetPlanFromFile() { + if (opts.options.has(opts.resetFromFileOpt)) { + try { + String resetPlanPath = opts.options.valueOf(opts.resetFromFileOpt); + String resetPlanCsv = Utils.readFileAsString(resetPlanPath); + Map> resetPlan = parseResetPlan(resetPlanCsv); + return Optional.of(resetPlan); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else return Optional.empty(); + } + + private Map> parseResetPlan(String resetPlanCsv) { + ObjectReader csvReader = CsvUtils.readerFor(CsvUtils.CsvRecordNoGroup.class); + String[] lines = resetPlanCsv.split("\n"); + boolean isSingleGroupQuery = opts.options.valuesOf(opts.groupOpt).size() == 1; + boolean isOldCsvFormat = false; + try { + if (lines.length > 0) { + csvReader.readValue(lines[0], CsvUtils.CsvRecordNoGroup.class); + isOldCsvFormat = true; + } + } catch (IOException e) { + throw new RuntimeException("Invalid CSV format in reset plan file: " + e.getMessage()); + } + + Map> dataMap = new HashMap<>(); + + try { + // Single group CSV format: "topic,partition,offset" + if (isSingleGroupQuery && isOldCsvFormat) { + String group = opts.options.valueOf(opts.groupOpt); + for (String line : lines) { + CsvUtils.CsvRecordNoGroup rec = csvReader.readValue(line, CsvUtils.CsvRecordNoGroup.class); + dataMap.computeIfAbsent(group, k -> new HashMap<>()) + .put(new TopicPartition(rec.getTopic(), rec.getPartition()), new OffsetAndMetadata(rec.getOffset())); + } + } else { + csvReader = CsvUtils.readerFor(CsvUtils.CsvRecordWithGroup.class); + for (String line : lines) { + CsvUtils.CsvRecordWithGroup rec = csvReader.readValue(line, CsvUtils.CsvRecordWithGroup.class); + dataMap.computeIfAbsent(rec.getGroup(), k -> new HashMap<>()) + .put(new TopicPartition(rec.getTopic(), rec.getPartition()), new OffsetAndMetadata(rec.getOffset())); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + return dataMap; + } + + private Map checkOffsetsRange(Map requestedOffsets) { + Map logStartOffsets = getLogStartOffsets(requestedOffsets.keySet()); + Map logEndOffsets = getLogEndOffsets(requestedOffsets.keySet()); + + Map res = new HashMap<>(); + + requestedOffsets.forEach((topicPartition, offset) -> { + LogOffsetResult logEndOffset = logEndOffsets.get(topicPartition); + + if (logEndOffset != null) { + if (logEndOffset instanceof LogOffset && offset > ((LogOffset) logEndOffset).value) { + long endOffset = ((LogOffset) logEndOffset).value; + LOGGER.warn("New offset (" + offset + ") is higher than latest offset for topic partition " + topicPartition + ". Value will be set to " + endOffset); + res.put(topicPartition, endOffset); + } else { + LogOffsetResult logStartOffset = logStartOffsets.get(topicPartition); + + if (logStartOffset instanceof LogOffset && offset < ((LogOffset) logStartOffset).value) { + long startOffset = ((LogOffset) logStartOffset).value; + LOGGER.warn("New offset (" + offset + ") is lower than earliest offset for topic partition " + topicPartition + ". Value will be set to " + startOffset); + res.put(topicPartition, startOffset); + } else + res.put(topicPartition, offset); + } + } else { + // the control should not reach here + throw new IllegalStateException("Unexpected non-existing offset value for topic partition " + topicPartition); + } + }); + + return res; + } + + private Map getLogTimestampOffsets(Collection topicPartitions, long timestamp) { + try { + Map timestampOffsets = topicPartitions.stream() + .collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.forTimestamp(timestamp))); + + Map offsets = adminClient.listOffsets( + timestampOffsets).all().get(); + + Map successfulOffsetsForTimes = new HashMap<>(); + Map unsuccessfulOffsetsForTimes = new HashMap<>(); + + offsets.forEach((tp, offsetsResultInfo) -> { + if (offsetsResultInfo.offset() != ListOffsetsResponse.UNKNOWN_OFFSET) + successfulOffsetsForTimes.put(tp, offsetsResultInfo); + else + unsuccessfulOffsetsForTimes.put(tp, offsetsResultInfo); + }); + + Map successfulLogTimestampOffsets = successfulOffsetsForTimes.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new LogOffset(e.getValue().offset()))); + + unsuccessfulOffsetsForTimes.forEach((tp, offsetResultInfo) -> + System.out.println("\nWarn: Partition " + tp.partition() + " from topic " + tp.topic() + + " is empty. Falling back to latest known offset.")); + + successfulLogTimestampOffsets.putAll(getLogEndOffsets(unsuccessfulOffsetsForTimes.keySet())); + + return successfulLogTimestampOffsets; + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + private Map getLogStartOffsets(Collection topicPartitions) { + return getLogOffsets(topicPartitions, OffsetSpec.earliest()); + } + + private Map getLogEndOffsets(Collection topicPartitions) { + return getLogOffsets(topicPartitions, OffsetSpec.latest()); + } + + private Map getLogOffsets(Collection topicPartitions, OffsetSpec offsetSpec) { + try { + Map startOffsets = topicPartitions.stream() + .collect(Collectors.toMap(Function.identity(), tp -> offsetSpec)); + + Map offsets = adminClient.listOffsets( + startOffsets + ).all().get(); + + return topicPartitions.stream().collect(Collectors.toMap( + Function.identity(), + tp -> offsets.containsKey(tp) + ? new LogOffset(offsets.get(tp).offset()) + : new Unknown() + )); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + private boolean isInferredInternalTopic(final String topicName, final String applicationId) { + return topicName.startsWith(applicationId + "-") && matchesInternalTopicFormat(topicName); + } + + public static boolean matchesInternalTopicFormat(final String topicName) { + return topicName.endsWith("-changelog") || topicName.endsWith("-repartition") + || topicName.endsWith("-subscription-registration-topic") + || topicName.endsWith("-subscription-response-topic") + || topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-\\d+-topic") + || topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic"); + } + + List collectAllTopics(String groupId) { + try { + return adminClient.describeStreamsGroups(List.of(groupId)) + .all().get().get(groupId) + .subtopologies().stream() + .flatMap(subtopology -> Stream.of( + subtopology.sourceTopics().stream(), + subtopology.repartitionSinkTopics().stream(), + subtopology.repartitionSourceTopics().keySet().stream(), + subtopology.stateChangelogTopics().keySet().stream() + ).flatMap(s -> s)).distinct().collect(Collectors.toList()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + interface LogOffsetResult { } + + private static class LogOffset implements LogOffsetResult { + final long value; + + LogOffset(long value) { + this.value = value; + } + } + + private static class Unknown implements LogOffsetResult { } + + private static class Ignore implements LogOffsetResult { } + /** * Prints an error message if the group state indicates that the group is either dead or empty. * diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java index 3990a36f7771c..3e1dfd6b429f7 100644 --- a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java @@ -23,16 +23,26 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import joptsimple.OptionSpec; +import static org.apache.kafka.tools.ToolsUtils.minus; + public class StreamsGroupCommandOptions extends CommandDefaultOptions { + private static final String NL = System.lineSeparator(); public static final Logger LOGGER = LoggerFactory.getLogger(StreamsGroupCommandOptions.class); public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to."; public static final String GROUP_DOC = "The streams group we wish to act on."; + private static final String ALL_GROUPS_DOC = "Apply to all streams groups."; + private static final String TOPIC_DOC = "The topic whose streams group information should be deleted or topic whose should be included in the reset offset process. " + + "In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the process. " + + "Reset-offsets also supports multiple topic inputs."; + private static final String ALL_TOPICS_DOC = "Consider all topics assigned to a group in the `reset-offsets` process."; public static final String LIST_DOC = "List all streams groups."; public static final String DESCRIBE_DOC = "Describe streams group and list offset lag related to given group."; public static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " + @@ -43,6 +53,22 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions { public static final String MEMBERS_DOC = "Describe members of the group. This option may be used with the '--describe' option only."; public static final String OFFSETS_DOC = "Describe the group and list all topic partitions in the group along with their offset information." + "This is the default sub-action and may be used with the '--describe' option only."; + private static final String RESET_OFFSETS_DOC = "Reset offsets of streams group. The instances should be inactive" + NL + + "Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to update the offsets." + NL + + "You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, " + + "--to-latest, --shift-by, --from-file, --to-current, --to-offset." + NL + + "To define the scope use --all-topics or --topic. One scope must be specified unless you use '--from-file'."; + private static final String DRY_RUN_DOC = "Only show results without executing changes on streams group. Supported operations: reset-offsets."; + private static final String EXECUTE_DOC = "Execute operation. Supported operations: reset-offsets."; + private static final String EXPORT_DOC = "Export operation execution to a CSV file. Supported operations: reset-offsets."; + private static final String RESET_TO_OFFSET_DOC = "Reset offsets to a specific offset."; + private static final String RESET_FROM_FILE_DOC = "Reset offsets to values defined in CSV file."; + private static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDThh:mm:ss.sss'"; + private static final String RESET_BY_DURATION_DOC = "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'"; + private static final String RESET_TO_EARLIEST_DOC = "Reset offsets to earliest offset."; + private static final String RESET_TO_LATEST_DOC = "Reset offsets to latest offset."; + private static final String RESET_TO_CURRENT_DOC = "Reset offsets to current offset."; + private static final String RESET_SHIFT_BY_DOC = "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative."; public static final String VERBOSE_DOC = """ Use with --describe --state to show group epoch and target assignment epoch. Use with --describe --members to show for each member the member epoch, target assignment epoch, current assignment, target assignment, and whether member is still using the classic rebalance protocol. @@ -50,15 +76,34 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions { public final OptionSpec bootstrapServerOpt; public final OptionSpec groupOpt; + final OptionSpec topicOpt; + final OptionSpec allTopicsOpt; public final OptionSpec listOpt; public final OptionSpec describeOpt; + final OptionSpec allGroupsOpt; public final OptionSpec timeoutMsOpt; public final OptionSpec commandConfigOpt; public final OptionSpec stateOpt; public final OptionSpec membersOpt; public final OptionSpec offsetsOpt; + public final OptionSpec resetOffsetsOpt; + public final OptionSpec resetToOffsetOpt; + public final OptionSpec resetFromFileOpt; + public final OptionSpec resetToDatetimeOpt; + public final OptionSpec resetByDurationOpt; + public final OptionSpec resetToEarliestOpt; + public final OptionSpec resetToLatestOpt; + public final OptionSpec resetToCurrentOpt; + public final OptionSpec resetShiftByOpt; + public final OptionSpec dryRunOpt; + public final OptionSpec executeOpt; + public final OptionSpec exportOpt; public final OptionSpec verboseOpt; + final Set> allResetOffsetScenarioOpts; + final Set> allGroupSelectionScopeOpts; + + public static StreamsGroupCommandOptions fromArgs(String[] args) { StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args); opts.checkArgs(); @@ -76,8 +121,14 @@ public StreamsGroupCommandOptions(String[] args) { .withRequiredArg() .describedAs("streams group") .ofType(String.class); + topicOpt = parser.accepts("topic", TOPIC_DOC) + .withRequiredArg() + .describedAs("topic") + .ofType(String.class); + allTopicsOpt = parser.accepts("all-topics", ALL_TOPICS_DOC); listOpt = parser.accepts("list", LIST_DOC); describeOpt = parser.accepts("describe", DESCRIBE_DOC); + allGroupsOpt = parser.accepts("all-groups", ALL_GROUPS_DOC); timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC) .availableIf(describeOpt) .withRequiredArg() @@ -88,6 +139,7 @@ public StreamsGroupCommandOptions(String[] args) { .withRequiredArg() .describedAs("command config property file") .ofType(String.class); + stateOpt = parser.accepts("state", STATE_DOC) .availableIf(listOpt, describeOpt) .withOptionalArg() @@ -96,10 +148,41 @@ public StreamsGroupCommandOptions(String[] args) { .availableIf(describeOpt); offsetsOpt = parser.accepts("offsets", OFFSETS_DOC) .availableIf(describeOpt); + resetOffsetsOpt = parser.accepts("reset-offsets", RESET_OFFSETS_DOC); + resetToOffsetOpt = parser.accepts("to-offset", RESET_TO_OFFSET_DOC) + .withRequiredArg() + .describedAs("offset") + .ofType(Long.class); + resetFromFileOpt = parser.accepts("from-file", RESET_FROM_FILE_DOC) + .withRequiredArg() + .describedAs("path to CSV file") + .ofType(String.class); + resetToDatetimeOpt = parser.accepts("to-datetime", RESET_TO_DATETIME_DOC) + .withRequiredArg() + .describedAs("datetime") + .ofType(String.class); + resetByDurationOpt = parser.accepts("by-duration", RESET_BY_DURATION_DOC) + .withRequiredArg() + .describedAs("duration") + .ofType(String.class); + resetToEarliestOpt = parser.accepts("to-earliest", RESET_TO_EARLIEST_DOC); + resetToLatestOpt = parser.accepts("to-latest", RESET_TO_LATEST_DOC); + resetToCurrentOpt = parser.accepts("to-current", RESET_TO_CURRENT_DOC); + resetShiftByOpt = parser.accepts("shift-by", RESET_SHIFT_BY_DOC) + .withRequiredArg() + .describedAs("number-of-offsets") + .ofType(Long.class); + verboseOpt = parser.accepts("verbose", VERBOSE_DOC) .availableIf(describeOpt); - + dryRunOpt = parser.accepts("dry-run", DRY_RUN_DOC); + executeOpt = parser.accepts("execute", EXECUTE_DOC); + exportOpt = parser.accepts("export", EXPORT_DOC); options = parser.parse(args); + + allResetOffsetScenarioOpts = new HashSet<>(Arrays.asList(resetToOffsetOpt, resetShiftByOpt, + resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt)); + allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, allGroupsOpt)); } public void checkArgs() { @@ -108,6 +191,9 @@ public void checkArgs() { CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt); if (options.has(describeOpt)) { + if (!options.has(groupOpt) && !options.has(allGroupsOpt)) + CommandLineUtils.printUsageAndExit(parser, + "Option " + describeOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); List> mutuallyExclusiveOpts = Arrays.asList(membersOpt, offsetsOpt, stateOpt); if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 : 0).sum() > 1) { CommandLineUtils.printUsageAndExit(parser, @@ -121,6 +207,35 @@ public void checkArgs() { LOGGER.debug("Option " + timeoutMsOpt + " is applicable only when " + describeOpt + " is used."); } + if (!options.has(groupOpt) && !options.has(allGroupsOpt)) + CommandLineUtils.printUsageAndExit(parser, + "Option " + resetOffsetsOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); + + checkOffsetResetArgs(); + CommandLineUtils.checkInvalidArgs(parser, options, listOpt, membersOpt, offsetsOpt); } + + private void checkOffsetResetArgs() { + if (options.has(resetOffsetsOpt)) { + if (options.has(dryRunOpt) && options.has(executeOpt)) + CommandLineUtils.printUsageAndExit(parser, "Option " + resetOffsetsOpt + " only accepts one of " + executeOpt + " and " + dryRunOpt); + + if (!options.has(dryRunOpt) && !options.has(executeOpt)) { + System.err.println("WARN: No action will be performed as the --execute option is missing. " + + "In a future major release, the default behavior of this command will be to prompt the user before " + + "executing the reset rather than doing a dry run. You should add the --dry-run option explicitly " + + "if you are scripting this command and want to keep the current default behavior without prompting."); + } + + CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt, minus(allResetOffsetScenarioOpts, resetToOffsetOpt)); + CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, minus(allResetOffsetScenarioOpts, resetToDatetimeOpt)); + CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt, minus(allResetOffsetScenarioOpts, resetByDurationOpt)); + CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt, minus(allResetOffsetScenarioOpts, resetToEarliestOpt)); + CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt, minus(allResetOffsetScenarioOpts, resetToLatestOpt)); + CommandLineUtils.checkInvalidArgs(parser, options, resetToCurrentOpt, minus(allResetOffsetScenarioOpts, resetToCurrentOpt)); + CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt, minus(allResetOffsetScenarioOpts, resetShiftByOpt)); + CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt, minus(allResetOffsetScenarioOpts, resetFromFileOpt)); + } + } } diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java b/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java new file mode 100644 index 0000000000000..3f4246d6e45be --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/streams/ResetStreamsGroupOffsetTest.java @@ -0,0 +1,649 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.streams; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.streams.GroupProtocol; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import joptsimple.OptionException; + +import static java.time.LocalDateTime.now; +import static java.util.Arrays.asList; +import static java.util.stream.Collectors.toMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +@Timeout(600) +@Tag("integration") +public class ResetStreamsGroupOffsetTest { + private static final String TOPIC_PREFIX = "foo-"; + private static final String APP_ID_PREFIX = "streams-group-command-test"; + private static final Properties STREAMS_CONFIG = new Properties(); + private static final int RECORD_TOTAL = 10; + public static EmbeddedKafkaCluster cluster; + private static String bootstrapServers; + private static Admin adminClient; + + @BeforeAll + public static void startCluster() { + final Properties props = new Properties(); + props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,streams"); + cluster = new EmbeddedKafkaCluster(2, props); + cluster.start(); + + bootstrapServers = cluster.bootstrapServers(); + adminClient = cluster.createAdminClient(); + + createStreamsConfig(bootstrapServers); + } + + private static void createStreamsConfig(String bootstrapServers) { + STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + STREAMS_CONFIG.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault())); + STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2); + } + + @AfterAll + public static void closeCluster() { + cluster.stop(); + } + + @Test + public void testResetWithUnrecognizedOption() { + String[] args = new String[]{"--unrecognized-option", "--bootstrap-server", bootstrapServers, "--reset-offsets", "--all-group", "--all-topics", "--to-offset", "5"}; + assertThrows(OptionException.class, () -> getStreamsGroupService(args)); + } + + @Test + public void testResetOffset() throws Exception { + final String appId = generateRandomAppId(); + final String topic1 = generateRandomTopic(); + final String topic2 = generateRandomTopic(); + final int numOfPartitions = 2; + String[] args; + produceConsumeShutdown(appId, topic1, topic2, RECORD_TOTAL * numOfPartitions * 2); + produceMessagesOnTwoPartitions(RECORD_TOTAL, topic1); + produceMessagesOnTwoPartitions(RECORD_TOTAL, topic2); + /////////////////////////////////////////////// Specific topic (--topic topic1) //////////////////////////////////////////////// + // reset to specific offset, offset already on 10 + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--topic", topic1, "--to-offset", "5"}; + resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 5L, 10L, 0, 1); + + resetForNextTest(appId, 10L, topic1); + + // reset to specific offset when after end offset, offset already on 10 + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--topic", topic1, "--to-offset", "30"}; + resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 20L, 10L, 0, 1); + + // reset to specific offset when before begin offset, offset already on 20 + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--topic", topic1, "--to-offset", "-30"}; + resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 0L, 20L, 0, 1); + + resetForNextTest(appId, 10L, topic1); + + // reset to specific date time + DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS"); + LocalDateTime dateTime = now().minusDays(1); + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--topic", topic1, "--to-datetime", format.format(dateTime)}; + resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 0L, 10L, 0, 1); + + resetForNextTest(appId, 10L, topic1); + + // reset by duration to earliest + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--topic", topic1, "--by-duration", "PT5M"}; + resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 0L, 10L, 0, 1); + + resetForNextTest(appId, 10L, topic1); + + // reset to earliest + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--topic", topic1, "--to-earliest"}; + resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 0L, 10L, 0, 1); + + resetForNextTest(appId, 10L, topic1); + + // reset to latest + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--topic", topic1, "--to-latest"}; + resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 20L, 10L, 0, 1); + + resetForNextTest(appId, 5L, topic1); + + // reset to current + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--topic", topic1, "--to-current"}; + resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 5L, 5L, 0, 1); + + // reset offset shift+. The current offset is 5, as of the prev test is executed (by --execute) + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--topic", topic1, "--shift-by", "3"}; + resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 8L, 5L, 0, 1); + + // reset offset shift-. The current offset is 8, as of the prev test is executed (by --execute) + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--topic", topic1, "--shift-by", "-3"}; + resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 5L, 8L, 0, 1); + + // reset offset shift by lower than earliest. The current offset is 5, as of the prev test is executed (by --execute) + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--topic", topic1, "--shift-by", "-150"}; + resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 0L, 5L, 0, 1); + + // reset offset shift by higher than latest. The current offset is 0, as of the prev test is executed (by --execute) + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--topic", topic1, "--shift-by", "150"}; + resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 20L, 0L, 0, 1); + + // export to file + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--topic", topic1, "--to-offset", "5", "--export"}; + File file = TestUtils.tempFile("reset", ".csv"); + Map exp = Map.of(new TopicPartition(topic1, 0), 5L, new TopicPartition(topic1, 1), 5L); + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { + Map> exportedOffsets = service.resetOffsets(); + writeContentToFile(file, service.exportOffsetsToCsv(exportedOffsets)); + + assertEquals(exp, toOffsetMap(exportedOffsets.get(appId))); + } + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--topic", topic1, "--from-file", file.getCanonicalPath()}; + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { + Map> importedOffsets = service.resetOffsets(); + assertEquals(exp, toOffsetMap(importedOffsets.get(appId))); + } + + ///////////////////////////////////////// Specific topic and partition (--topic topic1, --topic topic2) ///////////////////////////////////////// + resetForNextTest(appId, 10L, topic1); + + // reset to specific offset + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--topic", topic1 + ":1", "--to-offset", "5"}; + resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, 5L, 10L, 1); + + resetForNextTest(appId, 10L, topic1); + + // reset both partitions of topic1 and topic2:1 to specific offset + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, + "--topic", topic1, "--topic", topic2 + ":1", "--to-offset", "5"}; + final Map expectedOffsets = Map.of( + new TopicPartition(topic1, 0), 5L, + new TopicPartition(topic1, 1), 5L, + new TopicPartition(topic2, 1), 5L); + + resetOffsetsAndAssert(args, appId, List.of(topic1, topic2), expectedOffsets, + Map.of( + new TopicPartition(topic1, 0), 10L, + new TopicPartition(topic1, 1), 10L, + new TopicPartition(topic2, 0), 10L, + new TopicPartition(topic2, 1), 10L)); + resetOffsetsAndAssert(addTo(args, "--execute"), appId, List.of(topic1, topic2), expectedOffsets, + Map.of(new TopicPartition(topic1, 0), 5L, + new TopicPartition(topic1, 1), 5L, + new TopicPartition(topic2, 0), 10L, + new TopicPartition(topic2, 1), 5L)); + + ///////////////////////////////////////// All topics (--all-topics) ///////////////////////////////////////// + resetForNextTest(appId, 10L, topic1, topic2); + + // reset to specific offset + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--all-topics", "--to-offset", "5"}; + resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, topic2, 5L, 10L); + + resetForNextTest(appId, 10L, topic1, topic2); + + // reset to specific offset with two --topic options + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--topic", topic1, "--topic", topic2, "--to-offset", "5"}; + resetOffsetsAndAssertForDryRunAndExecute(args, appId, topic1, topic2, 5L, 10L); + + resetForNextTest(appId, 10L, topic1, topic2); + + // export to file + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--all-topics", "--to-offset", "5", "--export"}; + file = TestUtils.tempFile("reset-all", ".csv"); + exp = Map.of(new TopicPartition(topic1, 0), 5L, + new TopicPartition(topic1, 1), 5L, + new TopicPartition(topic2, 0), 5L, + new TopicPartition(topic2, 1), 5L); + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { + Map> exportedOffsets = service.resetOffsets(); + writeContentToFile(file, service.exportOffsetsToCsv(exportedOffsets)); + + assertEquals(exp, toOffsetMap(exportedOffsets.get(appId))); + } + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--topic", topic1, "--from-file", file.getCanonicalPath()}; + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { + Map> importedOffsets = service.resetOffsets(); + + assertEquals(exp, toOffsetMap(importedOffsets.get(appId))); + } + + adminClient.deleteTopics(List.of(topic1, topic2)).all().get(); + } + + @Test + public void testTopicsWhenResettingOffset() throws Exception { + final String appId = generateRandomAppId(); + final String topic1 = generateRandomTopic(); + final String topic2 = generateRandomTopic(); + final int numOfPartitions = 2; + String[] args; + produceConsumeShutdown(appId, topic1, topic2, RECORD_TOTAL * numOfPartitions * 2); + produceMessagesOnTwoPartitions(RECORD_TOTAL, topic1); + produceMessagesOnTwoPartitions(RECORD_TOTAL, topic2); + + args = new String[]{"--bootstrap-server", bootstrapServers, "--reset-offsets", "--group", appId, "--all-topics", "--to-offset", "5"}; + resetOffsetsAndAssertInternalTopicDeletionForDryRunAndExecute(args, appId); + + adminClient.deleteTopics(List.of(topic1, topic2)).all().get(); + } + + private void resetForNextTest(String appId, long desiredOffset, String... topics) throws ExecutionException, InterruptedException { + Map offsets = new HashMap<>(); + for (String topic : topics) { + offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(desiredOffset)); + offsets.put(new TopicPartition(topic, 1), new OffsetAndMetadata(desiredOffset)); + } + adminClient.alterStreamsGroupOffsets(appId, offsets).all().get(); + Map committedOffsets = committedOffsets(List.of(topics), appId); + for (TopicPartition tp: offsets.keySet()) { + assertEquals(desiredOffset, committedOffsets.get(tp)); + } + } + + private void AssertCommittedOffsets(String appId, + String topic, + long expectedCommittedOffset, + int... partitions) throws ExecutionException, InterruptedException { + List affectedTPs = Arrays.stream(partitions) + .mapToObj(partition -> new TopicPartition(topic, partition)) + .toList(); + Map committedOffsets = committedOffsets(List.of(topic), appId); + for (TopicPartition tp: affectedTPs) { + assertEquals(expectedCommittedOffset, committedOffsets.get(tp)); + } + } + + private void AssertCommittedOffsets(String appId, + String topic1, + String topic2, + long expectedCommittedOffset) throws ExecutionException, InterruptedException { + TopicPartition tp10 = new TopicPartition(topic1, 0); + TopicPartition tp11 = new TopicPartition(topic2, 0); + TopicPartition tp20 = new TopicPartition(topic1, 1); + TopicPartition tp21 = new TopicPartition(topic2, 1); + Map committedOffsets = committedOffsets(List.of(topic1, topic2), appId); + assertEquals(Map.of( + tp10, expectedCommittedOffset, + tp20, expectedCommittedOffset, + tp11, expectedCommittedOffset, + tp21, expectedCommittedOffset), committedOffsets); + } + + /** + * Resets offsets for a specific topic and partition(s) and verifies the results. + * + *

This method performs the following steps:

+ *
    + *
  • Resets offsets for the specified topic and partitions using the provided arguments.
  • + *
  • Asserts that the reset offsets match the expected offsets.
  • + *
  • Asserts that the committed offsets match the expected committed offsets.
  • + *
+ * + * @param args The command-line arguments for resetting offsets. + * @param appId The application ID for the Kafka Streams application. + * @param topic The topic for which offsets will be reset. + * @param expectedOffset The expected offset value after the reset. + * @param expectedCommittedOffset The expected committed offset value after the reset. + * @param partitions The partitions of the topic to reset offsets for. + * @throws ExecutionException If an error occurs during the execution of the reset operation. + * @throws InterruptedException If the thread is interrupted during the reset operation. + */ + private void resetOffsetsAndAssert(String[] args, + String appId, + String topic, + long expectedOffset, + long expectedCommittedOffset, + int... partitions) throws ExecutionException, InterruptedException { + Map> resetOffsetsResultByGroup; + Map expectedOffetMap = Arrays.stream(partitions) + .boxed() + .collect(Collectors.toMap( + partition -> new TopicPartition(topic, partition), + partition -> expectedOffset + )); + Map> expectedResetResults = Map.of(appId, expectedOffetMap); + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { + resetOffsetsResultByGroup = convertOffsetsToLong(service.resetOffsets()); + } + // assert that the reset offsets are as expected + assertEquals(expectedResetResults, resetOffsetsResultByGroup); + assertEquals(expectedResetResults.values().size(), resetOffsetsResultByGroup.values().size()); + // assert that the committed offsets are as expected + AssertCommittedOffsets(appId, topic, expectedCommittedOffset, partitions); + } + + private void resetOffsetsAndAssertInternalTopicDeletion(String[] args, + String appId) throws InterruptedException { + final boolean executeMode = Arrays.asList(args).contains("--execute"); + List internalTopics; + List allTopics; + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { + internalTopics = service.retrieveInternalTopics(List.of(appId)).get(appId); + allTopics = service.collectAllTopics(appId); + service.resetOffsets(); + } + + // assert that the internal topics are deleted in --execute mode and not in --dry-run mode + allTopics.addAll(List.of("__consumer_offsets", "__transaction_state")); + if (executeMode) { + allTopics.removeAll(internalTopics); + } + cluster.waitForRemainingTopics(30000, allTopics.toArray(new String[0])); + } + + /** + * Resets offsets for two topics and verifies the results. + * + *

This method performs the following steps:

+ *
    + *
  • Resets offsets for the specified topics using the provided arguments.
  • + *
  • Asserts that the reset offsets match the expected offsets.
  • + *
  • Asserts that the committed offsets match the expected committed offsets.
  • + *
+ * + * @param args The command-line arguments for resetting offsets. + * @param appId The application ID for the Kafka Streams application. + * @param topic1 The first topic for which offsets will be reset. + * @param topic2 The second topic for which offsets will be reset. + * @param expectedOffset The expected offset value after the reset. + * @param expectedCommittedOffset The expected committed offset value after the reset. + * @throws ExecutionException If an error occurs during the execution of the reset operation. + * @throws InterruptedException If the thread is interrupted during the reset operation. + */ + private void resetOffsetsAndAssert(String[] args, + String appId, + String topic1, + String topic2, + long expectedOffset, + long expectedCommittedOffset) throws ExecutionException, InterruptedException { + Map> resetOffsetsResultByGroup; + Map> expectedResetResults = Map.of( + appId, Map.of( + new TopicPartition(topic1, 0), expectedOffset, + new TopicPartition(topic2, 0), expectedOffset, + new TopicPartition(topic1, 1), expectedOffset, + new TopicPartition(topic2, 1), expectedOffset + ) + ); + + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { + resetOffsetsResultByGroup = convertOffsetsToLong(service.resetOffsets()); + } + // assert that the reset offsets are as expected + assertEquals(expectedResetResults, resetOffsetsResultByGroup); + assertEquals(expectedResetResults.values().size(), resetOffsetsResultByGroup.values().size()); + // assert that the committed offsets are as expected + AssertCommittedOffsets(appId, topic1, topic2, expectedCommittedOffset); + } + + /** + * Resets offsets for the specified topics and verifies the results. + * + *

This method performs the following steps:

+ *
    + *
  • Resets offsets for the given topics using the provided arguments.
  • + *
  • Asserts that the reset offsets match the expected offsets.
  • + *
  • Asserts that the committed offsets match the expected committed offsets.
  • + *
+ * + * @param args The command-line arguments for resetting offsets. + * @param appId The application ID for the Kafka Streams application. + * @param topics The list of topics for which offsets will be reset. + * @param expectedOffsets A map of expected offsets for each topic partition after the reset. + * @param expectedCommittedOffsets A map of expected committed offsets for each topic partition after the reset. + * @throws ExecutionException If an error occurs during the execution of the reset operation. + * @throws InterruptedException If the thread is interrupted during the reset operation. + */ + private void resetOffsetsAndAssert(String[] args, + String appId, + List topics, + Map expectedOffsets, + Map expectedCommittedOffsets) throws ExecutionException, InterruptedException { + Map resetOffsetsResult; + + try (StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args)) { + resetOffsetsResult = convertOffsetsToLong(service.resetOffsets()).get(appId); + } + // assert that the reset offsets are as expected + assertEquals(expectedOffsets, resetOffsetsResult); + assertEquals(expectedOffsets.values().size(), resetOffsetsResult.values().size()); + // assert that the committed offsets are as expected + assertEquals(expectedCommittedOffsets, committedOffsets(topics, appId)); + } + + private void resetOffsetsAndAssertForDryRunAndExecute(String[] args, + String appId, + String topic, + long expectedOffset, + long expectedCommittedOffset, + int... partitions) throws ExecutionException, InterruptedException { + resetOffsetsAndAssert(args, appId, topic, expectedOffset, expectedCommittedOffset, partitions); + resetOffsetsAndAssert(addTo(args, "--dry-run"), appId, topic, expectedOffset, expectedCommittedOffset, partitions); + resetOffsetsAndAssert(addTo(args, "--execute"), appId, topic, expectedOffset, expectedOffset, partitions); + } + + private void resetOffsetsAndAssertInternalTopicDeletionForDryRunAndExecute(String[] args, + String appId) throws InterruptedException { + resetOffsetsAndAssertInternalTopicDeletion(args, appId); + resetOffsetsAndAssertInternalTopicDeletion(addTo(args, "--dry-run"), appId); + resetOffsetsAndAssertInternalTopicDeletion(addTo(args, "--execute"), appId); + } + + private void resetOffsetsAndAssertForDryRunAndExecute(String[] args, + String appId, + String topic1, + String topic2, + long expectedOffset, + long expectedCommittedOffset) throws ExecutionException, InterruptedException { + resetOffsetsAndAssert(args, appId, topic1, topic2, expectedOffset, expectedCommittedOffset); + resetOffsetsAndAssert(addTo(args, "--dry-run"), appId, topic1, topic2, expectedOffset, expectedCommittedOffset); + resetOffsetsAndAssert(addTo(args, "--execute"), appId, topic1, topic2, expectedOffset, expectedOffset); + } + + private Map committedOffsets(List topics, + String group) throws ExecutionException, InterruptedException { + return adminClient.listConsumerGroupOffsets(group) + .all().get() + .get(group).entrySet() + .stream() + .filter(e -> topics.contains(e.getKey().topic())) + .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())); + } + + private static Map> convertOffsetsToLong(Map> map) { + return map.entrySet() + .stream() + .collect(toMap(Map.Entry::getKey, e -> e.getValue().entrySet() + .stream() + .collect(toMap(Map.Entry::getKey, e1 -> e1.getValue().offset())))); + } + + private Map toOffsetMap(Map map) { + return map.entrySet() + .stream() + .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset())); + } + + private StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] args) { + StreamsGroupCommandOptions opts = StreamsGroupCommandOptions.fromArgs(args); + return new StreamsGroupCommand.StreamsGroupService( + opts, + Map.of(AdminClientConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE)) + ); + } + + private void writeContentToFile(File file, String content) throws IOException { + try (BufferedWriter bw = new BufferedWriter(new FileWriter(file))) { + bw.write(content); + } + } + + private String[] addTo(String[] args, String... extra) { + List res = new ArrayList<>(asList(args)); + res.addAll(asList(extra)); + return res.toArray(new String[0]); + } + + private String generateRandomTopic() { + return TOPIC_PREFIX + TestUtils.randomString(10); + } + + private String generateRandomAppId() { + return APP_ID_PREFIX + TestUtils.randomString(10); + } + + /** + * Produces messages to two partitions of the specified topic and consumes them. + * + * @param appId The application ID for the Kafka Streams application. + * @param topic1 The first topic to produce and consume messages from. + * @param topic2 The second topic to produce and consume messages from. + * @param numOfCommittedMessages The number of committed messages to process before shutting down. + */ + private void produceConsumeShutdown(String appId, String topic1, String topic2, long numOfCommittedMessages) throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); + + cluster.createTopic(topic1, 2); + cluster.createTopic(topic2, 2); + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream inputStream1 = builder.stream(topic1); + final KStream inputStream2 = builder.stream(topic2); + + final AtomicInteger recordCount = new AtomicInteger(0); + + final KTable valueCounts = inputStream1.merge(inputStream2) + // Explicit repartition step with a custom internal topic name + .groupBy((key, value) -> key, Grouped.with(Serdes.String(), Serdes.String())) + .aggregate( + () -> "()", + (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")", + Materialized.as("aggregated_value")); + + valueCounts.toStream().peek((key, value) -> { + if (recordCount.incrementAndGet() > numOfCommittedMessages) { + throw new IllegalStateException("Crash on the " + numOfCommittedMessages + " record"); + } + }); + + + final KafkaStreams streams = new KafkaStreams(builder.build(), STREAMS_CONFIG); + streams.cleanUp(); + streams.start(); + + produceMessagesOnTwoPartitions(RECORD_TOTAL, topic1); + produceMessagesOnTwoPartitions(RECORD_TOTAL, topic2); + + + TestUtils.waitForCondition(() -> streams.state().equals(KafkaStreams.State.RUNNING), + "Expected RUNNING state but streams is on " + streams.state()); + + + try { + TestUtils.waitForCondition(() -> recordCount.get() == numOfCommittedMessages, + "Expected " + numOfCommittedMessages + " records processed but only got " + recordCount.get()); + } catch (final Exception e) { + e.printStackTrace(); + } finally { + assertEquals(numOfCommittedMessages, recordCount.get(), "Expected " + numOfCommittedMessages + " records processed but only got " + recordCount.get()); + streams.close(); + } + } + + /** + * Produces messages to two partitions of the specified topic. + * + * @param numOfMessages The number of messages to produce for each partition. + * @param topic The topic to which the messages will be produced. + */ + private static void produceMessagesOnTwoPartitions(final int numOfMessages, final String topic) { + + // partition 0 + List> data = new ArrayList<>(numOfMessages); + for (long v = 0; v < numOfMessages; ++v) { + data.add(new KeyValueTimestamp<>(v + "0" + topic, v + "0", cluster.time.milliseconds())); + } + + IntegrationTestUtils.produceSynchronously( + TestUtils.producerConfig(bootstrapServers, StringSerializer.class, StringSerializer.class), + false, + topic, + Optional.of(0), + data + ); + + // partition 1 + data = new ArrayList<>(numOfMessages); + for (long v = 0; v < 10; ++v) { + data.add(new KeyValueTimestamp<>(v + "1" + topic, v + "1", cluster.time.milliseconds())); + } + + IntegrationTestUtils.produceSynchronously( + TestUtils.producerConfig(bootstrapServers, StringSerializer.class, StringSerializer.class), + false, + topic, + Optional.of(1), + data + ); + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java index 65027ee3b79fb..984bcf423821d 100644 --- a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java @@ -17,30 +17,37 @@ package org.apache.kafka.tools.streams; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientTestUtils; import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult; +import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.GroupListing; import org.apache.kafka.clients.admin.KafkaAdminClient; -import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; import org.apache.kafka.clients.admin.ListGroupsOptions; import org.apache.kafka.clients.admin.ListGroupsResult; import org.apache.kafka.clients.admin.ListOffsetsResult; -import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsResult; import org.apache.kafka.clients.admin.StreamsGroupDescription; import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment; import org.apache.kafka.clients.admin.StreamsGroupMemberDescription; import org.apache.kafka.clients.admin.StreamsGroupSubtopologyDescription; +import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -48,7 +55,9 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import joptsimple.OptionException; @@ -58,25 +67,28 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class StreamsGroupCommandTest { + private static final Admin ADMIN_CLIENT = mock(KafkaAdminClient.class); + private static final String BOOTSTRAP_SERVERS = "localhost:9092"; + @Test public void testListStreamsGroups() throws Exception { String firstGroup = "first-group"; String secondGroup = "second-group"; - String bootstrapServer = "localhost:9092"; - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list"}; - Admin adminClient = mock(KafkaAdminClient.class); + String[] cgcArgs = new String[]{"--bootstrap-server", BOOTSTRAP_SERVERS, "--list"}; ListGroupsResult result = mock(ListGroupsResult.class); when(result.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList( new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)), new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.EMPTY)) ))); - when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(result); - StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(cgcArgs, adminClient); + when(ADMIN_CLIENT.listGroups(any(ListGroupsOptions.class))).thenReturn(result); + StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(cgcArgs); Set expectedGroups = new HashSet<>(Arrays.asList(firstGroup, secondGroup)); final Set[] foundGroups = new Set[]{Set.of()}; @@ -89,10 +101,9 @@ public void testListStreamsGroups() throws Exception { @Test public void testListWithUnrecognizedOption() { - String bootstrapServer = "localhost:9092"; - String[] cgcArgs = new String[]{"--frivolous-nonsense", "--bootstrap-server", bootstrapServer, "--list"}; + String[] cgcArgs = new String[]{"--frivolous-nonsense", "--bootstrap-server", BOOTSTRAP_SERVERS, "--list"}; final Exception exception = assertThrows(OptionException.class, () -> { - getStreamsGroupService(cgcArgs, new MockAdminClient()); + getStreamsGroupService(cgcArgs); }); assertEquals("frivolous-nonsense is not a recognized option", exception.getMessage()); } @@ -101,17 +112,15 @@ public void testListWithUnrecognizedOption() { public void testListStreamsGroupsWithStates() throws Exception { String firstGroup = "first-group"; String secondGroup = "second-group"; - String bootstrapServer = "localhost:9092"; - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--list", "--state"}; - Admin adminClient = mock(KafkaAdminClient.class); + String[] cgcArgs = new String[]{"--bootstrap-server", BOOTSTRAP_SERVERS, "--list", "--state"}; ListGroupsResult resultWithAllStates = mock(ListGroupsResult.class); when(resultWithAllStates.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList( new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)), new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.EMPTY)) ))); - when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithAllStates); - StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(cgcArgs, adminClient); + when(ADMIN_CLIENT.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithAllStates); + StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(cgcArgs); Set expectedListing = new HashSet<>(Arrays.asList( new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)), new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.EMPTY)))); @@ -126,7 +135,7 @@ public void testListStreamsGroupsWithStates() throws Exception { when(resultWithStableState.all()).thenReturn(KafkaFuture.completedFuture(List.of( new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)) ))); - when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithStableState); + when(ADMIN_CLIENT.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithStableState); Set expectedListingStable = Set.of( new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE))); @@ -141,8 +150,7 @@ public void testListStreamsGroupsWithStates() throws Exception { @Test public void testDescribeStreamsGroups() throws Exception { - String firstGroup = "group1"; - Admin adminClient = mock(KafkaAdminClient.class); + String firstGroup = "foo-group"; DescribeStreamsGroupsResult result = mock(DescribeStreamsGroupsResult.class); Map resultMap = new HashMap<>(); StreamsGroupDescription exp = new StreamsGroupDescription( @@ -158,15 +166,15 @@ public void testDescribeStreamsGroups() throws Exception { resultMap.put(firstGroup, exp); when(result.all()).thenReturn(KafkaFuture.completedFuture(resultMap)); - when(adminClient.describeStreamsGroups(ArgumentMatchers.anyCollection())).thenReturn(result); - StreamsGroupCommand.StreamsGroupService service = new StreamsGroupCommand.StreamsGroupService(null, adminClient); + when(ADMIN_CLIENT.describeStreamsGroups(ArgumentMatchers.anyCollection())).thenReturn(result); + StreamsGroupCommand.StreamsGroupService service = new StreamsGroupCommand.StreamsGroupService(null, ADMIN_CLIENT); assertEquals(exp, service.getDescribeGroup(firstGroup)); service.close(); } @Test public void testDescribeStreamsGroupsGetOffsets() throws Exception { - Admin adminClient = mock(KafkaAdminClient.class); + String groupId = "group1"; ListOffsetsResult startOffset = mock(ListOffsetsResult.class); Map startOffsetResultMap = new HashMap<>(); @@ -179,22 +187,31 @@ public void testDescribeStreamsGroupsGetOffsets() throws Exception { when(startOffset.all()).thenReturn(KafkaFuture.completedFuture(startOffsetResultMap)); when(endOffset.all()).thenReturn(KafkaFuture.completedFuture(endOffsetResultMap)); - when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(startOffset, endOffset); + when(ADMIN_CLIENT.listOffsets(ArgumentMatchers.anyMap())).thenReturn(startOffset, endOffset); - ListConsumerGroupOffsetsResult result = mock(ListConsumerGroupOffsetsResult.class); + ListStreamsGroupOffsetsResult result = mock(ListStreamsGroupOffsetsResult.class); Map committedOffsetsMap = new HashMap<>(); committedOffsetsMap.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(12, Optional.of(0), "")); - when(adminClient.listConsumerGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result); + when(ADMIN_CLIENT.listStreamsGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result); when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap)); + // Java + DescribeStreamsGroupsResult describeResult = mock(DescribeStreamsGroupsResult.class); + StreamsGroupDescription groupDescription = mock(StreamsGroupDescription.class); + StreamsGroupSubtopologyDescription subtopology = mock(StreamsGroupSubtopologyDescription.class); + when(ADMIN_CLIENT.describeStreamsGroups(List.of(groupId))).thenReturn(describeResult); + when(describeResult.all()).thenReturn(KafkaFuture.completedFuture(Map.of(groupId, groupDescription))); + when(groupDescription.subtopologies()).thenReturn(List.of(subtopology)); + when(subtopology.sourceTopics()).thenReturn(List.of("topic1")); + StreamsGroupMemberDescription description = new StreamsGroupMemberDescription("foo", 0, Optional.empty(), Optional.empty(), "bar", "baz", 0, "qux", Optional.empty(), Map.of(), List.of(), List.of(), new StreamsGroupMemberAssignment(List.of(), List.of(), List.of()), new StreamsGroupMemberAssignment(List.of(), List.of(), List.of()), false); StreamsGroupDescription x = new StreamsGroupDescription( - "group1", + groupId, 0, 0, 0, @@ -203,7 +220,7 @@ public void testDescribeStreamsGroupsGetOffsets() throws Exception { GroupState.STABLE, new Node(0, "host", 0), null); - StreamsGroupCommand.StreamsGroupService service = new StreamsGroupCommand.StreamsGroupService(null, adminClient); + StreamsGroupCommand.StreamsGroupService service = new StreamsGroupCommand.StreamsGroupService(null, ADMIN_CLIENT); Map lags = service.getOffsets(x); assertEquals(1, lags.size()); assertEquals(new StreamsGroupCommand.OffsetsInfo(Optional.of(12L), Optional.of(0), 30L, 18L), lags.get(new TopicPartition("topic1", 0))); @@ -258,9 +275,49 @@ public void testGroupStatesFromString() { assertThrow(" , ,"); } - StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] args, Admin adminClient) { + @Test + public void testAdminRequestsForResetOffsets() { + String groupId = "foo-group"; + List args = new ArrayList<>(Arrays.asList("--bootstrap-server", "localhost:9092", "--group", groupId, "--reset-offsets", "--topic", "topic1", "--to-latest")); + List topics = List.of("topic1"); + + when(ADMIN_CLIENT.describeStreamsGroups(List.of(groupId))) + .thenReturn(describeStreamsResult(groupId, GroupState.DEAD)); + when(ADMIN_CLIENT.describeTopics(topics)) + .thenReturn(describeTopicsResult(topics, 1)); + when(ADMIN_CLIENT.listOffsets(any())) + .thenReturn(listOffsetsResult()); + when(ADMIN_CLIENT.listGroups(any())).thenReturn(listGroupResult(groupId)); + ListStreamsGroupOffsetsResult result = mock(ListStreamsGroupOffsetsResult.class); + Map committedOffsetsMap = new HashMap<>(); + committedOffsetsMap.put(new TopicPartition("topic1", 0), mock(OffsetAndMetadata.class)); + when(ADMIN_CLIENT.listStreamsGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result); + when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap)); + + StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args.toArray(new String[0])); + Map> resetResult = service.resetOffsets(); + + assertEquals(Collections.singleton(groupId), resetResult.keySet()); + assertEquals(new HashSet<>(List.of(new TopicPartition(topics.get(0), 0))), + resetResult.get(groupId).keySet()); + + verify(ADMIN_CLIENT, times(1)).describeStreamsGroups(List.of(groupId)); + verify(ADMIN_CLIENT, times(1)).describeTopics(topics); + verify(ADMIN_CLIENT, times(1)).listOffsets(any()); + verify(ADMIN_CLIENT, times(1)).listStreamsGroupOffsets(any()); + service.close(); + } + + private ListGroupsResult listGroupResult(String groupId) { + KafkaFutureImpl> future = new KafkaFutureImpl<>(); + GroupListing groupListing = new GroupListing(groupId, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.DEAD)); + future.complete(List.of(groupListing)); + return new ListGroupsResult(future); + } + + StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] args) { StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args); - return new StreamsGroupCommand.StreamsGroupService(opts, adminClient); + return new StreamsGroupCommand.StreamsGroupService(opts, ADMIN_CLIENT); } private static void assertThrow(final String wrongState) { @@ -276,4 +333,46 @@ private static void assertThrow(final String wrongState) { .map(String::trim) .collect(Collectors.toSet()), validStates); } + + private DescribeStreamsGroupsResult describeStreamsResult(String groupId, GroupState groupState) { + StreamsGroupMemberDescription memberDescription = new StreamsGroupMemberDescription("foo", 0, Optional.empty(), + Optional.empty(), "bar", "baz", 0, "qux", + Optional.empty(), Map.of(), List.of(), List.of(), + new StreamsGroupMemberAssignment(List.of(), List.of(), List.of()), new StreamsGroupMemberAssignment(List.of(), List.of(), List.of()), + false); + StreamsGroupDescription description = new StreamsGroupDescription(groupId, + 0, + 0, + 0, + Collections.singletonList(new StreamsGroupSubtopologyDescription("subtopologyId", Collections.emptyList(), Collections.emptyList(), Map.of(), Map.of())), + List.of(memberDescription), + groupState, + new Node(1, "localhost", 9092), + Set.of()); + KafkaFutureImpl future = new KafkaFutureImpl<>(); + future.complete(description); + return new DescribeStreamsGroupsResult(Collections.singletonMap(groupId, future)); + } + + private DescribeTopicsResult describeTopicsResult(Collection topics, int numOfPartitions) { + Map topicDescriptions = new HashMap<>(); + + topics.forEach(topic -> { + List partitions = IntStream.range(0, numOfPartitions) + .mapToObj(i -> new TopicPartitionInfo(i, null, Collections.emptyList(), Collections.emptyList())) + .collect(Collectors.toList()); + topicDescriptions.put(topic, new TopicDescription(topic, false, partitions)); + }); + return AdminClientTestUtils.describeTopicsResult(topicDescriptions); + } + + private ListOffsetsResult listOffsetsResult() { + List topicPartitions = new ArrayList<>(); + topicPartitions.add(new TopicPartition("topic1", 0)); + ListOffsetsResult.ListOffsetsResultInfo resultInfo = new ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis(), Optional.of(1)); + Map> futures = topicPartitions.stream().collect(Collectors.toMap( + Function.identity(), + __ -> KafkaFuture.completedFuture(resultInfo))); + return new ListOffsetsResult(futures); + } }