@@ -16,13 +16,9 @@ package controllers
1616
1717import (
1818 "context"
19+ "errors"
1920 "reflect"
2021
21- "github.com/banzaicloud/kafka-operator/api/v1alpha1"
22- "github.com/banzaicloud/kafka-operator/api/v1beta1"
23- "github.com/banzaicloud/kafka-operator/pkg/k8sutil"
24- "github.com/banzaicloud/kafka-operator/pkg/kafkaclient"
25- "github.com/banzaicloud/kafka-operator/pkg/util"
2622 "github.com/go-logr/logr"
2723 apierrors "k8s.io/apimachinery/pkg/api/errors"
2824 "k8s.io/apimachinery/pkg/runtime"
@@ -32,6 +28,12 @@ import (
3228 "sigs.k8s.io/controller-runtime/pkg/handler"
3329 "sigs.k8s.io/controller-runtime/pkg/reconcile"
3430 "sigs.k8s.io/controller-runtime/pkg/source"
31+
32+ "github.com/banzaicloud/kafka-operator/api/v1alpha1"
33+ "github.com/banzaicloud/kafka-operator/api/v1beta1"
34+ "github.com/banzaicloud/kafka-operator/pkg/k8sutil"
35+ "github.com/banzaicloud/kafka-operator/pkg/kafkaclient"
36+ "github.com/banzaicloud/kafka-operator/pkg/util"
3537)
3638
3739var topicFinalizer = "finalizer.kafkatopics.kafka.banzaicloud.io"
@@ -129,6 +131,12 @@ func (r *KafkaTopicReconciler) Reconcile(request reconcile.Request) (reconcile.R
129131 return requeueWithError (reqLogger , "failure checking for existing topic" , err )
130132 }
131133
134+ // It may take several seconds after topic is created successfully for all the brokers
135+ // to become aware that the topic has been created
136+ if instance .Status .State == v1alpha1 .TopicStateCreated && existing == nil {
137+ return requeueWithError (reqLogger , instance .Spec .Name , errors .New ("topic is still creating" ))
138+ }
139+
132140 // we got a topic back
133141 if existing != nil {
134142 reqLogger .Info ("Topic already exists, verifying configuration" )
0 commit comments