Skip to content

Commit 73a458e

Browse files
authored
PVC safeguards for KRaft controllers (#109)
1 parent 3c1f922 commit 73a458e

File tree

2 files changed

+208
-60
lines changed

2 files changed

+208
-60
lines changed

pkg/resources/kafka/kafka.go

Lines changed: 89 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,58 +1164,27 @@ func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, bro
11641164
return errorfactory.New(errorfactory.APIFailure{}, err, "getting resource failed", "kind", desiredType)
11651165
}
11661166

1167-
// Handle disk removal
1168-
if len(pvcList.Items) > len(desiredPvcs) {
1169-
for _, pvc := range pvcList.Items {
1170-
foundInDesired := false
1171-
existingMountPath := pvc.Annotations["mountPath"]
1172-
1173-
for _, desiredPvc := range desiredPvcs {
1174-
desiredMountPath := desiredPvc.Annotations["mountPath"]
1175-
1176-
if existingMountPath == desiredMountPath {
1177-
foundInDesired = true
1178-
break
1179-
}
1180-
}
1181-
1182-
if foundInDesired {
1183-
continue
1184-
}
1167+
isController, err := r.isController(util.ConvertStringToInt32(brokerId))
1168+
if err != nil {
1169+
return errors.WrapIfWithDetails(err, "could not determine if broker is controller", "brokerId", brokerId)
1170+
}
11851171

1186-
mountPathToRemove := existingMountPath
1187-
if brokerState, ok := r.KafkaCluster.Status.BrokersState[brokerId]; ok {
1188-
volumeStateStatus, found := brokerState.GracefulActionState.VolumeStates[mountPathToRemove]
1189-
if !found {
1190-
// If the state is not found, it means that the disk removal was done according to the disk removal succeeded branch
1191-
log.Info("Disk removal was completed, waiting for Rolling Upgrade to remove PVC", "brokerId", brokerId, "mountPath", mountPathToRemove)
1192-
continue
1193-
}
1172+
if isController {
1173+
if len(desiredPvcs) != 1 {
1174+
return errors.New("controller broker can have only one volume")
1175+
}
1176+
// changing the controller's mount path leads to a new disk being added and the controller in an unrecoverable state
1177+
// due to the new disk not being initialized for kafka. we want to avoid this situation.
1178+
if len(pvcList.Items) == 1 && pvcList.Items[0].Annotations["mountPath"] != desiredPvcs[0].Annotations["mountPath"] {
1179+
return errors.New("controller broker volume mount path cannot be changed")
1180+
}
1181+
}
11941182

1195-
// Check the volume state
1196-
ccVolumeState := volumeStateStatus.CruiseControlVolumeState
1197-
switch {
1198-
case ccVolumeState.IsDiskRemovalSucceeded():
1199-
if err := r.Client.Delete(ctx, &pvc); err != nil {
1200-
return errorfactory.New(errorfactory.APIFailure{}, err, "deleting resource failed", "kind", desiredType)
1201-
}
1202-
log.Info("resource deleted")
1203-
err = k8sutil.DeleteVolumeStatus(r.Client, brokerId, mountPathToRemove, r.KafkaCluster, log)
1204-
if err != nil {
1205-
return errors.WrapIfWithDetails(err, "could not delete volume status for broker volume", "brokerId", brokerId, "mountPath", mountPathToRemove)
1206-
}
1207-
case ccVolumeState.IsDiskRemoval():
1208-
log.Info("Graceful disk removal is in progress", "brokerId", brokerId, "mountPath", mountPathToRemove)
1209-
waitForDiskRemovalToFinish = true
1210-
case ccVolumeState.IsDiskRebalance():
1211-
log.Info("Graceful disk rebalance is in progress, waiting to mark disk for removal", "brokerId", brokerId, "mountPath", mountPathToRemove)
1212-
waitForDiskRemovalToFinish = true
1213-
default:
1214-
brokerVolumesState[mountPathToRemove] = v1beta1.VolumeState{CruiseControlVolumeState: v1beta1.GracefulDiskRemovalRequired}
1215-
log.Info("Marked the volume for removal", "brokerId", brokerId, "mountPath", mountPathToRemove)
1216-
waitForDiskRemovalToFinish = true
1217-
}
1218-
}
1183+
// Handle disk removal
1184+
if len(pvcList.Items) > len(desiredPvcs) {
1185+
waitForDiskRemovalToFinish, err = handleDiskRemoval(ctx, pvcList, desiredPvcs, r, brokerId, log, desiredType, brokerVolumesState)
1186+
if err != nil {
1187+
return err
12191188
}
12201189
}
12211190

@@ -1314,6 +1283,63 @@ func (r *Reconciler) reconcileKafkaPvc(ctx context.Context, log logr.Logger, bro
13141283
return nil
13151284
}
13161285

1286+
func handleDiskRemoval(ctx context.Context, pvcList *corev1.PersistentVolumeClaimList, desiredPvcs []*corev1.PersistentVolumeClaim,
1287+
r *Reconciler, brokerId string, log logr.Logger, desiredType reflect.Type, brokerVolumesState map[string]v1beta1.VolumeState) (bool, error) {
1288+
waitForDiskRemovalToFinish := false
1289+
for _, pvc := range pvcList.Items {
1290+
foundInDesired := false
1291+
existingMountPath := pvc.Annotations["mountPath"]
1292+
1293+
for _, desiredPvc := range desiredPvcs {
1294+
desiredMountPath := desiredPvc.Annotations["mountPath"]
1295+
1296+
if existingMountPath == desiredMountPath {
1297+
foundInDesired = true
1298+
break
1299+
}
1300+
}
1301+
1302+
if foundInDesired {
1303+
continue
1304+
}
1305+
1306+
mountPathToRemove := existingMountPath
1307+
if brokerState, ok := r.KafkaCluster.Status.BrokersState[brokerId]; ok {
1308+
volumeStateStatus, found := brokerState.GracefulActionState.VolumeStates[mountPathToRemove]
1309+
if !found {
1310+
// If the state is not found, it means that the disk removal was done according to the disk removal succeeded branch
1311+
log.Info("Disk removal was completed, waiting for Rolling Upgrade to remove PVC", "brokerId", brokerId, "mountPath", mountPathToRemove)
1312+
continue
1313+
}
1314+
1315+
// Check the volume state
1316+
ccVolumeState := volumeStateStatus.CruiseControlVolumeState
1317+
switch {
1318+
case ccVolumeState.IsDiskRemovalSucceeded():
1319+
if err := r.Client.Delete(ctx, &pvc); err != nil {
1320+
return false, errorfactory.New(errorfactory.APIFailure{}, err, "deleting resource failed", "kind", desiredType)
1321+
}
1322+
log.Info("resource deleted")
1323+
err := k8sutil.DeleteVolumeStatus(r.Client, brokerId, mountPathToRemove, r.KafkaCluster, log)
1324+
if err != nil {
1325+
return false, errors.WrapIfWithDetails(err, "could not delete volume status for broker volume", "brokerId", brokerId, "mountPath", mountPathToRemove)
1326+
}
1327+
case ccVolumeState.IsDiskRemoval():
1328+
log.Info("Graceful disk removal is in progress", "brokerId", brokerId, "mountPath", mountPathToRemove)
1329+
waitForDiskRemovalToFinish = true
1330+
case ccVolumeState.IsDiskRebalance():
1331+
log.Info("Graceful disk rebalance is in progress, waiting to mark disk for removal", "brokerId", brokerId, "mountPath", mountPathToRemove)
1332+
waitForDiskRemovalToFinish = true
1333+
default:
1334+
brokerVolumesState[mountPathToRemove] = v1beta1.VolumeState{CruiseControlVolumeState: v1beta1.GracefulDiskRemovalRequired}
1335+
log.Info("Marked the volume for removal", "brokerId", brokerId, "mountPath", mountPathToRemove)
1336+
waitForDiskRemovalToFinish = true
1337+
}
1338+
}
1339+
}
1340+
return waitForDiskRemovalToFinish, nil
1341+
}
1342+
13171343
// GetBrokersWithPendingOrRunningCCTask returns list of brokers that are either waiting for CC
13181344
// to start executing a broker task (add broker, remove broker, etc) or CC already running a task for it.
13191345
func GetBrokersWithPendingOrRunningCCTask(kafkaCluster *v1beta1.KafkaCluster) []int32 {
@@ -1732,3 +1758,16 @@ func generateServicePortForAdditionalPorts(containerPorts []corev1.ContainerPort
17321758
}
17331759
return usedPorts
17341760
}
1761+
1762+
func (r *Reconciler) isController(brokerId int32) (bool, error) {
1763+
for _, broker := range r.KafkaCluster.Spec.Brokers {
1764+
if broker.Id == brokerId {
1765+
brokerConfig, err := broker.GetBrokerConfig(r.KafkaCluster.Spec)
1766+
if err != nil {
1767+
return false, errors.WrapIf(err, "failed to reconcile resource")
1768+
}
1769+
return brokerConfig.IsControllerNode(), nil
1770+
}
1771+
}
1772+
return false, errors.NewWithDetails("could not find broker in the spec", "brokerId", brokerId)
1773+
}

pkg/resources/kafka/kafka_test.go

Lines changed: 119 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,19 @@ import (
4242
"github.com/banzaicloud/koperator/pkg/scale"
4343
)
4444

45+
type PvcTestCase struct {
46+
testName string
47+
brokersDesiredPvcs map[string][]*corev1.PersistentVolumeClaim
48+
existingPvcs []*corev1.PersistentVolumeClaim
49+
kafkaClusterSpec v1beta1.KafkaClusterSpec
50+
kafkaClusterStatus v1beta1.KafkaClusterStatus
51+
expectedError bool
52+
expectedErrorMsg string
53+
expectedDeletePvc bool
54+
expectedCreatePvc bool
55+
expectedVolumeState map[string]v1beta1.CruiseControlVolumeState
56+
}
57+
4558
func TestGetBrokersWithPendingOrRunningCCTask(t *testing.T) {
4659
testCases := []struct {
4760
testName string
@@ -959,16 +972,7 @@ func TestGetServerPasswordKeysAndUsers(t *testing.T) { //nolint funlen
959972

960973
func TestReconcileKafkaPvcDiskRemoval(t *testing.T) {
961974
t.Parallel()
962-
testCases := []struct {
963-
testName string
964-
brokersDesiredPvcs map[string][]*corev1.PersistentVolumeClaim
965-
existingPvcs []*corev1.PersistentVolumeClaim
966-
kafkaClusterSpec v1beta1.KafkaClusterSpec
967-
kafkaClusterStatus v1beta1.KafkaClusterStatus
968-
expectedError bool
969-
expectedDeletePvc bool
970-
expectedVolumeState map[string]v1beta1.CruiseControlVolumeState
971-
}{
975+
testCases := []PvcTestCase{
972976
{
973977
testName: "If no disk removed, do nothing",
974978
brokersDesiredPvcs: map[string][]*corev1.PersistentVolumeClaim{
@@ -1137,9 +1141,105 @@ func TestReconcileKafkaPvcDiskRemoval(t *testing.T) {
11371141
},
11381142
}
11391143

1144+
execPvcTest(t, testCases)
1145+
}
1146+
1147+
func TestReconcileKafkaPvcDisk(t *testing.T) {
1148+
t.Parallel()
1149+
testCases := []PvcTestCase{
1150+
{
1151+
testName: "broker with multiple disks is allowed",
1152+
brokersDesiredPvcs: map[string][]*corev1.PersistentVolumeClaim{
1153+
"0": {
1154+
createPvc("test-pvc-1", "0", "/path/to/mount1"),
1155+
createPvc("test-pvc-2", "0", "/path/to/mount2"),
1156+
},
1157+
},
1158+
existingPvcs: []*corev1.PersistentVolumeClaim{
1159+
createPvc("test-pvc-1", "0", "/path/to/mount1"),
1160+
},
1161+
kafkaClusterSpec: v1beta1.KafkaClusterSpec{
1162+
Brokers: []v1beta1.Broker{
1163+
{
1164+
Id: int32(0),
1165+
BrokerConfig: &v1beta1.BrokerConfig{
1166+
Roles: []string{"broker"},
1167+
},
1168+
},
1169+
},
1170+
},
1171+
expectedCreatePvc: true,
1172+
expectedError: false,
1173+
},
1174+
{
1175+
testName: "when a controller has more than 1 disk, it should fail",
1176+
brokersDesiredPvcs: map[string][]*corev1.PersistentVolumeClaim{
1177+
"0": {
1178+
createPvc("test-pvc-1", "0", "/path/to/mount1"),
1179+
createPvc("test-pvc-2", "0", "/path/to/mount2"),
1180+
},
1181+
},
1182+
existingPvcs: []*corev1.PersistentVolumeClaim{
1183+
createPvc("test-pvc-1", "0", "/path/to/mount1"),
1184+
},
1185+
kafkaClusterSpec: v1beta1.KafkaClusterSpec{
1186+
Brokers: []v1beta1.Broker{
1187+
{
1188+
Id: int32(0),
1189+
BrokerConfig: &v1beta1.BrokerConfig{
1190+
Roles: []string{"controller"},
1191+
},
1192+
},
1193+
},
1194+
},
1195+
expectedError: true,
1196+
expectedErrorMsg: "controller broker can have only one volume",
1197+
},
1198+
{
1199+
testName: "when a controller changes a mount path, it should fail",
1200+
brokersDesiredPvcs: map[string][]*corev1.PersistentVolumeClaim{
1201+
"0": {
1202+
createPvc("test-pvc-1", "0", "/path/to/mount1"),
1203+
},
1204+
},
1205+
existingPvcs: []*corev1.PersistentVolumeClaim{
1206+
createPvc("test-pvc-1", "0", "/path/to/mount2"),
1207+
},
1208+
kafkaClusterSpec: v1beta1.KafkaClusterSpec{
1209+
Brokers: []v1beta1.Broker{
1210+
{
1211+
Id: int32(0),
1212+
BrokerConfig: &v1beta1.BrokerConfig{
1213+
Roles: []string{"controller"},
1214+
},
1215+
},
1216+
},
1217+
},
1218+
expectedError: true,
1219+
expectedErrorMsg: "controller broker volume mount path cannot be changed",
1220+
},
1221+
}
1222+
execPvcTest(t, testCases)
1223+
}
1224+
1225+
func execPvcTest(t *testing.T, testCases []PvcTestCase) {
1226+
kafkaClusterSpec := v1beta1.KafkaClusterSpec{
1227+
Brokers: []v1beta1.Broker{
1228+
{
1229+
Id: int32(0),
1230+
BrokerConfig: &v1beta1.BrokerConfig{
1231+
Roles: []string{"broker"},
1232+
},
1233+
},
1234+
},
1235+
}
11401236
mockCtrl := gomock.NewController(t)
11411237

11421238
for _, test := range testCases {
1239+
if test.kafkaClusterSpec.Brokers != nil {
1240+
kafkaClusterSpec = test.kafkaClusterSpec
1241+
}
1242+
11431243
mockClient := mocks.NewMockClient(mockCtrl)
11441244
mockSubResourceClient := mocks.NewMockSubResourceClient(mockCtrl)
11451245
t.Run(test.testName, func(t *testing.T) {
@@ -1151,6 +1251,7 @@ func TestReconcileKafkaPvcDiskRemoval(t *testing.T) {
11511251
Name: "kafka",
11521252
Namespace: "kafka",
11531253
},
1254+
Spec: kafkaClusterSpec,
11541255
},
11551256
},
11561257
}
@@ -1176,6 +1277,10 @@ func TestReconcileKafkaPvcDiskRemoval(t *testing.T) {
11761277
mockClient.EXPECT().Delete(context.TODO(), gomock.AssignableToTypeOf(&corev1.PersistentVolumeClaim{})).Return(nil)
11771278
}
11781279

1280+
// Mock the client.Create call
1281+
if test.expectedCreatePvc {
1282+
mockClient.EXPECT().Create(context.TODO(), gomock.AssignableToTypeOf(&corev1.PersistentVolumeClaim{})).Return(nil)
1283+
}
11791284
// Mock the status update call
11801285
mockClient.EXPECT().Status().Return(mockSubResourceClient).AnyTimes()
11811286
mockSubResourceClient.EXPECT().Update(context.Background(), gomock.AssignableToTypeOf(&v1beta1.KafkaCluster{})).Do(func(ctx context.Context, kafkaCluster *v1beta1.KafkaCluster, opts ...client.SubResourceUpdateOption) {
@@ -1191,6 +1296,10 @@ func TestReconcileKafkaPvcDiskRemoval(t *testing.T) {
11911296
// Test that the expected error is returned
11921297
if test.expectedError {
11931298
assert.NotNil(t, err, "Expected an error but got nil")
1299+
1300+
if test.expectedErrorMsg != "" {
1301+
assert.Contains(t, err.Error(), test.expectedErrorMsg, "Error message does not contain expected text.\nActual: %s\nExpected: %s", err.Error(), test.expectedErrorMsg)
1302+
}
11941303
} else {
11951304
assert.Nil(t, err, "Expected no error but got an error")
11961305
}

0 commit comments

Comments
 (0)