Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.

Commit f531cce

Browse files
committed
Add a more sopisticated broker deleted check
1 parent b19ad62 commit f531cce

File tree

1 file changed

+30
-26
lines changed

1 file changed

+30
-26
lines changed

pkg/resources/kafka/kafka.go

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
177177
}
178178
deletedBrokers = append(deletedBrokers, pod)
179179
}
180-
//TODO depend on CC load check instead of kubernetes deletion timestamp
180+
181181
if !arePodsAlreadyDeleted(deletedBrokers, log) {
182182
if r.KafkaCluster.Status.BrokersState[generateBrokerIdsFromPodSlice(deletedBrokers)[0]].GracefulActionState.CruiseControlState != v1beta1.GracefulUpdateRunning &&
183183
r.KafkaCluster.Status.BrokersState[generateBrokerIdsFromPodSlice(deletedBrokers)[0]].GracefulActionState.CruiseControlState != v1beta1.GracefulDownscaleSucceeded {
@@ -201,37 +201,41 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
201201
return err
202202
}
203203
}
204+
}
204205

205-
for _, broker := range deletedBrokers {
206-
err = r.Client.Delete(context.TODO(), &broker)
207-
if err != nil {
208-
return errors.WrapIfWithDetails(err, "could not delete broker", "id", broker.Labels["brokerId"])
209-
}
210-
err = r.Client.Delete(context.TODO(), &corev1.ConfigMap{ObjectMeta: templates.ObjectMeta(fmt.Sprintf(brokerConfigTemplate+"-%s", r.KafkaCluster.Name, broker.Labels["brokerId"]), labelsForKafka(r.KafkaCluster.Name), r.KafkaCluster)})
206+
for _, broker := range deletedBrokers {
207+
if broker.ObjectMeta.DeletionTimestamp != nil {
208+
log.Info(fmt.Sprintf("Broker %s is already on terminating state", broker.Labels["brokerId"]))
209+
continue
210+
}
211+
err = r.Client.Delete(context.TODO(), &broker)
212+
if err != nil {
213+
return errors.WrapIfWithDetails(err, "could not delete broker", "id", broker.Labels["brokerId"])
214+
}
215+
err = r.Client.Delete(context.TODO(), &corev1.ConfigMap{ObjectMeta: templates.ObjectMeta(fmt.Sprintf(brokerConfigTemplate+"-%s", r.KafkaCluster.Name, broker.Labels["brokerId"]), labelsForKafka(r.KafkaCluster.Name), r.KafkaCluster)})
216+
if err != nil {
217+
return errors.WrapIfWithDetails(err, "could not delete configmap for broker", "id", broker.Labels["brokerId"])
218+
}
219+
if !r.KafkaCluster.Spec.HeadlessServiceEnabled {
220+
err = r.Client.Delete(context.TODO(), &corev1.Service{ObjectMeta: templates.ObjectMeta(fmt.Sprintf("%s-%s", r.KafkaCluster.Name, broker.Labels["brokerId"]), labelsForKafka(r.KafkaCluster.Name), r.KafkaCluster)})
211221
if err != nil {
212-
return errors.WrapIfWithDetails(err, "could not delete configmap for broker", "id", broker.Labels["brokerId"])
222+
return errors.WrapIfWithDetails(err, "could not delete service for broker", "id", broker.Labels["brokerId"])
213223
}
214-
if !r.KafkaCluster.Spec.HeadlessServiceEnabled {
215-
err = r.Client.Delete(context.TODO(), &corev1.Service{ObjectMeta: templates.ObjectMeta(fmt.Sprintf("%s-%s", r.KafkaCluster.Name, broker.Labels["brokerId"]), labelsForKafka(r.KafkaCluster.Name), r.KafkaCluster)})
224+
}
225+
for _, volume := range broker.Spec.Volumes {
226+
if strings.HasPrefix(volume.Name, kafkaDataVolumeMount) {
227+
err = r.Client.Delete(context.TODO(), &corev1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{
228+
Name: volume.PersistentVolumeClaim.ClaimName,
229+
Namespace: r.KafkaCluster.Namespace,
230+
}})
216231
if err != nil {
217-
return errors.WrapIfWithDetails(err, "could not delete service for broker", "id", broker.Labels["brokerId"])
218-
}
219-
}
220-
for _, volume := range broker.Spec.Volumes {
221-
if strings.HasPrefix(volume.Name, kafkaDataVolumeMount) {
222-
err = r.Client.Delete(context.TODO(), &corev1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{
223-
Name: volume.PersistentVolumeClaim.ClaimName,
224-
Namespace: r.KafkaCluster.Namespace,
225-
}})
226-
if err != nil {
227-
return errors.WrapIfWithDetails(err, "could not delete pvc for broker", "id", broker.Labels["brokerId"])
228-
}
232+
return errors.WrapIfWithDetails(err, "could not delete pvc for broker", "id", broker.Labels["brokerId"])
229233
}
230234
}
231-
err = k8sutil.DeleteStatus(r.Client, broker.Labels["brokerId"], r.KafkaCluster, log)
232-
if err != nil {
233-
return errors.WrapIfWithDetails(err, "could not delete status for broker", "id", broker.Labels["brokerId"])
234-
}
235+
}
236+
err = k8sutil.DeleteStatus(r.Client, broker.Labels["brokerId"], r.KafkaCluster, log)
237+
if err != nil {
238+
return errors.WrapIfWithDetails(err, "could not delete status for broker", "id", broker.Labels["brokerId"])
235239
}
236240
}
237241
}

0 commit comments

Comments
 (0)