Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -425,10 +425,10 @@ public Set<NodeRef> nodes() {
}

/**
* Generates list of Kafka node IDs that are going to be added to the Kafka cluster as brokers.
* Generates list of Kafka nodes that are going to be added to the Kafka cluster as brokers.
* This reports all broker nodes on cluster creation as well as the newly added ones on scaling up.
*
* @return Set of Kafka node IDs which are going to be added as brokers.
* @return Set of Kafka nodes which are going to be added as brokers.
*/
public Set<NodeRef> addedNodes() {
Set<NodeRef> nodes = new LinkedHashSet<>();
Expand All @@ -441,15 +441,15 @@ public Set<NodeRef> addedNodes() {
}

/**
* Generates list of Kafka node IDs that are going to be removed from the Kafka cluster.
* Generates list of Kafka nodes that are going to be removed from the Kafka cluster.
*
* @return Set of Kafka node IDs which are going to be removed
* @return Set of Kafka nodes which are going to be removed
*/
public Set<Integer> removedNodes() {
Set<Integer> nodes = new LinkedHashSet<>();
public Set<NodeRef> removedNodes() {
Set<NodeRef> nodes = new LinkedHashSet<>();

for (KafkaPool pool : nodePools) {
nodes.addAll(pool.scaledDownNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()));
nodes.addAll(pool.scaledDownNodes());
}

return nodes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.strimzi.operator.cluster.model.KafkaPool;
import io.strimzi.operator.cluster.model.KafkaVersion;
import io.strimzi.operator.cluster.model.KafkaVersionChange;
import io.strimzi.operator.cluster.model.NodeRef;
import io.strimzi.operator.cluster.model.SharedEnvironmentProvider;
import io.strimzi.operator.cluster.model.nodepools.NodePoolUtils;
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
Expand Down Expand Up @@ -114,9 +115,11 @@ public Future<KafkaCluster> prepareKafkaCluster(
return createKafkaCluster(kafkaCr, nodePools, oldStorage, versionChange)
.compose(kafka -> brokerRemovalCheck(kafkaCr, kafka))
.compose(kafka -> {
Set<Integer> removedNodeIds = kafka.removedNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet());

if (checkFailed() && tryToFixProblems) {
// saving scaling down blocked nodes, before they are reverted back
this.scalingDownBlockedNodes.addAll(kafka.removedNodes());
this.scalingDownBlockedNodes.addAll(removedNodeIds);
// We have a failure, and should try to fix issues
// Once we fix it, we call this method again, but this time with tryToFixProblems set to false
return revertScaleDown(nodePools)
Expand All @@ -127,7 +130,7 @@ public Future<KafkaCluster> prepareKafkaCluster(
List<String> errors = new ArrayList<>();

if (scaleDownCheckFailed) {
errors.add("Cannot scale-down Kafka brokers " + kafka.removedNodes() + " because they have assigned partition-replicas.");
errors.add("Cannot scale-down Kafka brokers " + removedNodeIds + " because they have assigned partition-replicas.");
}

if (usedToBeBrokersCheckFailed) {
Expand Down Expand Up @@ -185,10 +188,12 @@ private Future<KafkaCluster> brokerRemovalCheck(Kafka kafkaCr, KafkaCluster kafk
return ReconcilerUtils.coTlsPemIdentity(reconciliation, secretOperator)
.compose(coTlsPemIdentity -> brokerScaleDownOperations.brokersInUse(reconciliation, vertx, coTlsPemIdentity, adminClientProvider))
.compose(brokersInUse -> {
Set<Integer> removedNodeIds = kafka.removedNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet());

// Check nodes that are being scaled down
Set<Integer> scaledDownBrokersInUse = kafka.removedNodes().stream().filter(brokersInUse::contains).collect(Collectors.toSet());
Set<Integer> scaledDownBrokersInUse = removedNodeIds.stream().filter(brokersInUse::contains).collect(Collectors.toSet());
if (!scaledDownBrokersInUse.isEmpty()) {
LOGGER.warnCr(reconciliation, "Cannot scale down brokers {} because {} have assigned partition-replicas", kafka.removedNodes(), scaledDownBrokersInUse);
LOGGER.warnCr(reconciliation, "Cannot scale down brokers {} because {} have assigned partition-replicas", removedNodeIds, scaledDownBrokersInUse);
scaleDownCheckFailed = true;
} else {
scaleDownCheckFailed = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void testNewClusterWithKRaft(VertxTestContext context) {
assertThat(kc, is(notNullValue()));
assertThat(kc.nodes().size(), is(9));
assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(0, 1, 2, 3, 4, 5, 6, 7, 8)));
assertThat(kc.removedNodes(), is(Set.of()));
assertThat(kc.removedNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of()));
Comment thread
scholzj marked this conversation as resolved.

// Check the status conditions
assertThat(kafkaStatus.getConditions(), is(nullValue()));
Expand All @@ -240,7 +240,7 @@ public void testNewClusterWithMixedNodesKRaft(VertxTestContext context) {
assertThat(kc, is(notNullValue()));
assertThat(kc.nodes().size(), is(3));
assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(0, 1, 2)));
assertThat(kc.removedNodes(), is(Set.of()));
assertThat(kc.removedNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of()));

// Check the status conditions
assertThat(kafkaStatus.getConditions(), is(nullValue()));
Expand All @@ -266,7 +266,7 @@ public void testExistingClusterWithKRaft(VertxTestContext context) {
assertThat(kc, is(notNullValue()));
assertThat(kc.nodes().size(), is(9));
assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002, 3000, 3001, 3002)));
assertThat(kc.removedNodes(), is(Set.of()));
assertThat(kc.removedNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of()));

// Check the status conditions
assertThat(kafkaStatus.getConditions(), is(nullValue()));
Expand All @@ -292,7 +292,7 @@ public void testExistingClusterWithMixedNodesKRaft(VertxTestContext context) {
assertThat(kc, is(notNullValue()));
assertThat(kc.nodes().size(), is(3));
assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(3000, 3001, 3002)));
assertThat(kc.removedNodes(), is(Set.of()));
assertThat(kc.removedNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of()));

// Check the status conditions
assertThat(kafkaStatus.getConditions(), is(nullValue()));
Expand Down Expand Up @@ -322,7 +322,7 @@ public void testRevertScaleDownWithKRaft(VertxTestContext context) {
assertThat(kc, is(notNullValue()));
assertThat(kc.nodes().size(), is(13));
assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 1003, 1004, 2000, 2001, 2002, 2003, 2004, 3000, 3001, 3002)));
assertThat(kc.removedNodes(), is(Set.of(3003, 3004))); // Controllers are not affected
assertThat(kc.removedNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(3003, 3004))); // Controllers are not affected

