diff --git a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java index 1ccf0c3d5c05a..d6b32498a67d0 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java @@ -229,13 +229,34 @@ public BootstrapMetadata bootstrapMetadata() { return bootstrapMetadata; } + /** + * Validates the correctness of the given cluster id. A valid cluster id is a base64, urlencoded, no padding + * representation of a {@link Uuid}. These checks do not validate the absence of - character as + * {@link Uuid#randomUuid()} avoids them only for convenience reasons and such a validation would break + * compatibility when attempting to format using an old cluster id. + */ + private void validateClusterId(String clusterId) { + if (clusterId == null) { + throw new FormatterException("You must specify the cluster id."); + } + if (clusterId.contains("=")) { + throw new FormatterException("The specified cluster id, " + clusterId + " is invalid: contains padding"); + } + try { + Uuid uuid = Uuid.fromString(clusterId); + if (Uuid.RESERVED.contains(uuid)) { + throw new FormatterException("The specified cluster id, " + clusterId + " is reserved"); + } + } catch (IllegalArgumentException e) { + throw new FormatterException("The specified cluster id, " + clusterId + " is invalid", e); + } + } + public void run() throws Exception { if (nodeId < 0) { throw new RuntimeException("You must specify a valid non-negative node ID."); } - if (clusterId == null) { - throw new FormatterException("You must specify the cluster id."); - } + validateClusterId(clusterId); if (directories.isEmpty()) { throw new FormatterException("You must specify at least one directory to format"); } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java index dcfba4328aeb7..46d14202b0c4a 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java @@ -587,4 +587,28 @@ public void testFormatWithNoInitialControllers() throws Exception { assertNotNull(logDirProps1); } } + + @ParameterizedTest + @ValueSource(strings = {"unrvTtQISjar0JUWGU/8Pg", "igNUVIdeSPO5JCZYFhOh7Q==", "AAAAAAAAAAAAAAAAAAAAAA", "AAAAAAAAAAAAAAAAAAAAAQ"}) + public void testFormatWithInvalidClusterId(String clusterId) throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setClusterId(clusterId); + String expectedPrefix = "The specified cluster id, " + clusterId; + assertEquals(expectedPrefix, + assertThrows(FormatterException.class, + formatter1.formatter::run). + getMessage().substring(0, expectedPrefix.length())); + } + } + + @ParameterizedTest + @ValueSource(strings = {"iIygKoNNSpGzNeAEr_QW7w", "-w_KF-snTmuEnKPqZ0RDzA", "dZMgZA7nTLqZTdzb0zbvSQ", "hxqHyN2OSSmPajVhm_DR-Q"}) + public void testFormatWithValidClusterId(String clusterId) throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setClusterId(clusterId); + formatter1.formatter.run(); + } + } }