Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ private void configureTls(String listenerName, CertAndKeySecretSource serverCert
String certConfigProviderValue;
String keyConfigProviderValue;
if (serverCertificate != null) {
certConfigProviderValue = String.format(PLACEHOLDER_SECRET_TEMPLATE_KUBE_CONFIG_PROVIDER, reconciliation.namespace(), serverCertificate.getSecretName(), serverCertificate.getCertificate());
keyConfigProviderValue = String.format(PLACEHOLDER_SECRET_TEMPLATE_KUBE_CONFIG_PROVIDER, reconciliation.namespace(), serverCertificate.getSecretName(), serverCertificate.getKey());
certConfigProviderValue = String.format(PLACEHOLDER_SECRET_TEMPLATE_KUBE_CONFIG_PROVIDER, reconciliation.namespace(), node.podName(), listenerNameInProperty + ".crt");
keyConfigProviderValue = String.format(PLACEHOLDER_SECRET_TEMPLATE_KUBE_CONFIG_PROVIDER, reconciliation.namespace(), node.podName(), listenerNameInProperty + ".key");
} else {
certConfigProviderValue = String.format(PLACEHOLDER_SECRET_TEMPLATE_KUBE_CONFIG_PROVIDER, reconciliation.namespace(), node.podName(), node.podName() + ".crt");
keyConfigProviderValue = String.format(PLACEHOLDER_SECRET_TEMPLATE_KUBE_CONFIG_PROVIDER, reconciliation.namespace(), node.podName(), node.podName() + ".key");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1875,18 +1875,6 @@ public List<ConfigMap> generatePerBrokerConfigurationConfigMaps(MetricsAndLoggin
return configMaps;
}

/**
* Generates a Secret with the given name and data in Kafka Cluster's namespace
*
* @param secretData Secret data
* @param secretName Secret name
*
* @return Secret that is generated
*/
public Secret generateSecret(Map<String, String> secretData, String secretName) {
return ModelUtils.createSecret(secretName, namespace, labels, ownerReference, secretData, Map.of(), Map.of());
}

/**
* @return Kafka version
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ private Future<Void> updateKafkaAutoRebalanceStatus(KafkaStatus kafkaStatus) {
Set<Integer> addedBrokers = kafka.addedNodes().stream().filter(NodeRef::broker).map(NodeRef::nodeId).collect(Collectors.toSet());

// if added brokers list contains all desired, it's a newly created cluster so there are no actual scaled up brokers.
// when added brokers list has fewer nodes than desired, it actually containes the new ones for scaling up
// when added brokers list has fewer nodes than desired, it actually containers the new ones for scaling up
Set<Integer> scaledUpBrokerNodes = addedBrokers.containsAll(desiredBrokers) ? Set.of() : addedBrokers;

KafkaRebalanceUtils.updateKafkaAutoRebalanceStatus(kafkaStatus, kafkaAutoRebalanceStatus, scaledUpBrokerNodes);
Expand Down Expand Up @@ -757,21 +757,50 @@ protected Future<Void> brokerConfigurationConfigMaps() {
*/
protected Future<Void> certificateSecrets(Clock clock) {
return secretOperator.listAsync(reconciliation.namespace(), kafka.getSelectorLabels().withStrimziComponentType(KafkaCluster.COMPONENT_TYPE))
.compose(existingSecrets -> {
List<Secret> desiredCertSecrets = kafka.generateCertificatesSecrets(clusterCa, clientsCa, existingSecrets,
listenerReconciliationResults.bootstrapDnsNames, listenerReconciliationResults.brokerDnsNames,
Util.isMaintenanceTimeWindowsSatisfied(reconciliation, maintenanceWindows, clock.instant()));

List<String> desiredCertSecretNames = desiredCertSecrets.stream().map(secret -> secret.getMetadata().getName()).toList();
existingSecrets.forEach(secret -> {
String secretName = secret.getMetadata().getName();
// Don't delete desired secrets or jmx secrets
if (!desiredCertSecretNames.contains(secretName) && !KafkaResources.kafkaJmxSecretName(reconciliation.name()).equals(secretName)) {
secretsToDelete.add(secretName);
}
});
return updateCertificateSecrets(desiredCertSecrets);
}).mapEmpty();
.compose(existingSecrets -> getDesiredCertSecrets(clock, existingSecrets))
.compose(this::addCustomCertsToDesiredCertSecrets)
.compose(this::updateCertificateSecrets)
.mapEmpty();

}

private Future<List<Secret>> getDesiredCertSecrets(Clock clock, List<Secret> existingSecrets) {
List<Secret> desiredCertSecrets = kafka.generateCertificatesSecrets(clusterCa, clientsCa, existingSecrets,
listenerReconciliationResults.bootstrapDnsNames, listenerReconciliationResults.brokerDnsNames,
Util.isMaintenanceTimeWindowsSatisfied(reconciliation, maintenanceWindows, clock.instant()));

List<String> desiredCertSecretNames = desiredCertSecrets.stream().map(secret -> secret.getMetadata().getName()).toList();
existingSecrets.forEach(secret -> {
String secretName = secret.getMetadata().getName();
// Don't delete desired secrets or jmx secrets
if (!desiredCertSecretNames.contains(secretName) && !KafkaResources.kafkaJmxSecretName(reconciliation.name()).equals(secretName)) {
secretsToDelete.add(secretName);
}
});
return Future.succeededFuture(desiredCertSecrets);
}

private Future<List<Secret>> addCustomCertsToDesiredCertSecrets(List<Secret> desiredCertSecrets) {
return collectCustomCerts()
.map(customCertsData -> {
desiredCertSecrets.forEach(s -> s.getData().putAll(customCertsData));
return desiredCertSecrets;
});
}

private Future<Map<String, String>> collectCustomCerts() {
Map<String, String> customCertsData = new HashMap<>();
List<Future<Object>> futures = kafka.getListeners().stream()
.filter(l -> l.isTls() && l.getConfiguration() != null)
.map(l ->
ReconcilerUtils.getCertificateAndKeyAsync(secretOperator, reconciliation.namespace(), l.getConfiguration().getBrokerCertChainAndKey())
.onSuccess(certAndKey -> {
customCertsData.putAll(CertUtils.buildSecretData(ListenersUtils.identifier(l), certAndKey));
})
.mapEmpty()
).toList();
return Future.all(futures)
.map(f -> customCertsData);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.fabric8.kubernetes.api.model.Secret;
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.strimzi.api.kafka.model.common.CertAndKeySecretSource;
import io.strimzi.api.kafka.model.common.CertSecretSource;
import io.strimzi.api.kafka.model.common.GenericSecretSource;
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthentication;
Expand Down Expand Up @@ -621,9 +622,25 @@ private static Future<String> getCertificateAsync(SecretOperator secretOperator,

private static Future<CertAndKey> getCertificateAndKeyAsync(SecretOperator secretOperator, String namespace, KafkaClientAuthenticationTls auth) {
return getValidatedSecret(secretOperator, namespace, auth.getCertificateAndKey().getSecretName(), auth.getCertificateAndKey().getCertificate(), auth.getCertificateAndKey().getKey())
//TODO: shouldn't this be decoded since we encode again? Otherwise encoded value gets encoded again for calculating the hash.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scholzj can I please clarify something here

I was going to make this method public and then use it for extracting CertAndKey from the secret, but realised that it returns it with secret data values that are not decoded (stays encoded in base64). Then the method that calls this method authTlsHash(), encodes the values again, meaning that certificates and keys end up encoded in base64 twice before calculating the hash for it. I'm not sure if that makes sense or was intentional so wonder if we should decode the extracted values before constructing CertAndKey object to return, just like I did in the new method below. In that case we wouldn't need 2 separate methods for the purpose.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think there is any intention of having the hash calculated from data that are twice encoded to Base64 (that said, doing the base64 encoding twice is wasteful, but it should still produce a stable hash, so it should not really cause major issues).

But I think that if you can do it easily, you can definitely remove it. I guess this was either done by mistake or because some of the values being fed there are decoded and some are encoded.

I do not remember the details of how the CertAndKey is used, so not 100% sure if it is better to keep the values encoded or decoded inside it. Normally, the base64 encoding matters for us only when interacting with Kubernetes Secrets. But the certificates themself should be anyway strings even without the base64 encoding.

.compose(secret -> Future.succeededFuture(new CertAndKey(secret.getData().get(auth.getCertificateAndKey().getKey()).getBytes(StandardCharsets.UTF_8), secret.getData().get(auth.getCertificateAndKey().getCertificate()).getBytes(StandardCharsets.UTF_8))));
}

/**
* Extracts certificate and key from the given Secret source.
*
* @param secretOperator Secrets operator
* @param namespace namespace to get Secret in
* @param certAndKeySecretSource Secret source for certificate and key
* @return Certificate and Key extracted from the Secret source and decoded
*/
public static Future<CertAndKey> getCertificateAndKeyAsync(SecretOperator secretOperator, String namespace, CertAndKeySecretSource certAndKeySecretSource) {
return getValidatedSecret(secretOperator, namespace, certAndKeySecretSource.getSecretName(), certAndKeySecretSource.getCertificate(), certAndKeySecretSource.getKey())
.compose(secret -> Future.succeededFuture(new CertAndKey(
Util.decodeBytesFromBase64(secret.getData().get(certAndKeySecretSource.getKey())),
Util.decodeBytesFromBase64(secret.getData().get(certAndKeySecretSource.getCertificate())))));
}

private static Future<String> getPasswordAsync(SecretOperator secretOperator, String namespace, KafkaClientAuthentication auth) {
if (auth instanceof KafkaClientAuthenticationPlain plainAuth) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1406,8 +1406,8 @@ public void testWithTlsListenersWithCustomCerts() {
"inter.broker.listener.name=REPLICATION-9091",
"sasl.enabled.mechanisms=",
"ssl.endpoint.identification.algorithm=HTTPS",
"listener.name.tls-9093.ssl.keystore.certificate.chain=${strimzisecrets:namespace/my-secret:my.crt}",
"listener.name.tls-9093.ssl.keystore.key=${strimzisecrets:namespace/my-secret:my.key}",
"listener.name.tls-9093.ssl.keystore.certificate.chain=${strimzisecrets:namespace/my-cluster-kafka-2:tls-9093.crt}",
"listener.name.tls-9093.ssl.keystore.key=${strimzisecrets:namespace/my-cluster-kafka-2:tls-9093.key}",
"listener.name.tls-9093.ssl.keystore.type=PEM"));
}

Expand Down Expand Up @@ -1568,8 +1568,8 @@ public void testWithExternalRouteListenersWithCustomCerts() {
"inter.broker.listener.name=REPLICATION-9091",
"sasl.enabled.mechanisms=",
"ssl.endpoint.identification.algorithm=HTTPS",
"listener.name.external-9094.ssl.keystore.certificate.chain=${strimzisecrets:namespace/my-secret:my.crt}",
"listener.name.external-9094.ssl.keystore.key=${strimzisecrets:namespace/my-secret:my.key}",
"listener.name.external-9094.ssl.keystore.certificate.chain=${strimzisecrets:namespace/my-cluster-kafka-2:external-9094.crt}",
"listener.name.external-9094.ssl.keystore.key=${strimzisecrets:namespace/my-cluster-kafka-2:external-9094.key}",
"listener.name.external-9094.ssl.keystore.type=PEM"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2326,8 +2326,8 @@ public void testExternalCertificateIngress() {
KafkaCluster kc = KafkaCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, kafkaAssembly, pools, VERSIONS, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, null, SHARED_ENV_PROVIDER);
String brokerConfig = kc.generatePerBrokerConfiguration(5, ADVERTISED_HOSTNAMES, ADVERTISED_PORTS);

assertThat(brokerConfig, is(containsString("listener.name.external-9094.ssl.keystore.certificate.chain=${strimzisecrets:namespace/my-secret:my-external-cert.crt}")));
assertThat(brokerConfig, is(containsString("listener.name.external-9094.ssl.keystore.key=${strimzisecrets:namespace/my-secret:my.key}")));
assertThat(brokerConfig, is(containsString("listener.name.external-9094.ssl.keystore.certificate.chain=${strimzisecrets:namespace/foo-brokers-5:external-9094.crt}")));
assertThat(brokerConfig, is(containsString("listener.name.external-9094.ssl.keystore.key=${strimzisecrets:namespace/foo-brokers-5:external-9094.key}")));
assertThat(brokerConfig, is(containsString("listener.name.external-9094.ssl.keystore.type=PEM")));

String controllerConfig = kc.generatePerBrokerConfiguration(0, Map.of(0, Map.of("CONTROLPLANE_9090", "controller-0")), Map.of(0, Map.of("CONTROLPLANE_9090", "9090")));
Expand Down Expand Up @@ -2363,8 +2363,8 @@ public void testCustomCertificateTls() {
KafkaCluster kc = KafkaCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, kafkaAssembly, pools, VERSIONS, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, null, SHARED_ENV_PROVIDER);
String mixedConfig = kc.generatePerBrokerConfiguration(4, ADVERTISED_HOSTNAMES, ADVERTISED_PORTS);

assertThat(mixedConfig, is(containsString("listener.name.tls-9093.ssl.keystore.certificate.chain=${strimzisecrets:namespace/my-secret:my-external-cert.crt}")));
assertThat(mixedConfig, is(containsString("listener.name.tls-9093.ssl.keystore.key=${strimzisecrets:namespace/my-secret:my.key}")));
assertThat(mixedConfig, is(containsString("listener.name.tls-9093.ssl.keystore.certificate.chain=${strimzisecrets:namespace/foo-mixed-4:tls-9093.crt}")));
assertThat(mixedConfig, is(containsString("listener.name.tls-9093.ssl.keystore.key=${strimzisecrets:namespace/foo-mixed-4:tls-9093.key}")));
assertThat(mixedConfig, is(containsString("listener.name.tls-9093.ssl.keystore.type=PEM")));

String controllerConfig = kc.generatePerBrokerConfiguration(0, Map.of(0, Map.of("CONTROLPLANE_9090", "controller-0")), Map.of(0, Map.of("CONTROLPLANE_9090", "9090")));
Expand Down
Loading