// Check the status conditions
assertThat(kafkaStatus.getConditions().size(), is(2));
Expand Down Expand Up @@ -360,7 +360,7 @@ public void testRevertScaleDownWithKRaftMixedNodes(VertxTestContext context) {
assertThat(kc, is(notNullValue()));
assertThat(kc.nodes().size(), is(5));
assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(3000, 3001, 3002, 3003, 3004)));
assertThat(kc.removedNodes(), is(Set.of()));
assertThat(kc.removedNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of()));

// Check the status conditions
assertThat(kafkaStatus.getConditions().size(), is(1));
Expand Down Expand Up @@ -394,7 +394,7 @@ public void testCorrectScaleDownWithKRaft(VertxTestContext context) {
assertThat(kc, is(notNullValue()));
assertThat(kc.nodes().size(), is(9));
assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002, 3000, 3001, 3002)));
assertThat(kc.removedNodes(), is(Set.of(1003, 1004, 2003, 2004, 3003, 3004)));
assertThat(kc.removedNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1003, 1004, 2003, 2004, 3003, 3004)));

// Check the status conditions
assertThat(kafkaStatus.getConditions(), is(nullValue()));
Expand Down Expand Up @@ -422,7 +422,7 @@ public void testThrowsRevertScaleDownFailsWithKRaft(VertxTestContext context) {
.onComplete(context.failing(ex -> context.verify(() -> {
// Check exception
assertThat(ex, instanceOf(InvalidResourceException.class));
assertThat(ex.getMessage(), is("Following errors were found when processing the Kafka custom resource: [Cannot scale-down Kafka brokers [3003, 3004, 1003, 1004, 2003, 2004] because they have assigned partition-replicas.]"));
assertThat(ex.getMessage(), is("Following errors were found when processing the Kafka custom resource: [Cannot scale-down Kafka brokers [2003, 2004, 3003, 1003, 3004, 1004] because they have assigned partition-replicas.]"));
Comment thread
ppatierno marked this conversation as resolved.
Outdated

// Check the status conditions
assertThat(kafkaStatus.getConditions(), is(nullValue()));
Expand Down Expand Up @@ -454,7 +454,7 @@ public void testSkipScaleDownCheckWithKRaft(VertxTestContext context) {
assertThat(kc, is(notNullValue()));
assertThat(kc.nodes().size(), is(9));
assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002, 3000, 3001, 3002)));
assertThat(kc.removedNodes(), is(Set.of(1003, 1004, 2003, 2004, 3003, 3004)));
assertThat(kc.removedNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1003, 1004, 2003, 2004, 3003, 3004)));

// Check the status conditions
assertThat(kafkaStatus.getConditions(), is(nullValue()));
Expand Down Expand Up @@ -486,7 +486,7 @@ public void testRevertRoleChangeWithKRaftMixedNodes(VertxTestContext context) {
assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002, 3000, 3001, 3002)));
assertThat(kc.brokerNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002, 3000, 3001, 3002)));
assertThat(kc.controllerNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(3000, 3001, 3002)));
assertThat(kc.removedNodes(), is(Set.of()));
assertThat(kc.removedNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of()));
assertThat(kc.usedToBeBrokerNodes(), is(Set.of()));

