3131/**
3232 * The algorithm:
3333 * 1. Create a map from the supplied desired String
34- * 2. Fill placeholders (e.g. ${BROKER_ID}) in desired map as the broker's {@code kafka_config_generator.sh} would
35- * 3a. Loop over all entries. If the entry is in IGNORABLE_PROPERTIES or entry.value from desired is equal to entry.value from current, do nothing
34+ * 2a. Loop over all entries. If the entry is in IGNORABLE_PROPERTIES or entry.value from desired is equal to entry.value from current, do nothing
3635 * else add it to the diff
37- * 3b . If entry was removed from desired, add it to the diff with null value.
38- * 3c . If custom entry was removed, delete property
36+ * 2b . If entry was removed from desired, add it to the diff with null value.
37+ * 2c . If custom entry was removed, delete property
3938 */
4039public class KafkaConfigurationDiff extends AbstractJsonDiff {
4140 private static final ReconciliationLogger LOGGER = ReconciliationLogger .create (KafkaConfigurationDiff .class );
@@ -46,20 +45,10 @@ public class KafkaConfigurationDiff extends AbstractJsonDiff {
4645 private final boolean isBroker ;
4746
4847 /**
49- * These options are skipped because they contain placeholders
50- * 909[1-4] is for skipping all (internal, plain, secured, external) listeners properties
48+ * These options are skipped from getting updated because they contain placeholders
5149 */
5250 public static final Pattern IGNORABLE_PROPERTIES_PATTERN = Pattern .compile (
5351 "^(broker\\ .id"
54- + "|.*-[0-9]{2,5}\\ .ssl\\ .keystore\\ .location"
55- + "|.*-[0-9]{2,5}\\ .ssl\\ .keystore\\ .password"
56- + "|.*-[0-9]{2,5}\\ .ssl\\ .keystore\\ .type"
57- + "|.*-[0-9]{2,5}\\ .ssl\\ .truststore\\ .location"
58- + "|.*-[0-9]{2,5}\\ .ssl\\ .truststore\\ .password"
59- + "|.*-[0-9]{2,5}\\ .ssl\\ .truststore\\ .type"
60- + "|.*-[0-9]{2,5}\\ .ssl\\ .client\\ .auth"
61- + "|.*-[0-9]{2,5}\\ .scram-sha-512\\ .sasl\\ .jaas\\ .config"
62- + "|.*-[0-9]{2,5}\\ .sasl\\ .enabled\\ .mechanisms"
6352 + "|advertised\\ .listeners"
6453 + "|broker\\ .rack)$" );
6554
@@ -226,7 +215,7 @@ protected boolean canBeUpdatedDynamically() {
226215 for (AlterConfigOp entry : nodeConfigDiff ) {
227216 if (isScope (entry .configEntry (), Scope .READ_ONLY )) {
228217 result = false ;
229- LOGGER .infoCr (reconciliation , "Configuration can't be updated dynamically due to : {}" , entry );
218+ LOGGER .infoCr (reconciliation , "Configuration can't be updated dynamically because its scope is ready only : {}" , entry );
230219 break ;
231220 }
232221 }
@@ -300,28 +289,25 @@ private Collection<AlterConfigOp> diff(NodeRef nodeRef, String desired,
300289 .filter (configEntry -> configEntry .name ().equals (pathValueWithoutSlash ))
301290 .findFirst ();
302291
292+ boolean isConfigUpdated = false ;
303293 String op = d .get ("op" ).asText ();
304294 if (optEntry .isPresent ()) {
305295 ConfigEntry entry = optEntry .get ();
306296 if ("remove" .equals (op )) {
307- removeProperty (configModel , updatedCE , pathValueWithoutSlash , entry );
297+ isConfigUpdated = removeProperty (configModel , updatedCE , pathValueWithoutSlash , entry );
308298 } else if ("replace" .equals (op )) {
309299 // entry is in the current, desired is updated value
310- updateOrAdd (entry .name (), configModel , desiredMap , updatedCE );
300+ isConfigUpdated = updateOrAdd (entry .name (), configModel , desiredMap , updatedCE );
311301 }
312302 } else {
313303 if ("add" .equals (op )) {
314304 // entry is not in the current, it is added
315- updateOrAdd (pathValueWithoutSlash , configModel , desiredMap , updatedCE );
305+ isConfigUpdated = updateOrAdd (pathValueWithoutSlash , configModel , desiredMap , updatedCE );
316306 }
317307 }
318308
319- if ("remove" .equals (op )) {
320- // there is a lot of properties set by default - not having them in desired causes very noisy log output
321- LOGGER .traceCr (reconciliation , "Kafka Broker {} Config Differs : {}" , nodeRef .nodeId (), d );
322- LOGGER .traceCr (reconciliation , "Current Kafka Broker Config path {} has value {}" , pathValueWithoutSlash , lookupPath (source , pathValue ));
323- LOGGER .traceCr (reconciliation , "Desired Kafka Broker Config path {} has value {}" , pathValueWithoutSlash , lookupPath (target , pathValue ));
324- } else {
309+ // Log only if a config is getting updated, otherwise ignorable and custom configs produces very noisy log output as they always have different desired and current values.
310+ if (isConfigUpdated ) {
325311 LOGGER .debugCr (reconciliation , "Kafka Broker {} Config Differs : {}" , nodeRef .nodeId (), d );
326312 LOGGER .debugCr (reconciliation , "Current Kafka Broker Config path {} has value {}" , pathValueWithoutSlash , lookupPath (source , pathValue ));
327313 LOGGER .debugCr (reconciliation , "Desired Kafka Broker Config path {} has value {}" , pathValueWithoutSlash , lookupPath (target , pathValue ));
@@ -331,20 +317,22 @@ private Collection<AlterConfigOp> diff(NodeRef nodeRef, String desired,
331317 return updatedCE ;
332318 }
333319
334- private void updateOrAdd (String propertyName , Map <String , ConfigModel > configModel , Map <String , String > desiredMap , Collection <AlterConfigOp > updatedCE ) {
320+ private boolean updateOrAdd (String propertyName , Map <String , ConfigModel > configModel , Map <String , String > desiredMap , Collection <AlterConfigOp > updatedCE ) {
335321 if (!isIgnorableProperty (propertyName )) {
336322 if (KafkaConfiguration .isCustomConfigurationOption (propertyName , configModel )) {
337323 LOGGER .traceCr (reconciliation , "custom property {} has been updated/added {}" , propertyName , desiredMap .get (propertyName ));
338324 } else {
339325 LOGGER .traceCr (reconciliation , "property {} has been updated/added {}" , propertyName , desiredMap .get (propertyName ));
340326 updatedCE .add (new AlterConfigOp (new ConfigEntry (propertyName , desiredMap .get (propertyName )), AlterConfigOp .OpType .SET ));
327+ return true ;
341328 }
342329 } else {
343- LOGGER .traceCr (reconciliation , "{} is ignorable, not considering" );
330+ LOGGER .traceCr (reconciliation , "{} is ignorable, not considering" , propertyName );
344331 }
332+ return false ;
345333 }
346334
347- private void removeProperty (Map <String , ConfigModel > configModel , Collection <AlterConfigOp > updatedCE , String pathValueWithoutSlash , ConfigEntry entry ) {
335+ private boolean removeProperty (Map <String , ConfigModel > configModel , Collection <AlterConfigOp > updatedCE , String pathValueWithoutSlash , ConfigEntry entry ) {
348336 if (KafkaConfiguration .isCustomConfigurationOption (entry .name (), configModel )) {
349337 // we are deleting custom option
350338 LOGGER .traceCr (reconciliation , "removing custom property {}" , entry .name ());
@@ -360,10 +348,12 @@ private void removeProperty(Map<String, ConfigModel> configModel, Collection<Alt
360348 if (!isIgnorableProperty (pathValueWithoutSlash )) {
361349 updatedCE .add (new AlterConfigOp (new ConfigEntry (pathValueWithoutSlash , null ), AlterConfigOp .OpType .DELETE ));
362350 LOGGER .infoCr (reconciliation , "{} not set in desired, unsetting back to default {}" , entry .name (), "deleted entry" );
351+ return true ;
363352 } else {
364- LOGGER .traceCr (reconciliation , "{} is ignorable, not considering as removed" );
353+ LOGGER .traceCr (reconciliation , "{} is ignorable, not considering as removed" , entry . name () );
365354 }
366355 }
356+ return false ;
367357 }
368358
369359 /**
0 commit comments