Skip to content

Commit 949f1a7

Browse files
authored
MINOR: Cleanups in Tools module (apache#21823)
The PR does some very minor cleanups in the Tools module predominantly converting some inner classes to records as and when applicable. Additionally some places where we could use smart lambdas. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ken Huang <s7133700@gmail.com>, Christo Lolov <lolovc@amazon.com>
1 parent a2c0c8b commit 949f1a7

8 files changed

Lines changed: 29 additions & 68 deletions

File tree

tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -300,15 +300,10 @@ private static List<RaftVoterEndpoint> getEndpoints(QuorumInfo.Node node) {
300300
return node == null ? new ArrayList<>() : node.endpoints();
301301
}
302302

303-
private static class Node {
304-
private final int id;
305-
private final Uuid directoryId;
306-
private final List<RaftVoterEndpoint> endpoints;
307-
308-
private Node(int id, Uuid directoryId, List<RaftVoterEndpoint> endpoints) {
309-
this.id = id;
310-
this.directoryId = Objects.requireNonNull(directoryId);
311-
this.endpoints = Objects.requireNonNull(endpoints);
303+
private record Node(int id, Uuid directoryId, List<RaftVoterEndpoint> endpoints) {
304+
private Node {
305+
Objects.requireNonNull(directoryId);
306+
Objects.requireNonNull(endpoints);
312307
}
313308

314309
@Override

tools/src/main/java/org/apache/kafka/tools/reassign/PartitionMove.java

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,36 +17,12 @@
1717

1818
package org.apache.kafka.tools.reassign;
1919

20-
import java.util.Objects;
2120
import java.util.Set;
2221

2322
/**
2423
* A partition movement. The source and destination brokers may overlap.
24+
* @param sources The source brokers.
25+
* @param destinations The destination brokers.
2526
*/
26-
final class PartitionMove {
27-
public final Set<Integer> sources;
28-
29-
public final Set<Integer> destinations;
30-
31-
/**
32-
* @param sources The source brokers.
33-
* @param destinations The destination brokers.
34-
*/
35-
public PartitionMove(Set<Integer> sources, Set<Integer> destinations) {
36-
this.sources = sources;
37-
this.destinations = destinations;
38-
}
39-
40-
@Override
41-
public boolean equals(Object o) {
42-
if (this == o) return true;
43-
if (o == null || getClass() != o.getClass()) return false;
44-
PartitionMove that = (PartitionMove) o;
45-
return Objects.equals(sources, that.sources) && Objects.equals(destinations, that.destinations);
46-
}
47-
48-
@Override
49-
public int hashCode() {
50-
return Objects.hash(sources, destinations);
51-
}
27+
record PartitionMove(Set<Integer> sources, Set<Integer> destinations) {
5228
}

tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,7 +1061,7 @@ static Map<String, Map<Integer, PartitionMove>> calculateProposedMoveMap(Map<Top
10611061

10621062
if (partMoves.containsKey(part.partition())) {
10631063
PartitionMove move = partMoves.get(part.partition());
1064-
sources.addAll(move.sources);
1064+
sources.addAll(move.sources());
10651065
} else if (currentParts.containsKey(part))
10661066
sources.addAll(currentParts.get(part));
10671067
else
@@ -1086,7 +1086,7 @@ static Map<String, String> calculateLeaderThrottles(Map<String, Map<Integer, Par
10861086
moveMap.forEach((topicName, partMoveMap) -> {
10871087
Set<String> components = new TreeSet<>();
10881088
partMoveMap.forEach((partId, move) ->
1089-
move.sources.forEach(source -> components.add(String.format("%d:%d", partId, source))));
1089+
move.sources().forEach(source -> components.add(String.format("%d:%d", partId, source))));
10901090
results.put(topicName, String.join(",", components));
10911091
});
10921092
return results;
@@ -1103,8 +1103,8 @@ static Map<String, String> calculateFollowerThrottles(Map<String, Map<Integer, P
11031103
moveMap.forEach((topicName, partMoveMap) -> {
11041104
Set<String> components = new TreeSet<>();
11051105
partMoveMap.forEach((partId, move) ->
1106-
move.destinations.forEach(destination -> {
1107-
if (!move.sources.contains(destination)) {
1106+
move.destinations().forEach(destination -> {
1107+
if (!move.sources().contains(destination)) {
11081108
components.add(String.format("%d:%d", partId, destination));
11091109
}
11101110
})
@@ -1124,8 +1124,8 @@ static Map<String, String> calculateFollowerThrottles(Map<String, Map<Integer, P
11241124
static Set<Integer> calculateReassigningBrokers(Map<String, Map<Integer, PartitionMove>> moveMap) {
11251125
Set<Integer> reassigningBrokers = new TreeSet<>();
11261126
moveMap.values().forEach(partMoveMap -> partMoveMap.values().forEach(partMove -> {
1127-
reassigningBrokers.addAll(partMove.sources);
1128-
reassigningBrokers.addAll(partMove.destinations);
1127+
reassigningBrokers.addAll(partMove.sources());
1128+
reassigningBrokers.addAll(partMove.destinations());
11291129
}));
11301130
return reassigningBrokers;
11311131
}

tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -138,17 +138,11 @@
138138
public class DumpLogSegmentsTest {
139139
private static final Pattern SIZE_PATTERN = Pattern.compile(".+?size:\\s(\\d+).+");
140140

141-
private static final class BatchInfo {
142-
private final List<SimpleRecord> records;
143-
private final boolean hasKeys;
144-
private final boolean hasValues;
145-
146-
private BatchInfo(List<SimpleRecord> records, boolean hasKeys, boolean hasValues) {
147-
this.records = records;
148-
this.hasKeys = hasKeys;
149-
this.hasValues = hasValues;
150-
}
151-
}
141+
private record BatchInfo(
142+
List<SimpleRecord> records,
143+
boolean hasKeys,
144+
boolean hasValues
145+
) { }
152146

153147
private final File tmpDir = TestUtils.tempDir();
154148
private final File logDir = TestUtils.randomPartitionLogDir(tmpDir);

tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,13 @@ public void testDeprecatedAdminConfig() throws Exception {
111111
Path adminConfigPath = tempAdminConfig(defaultApiTimeoutMs, requestTimeoutMs);
112112

113113
try (final MockedStatic<Admin> mockedAdmin = Mockito.mockStatic(Admin.class)) {
114-
String output = ToolsTestUtils.captureStandardOut(() -> {
114+
String output = ToolsTestUtils.captureStandardOut(() ->
115115
LeaderElectionCommand.mainNoExit(
116116
"--bootstrap-server", cluster.bootstrapServers(),
117117
"--election-type", "unclean", "--all-topic-partitions",
118118
"--admin.config", adminConfigPath.toString()
119-
);
120-
});
119+
)
120+
);
121121
assertTrue(output.contains("Option --admin.config has been deprecated and will be removed in a future version. Use --command-config instead."));
122122

123123
ArgumentCaptor<Properties> argumentCaptor = ArgumentCaptor.forClass(Properties.class);
@@ -161,14 +161,14 @@ public void testCommandConfigAndDeprecatedConfigPresent() throws Exception {
161161
// Mock Exit because CommandLineUtils.checkInvalidArgs calls exit
162162
Exit.setExitProcedure(new ToolsTestUtils.MockExitProcedure());
163163

164-
String output = ToolsTestUtils.captureStandardErr(() -> {
164+
String output = ToolsTestUtils.captureStandardErr(() ->
165165
LeaderElectionCommand.mainNoExit(
166166
"--bootstrap-server", "localhost:9092",
167167
"--election-type", "unclean", "--all-topic-partitions",
168168
"--admin.config", adminConfigPath.toString(),
169169
"--command-config", adminConfigPath.toString()
170-
);
171-
});
170+
)
171+
);
172172

173173
assertTrue(output.contains(String.format("Option \"%s\" can't be used with option \"%s\"",
174174
"[admin.config]", "[command-config]")));

tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -257,9 +257,7 @@ public void testDeprecatedConfig(final TestInfo testInfo) throws IOException {
257257
};
258258

259259
try (final MockedStatic<Admin> mockedAdmin = Mockito.mockStatic(Admin.class, Mockito.CALLS_REAL_METHODS)) {
260-
String output = ToolsTestUtils.captureStandardOut(() -> {
261-
new StreamsResetter().execute(parameters);
262-
});
260+
String output = ToolsTestUtils.captureStandardOut(() -> new StreamsResetter().execute(parameters));
263261
assertTrue(output.contains("Option --config-file has been deprecated and will be removed in a future version. Use --command-config instead."));
264262

265263
ArgumentCaptor<Properties> argumentCaptor = ArgumentCaptor.forClass(Properties.class);
@@ -308,9 +306,7 @@ public void testCommandConfigAndDeprecatedConfigPresent(final TestInfo testInfo)
308306
// Mock Exit because CommandLineUtils.checkInvalidArgs calls exit
309307
Exit.setExitProcedure(new ToolsTestUtils.MockExitProcedure());
310308

311-
String output = ToolsTestUtils.captureStandardErr(() -> {
312-
new StreamsResetter().execute(parameters);
313-
});
309+
String output = ToolsTestUtils.captureStandardErr(() -> new StreamsResetter().execute(parameters));
314310

315311
assertTrue(output.contains(String.format("Option \"%s\" can't be used with option \"%s\"",
316312
"[config-file]", "[command-config]")));

tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,12 @@ public void testAdminRequestsForDescribeNegativeOffsets() throws Exception {
184184
Map<TopicPartition, Optional<Long>> returnedOffsets = assignments.map(results ->
185185
results.stream().collect(Collectors.toMap(
186186
assignment -> new TopicPartition(assignment.topic().get(), assignment.partition().get()),
187-
assignment -> assignment.offset()))
187+
PartitionAssignmentState::offset))
188188
).orElse(Map.of());
189189
Map<TopicPartition, Optional<Integer>> returnedLeaderEpoch = assignments.map(results ->
190190
results.stream().collect(Collectors.toMap(
191191
assignment -> new TopicPartition(assignment.topic().get(), assignment.partition().get()),
192-
assignment -> assignment.leaderEpoch()))
192+
PartitionAssignmentState::leaderEpoch))
193193
).orElse(Map.of());
194194

195195
Map<TopicPartition, Optional<Long>> expectedOffsets = Map.of(

tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ topic, new TopicDescription(topic, false, List.of(
473473
when(adminClient.listStreamsGroupOffsets(anyMap(), any(ListStreamsGroupOffsetsOptions.class))).thenReturn(result);
474474
when(result.partitionsToOffsetAndMetadata(anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
475475
StreamsGroupCommand.StreamsGroupService service = getStreamsGroupService(args.toArray(new String[0]), adminClient);
476-
assertThrows(UnknownTopicOrPartitionException.class, () -> service.resetOffsets());
476+
assertThrows(UnknownTopicOrPartitionException.class, service::resetOffsets);
477477
service.close();
478478
}
479479

0 commit comments

Comments
 (0)