// Check the status conditions
Expand Down Expand Up @@ -529,7 +529,7 @@ public void testRevertRoleChangeWithKRaftDedicatedNodes(VertxTestContext context
assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002, 3000, 3001, 3002)));
assertThat(kc.brokerNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002, 3000, 3001, 3002)));
assertThat(kc.controllerNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(3000, 3001, 3002)));
assertThat(kc.removedNodes(), is(Set.of()));
assertThat(kc.removedNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of()));
assertThat(kc.usedToBeBrokerNodes(), is(Set.of()));

// Check the status conditions
Expand Down Expand Up @@ -564,7 +564,7 @@ public void testCorrectRoleChangeWithKRaft(VertxTestContext context) {
assertThat(kc, is(notNullValue()));
assertThat(kc.nodes().size(), is(9));
assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002, 3000, 3001, 3002)));
assertThat(kc.removedNodes(), is(Set.of()));
assertThat(kc.removedNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of()));
assertThat(kc.usedToBeBrokerNodes(), is(Set.of(3000, 3001, 3002)));

// Check the status conditions
Expand Down Expand Up @@ -625,7 +625,7 @@ public void testSkipRoleChangeCheckWithKRaft(VertxTestContext context) {
assertThat(kc, is(notNullValue()));
assertThat(kc.nodes().size(), is(9));
assertThat(kc.nodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of(1000, 1001, 1002, 2000, 2001, 2002, 3000, 3001, 3002)));
assertThat(kc.removedNodes(), is(Set.of()));
assertThat(kc.removedNodes().stream().map(NodeRef::nodeId).collect(Collectors.toSet()), is(Set.of()));
assertThat(kc.usedToBeBrokerNodes(), is(Set.of(3000, 3001, 3002)));

// Check the status conditions
Expand Down
Loading