3434import org .apache .kafka .clients .admin .AlterConfigOp ;
3535import org .apache .kafka .clients .admin .AlterConfigsResult ;
3636import org .apache .kafka .clients .admin .Config ;
37+ import org .apache .kafka .clients .admin .FeatureMetadata ;
3738import org .apache .kafka .common .KafkaException ;
3839import org .apache .kafka .common .KafkaFuture ;
3940import org .apache .kafka .common .config .ConfigException ;
@@ -623,8 +624,12 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont
623624 // Always get the broker config. This request gets sent to that specific broker, so it's a proof that we can
624625 // connect to the broker and that it's capable of responding.
625626 Config brokerConfig ;
627+ short elrVersion = 0 ;
626628 try {
627629 brokerConfig = brokerConfig (nodeRef );
630+ FeatureMetadata featureMetadata = featureMetadata ();
631+ elrVersion = featureMetadata .finalizedFeatures ().get ("eligible.leader.replicas.version" ) == null ?
632+ 0 : featureMetadata .finalizedFeatures ().get ("eligible.leader.replicas.version" ).maxVersionLevel ();
628633 } catch (ForceableProblem e ) {
629634 if (restartContext .backOff .done ()) {
630635 needsRestart = true ;
@@ -636,7 +641,7 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont
636641
637642 if (!needsRestart && allowReconfiguration ) {
638643 LOGGER .traceCr (reconciliation , "Pod {}: description {}" , nodeRef , brokerConfig );
639- brokerConfigDiff = new KafkaBrokerConfigurationDiff (reconciliation , brokerConfig , kafkaConfigProvider .apply (nodeRef .nodeId ()), kafkaVersion , nodeRef );
644+ brokerConfigDiff = new KafkaBrokerConfigurationDiff (reconciliation , brokerConfig , kafkaConfigProvider .apply (nodeRef .nodeId ()), kafkaVersion , nodeRef , elrVersion );
640645
641646 if (brokerConfigDiff .getDiffSize () > 0 ) {
642647 if (brokerConfigDiff .canBeUpdatedDynamically ()) {
@@ -657,6 +662,17 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont
657662 restartContext .brokerConfigDiff = brokerConfigDiff ;
658663 }
659664
665+ /**
666+ * Returns the information about the features within the Kafka Cluster
667+ * @return information about the features
668+ */
669+ /* test */ FeatureMetadata featureMetadata () throws ForceableProblem , InterruptedException {
670+ return await (VertxUtil .kafkaFutureToVertxFuture (reconciliation , vertx , brokerAdminClient .describeFeatures ().featureMetadata ()),
671+ 30 , TimeUnit .SECONDS ,
672+ error -> new ForceableProblem ("Error getting feature metadata" , error )
673+ );
674+ }
675+
660676 /**
661677 * Returns a config of the given broker.
662678 * @param nodeRef The reference of the broker.
0 commit comments