Skip to content

Commit b52ab91

Browse files
authored
Allow KafkaRoller connect to controllers (#11838)
Signed-off-by: Gantigmaa Selenge <tina.selenge@gmail.com>
1 parent fdf136d commit b52ab91

8 files changed

Lines changed: 142 additions & 80 deletions

File tree

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,10 @@ public class KafkaCluster extends AbstractModel implements SupportsMetrics, Supp
155155
protected static final String REPLICATION_PORT_NAME = "tcp-replication";
156156
protected static final int KAFKA_AGENT_PORT = 8443;
157157
protected static final String KAFKA_AGENT_PORT_NAME = "tcp-kafkaagent";
158-
protected static final int CONTROLPLANE_PORT = 9090;
158+
/**
159+
* Port number used for control plane
160+
*/
161+
public static final int CONTROLPLANE_PORT = 9090;
159162
protected static final String CONTROLPLANE_PORT_NAME = "tcp-ctrlplane"; // port name is up to 15 characters
160163

161164
/**

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaRoller.java

Lines changed: 28 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,7 @@
3434
import org.apache.kafka.clients.admin.AlterConfigOp;
3535
import org.apache.kafka.clients.admin.AlterConfigsResult;
3636
import org.apache.kafka.clients.admin.Config;
37-
import org.apache.kafka.common.KafkaException;
3837
import org.apache.kafka.common.KafkaFuture;
39-
import org.apache.kafka.common.config.ConfigException;
4038
import org.apache.kafka.common.config.ConfigResource;
4139
import org.apache.kafka.common.errors.SslAuthenticationException;
4240

@@ -190,7 +188,7 @@ public KafkaRoller(Reconciliation reconciliation, Vertx vertx, PodOperator podOp
190188
private boolean maybeInitBrokerAdminClient() {
191189
if (this.brokerAdminClient == null) {
192190
try {
193-
this.brokerAdminClient = adminClient(nodes.stream().filter(NodeRef::broker).collect(Collectors.toSet()), false);
191+
this.brokerAdminClient = adminClient(nodes, false);
194192
} catch (ForceableProblem | FatalProblem e) {
195193
LOGGER.warnCr(reconciliation, "Failed to create brokerAdminClient.", e);
196194
return false;
@@ -206,13 +204,7 @@ private boolean maybeInitBrokerAdminClient() {
206204
private boolean maybeInitControllerAdminClient() {
207205
if (this.controllerAdminClient == null) {
208206
try {
209-
// TODO: Currently, when running in KRaft mode Kafka does not support using Kafka Admin API with controller
210-
// nodes. This is tracked in https://github.com/strimzi/strimzi-kafka-operator/issues/9692.
211-
// Therefore use broker nodes of the cluster to initialise adminClient for quorum health check.
212-
// Once Kafka Admin API is supported for controllers, nodes.stream().filter(NodeRef:controller)
213-
// can be used here. Until then pass an empty set of nodes so the client is initialized with
214-
// the brokers service.
215-
this.controllerAdminClient = adminClient(Set.of(), false);
207+
this.controllerAdminClient = adminClient(nodes, true);
216208
} catch (ForceableProblem | FatalProblem e) {
217209
LOGGER.warnCr(reconciliation, "Failed to create controllerAdminClient.", e);
218210
return false;
@@ -400,7 +392,7 @@ private void restartIfNecessary(NodeRef nodeRef, RestartContext restartContext)
400392
if (!restartContext.podStuck) {
401393
// We want to give pods chance to get ready before we try to connect to the or consider them for rolling.
402394
// This is important especially for pods which were just started. But only in case when they are not stuck.
403-
// If the pod is stuck, it suggests that it is running already for some time and it will not become ready.
395+
// If the pod is stuck, it suggests that it is running already for some time, and it will not become ready.
404396
// Waiting for it would likely just waste time.
405397
LOGGER.debugCr(reconciliation, "Waiting for pod {} to become ready before checking its state", nodeRef.podName());
406398
try {
@@ -583,32 +575,12 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont
583575

584576
if (isController) {
585577
if (maybeInitControllerAdminClient()) {
586-
String controllerQuorumFetchTimeout = CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT;
587-
String desiredConfig = kafkaConfigProvider.apply(nodeRef.nodeId());
588-
589-
if (desiredConfig != null) {
590-
OrderedProperties orderedProperties = new OrderedProperties();
591-
controllerQuorumFetchTimeout = orderedProperties.addStringPairs(desiredConfig).asMap().getOrDefault(CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_NAME, CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT);
592-
}
593-
594-
restartContext.quorumCheck = quorumCheck(controllerAdminClient, Long.parseLong(controllerQuorumFetchTimeout));
578+
restartContext.quorumCheck = quorumCheck(controllerAdminClient, nodeRef);
595579
} else {
596-
//TODO When https://github.com/strimzi/strimzi-kafka-operator/issues/9692 is complete
597-
// we should change this logic to immediately restart this pod because we cannot connect to it.
598-
if (isBroker) {
599-
// If it is a combined node (controller and broker) and the admin client cannot be initialised,
600-
// restart this pod. There is no reason to continue as we won't be able to
601-
// connect an admin client to this pod for other checks later.
602-
LOGGER.infoCr(reconciliation, "KafkaQuorumCheck cannot be initialised for {} because none of the brokers do not seem to responding to connection attempts. " +
603-
"Restarting pod because it is a combined node so it is one of the brokers that is not responding.", nodeRef);
604-
reasonToRestartPod.add(RestartReason.POD_UNRESPONSIVE);
605-
markRestartContextWithForceRestart(restartContext);
606-
return;
607-
} else {
608-
// If it is a controller only node throw an UnforceableProblem, so we try again until the backOff
609-
// is finished, then it will move on to the next controller and eventually the brokers.
610-
throw new UnforceableProblem("KafkaQuorumCheck cannot be initialised for " + nodeRef + " because none of the brokers do not seem to responding to connection attempts");
611-
}
580+
LOGGER.infoCr(reconciliation, "Pod {} needs to be restarted, because it does not seem to responding to connection attempts", nodeRef);
581+
reasonToRestartPod.add(RestartReason.POD_UNRESPONSIVE);
582+
markRestartContextWithForceRestart(restartContext);
583+
return;
612584
}
613585
}
614586

@@ -811,7 +783,6 @@ private void awaitReadiness(Pod pod, long timeout, TimeUnit unit) throws FatalPr
811783
* @param <E> The exception type
812784
* @return The result of the future
813785
* @throws E The exception type returned from {@code exceptionMapper}.
814-
* @throws TimeoutException If the given future is not completed before the timeout.
815786
* @throws InterruptedException If the waiting was interrupted.
816787
*/
817788
private static <T, E extends Exception> T await(Future<T> future, long timeout, TimeUnit unit,
@@ -855,36 +826,35 @@ protected Future<Void> restart(Pod pod, RestartContext restartContext) {
855826
* Returns an AdminClient instance bootstrapped from the given nodes. If nodes is an
856827
* empty set, use the brokers service to bootstrap the client.
857828
*/
858-
/* test */ Admin adminClient(Set<NodeRef> nodes, boolean ceShouldBeFatal) throws ForceableProblem, FatalProblem {
859-
// If no nodes are passed initialize the admin client using the brokers service
860-
// TODO when https://github.com/strimzi/strimzi-kafka-operator/issues/9692 is completed review whether
861-
// this function can be reverted to expect nodes to be non empty
862-
String bootstrapHostnames;
863-
if (nodes.isEmpty()) {
864-
bootstrapHostnames = String.format("%s:%s", DnsNameGenerator.of(namespace, KafkaResources.bootstrapServiceName(cluster)).serviceDnsName(), KafkaCluster.REPLICATION_PORT);
865-
} else {
866-
bootstrapHostnames = nodes.stream().map(node -> DnsNameGenerator.podDnsName(namespace, KafkaResources.brokersServiceName(cluster), node.podName()) + ":" + KafkaCluster.REPLICATION_PORT).collect(Collectors.joining(","));
867-
}
868-
829+
/* test */ Admin adminClient(Set<NodeRef> nodes, boolean isController) throws ForceableProblem, FatalProblem {
830+
String bootstrapHostnames = null;
869831
try {
870-
LOGGER.debugCr(reconciliation, "Creating AdminClient for {}", bootstrapHostnames);
871-
return adminClientProvider.createAdminClient(bootstrapHostnames, coTlsPemIdentity.pemTrustSet(), coTlsPemIdentity.pemAuthIdentity());
872-
} catch (KafkaException e) {
873-
if (ceShouldBeFatal && (e instanceof ConfigException
874-
|| e.getCause() instanceof ConfigException)) {
875-
throw new FatalProblem("An error while try to create an admin client with bootstrap brokers " + bootstrapHostnames, e);
832+
if (isController) {
833+
bootstrapHostnames = nodes.stream().filter(NodeRef::controller).map(node -> DnsNameGenerator.podDnsName(namespace, KafkaResources.brokersServiceName(cluster), node.podName()) + ":" + KafkaCluster.CONTROLPLANE_PORT).collect(Collectors.joining(","));
834+
LOGGER.debugCr(reconciliation, "Creating AdminClient for {}", bootstrapHostnames);
835+
return adminClientProvider.createControllerAdminClient(bootstrapHostnames, coTlsPemIdentity.pemTrustSet(), coTlsPemIdentity.pemAuthIdentity());
876836
} else {
877-
throw new ForceableProblem("An error while try to create an admin client with bootstrap brokers " + bootstrapHostnames, e);
837+
bootstrapHostnames = nodes.stream().filter(NodeRef::broker).map(node -> DnsNameGenerator.podDnsName(namespace, KafkaResources.brokersServiceName(cluster), node.podName()) + ":" + KafkaCluster.REPLICATION_PORT).collect(Collectors.joining(","));
838+
LOGGER.debugCr(reconciliation, "Creating AdminClient for {}", bootstrapHostnames);
839+
return adminClientProvider.createAdminClient(bootstrapHostnames, coTlsPemIdentity.pemTrustSet(), coTlsPemIdentity.pemAuthIdentity());
878840
}
879841
} catch (RuntimeException e) {
880-
throw new ForceableProblem("An error while try to create an admin client with bootstrap brokers " + bootstrapHostnames, e);
842+
throw new ForceableProblem("An error while try to create an admin client with bootstrap " + bootstrapHostnames, e);
881843
}
882844
}
883845

884-
/* test */ KafkaQuorumCheck quorumCheck(Admin ac, long controllerQuorumFetchTimeoutMs) {
885-
return new KafkaQuorumCheck(reconciliation, ac, vertx, controllerQuorumFetchTimeoutMs);
846+
/* test */ KafkaQuorumCheck quorumCheck(Admin ac, NodeRef nodeRef) {
847+
String controllerQuorumFetchTimeout = CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT;
848+
String desiredConfig = kafkaConfigProvider.apply(nodeRef.nodeId());
849+
850+
if (desiredConfig != null) {
851+
OrderedProperties orderedProperties = new OrderedProperties();
852+
controllerQuorumFetchTimeout = orderedProperties.addStringPairs(desiredConfig).asMap().getOrDefault(CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_NAME, CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT);
853+
}
854+
return new KafkaQuorumCheck(reconciliation, ac, vertx, Long.parseLong(controllerQuorumFetchTimeout));
886855
}
887856

857+
888858
/* test */ KafkaAvailability availability(Admin ac) {
889859
return new KafkaAvailability(reconciliation, ac);
890860
}

cluster-operator/src/test/java/io/strimzi/operator/cluster/ResourceUtils.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,10 +376,20 @@ public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTru
376376
return createAdminClient(bootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties());
377377
}
378378

379+
@Override
380+
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity) {
381+
return createControllerAdminClient(controllerBootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties());
382+
}
383+
379384
@Override
380385
public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
381386
return mockAdminClient;
382387
}
388+
389+
@Override
390+
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
391+
return mockAdminClient;
392+
}
383393
};
384394
}
385395

cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaRollerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -997,7 +997,7 @@ Future<Boolean> canRoll(int podId) {
997997
}
998998

999999
@Override
1000-
protected KafkaQuorumCheck quorumCheck(Admin ac, long controllerQuorumFetchTimeoutMs) {
1000+
protected KafkaQuorumCheck quorumCheck(Admin ac, NodeRef ref) {
10011001
Admin admin = mock(Admin.class);
10021002
DescribeMetadataQuorumResult qrmResult = mock(DescribeMetadataQuorumResult.class);
10031003
when(admin.describeMetadataQuorum()).thenReturn(qrmResult);

cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/events/KubernetesRestartEventsMockTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,10 +588,20 @@ public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTru
588588
return adminClientSupplier.get();
589589
}
590590

591+
@Override
592+
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity) {
593+
return adminClientSupplier.get();
594+
}
595+
591596
@Override
592597
public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
593598
return adminClientSupplier.get();
594599
}
600+
601+
@Override
602+
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
603+
return adminClientSupplier.get();
604+
}
595605
};
596606

597607
return new ResourceOperatorSupplier(vertx,

operator-common/src/main/java/io/strimzi/operator/common/AdminClientProvider.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
public interface AdminClientProvider {
1717

1818
/**
19-
* Create a Kafka Admin interface instance
19+
* Create a Kafka Admin interface instance for brokers
2020
*
2121
* @param bootstrapHostnames Kafka hostname to connect to for administration operations
2222
* @param kafkaCaTrustSet Trust set for connecting to Kafka
@@ -26,7 +26,17 @@ public interface AdminClientProvider {
2626
Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity);
2727

2828
/**
29-
* Create a Kafka Admin interface instance
29+
* Create a Kafka Admin interface instance for controllers
30+
*
31+
* @param controllerBootstrapHostnames Kafka controller hostname to connect to for administration operations
32+
* @param kafkaCaTrustSet Trust set for connecting to Kafka
33+
* @param authIdentity Identity for TLS client authentication for connecting to Kafka
34+
* @return Instance of Kafka Admin interface
35+
*/
36+
Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity);
37+
38+
/**
39+
* Create a Kafka Admin interface instance for brokers
3040
*
3141
* @param bootstrapHostnames Kafka hostname to connect to for administration operations
3242
* @param kafkaCaTrustSet Trust set for connecting to Kafka
@@ -36,4 +46,16 @@ public interface AdminClientProvider {
3646
* @return Instance of Kafka Admin interface
3747
*/
3848
Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config);
49+
50+
/**
51+
* Create a Kafka Admin interface instance for controllers
52+
*
53+
* @param controllerBootstrapHostnames Kafka hostname to connect to for administration operations
54+
* @param kafkaCaTrustSet Trust set for connecting to Kafka
55+
* @param authIdentity Identity for TLS client authentication for connecting to Kafka
56+
* @param config Additional configuration for the Kafka Admin Client
57+
*
58+
* @return Instance of Kafka Admin interface
59+
*/
60+
Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config);
3961
}

