Skip to content

Commit a91f51e

Browse files
authored
Move methods not related to Vert.x out of VertxUtil class (#11768)
Signed-off-by: Jakub Scholz <www@scholzj.com>
1 parent de96c6e commit a91f51e

File tree

7 files changed

+486
-522
lines changed

7 files changed

+486
-522
lines changed

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/VertxUtil.java

Lines changed: 0 additions & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -4,40 +4,20 @@
44
*/
55
package io.strimzi.operator.cluster.operator;
66

7-
import io.fabric8.kubernetes.api.model.Secret;
8-
import io.strimzi.api.kafka.model.common.CertSecretSource;
9-
import io.strimzi.api.kafka.model.common.GenericSecretSource;
10-
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthentication;
11-
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationOAuth;
12-
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationPlain;
13-
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScram;
14-
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationTls;
15-
import io.strimzi.certs.CertAndKey;
16-
import io.strimzi.operator.cluster.operator.resource.kubernetes.SecretOperator;
17-
import io.strimzi.operator.common.InvalidConfigurationException;
187
import io.strimzi.operator.common.Reconciliation;
198
import io.strimzi.operator.common.ReconciliationLogger;
209
import io.strimzi.operator.common.TimeoutException;
21-
import io.strimzi.operator.common.model.InvalidResourceException;
2210
import io.vertx.core.Future;
2311
import io.vertx.core.Handler;
2412
import io.vertx.core.Promise;
2513
import io.vertx.core.Vertx;
2614
import org.apache.kafka.common.KafkaFuture;
2715

28-
import java.nio.charset.StandardCharsets;
29-
import java.nio.file.FileSystems;
30-
import java.nio.file.PathMatcher;
31-
import java.nio.file.Paths;
32-
import java.util.ArrayList;
33-
import java.util.List;
34-
import java.util.Map;
3516
import java.util.concurrent.CompletableFuture;
3617
import java.util.concurrent.CompletionException;
3718
import java.util.function.BooleanSupplier;
3819
import java.util.function.Predicate;
3920
import java.util.function.Supplier;
40-
import java.util.stream.Collectors;
4121

4222
/**
4323
* Class with various utility methods that use or depend on Vert.x core.
@@ -214,159 +194,4 @@ public static <T> Future<T> completableFutureToVertxFuture(CompletableFuture<T>
214194
});
215195
return promise.future();
216196
}
217-
218-
/**
219-
* When TLS certificate or Auth certificate (or password) is changed, the hash is computed.
220-
* It is used for rolling updates.
221-
* @param secretOperations Secret operator
222-
* @param namespace namespace to get Secrets in
223-
* @param auth Authentication object to compute hash from
224-
* @param certSecretSources TLS trusted certificates whose hashes are joined to result
225-
* @return Future computing hash from TLS + Auth
226-
*/
227-
public static Future<Integer> authTlsHash(SecretOperator secretOperations, String namespace, KafkaClientAuthentication auth, List<CertSecretSource> certSecretSources) {
228-
Future<Integer> tlsFuture;
229-
if (certSecretSources == null || certSecretSources.isEmpty()) {
230-
tlsFuture = Future.succeededFuture(0);
231-
} else {
232-
// get all TLS trusted certs, compute hash from each of them, sum hashes
233-
tlsFuture = Future.join(certSecretSources.stream().map(certSecretSource ->
234-
getCertificateAsync(secretOperations, namespace, certSecretSource)
235-
.compose(cert -> Future.succeededFuture(cert.hashCode()))).collect(Collectors.toList()))
236-
.compose(hashes -> Future.succeededFuture(hashes.list().stream().mapToInt(e -> (int) e).sum()));
237-
}
238-
239-
if (auth == null) {
240-
return tlsFuture;
241-
} else {
242-
// compute hash from Auth
243-
if (auth instanceof KafkaClientAuthenticationScram) {
244-
// only passwordSecret can be changed
245-
return tlsFuture.compose(tlsHash -> getPasswordAsync(secretOperations, namespace, auth)
246-
.compose(password -> Future.succeededFuture(password.hashCode() + tlsHash)));
247-
} else if (auth instanceof KafkaClientAuthenticationPlain) {
248-
// only passwordSecret can be changed
249-
return tlsFuture.compose(tlsHash -> getPasswordAsync(secretOperations, namespace, auth)
250-
.compose(password -> Future.succeededFuture(password.hashCode() + tlsHash)));
251-
} else if (auth instanceof KafkaClientAuthenticationTls) {
252-
// custom cert can be used (and changed)
253-
return ((KafkaClientAuthenticationTls) auth).getCertificateAndKey() == null ? tlsFuture :
254-
tlsFuture.compose(tlsHash -> getCertificateAndKeyAsync(secretOperations, namespace, (KafkaClientAuthenticationTls) auth)
255-
.compose(crtAndKey -> Future.succeededFuture(crtAndKey.certAsBase64String().hashCode() + crtAndKey.keyAsBase64String().hashCode() + tlsHash)));
256-
} else if (auth instanceof KafkaClientAuthenticationOAuth) {
257-
List<Future<Integer>> futureList = ((KafkaClientAuthenticationOAuth) auth).getTlsTrustedCertificates() == null ?
258-
new ArrayList<>() : ((KafkaClientAuthenticationOAuth) auth).getTlsTrustedCertificates().stream().map(certSecretSource ->
259-
getCertificateAsync(secretOperations, namespace, certSecretSource)
260-
.compose(cert -> Future.succeededFuture(cert.hashCode()))).collect(Collectors.toList());
261-
futureList.add(tlsFuture);
262-
futureList.add(addSecretHash(secretOperations, namespace, ((KafkaClientAuthenticationOAuth) auth).getAccessToken()));
263-
futureList.add(addSecretHash(secretOperations, namespace, ((KafkaClientAuthenticationOAuth) auth).getClientSecret()));
264-
futureList.add(addSecretHash(secretOperations, namespace, ((KafkaClientAuthenticationOAuth) auth).getRefreshToken()));
265-
return Future.join(futureList)
266-
.compose(hashes -> Future.succeededFuture(hashes.list().stream().mapToInt(e -> (int) e).sum()));
267-
} else {
268-
// unknown Auth type
269-
return tlsFuture;
270-
}
271-
}
272-
}
273-
274-
private static Future<Integer> addSecretHash(SecretOperator secretOperations, String namespace, GenericSecretSource genericSecretSource) {
275-
if (genericSecretSource != null) {
276-
return secretOperations.getAsync(namespace, genericSecretSource.getSecretName())
277-
.compose(secret -> {
278-
if (secret == null) {
279-
return Future.failedFuture("Secret " + genericSecretSource.getSecretName() + " not found");
280-
} else {
281-
return Future.succeededFuture(secret.getData().get(genericSecretSource.getKey()).hashCode());
282-
}
283-
});
284-
}
285-
return Future.succeededFuture(0);
286-
}
287-
288-
/**
289-
* Utility method which gets the secret and validates that the required fields are present in it
290-
*
291-
* @param secretOperator Secret operator to get the secret from the Kubernetes API
292-
* @param namespace Namespace where the Secret exist
293-
* @param name Name of the Secret
294-
* @param items List of items which should be present in the Secret
295-
*
296-
* @return Future with the Secret if is exits and has the required items. Failed future with an error message otherwise.
297-
*/
298-
/* test */ static Future<Secret> getValidatedSecret(SecretOperator secretOperator, String namespace, String name, String... items) {
299-
return secretOperator.getAsync(namespace, name)
300-
.compose(secret -> validatedSecret(namespace, name, secret, items));
301-
}
302-
303-
/**
304-
* Utility method which validates that the required fields are present in a Secret passed to it
305-
*
306-
* @param namespace Namespace of the Secret
307-
* @param name Name of the Secret (used for error message in case the Secret is null)
308-
* @param secret Secret that should be validated or null if the Secret does not exist
309-
* @param items List of items which should be present in the Secret
310-
*
311-
* @return Future with the Secret if is exits and has the required items. Failed future with an error message otherwise.
312-
*/
313-
/* test */ static Future<Secret> validatedSecret(String namespace, String name, Secret secret, String... items) {
314-
if (secret == null) {
315-
return Future.failedFuture(new InvalidConfigurationException("Secret " + name + " not found in namespace " + namespace));
316-
} else {
317-
List<String> errors = new ArrayList<>(0);
318-
319-
if (items != null) {
320-
for (String item : items) {
321-
if (!secret.getData().containsKey(item)) {
322-
// Item not found => error will be raised
323-
errors.add(item);
324-
}
325-
}
326-
}
327-
328-
if (errors.isEmpty()) {
329-
return Future.succeededFuture(secret);
330-
} else {
331-
return Future.failedFuture(new InvalidConfigurationException(String.format("Items with key(s) %s are missing in Secret %s", errors, name)));
332-
}
333-
}
334-
}
335-
336-
private static Future<String> getCertificateAsync(SecretOperator secretOperator, String namespace, CertSecretSource certSecretSource) {
337-
return secretOperator.getAsync(namespace, certSecretSource.getSecretName())
338-
.compose(secret -> {
339-
if (certSecretSource.getCertificate() != null) {
340-
return validatedSecret(namespace, certSecretSource.getSecretName(), secret, certSecretSource.getCertificate())
341-
.compose(validatedSecret -> Future.succeededFuture(validatedSecret.getData().get(certSecretSource.getCertificate())));
342-
} else if (certSecretSource.getPattern() != null) {
343-
PathMatcher matcher = FileSystems.getDefault().getPathMatcher("glob:" + certSecretSource.getPattern());
344-
345-
return validatedSecret(namespace, certSecretSource.getSecretName(), secret)
346-
.compose(validatedSecret -> Future.succeededFuture(validatedSecret.getData().entrySet().stream().filter(e -> matcher.matches(Paths.get(e.getKey()))).map(Map.Entry::getValue).sorted().collect(Collectors.joining())));
347-
} else {
348-
throw new InvalidResourceException("Certificate source does not contain the certificate or the pattern.");
349-
}
350-
});
351-
}
352-
353-
private static Future<CertAndKey> getCertificateAndKeyAsync(SecretOperator secretOperator, String namespace, KafkaClientAuthenticationTls auth) {
354-
return getValidatedSecret(secretOperator, namespace, auth.getCertificateAndKey().getSecretName(), auth.getCertificateAndKey().getCertificate(), auth.getCertificateAndKey().getKey())
355-
.compose(secret -> Future.succeededFuture(new CertAndKey(secret.getData().get(auth.getCertificateAndKey().getKey()).getBytes(StandardCharsets.UTF_8), secret.getData().get(auth.getCertificateAndKey().getCertificate()).getBytes(StandardCharsets.UTF_8))));
356-
}
357-
358-
private static Future<String> getPasswordAsync(SecretOperator secretOperator, String namespace, KafkaClientAuthentication auth) {
359-
if (auth instanceof KafkaClientAuthenticationPlain plainAuth) {
360-
361-
return getValidatedSecret(secretOperator, namespace, plainAuth.getPasswordSecret().getSecretName(), plainAuth.getPasswordSecret().getPassword())
362-
.compose(secret -> Future.succeededFuture(secret.getData().get(plainAuth.getPasswordSecret().getPassword())));
363-
} else if (auth instanceof KafkaClientAuthenticationScram scramAuth) {
364-
365-
return getValidatedSecret(secretOperator, namespace, scramAuth.getPasswordSecret().getSecretName(), scramAuth.getPasswordSecret().getPassword())
366-
.compose(secret -> Future.succeededFuture(secret.getData().get(scramAuth.getPasswordSecret().getPassword())));
367-
} else {
368-
return Future.failedFuture("Auth type " + auth.getType() + " does not have a password property");
369-
}
370-
}
371-
372197
}

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperator.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.strimzi.operator.cluster.PlatformFeaturesAvailability;
2222
import io.strimzi.operator.cluster.model.KafkaBridgeCluster;
2323
import io.strimzi.operator.cluster.model.SharedEnvironmentProvider;
24-
import io.strimzi.operator.cluster.operator.VertxUtil;
2524
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
2625
import io.strimzi.operator.cluster.operator.resource.kubernetes.DeploymentOperator;
2726
import io.strimzi.operator.common.Annotations;
@@ -110,7 +109,7 @@ protected Future<KafkaBridgeStatus> createOrUpdate(Reconciliation reconciliation
110109
return configMapOperations.reconcile(reconciliation, namespace, KafkaBridgeResources.configMapName(reconciliation.name()), configMap);
111110
})
112111
.compose(i -> isPodDisruptionBudgetGeneration ? podDisruptionBudgetOperator.reconcile(reconciliation, namespace, bridge.getComponentName(), bridge.generatePodDisruptionBudget()) : Future.succeededFuture())
113-
.compose(i -> VertxUtil.authTlsHash(secretOperations, namespace, auth, trustedCertificates))
112+
.compose(i -> ReconcilerUtils.authTlsHash(secretOperations, namespace, auth, trustedCertificates))
114113
.compose(authTlsHash -> {
115114
podAnnotations.put(Annotations.ANNO_STRIMZI_AUTH_HASH, Integer.toString(authTlsHash));
116115
return deploymentOperations.reconcile(reconciliation, namespace, bridge.getComponentName(), bridge.generateDeployment(podAnnotations, pfa.isOpenshift(), imagePullPolicy, imagePullSecrets));

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectAssemblyOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ private Future<Void> updateConnectorsThatConnectClusterWasDeleted(Reconciliation
311311
private Future<Integer> generateAuthHash(String namespace, KafkaConnectSpec kafkaConnectSpec) {
312312
KafkaClientAuthentication auth = kafkaConnectSpec.getAuthentication();
313313
List<CertSecretSource> trustedCertificates = kafkaConnectSpec.getTls() == null ? Collections.emptyList() : kafkaConnectSpec.getTls().getTrustedCertificates();
314-
return VertxUtil.authTlsHash(secretOperations, namespace, auth, trustedCertificates);
314+
return ReconcilerUtils.authTlsHash(secretOperations, namespace, auth, trustedCertificates);
315315
}
316316

317317
/**

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ private Future<Integer> generateAuthHash(String namespace, KafkaMirrorMaker2Spec
203203
.stream()
204204
.map(cluster -> {
205205
List<CertSecretSource> trustedCertificates = cluster.getTls() == null ? Collections.emptyList() : cluster.getTls().getTrustedCertificates();
206-
return VertxUtil.authTlsHash(secretOperations, namespace, cluster.getAuthentication(), trustedCertificates);
206+
return ReconcilerUtils.authTlsHash(secretOperations, namespace, cluster.getAuthentication(), trustedCertificates);
207207
}).collect(Collectors.toList())
208208
)
209209
.onSuccess(hashes -> {

0 commit comments

Comments
 (0)