|
23 | 23 | import io.strimzi.api.kafka.model.kafka.cruisecontrol.KafkaAutoRebalanceStatusBrokers; |
24 | 24 | import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListener; |
25 | 25 | import io.strimzi.api.kafka.model.kafka.listener.ListenerStatus; |
26 | | -import io.strimzi.kafka.config.model.ConfigModel; |
27 | | -import io.strimzi.kafka.config.model.ConfigModels; |
28 | 26 | import io.strimzi.kafka.config.model.Scope; |
29 | 27 | import io.strimzi.operator.common.Util; |
30 | 28 | import io.strimzi.systemtest.TestConstants; |
|
44 | 42 | import org.hamcrest.CoreMatchers; |
45 | 43 |
|
46 | 44 | import java.io.File; |
47 | | -import java.io.FileInputStream; |
48 | 45 | import java.io.IOException; |
49 | | -import java.io.InputStream; |
50 | 46 | import java.nio.charset.Charset; |
51 | 47 | import java.time.Duration; |
52 | | -import java.util.HashMap; |
53 | 48 | import java.util.List; |
54 | 49 | import java.util.Map; |
55 | 50 | import java.util.Random; |
56 | 51 | import java.util.function.Consumer; |
57 | 52 | import java.util.function.Supplier; |
58 | | -import java.util.stream.Collectors; |
59 | 53 |
|
60 | | -import static io.strimzi.api.kafka.model.kafka.KafkaClusterSpec.FORBIDDEN_PREFIXES; |
61 | | -import static io.strimzi.api.kafka.model.kafka.KafkaClusterSpec.FORBIDDEN_PREFIX_EXCEPTIONS; |
62 | 54 | import static io.strimzi.systemtest.enums.CustomResourceStatus.NotReady; |
63 | 55 | import static io.strimzi.systemtest.enums.CustomResourceStatus.Ready; |
64 | 56 | import static io.strimzi.systemtest.resources.types.KafkaType.kafkaClient; |
@@ -263,134 +255,64 @@ public synchronized static boolean verifyCrDynamicConfiguration(final String nam |
263 | 255 |
|
264 | 256 | /** |
265 | 257 | * Verifies that updated configuration was successfully changed inside Kafka pods |
266 | | - * @param namespaceName name of the namespace |
267 | | - * @param kafkaPodNamePrefix prefix of Kafka pods |
268 | | - * @param brokerConfigName key of specific property |
269 | | - * @param value value of specific property |
270 | | - * @param kafkaVersion Kafka version to get the config model |
| 258 | + * |
| 259 | + * @param namespaceName Name of the namespace. |
| 260 | + * @param clusterName Name of the Kafka cluster. |
| 261 | + * @param scraperPodName Name of Scraper Pod. |
| 262 | + * @param configName Name of the configuration. |
| 263 | + * @param value Value of specific property. |
| 264 | + * |
271 | 265 | * @return |
272 | 266 | * true = if specific property match the excepted property |
273 | 267 | * false = if specific property doesn't match the excepted property |
274 | 268 | */ |
275 | | - public synchronized static boolean verifyPodDynamicConfiguration(final String namespaceName, String scraperPodName, String bootstrapServer, String kafkaPodNamePrefix, String brokerConfigName, Object value, String kafkaVersion) { |
276 | | - List<Pod> brokerPods = KubeResourceManager.get().kubeClient().listPodsByPrefixInName(namespaceName, kafkaPodNamePrefix); |
| 269 | + public synchronized static boolean verifyPodDynamicConfiguration( |
| 270 | + final String namespaceName, |
| 271 | + final String clusterName, |
| 272 | + String scraperPodName, |
| 273 | + String scope, |
| 274 | + String configName, |
| 275 | + String value |
| 276 | + ) { |
| 277 | + String bootstrapServer = KafkaResources.plainBootstrapAddress(clusterName); |
| 278 | + String brokerPodSetName = KafkaComponents.getBrokerPodSetName(clusterName); |
| 279 | + |
| 280 | + List<Pod> brokerPods = KubeResourceManager.get().kubeClient().listPodsByPrefixInName(namespaceName, brokerPodSetName); |
277 | 281 | int[] brokerId = {0}; |
278 | 282 |
|
279 | | - Map<String, ConfigModel> configModelMap = readConfigModel(kafkaVersion); |
280 | | - |
281 | 283 | // the check/describe for a dynamic change is different depending on the property being cluster-wide or per-broker |
282 | | - if (configModelMap.get(brokerConfigName).getScope().equals(Scope.CLUSTER_WIDE)) { |
| 284 | + if (Scope.valueOf(scope).equals(Scope.CLUSTER_WIDE)) { |
| 285 | + TestUtils.waitFor("cluster-wide dyn.configuration to change", TestConstants.GLOBAL_POLL_INTERVAL, TestConstants.RECONCILIATION_INTERVAL + Duration.ofSeconds(10).toMillis(), () -> { |
| 286 | + String result = KafkaCmdClient.describeKafkaBrokerDefaultsUsingPodCli(namespaceName, scraperPodName, bootstrapServer); |
283 | 287 |
|
284 | | - TestUtils.waitFor("cluster-wide dyn.configuration to change", TestConstants.GLOBAL_POLL_INTERVAL, TestConstants.RECONCILIATION_INTERVAL + Duration.ofSeconds(10).toMillis(), |
285 | | - () -> { |
286 | | - String result = KafkaCmdClient.describeKafkaBrokerDefaultsUsingPodCli(namespaceName, scraperPodName, bootstrapServer); |
287 | | - |
288 | | - LOGGER.debug("This cluster-wide dyn.configuration {}", result); |
289 | | - |
290 | | - if (!result.contains(brokerConfigName + "=" + value)) { |
291 | | - LOGGER.error("Cluster-wide configuration doesn't contain {} with value {}", brokerConfigName, value); |
292 | | - LOGGER.error("Kafka configuration {}", result); |
293 | | - return false; |
294 | | - } |
295 | | - return true; |
296 | | - }); |
| 288 | + LOGGER.debug("This is cluster-wide dyn.configuration {}", result); |
297 | 289 |
|
| 290 | + if (!result.contains(configName + "=" + value)) { |
| 291 | + LOGGER.error("Cluster-wide configuration doesn't contain {} with value {}", configName, value); |
| 292 | + LOGGER.error("Kafka configuration {}", result); |
| 293 | + return false; |
| 294 | + } |
| 295 | + return true; |
| 296 | + }); |
298 | 297 | } else { |
299 | | - |
300 | 298 | for (Pod pod : brokerPods) { |
| 299 | + TestUtils.waitFor("dyn.configuration to change", TestConstants.GLOBAL_POLL_INTERVAL, TestConstants.RECONCILIATION_INTERVAL + Duration.ofSeconds(10).toMillis(), () -> { |
| 300 | + String result = KafkaCmdClient.describeKafkaBrokerUsingPodCli(namespaceName, scraperPodName, bootstrapServer, brokerId[0]++); |
301 | 301 |
|
302 | | - TestUtils.waitFor("dyn.configuration to change", TestConstants.GLOBAL_POLL_INTERVAL, TestConstants.RECONCILIATION_INTERVAL + Duration.ofSeconds(10).toMillis(), |
303 | | - () -> { |
304 | | - String result = KafkaCmdClient.describeKafkaBrokerUsingPodCli(namespaceName, scraperPodName, bootstrapServer, brokerId[0]++); |
305 | | - |
306 | | - LOGGER.debug("This dyn.configuration {} inside the Kafka Pod: {}/{}", result, namespaceName, pod.getMetadata().getName()); |
| 302 | + LOGGER.debug("This is dyn.configuration {} inside the Kafka Pod: {}/{}", result, namespaceName, pod.getMetadata().getName()); |
307 | 303 |
|
308 | | - if (!result.contains(brokerConfigName + "=" + value)) { |
309 | | - LOGGER.error("Kafka Pod: {}/{} doesn't contain {} with value {}", namespaceName, pod.getMetadata().getName(), brokerConfigName, value); |
310 | | - LOGGER.error("Kafka configuration {}", result); |
311 | | - return false; |
312 | | - } |
313 | | - return true; |
314 | | - }); |
| 304 | + if (!result.contains(configName + "=" + value)) { |
| 305 | + LOGGER.error("Kafka Pod: {}/{} doesn't contain {} with value {}", namespaceName, pod.getMetadata().getName(), configName, value); |
| 306 | + LOGGER.error("Kafka configuration {}", result); |
| 307 | + return false; |
| 308 | + } |
| 309 | + return true; |
| 310 | + }); |
315 | 311 | } |
316 | 312 | } |
317 | 313 | return true; |
318 | 314 | } |
319 | 315 |
|
320 | | - /** |
321 | | - * Loads all kafka config parameters supported by the given {@code kafkaVersion}, as generated by #KafkaConfigModelGenerator in config-model-generator. |
322 | | - * @param kafkaVersion specific kafka version |
323 | | - * @return all supported kafka properties |
324 | | - */ |
325 | | - public static Map<String, ConfigModel> readConfigModel(String kafkaVersion) { |
326 | | - String name = TestUtils.USER_PATH + "/../cluster-operator/src/main/resources/kafka-" + kafkaVersion + "-config-model.json"; |
327 | | - try { |
328 | | - try (InputStream in = new FileInputStream(name)) { |
329 | | - ConfigModels configModels = new ObjectMapper().readValue(in, ConfigModels.class); |
330 | | - if (!kafkaVersion.equals(configModels.getVersion())) { |
331 | | - throw new RuntimeException("Incorrect version"); |
332 | | - } |
333 | | - return configModels.getConfigs(); |
334 | | - } |
335 | | - } catch (IOException e) { |
336 | | - throw new RuntimeException("Error reading from classpath resource " + name, e); |
337 | | - } |
338 | | - } |
339 | | - |
340 | | - /** |
341 | | - * Return dynamic Kafka configs supported by the given version of Kafka. |
342 | | - * @param kafkaVersion specific kafka version |
343 | | - * @return all dynamic properties for specific kafka version |
344 | | - */ |
345 | | - @SuppressWarnings({"checkstyle:CyclomaticComplexity", "checkstyle:BooleanExpressionComplexity"}) |
346 | | - public static Map<String, ConfigModel> getDynamicConfigurationProperties(String kafkaVersion) { |
347 | | - |
348 | | - Map<String, ConfigModel> configs = KafkaUtils.readConfigModel(kafkaVersion); |
349 | | - |
350 | | - LOGGER.info("Kafka config {}", configs.toString()); |
351 | | - |
352 | | - LOGGER.info("Number of all Kafka configs {}", configs.size()); |
353 | | - |
354 | | - Map<String, ConfigModel> dynamicConfigs = configs |
355 | | - .entrySet() |
356 | | - .stream() |
357 | | - .filter(a -> { |
358 | | - String[] prefixKey = a.getKey().split("\\."); |
359 | | - |
360 | | - // filter all which is Scope = ClusterWide or PerBroker |
361 | | - boolean isClusterWideOrPerBroker = a.getValue().getScope() == Scope.CLUSTER_WIDE || a.getValue().getScope() == Scope.PER_BROKER; |
362 | | - |
363 | | - if (prefixKey[0].equals("ssl") || prefixKey[0].equals("sasl") || prefixKey[0].equals("advertised") || |
364 | | - prefixKey[0].equals("listeners") || prefixKey[0].equals("listener")) { |
365 | | - return isClusterWideOrPerBroker && !FORBIDDEN_PREFIXES.contains(prefixKey[0]); |
366 | | - } |
367 | | - |
368 | | - return isClusterWideOrPerBroker; |
369 | | - }) |
370 | | - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); |
371 | | - |
372 | | - LOGGER.info("Number of dynamic-configs {}", dynamicConfigs.size()); |
373 | | - |
374 | | - Map<String, ConfigModel> forbiddenExceptionsConfigs = configs |
375 | | - .entrySet() |
376 | | - .stream() |
377 | | - .filter(a -> FORBIDDEN_PREFIX_EXCEPTIONS.contains(a.getKey())) |
378 | | - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); |
379 | | - |
380 | | - LOGGER.info("Number of forbidden-exception-configs {}", forbiddenExceptionsConfigs.size()); |
381 | | - |
382 | | - Map<String, ConfigModel> dynamicConfigsWithExceptions = new HashMap<>(); |
383 | | - |
384 | | - dynamicConfigsWithExceptions.putAll(dynamicConfigs); |
385 | | - dynamicConfigsWithExceptions.putAll(forbiddenExceptionsConfigs); |
386 | | - |
387 | | - LOGGER.info("Size of dynamic-configs with forbidden-exception-configs {}", dynamicConfigsWithExceptions.size()); |
388 | | - |
389 | | - dynamicConfigsWithExceptions.forEach((key, value) -> LOGGER.info("{} -> {}:{}", key, value.getScope(), value.getType())); |
390 | | - |
391 | | - return dynamicConfigsWithExceptions; |
392 | | - } |
393 | | - |
394 | 316 | /** |
395 | 317 | * Generated random name for the Kafka resource based on prefix |
396 | 318 | * @param clusterName name prefix |
|
0 commit comments