Skip to content

Commit ef60183

Browse files
fvalerischolzj
authored andcommitted
Fix the UTO upgrade issue reported in #9470. (#9474)
Signed-off-by: Federico Valeri <[email protected]>
1 parent ee9b1ce commit ef60183

File tree

2 files changed

+27
-2
lines changed

2 files changed

+27
-2
lines changed

topic-operator/src/main/java/io/strimzi/operator/topic/v2/BatchingTopicController.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.kafka.common.config.ConfigResource;
4040
import org.apache.kafka.common.errors.ApiException;
4141
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
42+
import org.apache.kafka.common.errors.TopicExistsException;
4243
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
4344

4445
import java.io.InterruptedIOException;
@@ -283,7 +284,12 @@ private PartitionedByError<ReconcilableTopic, Void> createTopics(List<Reconcilab
283284
values.get(reconcilableTopic.topicName()).get();
284285
return pair(reconcilableTopic, Either.ofRight((null)));
285286
} catch (ExecutionException e) {
286-
return pair(reconcilableTopic, Either.ofLeft(handleAdminException(e)));
287+
if (e.getCause() != null && e.getCause() instanceof TopicExistsException) {
288+
// we treat this as a success, the next reconciliation checks the configuration
289+
return pair(reconcilableTopic, Either.ofRight((null)));
290+
} else {
291+
return pair(reconcilableTopic, Either.ofLeft(handleAdminException(e)));
292+
}
287293
} catch (InterruptedException e) {
288294
throw new UncheckedInterruptedException(e);
289295
}

topic-operator/src/test/java/io/strimzi/operator/topic/v2/TopicControllerIT.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.kafka.common.config.ConfigResource;
3939
import org.apache.kafka.common.config.TopicConfig;
4040
import org.apache.kafka.common.errors.TopicAuthorizationException;
41+
import org.apache.kafka.common.errors.TopicExistsException;
4142
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
4243
import org.apache.kafka.common.internals.KafkaFutureImpl;
4344
import org.apache.logging.log4j.Level;
@@ -1883,7 +1884,7 @@ public void shouldFailIfNumPartitionsDivergedWithConfigChange(@BrokerConfig(name
18831884
"Decreasing partitions not supported"));
18841885
}
18851886

1886-
private static <T> KafkaFuture<T> failedFuture(Throwable error) throws ExecutionException, InterruptedException {
1887+
private static <T> KafkaFuture<T> failedFuture(Throwable error) {
18871888
var future = new KafkaFutureImpl<T>();
18881889
future.completeExceptionally(error);
18891890
return future;
@@ -2080,4 +2081,22 @@ public void shouldNotReconcileKafkaTopicWithMissingSpec(
20802081

20812082
assertNotExistsInKafka(expectedTopicName(created));
20822083
}
2084+
2085+
@Test
2086+
public void shouldReconcileOnTopicExistsException(
2087+
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
2088+
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException {
2089+
var config = topicOperatorConfig(NAMESPACE, kafkaCluster);
2090+
var topicName = randomTopicName();
2091+
2092+
var creteTopicResult = mock(CreateTopicsResult.class);
2093+
var existsException = new TopicExistsException(format("Topic '%s' already exists.", topicName));
2094+
Mockito.doReturn(failedFuture(existsException)).when(creteTopicResult).all();
2095+
Mockito.doReturn(Map.of(topicName, failedFuture(existsException))).when(creteTopicResult).values();
2096+
operatorAdmin = new Admin[]{Mockito.spy(Admin.create(config.adminClientConfig()))};
2097+
Mockito.doReturn(creteTopicResult).when(operatorAdmin[0]).createTopics(any());
2098+
2099+
KafkaTopic kafkaTopic = createTopic(kafkaCluster, kafkaTopic(NAMESPACE, topicName, true, topicName, 2, 1));
2100+
assertTrue(readyIsTrue().test(kafkaTopic));
2101+
}
20832102
}

0 commit comments

Comments
 (0)