Skip to content

Commit cd317eb

Browse files
committed
Address review comments
1 parent ab0caec commit cd317eb

2 files changed

Lines changed: 26 additions & 7 deletions

File tree

metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -229,27 +229,36 @@ public BootstrapMetadata bootstrapMetadata() {
229229
return bootstrapMetadata;
230230
}
231231

232-
public void run() throws Exception {
233-
if (nodeId < 0) {
234-
throw new RuntimeException("You must specify a valid non-negative node ID.");
235-
}
232+
/**
233+
* Validates the correctness of the given cluster id. A valid cluster id is a base64, urlencoded, no padding
234+
* representation of a {@link Uuid}. These checks do not validate the absence of <code>-</code> character as
235+
* {@link Uuid#randomUuid()} avoids them only for convenience reasons.
236+
*/
237+
private void validateClusterId(String clusterId) {
236238
if (clusterId == null) {
237239
throw new FormatterException("You must specify the cluster id.");
238240
}
241+
if (clusterId.contains("=")) {
242+
throw new FormatterException("The specified cluster id, " + clusterId + " is invalid: contains padding");
243+
}
239244
try {
240-
if (clusterId.contains("=")) {
241-
throw new FormatterException("The specified cluster id, " + clusterId + " contains padding and is invalid");
242-
}
243245
Uuid uuid = Uuid.fromString(clusterId);
244246
if (Uuid.RESERVED.contains(uuid)) {
245247
throw new FormatterException("The specified cluster id, " + clusterId + " is reserved");
246248
}
247249
} catch (IllegalArgumentException e) {
248250
throw new FormatterException("The specified cluster id, " + clusterId + " is invalid", e);
249251
}
252+
}
253+
254+
public void run() throws Exception {
255+
if (nodeId < 0) {
256+
throw new RuntimeException("You must specify a valid non-negative node ID.");
257+
}
250258
if (directories.isEmpty()) {
251259
throw new FormatterException("You must specify at least one directory to format");
252260
}
261+
validateClusterId(clusterId);
253262
if (controllerListenerName == null) {
254263
throw new FormatterException("You must specify the name of the initial controller listener.");
255264
}

metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,4 +601,14 @@ public void testFormatWithInvalidClusterId(String clusterId) throws Exception {
601601
getMessage().substring(0, expectedPrefix.length()));
602602
}
603603
}
604+
605+
@ParameterizedTest
606+
@ValueSource(strings = {"iIygKoNNSpGzNeAEr_QW7w", "-w_KF-snTmuEnKPqZ0RDzA", "dZMgZA7nTLqZTdzb0zbvSQ", "hxqHyN2OSSmPajVhm_DR-Q"})
607+
public void testFormatWithValidClusterId(String clusterId) throws Exception {
608+
try (TestEnv testEnv = new TestEnv(2)) {
609+
FormatterContext formatter1 = testEnv.newFormatter();
610+
formatter1.formatter.setClusterId(clusterId);
611+
formatter1.formatter.run();
612+
}
613+
}
604614
}

0 commit comments

Comments
 (0)