diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index b0b43f05a4c75..55f122bce95a3 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -525,10 +525,22 @@ BrokerFeature processRegistrationFeature( int defaultVersion = feature.name().equals(MetadataVersion.FEATURE_NAME) ? 1 : 0; // The default value for MetadataVersion is 1 not 0. short finalized = finalizedFeatures.versionOrDefault(feature.name(), (short) defaultVersion); if (!VersionRange.of(feature.minSupportedVersion(), feature.maxSupportedVersion()).contains(finalized)) { + boolean isMetadataVersion = feature.name().equals(MetadataVersion.FEATURE_NAME); + String finalizedDisplay = isMetadataVersion + ? finalized + " (" + MetadataVersion.fromFeatureLevel(finalized) + ")" + : String.valueOf(finalized); + String minSupportedDisplay = isMetadataVersion + ? feature.minSupportedVersion() + " (" + + MetadataVersion.fromFeatureLevel(feature.minSupportedVersion()) + ")" + : String.valueOf(feature.minSupportedVersion()); + String maxSupportedDisplay = isMetadataVersion + ? feature.maxSupportedVersion() + " (" + + MetadataVersion.fromFeatureLevel(feature.maxSupportedVersion()) + ")" + : String.valueOf(feature.maxSupportedVersion()); throw new UnsupportedVersionException("Unable to register because the broker " + - "does not support finalized version " + finalized + " of " + feature.name() + - ". The broker wants a version between " + feature.minSupportedVersion() + " and " + - feature.maxSupportedVersion() + ", inclusive."); + "does not support finalized version " + finalizedDisplay + " of " + feature.name() + + ". The broker wants a version between " + minSupportedDisplay + " and " + + maxSupportedDisplay + ", inclusive."); } // A feature is not found in the finalizedFeature map if it is unknown to the controller or set to 0 (feature not enabled). if (!finalizedFeatures.featureNames().contains(feature.name())) diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java index 786891e695bcd..f8f8c80cc16ec 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java @@ -686,8 +686,8 @@ public void testRegistrationWithUnsupportedMetadataVersion() { build(); clusterControl.activate(); - assertEquals("Unable to register because the broker does not support finalized version 4 of " + - "metadata.version. The broker wants a version between 1 and 1, inclusive.", + assertEquals("Unable to register because the broker does not support finalized version 4 (3.3-IV0) of " + + "metadata.version. The broker wants a version between 1 (3.0-IV1) and 1 (3.0-IV1), inclusive.", assertThrows(UnsupportedVersionException.class, () -> clusterControl.registerBroker( new BrokerRegistrationRequestData(). @@ -698,8 +698,8 @@ public void testRegistrationWithUnsupportedMetadataVersion() { 123L, featureControl.finalizedFeatures(Long.MAX_VALUE))).getMessage()); - assertEquals("Unable to register because the broker does not support finalized version 4 of " + - "metadata.version. The broker wants a version between 7 and 7, inclusive.", + assertEquals("Unable to register because the broker does not support finalized version 4 (3.3-IV0) of " + + "metadata.version. The broker wants a version between 7 (3.3-IV3) and 7 (3.3-IV3), inclusive.", assertThrows(UnsupportedVersionException.class, () -> clusterControl.registerBroker( new BrokerRegistrationRequestData().