@@ -271,11 +271,6 @@ boolean check() {
271271 */
272272 private final TimelineHashMap <Uuid , Integer > directoryToBroker ;
273273
274- /**
275- * Maps broker IDs to their non-default static configs reported during registration.
276- */
277- private final Map <ConfigResource , Map <String , String >> brokerStaticConfigs ;
278-
279274 /**
280275 * Manages the kafka.controller:type=KafkaController,name=TimeSinceLastHeartbeatReceivedMs,broker=<brokerId> metrics.
281276 */
@@ -305,7 +300,6 @@ private ClusterControlManager(
305300 this .featureControl = featureControl ;
306301 this .controllerRegistrations = new TimelineHashMap <>(snapshotRegistry , 0 );
307302 this .directoryToBroker = new TimelineHashMap <>(snapshotRegistry , 0 );
308- this .brokerStaticConfigs = new HashMap <>();
309303 this .brokerShutdownHandler = brokerShutdownHandler ;
310304 this .metrics = metrics ;
311305 }
@@ -348,7 +342,14 @@ Map<Integer, BrokerRegistration> brokerRegistrations() {
348342 }
349343
350344 public Map <ConfigResource , Map <String , String >> brokerStaticConfigs () {
351- return Collections .unmodifiableMap (brokerStaticConfigs );
345+ Map <ConfigResource , Map <String , String >> result = new HashMap <>();
346+ for (Map .Entry <Integer , BrokerRegistration > entry : brokerRegistrations .entrySet ()) {
347+ Map <String , String > statics = entry .getValue ().staticConfigs ();
348+ if (!statics .isEmpty ()) {
349+ result .put (new ConfigResource (ConfigResource .Type .BROKER , String .valueOf (entry .getKey ())), statics );
350+ }
351+ }
352+ return Collections .unmodifiableMap (result );
352353 }
353354
354355 Map <Integer , ControllerRegistration > controllerRegistrations () {
@@ -448,16 +449,6 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
448449 record .setCordonedLogDirs (request .cordonedLogDirs ());
449450 }
450451
451- // Store broker static configs in-memory (always available for pre-upgrade validation)
452- // Skip sensitive configs — their values are null
453- Map <String , String > statics = new HashMap <>();
454- for (BrokerRegistrationRequestData .StaticConfig sc : request .staticConfigs ()) {
455- if (!sc .isSensitive ()) {
456- statics .put (sc .name (), sc .value ());
457- }
458- }
459- brokerStaticConfigs .put (new ConfigResource (ConfigResource .Type .BROKER , String .valueOf (brokerId )), Map .copyOf (statics ));
460-
461452 // Write static configs to the record when MV supports it
462453 if (featureControl .metadataVersionOrThrow ().isStaticConfigReportingSupported ()) {
463454 for (BrokerRegistrationRequestData .StaticConfig sc : request .staticConfigs ()) {
@@ -612,6 +603,13 @@ public void replay(RegisterBrokerRecord record, long offset) {
612603 features .put (feature .name (), VersionRange .of (
613604 feature .minSupportedVersion (), feature .maxSupportedVersion ()));
614605 }
606+ // Parse static configs from record, skipping sensitive ones (their values are null)
607+ Map <String , String > staticConfigs = new HashMap <>();
608+ for (RegisterBrokerRecord .BrokerStaticConfig sc : record .staticConfigs ()) {
609+ if (!sc .isSensitive ()) {
610+ staticConfigs .put (sc .name (), sc .value ());
611+ }
612+ }
615613 // Update broker registrations.
616614 BrokerRegistration prevRegistration = brokerRegistrations .put (brokerId ,
617615 new BrokerRegistration .Builder ().
@@ -626,16 +624,8 @@ public void replay(RegisterBrokerRecord record, long offset) {
626624 setIsMigratingZkBroker (record .isMigratingZkBroker ()).
627625 setDirectories (record .logDirs ()).
628626 setCordonedDirectories (record .cordonedLogDirs ()).
627+ setStaticConfigs (staticConfigs ).
629628 build ());
630- // Restore static configs from metadata log (for controller failover)
631- // Skip sensitive configs — their values are null
632- Map <String , String > replayedStatics = new HashMap <>();
633- for (RegisterBrokerRecord .BrokerStaticConfig sc : record .staticConfigs ()) {
634- if (!sc .isSensitive ()) {
635- replayedStatics .put (sc .name (), sc .value ());
636- }
637- }
638- brokerStaticConfigs .put (new ConfigResource (ConfigResource .Type .BROKER , String .valueOf (record .brokerId ())), Map .copyOf (replayedStatics ));
639629
640630 updateDirectories (brokerId , prevRegistration == null ? null : prevRegistration .directories (), record .logDirs ());
641631 if (heartbeatManager != null ) {
0 commit comments