operator-common/src/main/java/io/strimzi/operator/common/DefaultAdminClientProvider.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTru
2121
return createAdminClient(bootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties());
2222
}
2323

24+
@Override
25+
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity) {
26+
return createControllerAdminClient(controllerBootstrapHostnames, kafkaCaTrustSet, authIdentity, new Properties());
27+
}
28+
2429
/**
2530
* Create a Kafka Admin interface instance handling the following different scenarios:
2631
*
@@ -44,26 +49,30 @@ public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTru
4449
*/
4550
@Override
4651
public Admin createAdminClient(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
47-
return Admin.create(adminClientConfiguration(bootstrapHostnames, kafkaCaTrustSet, authIdentity, config));
52+
config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapHostnames);
53+
return Admin.create(adminClientConfiguration(kafkaCaTrustSet, authIdentity, config));
54+
}
55+
56+
@Override
57+
public Admin createControllerAdminClient(String controllerBootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
58+
config.setProperty(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, controllerBootstrapHostnames);
59+
return Admin.create(adminClientConfiguration(kafkaCaTrustSet, authIdentity, config));
4860
}
4961

5062
/**
5163
* Utility method for preparing the Admin client configuration
5264
*
53-
* @param bootstrapHostnames Kafka bootstrap address
5465
* @param kafkaCaTrustSet Trust set for connecting to Kafka
5566
* @param authIdentity Identity for TLS client authentication for connecting to Kafka
5667
* @param config Custom Admin client configuration or empty properties instance
5768
*
5869
* @return Admin client configuration
5970
*/
60-
/* test */ static Properties adminClientConfiguration(String bootstrapHostnames, PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
71+
/* test */ Properties adminClientConfiguration(PemTrustSet kafkaCaTrustSet, PemAuthIdentity authIdentity, Properties config) {
6172
if (config == null) {
6273
throw new InvalidConfigurationException("The config parameter should not be null");
6374
}
6475

65-
config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapHostnames);
66-
6776
// configuring TLS encryption if requested
6877
if (kafkaCaTrustSet != null) {
6978
config.putIfAbsent(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SSL");

0 commit comments

Comments
 (0)