Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.SharePartitionOffsetInfo;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -117,7 +118,6 @@ private Map<String, Map<TopicPartition, OffsetAndMetadata>> parseResetPlan(Strin
isOldCsvFormat = true;
}
} catch (IOException e) {
e.printStackTrace();
// Ignore.
}

Expand Down Expand Up @@ -159,14 +159,14 @@ private Map<TopicPartition, Long> checkOffsetsRange(Map<TopicPartition, Long> re
if (logEndOffset != null) {
if (logEndOffset instanceof LogOffset && offset > ((LogOffset) logEndOffset).value) {
long endOffset = ((LogOffset) logEndOffset).value;
LOGGER.warn("New offset (" + offset + ") is higher than latest offset for topic partition " + topicPartition + ". Value will be set to " + endOffset);
LOGGER.warn("New offset (" + offset + ") is higher than latest offset for topic partition " + topicPartition + ". Value will be set to " + endOffset + ".");
res.put(topicPartition, endOffset);
} else {
LogOffsetResult logStartOffset = logStartOffsets.get(topicPartition);

if (logStartOffset instanceof LogOffset && offset < ((LogOffset) logStartOffset).value) {
long startOffset = ((LogOffset) logStartOffset).value;
LOGGER.warn("New offset (" + offset + ") is lower than earliest offset for topic partition " + topicPartition + ". Value will be set to " + startOffset);
LOGGER.warn("New offset (" + offset + ") is lower than earliest offset for topic partition " + topicPartition + ". Value will be set to " + startOffset + ".");
res.put(topicPartition, startOffset);
} else
res.put(topicPartition, offset);
Expand Down Expand Up @@ -390,7 +390,7 @@ public Map<TopicPartition, OffsetAndMetadata> resetFromFile(String groupId) {
Map<TopicPartition, OffsetAndMetadata> resetPlanForGroup = resetPlan.get(groupId);

if (resetPlanForGroup == null) {
printError("No reset plan for group " + groupId + " found", Optional.empty());
printError("No reset plan for group " + groupId + " found.", Optional.empty());
return Map.<TopicPartition, OffsetAndMetadata>of();
}

Expand Down Expand Up @@ -439,6 +439,42 @@ public Map<TopicPartition, OffsetAndMetadata> resetToCurrent(Collection<TopicPar
return preparedOffsetsForPartitionsWithCommittedOffset;
}

public Map<TopicPartition, OffsetAndMetadata> resetToCurrentForShareGroup(Collection<TopicPartition> partitionsToReset, Map<TopicPartition, SharePartitionOffsetInfo> currentOffsetInfo) {
Collection<TopicPartition> partitionsToResetWithStartOffset = new ArrayList<>();
Collection<TopicPartition> partitionsToResetWithoutStartOffset = new ArrayList<>();

for (TopicPartition topicPartition : partitionsToReset) {
if (currentOffsetInfo.containsKey(topicPartition))
partitionsToResetWithStartOffset.add(topicPartition);
else
partitionsToResetWithoutStartOffset.add(topicPartition);
}

Map<TopicPartition, OffsetAndMetadata> preparedOffsetsForPartitionsWithStartOffset = partitionsToResetWithStartOffset.stream()
.collect(Collectors.toMap(Function.identity(), topicPartition -> {
SharePartitionOffsetInfo offsetInfo = currentOffsetInfo.get(topicPartition);

if (offsetInfo == null) {
throw new IllegalStateException("Expected a valid start offset for topic partition: " + topicPartition);
}

return new OffsetAndMetadata(offsetInfo.startOffset());
}));

Map<TopicPartition, OffsetAndMetadata> preparedOffsetsForPartitionsWithoutStartOffset =
getLogEndOffsets(partitionsToResetWithoutStartOffset)
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> {
if (!(e.getValue() instanceof OffsetsUtils.LogOffset)) {
CommandLineUtils.printUsageAndExit(parser, "Error getting ending offset of topic partition: " + e.getKey());
}
return new OffsetAndMetadata(((OffsetsUtils.LogOffset) e.getValue()).value);
}));

preparedOffsetsForPartitionsWithStartOffset.putAll(preparedOffsetsForPartitionsWithoutStartOffset);

return preparedOffsetsForPartitionsWithStartOffset;
}

public void checkAllTopicPartitionsValid(Collection<TopicPartition> partitionsToReset) {
// check the partitions exist
List<TopicPartition> partitionsNotExistList = filterNonExistentPartitions(partitionsToReset);
Expand Down Expand Up @@ -537,10 +573,14 @@ public OffsetsUtilsOptions(

public OffsetsUtilsOptions(
List<String> groupOpt,
List<Long> resetToOffsetOpt,
List<String> resetFromFileOpt,
List<String> resetToDatetimeOpt,
long timeoutMsOpt) {

this.groupOpt = groupOpt;
this.resetToOffsetOpt = resetToOffsetOpt;
this.resetFromFileOpt = resetFromFileOpt;
this.resetToDatetimeOpt = resetToDatetimeOpt;
this.timeoutMsOpt = timeoutMsOpt;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
private static final String COMMAND_CONFIG_DOC = "Property file containing configs to be passed to Admin Client and Consumer.";
private static final String RESET_OFFSETS_DOC = "Reset offsets of consumer group. Supports one consumer group at the time, and instances should be inactive" + NL +
"Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to update the offsets. " +
"Additionally, the --export option is used to export the results to a CSV format." + NL +
"Additionally, the --export option is used to export the offsets in CSV format." + NL +
"You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, " +
"--to-latest, --shift-by, --from-file, --to-current, --to-offset." + NL +
"To define the scope use --all-topics or --topic. One scope must be specified unless you use '--from-file'.";
private static final String DRY_RUN_DOC = "Only show results without executing changes on Consumer Groups. Supported operations: reset-offsets.";
private static final String EXECUTE_DOC = "Execute operation. Supported operations: reset-offsets.";
private static final String EXPORT_DOC = "Export operation execution to a CSV file. Supported operations: reset-offsets.";
private static final String EXPORT_DOC = "Export offset information in CSV format. Supported operations: reset-offsets.";
private static final String RESET_TO_OFFSET_DOC = "Reset offsets to a specific offset.";
private static final String RESET_FROM_FILE_DOC = "Reset offsets to values defined in CSV file.";
private static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDThh:mm:ss.sss'";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import org.apache.kafka.server.util.CommandLineUtils;
import org.apache.kafka.tools.OffsetsUtils;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectWriter;

import java.io.IOException;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
Expand Down Expand Up @@ -98,7 +101,13 @@ public static void run(ShareGroupCommandOptions opts) {
} else if (opts.options.has(opts.deleteOpt)) {
shareGroupService.deleteShareGroups();
} else if (opts.options.has(opts.resetOffsetsOpt)) {
shareGroupService.resetOffsets();
Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetsToReset = shareGroupService.resetOffsets();
if (opts.options.has(opts.exportOpt)) {
String exported = shareGroupService.exportOffsetsToCsv(offsetsToReset);
System.out.println(exported);
} else {
OffsetsUtils.printOffsetsToReset(offsetsToReset);
}
} else if (opts.options.has(opts.deleteOffsetsOpt)) {
shareGroupService.deleteOffsets();
}
Expand Down Expand Up @@ -150,6 +159,8 @@ public ShareGroupService(ShareGroupCommandOptions opts, Admin adminClient) {
private OffsetsUtils.OffsetsUtilsOptions getOffsetsUtilsOptions(ShareGroupCommandOptions opts) {
return
new OffsetsUtils.OffsetsUtilsOptions(opts.options.valuesOf(opts.groupOpt),
opts.options.valuesOf(opts.resetToOffsetOpt),
opts.options.valuesOf(opts.resetFromFileOpt),
opts.options.valuesOf(opts.resetToDatetimeOpt),
opts.options.valueOf(opts.timeoutMsOpt));
}
Expand Down Expand Up @@ -242,6 +253,26 @@ public void describeGroups() throws ExecutionException, InterruptedException {
}
}

String exportOffsetsToCsv(Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetsForGroups) {
ObjectWriter csvWriter = CsvUtils.writerFor(CsvUtils.CsvRecordNoGroup.class);

return offsetsForGroups.entrySet().stream().flatMap(e -> {
Map<TopicPartition, OffsetAndMetadata> partitionInfo = e.getValue();

return partitionInfo.entrySet().stream().map(e1 -> {
TopicPartition k = e1.getKey();
OffsetAndMetadata v = e1.getValue();
Object csvRecord = new CsvUtils.CsvRecordNoGroup(k.topic(), k.partition(), v.offset());

try {
return csvWriter.writeValueAsString(csvRecord);
} catch (JsonProcessingException err) {
throw new RuntimeException(err);
}
});
}).collect(Collectors.joining());
}

Map<String, Throwable> deleteShareGroups() {
List<GroupListing> shareGroupIds = listDetailedShareGroups();
List<String> groupIds = opts.options.has(opts.allGroupsOpt)
Expand Down Expand Up @@ -382,32 +413,36 @@ Entry<Throwable, Map<String, Throwable>> sendDeleteShareGroupOffsetsRequest(Stri
return new SimpleImmutableEntry<>(topLevelException, topicLevelResult);
}

void resetOffsets() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets() {
Map<String, Map<TopicPartition, OffsetAndMetadata>> result = new HashMap<>();

String groupId = opts.options.valueOf(opts.groupOpt);
try {
ShareGroupDescription shareGroupDescription = describeShareGroups(List.of(groupId)).get(groupId);
if (!(GroupState.EMPTY.equals(shareGroupDescription.groupState()) || GroupState.DEAD.equals(shareGroupDescription.groupState()))) {
CommandLineUtils.printErrorAndExit(String.format("Share group '%s' is not empty.", groupId));
}
resetOffsetsForInactiveGroup(groupId);
result.put(groupId, resetOffsetsForInactiveGroup(groupId));
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof GroupIdNotFoundException) {
resetOffsetsForInactiveGroup(groupId);
result.put(groupId, resetOffsetsForInactiveGroup(groupId));
} else if (cause instanceof KafkaException) {
CommandLineUtils.printErrorAndExit(cause.getMessage());
} else {
throw new RuntimeException(cause);
}
}

return result;
}

private void resetOffsetsForInactiveGroup(String groupId) {
private Map<TopicPartition, OffsetAndMetadata> resetOffsetsForInactiveGroup(String groupId) {
try {
Collection<TopicPartition> partitionsToReset = getPartitionsToReset(groupId);
Map<TopicPartition, OffsetAndMetadata> offsetsToReset = prepareOffsetsToReset(partitionsToReset);
Map<TopicPartition, OffsetAndMetadata> offsetsToReset = prepareOffsetsToReset(groupId, partitionsToReset);
boolean dryRun = opts.options.has(opts.dryRunOpt) || !opts.options.has(opts.executeOpt);
if (!dryRun) {
adminClient.alterShareGroupOffsets(groupId,
Expand All @@ -418,7 +453,7 @@ private void resetOffsetsForInactiveGroup(String groupId) {
withTimeoutMs(new AlterShareGroupOffsetsOptions())
).all().get();
}
OffsetsUtils.printOffsetsToReset(Map.of(groupId, offsetsToReset));
return offsetsToReset;
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
} catch (ExecutionException ee) {
Expand Down Expand Up @@ -448,14 +483,32 @@ private Collection<TopicPartition> getPartitionsToReset(String groupId) throws E
return partitionsToReset;
}

private Map<TopicPartition, OffsetAndMetadata> prepareOffsetsToReset(Collection<TopicPartition> partitionsToReset) {
private Map<TopicPartition, SharePartitionOffsetInfo> getOffsetInfo(String groupId) {
try {
return adminClient.listShareGroupOffsets(
Map.of(groupId, new ListShareGroupOffsetsSpec()),
withTimeoutMs(new ListShareGroupOffsetsOptions())
).partitionsToOffsetInfo(groupId).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to interrupt the currentThread for RunTimeException?
For ExecutionException, getting the instance type for ex if it's of KafkaException(GroupAuthorizationException etc), it could be more visible to the caller ?

}
}

private Map<TopicPartition, OffsetAndMetadata> prepareOffsetsToReset(String groupId, Collection<TopicPartition> partitionsToReset) {
offsetsUtils.checkAllTopicPartitionsValid(partitionsToReset);
if (opts.options.has(opts.resetToEarliestOpt)) {
if (opts.options.has(opts.resetToOffsetOpt)) {
return offsetsUtils.resetToOffset(partitionsToReset);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this pr, but this call sets offset to 0L, if opts.resetToOffsetOpt is empty or null. Is that ok instead of throwing an exception?
This is also the same in CosnumerGroupCommand too I think.

} else if (opts.options.has(opts.resetToEarliestOpt)) {
return offsetsUtils.resetToEarliest(partitionsToReset);
} else if (opts.options.has(opts.resetToLatestOpt)) {
return offsetsUtils.resetToLatest(partitionsToReset);
} else if (opts.options.has(opts.resetToDatetimeOpt)) {
return offsetsUtils.resetToDateTime(partitionsToReset);
} else if (offsetsUtils.resetPlanFromFile().isPresent()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call resetPlanFromFile and next call resetFromFile seem to do same reading and parsing the entire csv. May be use opts to check for presence?

return offsetsUtils.resetFromFile(groupId);
} else if (opts.options.has(opts.resetToCurrentOpt)) {
Map<TopicPartition, SharePartitionOffsetInfo> currentOffsets = getOffsetInfo(groupId);
return offsetsUtils.resetToCurrentForShareGroup(partitionsToReset, currentOffsets);
}
CommandLineUtils
.printUsageAndExit(opts.parser, String.format("Option '%s' requires one of the following scenarios: %s", opts.resetOffsetsOpt, opts.allResetOffsetScenarioOpts));
Expand Down
Loading
Loading