@@ -1182,56 +1182,9 @@ func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, bro
11821182
11831183 // Handle disk removal
11841184 if len (pvcList .Items ) > len (desiredPvcs ) {
1185- for _ , pvc := range pvcList .Items {
1186- foundInDesired := false
1187- existingMountPath := pvc .Annotations ["mountPath" ]
1188-
1189- for _ , desiredPvc := range desiredPvcs {
1190- desiredMountPath := desiredPvc .Annotations ["mountPath" ]
1191-
1192- if existingMountPath == desiredMountPath {
1193- foundInDesired = true
1194- break
1195- }
1196- }
1197-
1198- if foundInDesired {
1199- continue
1200- }
1201-
1202- mountPathToRemove := existingMountPath
1203- if brokerState , ok := r .KafkaCluster .Status .BrokersState [brokerId ]; ok {
1204- volumeStateStatus , found := brokerState .GracefulActionState .VolumeStates [mountPathToRemove ]
1205- if ! found {
1206- // If the state is not found, it means that the disk removal was done according to the disk removal succeeded branch
1207- log .Info ("Disk removal was completed, waiting for Rolling Upgrade to remove PVC" , "brokerId" , brokerId , "mountPath" , mountPathToRemove )
1208- continue
1209- }
1210-
1211- // Check the volume state
1212- ccVolumeState := volumeStateStatus .CruiseControlVolumeState
1213- switch {
1214- case ccVolumeState .IsDiskRemovalSucceeded ():
1215- if err := r .Client .Delete (ctx , & pvc ); err != nil {
1216- return errorfactory .New (errorfactory.APIFailure {}, err , "deleting resource failed" , "kind" , desiredType )
1217- }
1218- log .Info ("resource deleted" )
1219- err = k8sutil .DeleteVolumeStatus (r .Client , brokerId , mountPathToRemove , r .KafkaCluster , log )
1220- if err != nil {
1221- return errors .WrapIfWithDetails (err , "could not delete volume status for broker volume" , "brokerId" , brokerId , "mountPath" , mountPathToRemove )
1222- }
1223- case ccVolumeState .IsDiskRemoval ():
1224- log .Info ("Graceful disk removal is in progress" , "brokerId" , brokerId , "mountPath" , mountPathToRemove )
1225- waitForDiskRemovalToFinish = true
1226- case ccVolumeState .IsDiskRebalance ():
1227- log .Info ("Graceful disk rebalance is in progress, waiting to mark disk for removal" , "brokerId" , brokerId , "mountPath" , mountPathToRemove )
1228- waitForDiskRemovalToFinish = true
1229- default :
1230- brokerVolumesState [mountPathToRemove ] = v1beta1.VolumeState {CruiseControlVolumeState : v1beta1 .GracefulDiskRemovalRequired }
1231- log .Info ("Marked the volume for removal" , "brokerId" , brokerId , "mountPath" , mountPathToRemove )
1232- waitForDiskRemovalToFinish = true
1233- }
1234- }
1185+ waitForDiskRemovalToFinish , err = handleDiskRemoval (ctx , pvcList , desiredPvcs , r , brokerId , log , desiredType , brokerVolumesState )
1186+ if err != nil {
1187+ return err
12351188 }
12361189 }
12371190
@@ -1330,6 +1283,63 @@ func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, bro
13301283 return nil
13311284}
13321285
1286+ func handleDiskRemoval (ctx context.Context , pvcList * corev1.PersistentVolumeClaimList , desiredPvcs []* corev1.PersistentVolumeClaim ,
1287+ r * Reconciler , brokerId string , log logr.Logger , desiredType reflect.Type , brokerVolumesState map [string ]v1beta1.VolumeState ) (bool , error ) {
1288+ waitForDiskRemovalToFinish := false
1289+ for _ , pvc := range pvcList .Items {
1290+ foundInDesired := false
1291+ existingMountPath := pvc .Annotations ["mountPath" ]
1292+
1293+ for _ , desiredPvc := range desiredPvcs {
1294+ desiredMountPath := desiredPvc .Annotations ["mountPath" ]
1295+
1296+ if existingMountPath == desiredMountPath {
1297+ foundInDesired = true
1298+ break
1299+ }
1300+ }
1301+
1302+ if foundInDesired {
1303+ continue
1304+ }
1305+
1306+ mountPathToRemove := existingMountPath
1307+ if brokerState , ok := r .KafkaCluster .Status .BrokersState [brokerId ]; ok {
1308+ volumeStateStatus , found := brokerState .GracefulActionState .VolumeStates [mountPathToRemove ]
1309+ if ! found {
1310+ // If the state is not found, it means that the disk removal was done according to the disk removal succeeded branch
1311+ log .Info ("Disk removal was completed, waiting for Rolling Upgrade to remove PVC" , "brokerId" , brokerId , "mountPath" , mountPathToRemove )
1312+ continue
1313+ }
1314+
1315+ // Check the volume state
1316+ ccVolumeState := volumeStateStatus .CruiseControlVolumeState
1317+ switch {
1318+ case ccVolumeState .IsDiskRemovalSucceeded ():
1319+ if err := r .Client .Delete (ctx , & pvc ); err != nil {
1320+ return false , errorfactory .New (errorfactory.APIFailure {}, err , "deleting resource failed" , "kind" , desiredType )
1321+ }
1322+ log .Info ("resource deleted" )
1323+ err := k8sutil .DeleteVolumeStatus (r .Client , brokerId , mountPathToRemove , r .KafkaCluster , log )
1324+ if err != nil {
1325+ return false , errors .WrapIfWithDetails (err , "could not delete volume status for broker volume" , "brokerId" , brokerId , "mountPath" , mountPathToRemove )
1326+ }
1327+ case ccVolumeState .IsDiskRemoval ():
1328+ log .Info ("Graceful disk removal is in progress" , "brokerId" , brokerId , "mountPath" , mountPathToRemove )
1329+ waitForDiskRemovalToFinish = true
1330+ case ccVolumeState .IsDiskRebalance ():
1331+ log .Info ("Graceful disk rebalance is in progress, waiting to mark disk for removal" , "brokerId" , brokerId , "mountPath" , mountPathToRemove )
1332+ waitForDiskRemovalToFinish = true
1333+ default :
1334+ brokerVolumesState [mountPathToRemove ] = v1beta1.VolumeState {CruiseControlVolumeState : v1beta1 .GracefulDiskRemovalRequired }
1335+ log .Info ("Marked the volume for removal" , "brokerId" , brokerId , "mountPath" , mountPathToRemove )
1336+ waitForDiskRemovalToFinish = true
1337+ }
1338+ }
1339+ }
1340+ return waitForDiskRemovalToFinish , nil
1341+ }
1342+
13331343// GetBrokersWithPendingOrRunningCCTask returns list of brokers that are either waiting for CC
13341344// to start executing a broker task (add broker, remove broker, etc) or CC already running a task for it.
13351345func GetBrokersWithPendingOrRunningCCTask (kafkaCluster * v1beta1.KafkaCluster ) []int32 {
0 commit comments