Skip to content

Commit 7b7fae7

Browse files
authored
MINOR: Convert GroupOffsetsResetterOptions to record (apache#22251)
Convert `GroupOffsetsResetterOptions` to record. Reviewers: Ken Huang <s7133700@gmail.com>, Maros Orsak <maros.orsak159@gmail.com>, PoAn Yang <payang@apache.org>
1 parent c717e6e commit 7b7fae7

1 file changed

Lines changed: 19 additions & 40 deletions

File tree

tools/src/main/java/org/apache/kafka/tools/GroupOffsetsResetter.java

Lines changed: 19 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,9 @@ public static void printOffsetsToReset(Map<String, Map<TopicPartition, OffsetAnd
9494
}
9595

9696
public Optional<Map<String, Map<TopicPartition, OffsetAndMetadata>>> resetPlanFromFile() {
97-
if (opts.resetFromFileOpt != null && !opts.resetFromFileOpt.isEmpty()) {
97+
if (opts.resetFromFileOpt() != null && !opts.resetFromFileOpt().isEmpty()) {
9898
try {
99-
String resetPlanPath = opts.resetFromFileOpt.get(0);
99+
String resetPlanPath = opts.resetFromFileOpt().get(0);
100100
String resetPlanCsv = Utils.readFileAsString(resetPlanPath);
101101
Map<String, Map<TopicPartition, OffsetAndMetadata>> resetPlan = parseResetPlan(resetPlanCsv);
102102
return Optional.of(resetPlan);
@@ -109,7 +109,7 @@ public Optional<Map<String, Map<TopicPartition, OffsetAndMetadata>>> resetPlanFr
109109
private Map<String, Map<TopicPartition, OffsetAndMetadata>> parseResetPlan(String resetPlanCsv) {
110110
ObjectReader csvReader = CsvUtils.readerFor(CsvUtils.CsvRecordNoGroup.class);
111111
String[] lines = resetPlanCsv.split("\n");
112-
boolean isSingleGroupQuery = opts.groupOpt.size() == 1;
112+
boolean isSingleGroupQuery = opts.groupOpt().size() == 1;
113113
boolean isOldCsvFormat = false;
114114
try {
115115
if (lines.length > 0) {
@@ -126,7 +126,7 @@ private Map<String, Map<TopicPartition, OffsetAndMetadata>> parseResetPlan(Strin
126126
try {
127127
// Single group CSV format: "topic,partition,offset"
128128
if (isSingleGroupQuery && isOldCsvFormat) {
129-
String group = opts.groupOpt.get(0);
129+
String group = opts.groupOpt().get(0);
130130
for (String line : lines) {
131131
CsvUtils.CsvRecordNoGroup rec = csvReader.readValue(line, CsvUtils.CsvRecordNoGroup.class);
132132
dataMap.computeIfAbsent(group, k -> new HashMap<>())
@@ -298,8 +298,8 @@ public Stream<TopicPartition> parseTopicsWithPartitions(String topicArg) {
298298
}
299299

300300
public Map<TopicPartition, OffsetAndMetadata> resetToOffset(Collection<TopicPartition> partitionsToReset) {
301-
long offset = opts.resetToOffsetOpt != null && !opts.resetToOffsetOpt.isEmpty()
302-
? opts.resetToOffsetOpt.get(0)
301+
long offset = opts.resetToOffsetOpt() != null && !opts.resetToOffsetOpt().isEmpty()
302+
? opts.resetToOffsetOpt().get(0)
303303
: 0L;
304304
return checkOffsetsRange(partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), tp -> offset)))
305305
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue())));
@@ -336,7 +336,7 @@ public Map<TopicPartition, OffsetAndMetadata> resetByShiftBy(
336336
Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets) {
337337

338338
Map<TopicPartition, Long> requestedOffsets = partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> {
339-
long shiftBy = opts.resetShiftByOpt;
339+
long shiftBy = opts.resetShiftByOpt();
340340
OffsetAndMetadata currentOffset = currentCommittedOffsets.get(topicPartition);
341341

342342
if (currentOffset == null) {
@@ -351,7 +351,7 @@ public Map<TopicPartition, OffsetAndMetadata> resetByShiftBy(
351351

352352
public Map<TopicPartition, OffsetAndMetadata> resetToDateTime(Collection<TopicPartition> partitionsToReset) {
353353
try {
354-
long timestamp = Utils.getDateTime(opts.resetToDatetimeOpt.get(0));
354+
long timestamp = Utils.getDateTime(opts.resetToDatetimeOpt().get(0));
355355
Map<TopicPartition, LogOffsetResult> logTimestampOffsets =
356356
getLogTimestampOffsets(partitionsToReset, timestamp);
357357
return partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> {
@@ -367,7 +367,7 @@ public Map<TopicPartition, OffsetAndMetadata> resetToDateTime(Collection<TopicPa
367367
}
368368

369369
public Map<TopicPartition, OffsetAndMetadata> resetByDuration(Collection<TopicPartition> partitionsToReset) {
370-
String duration = opts.resetByDurationOpt;
370+
String duration = opts.resetByDurationOpt();
371371
Duration durationParsed = Duration.parse(duration);
372372
Instant now = Instant.now();
373373
durationParsed.negated().addTo(now);
@@ -489,7 +489,7 @@ public List<TopicPartition> filterNonExistentPartitions(Collection<TopicPartitio
489489
}
490490

491491
private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
492-
int t = (int) opts.timeoutMsOpt;
492+
int t = (int) opts.timeoutMsOpt();
493493
return options.timeoutMs(t);
494494
}
495495

@@ -508,41 +508,20 @@ public static class Unknown implements LogOffsetResult { }
508508
public static class Ignore implements LogOffsetResult { }
509509

510510

511-
public static class GroupOffsetsResetterOptions {
512-
List<String> groupOpt;
513-
List<Long> resetToOffsetOpt;
514-
List<String> resetFromFileOpt;
515-
List<String> resetToDatetimeOpt;
516-
String resetByDurationOpt;
517-
Long resetShiftByOpt;
518-
long timeoutMsOpt;
511+
public record GroupOffsetsResetterOptions(
512+
List<String> groupOpt,
513+
List<Long> resetToOffsetOpt,
514+
List<String> resetFromFileOpt,
515+
List<String> resetToDatetimeOpt,
516+
String resetByDurationOpt,
517+
Long resetShiftByOpt,
518+
long timeoutMsOpt) {
519519

520520
public GroupOffsetsResetterOptions(
521521
List<String> groupOpt,
522-
List<Long> resetToOffsetOpt,
523-
List<String> resetFromFileOpt,
524522
List<String> resetToDatetimeOpt,
525-
String resetByDurationOpt,
526-
Long resetShiftByOpt,
527523
long timeoutMsOpt) {
528-
529-
this.groupOpt = groupOpt;
530-
this.resetToOffsetOpt = resetToOffsetOpt;
531-
this.resetFromFileOpt = resetFromFileOpt;
532-
this.resetToDatetimeOpt = resetToDatetimeOpt;
533-
this.resetByDurationOpt = resetByDurationOpt;
534-
this.resetShiftByOpt = resetShiftByOpt;
535-
this.timeoutMsOpt = timeoutMsOpt;
536-
}
537-
538-
public GroupOffsetsResetterOptions(
539-
List<String> groupOpt,
540-
List<String> resetToDatetimeOpt,
541-
long timeoutMsOpt) {
542-
543-
this.groupOpt = groupOpt;
544-
this.resetToDatetimeOpt = resetToDatetimeOpt;
545-
this.timeoutMsOpt = timeoutMsOpt;
524+
this(groupOpt, null, null, resetToDatetimeOpt, null, null, timeoutMsOpt);
546525
}
547526
}
548527
}

0 commit comments

Comments
 (0)