@@ -20,8 +20,15 @@ import (
2020 "github.com/stretchr/testify/require"
2121 corev1 "k8s.io/api/core/v1"
2222 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23+ "k8s.io/apimachinery/pkg/util/intstr"
2324
25+ "go.uber.org/mock/gomock"
26+
27+ apiutil "github.com/banzaicloud/koperator/api/util"
2428 "github.com/banzaicloud/koperator/api/v1beta1"
29+ "github.com/banzaicloud/koperator/pkg/resources"
30+ mocks "github.com/banzaicloud/koperator/pkg/resources/kafka/mocks"
31+ "github.com/banzaicloud/koperator/pkg/util"
2532)
2633
2734func TestIsNodePortAccessMethodInUseAmongExternalListeners (t * testing.T ) {
@@ -296,3 +303,145 @@ func TestNonNodePortServices(t *testing.T) {
296303 })
297304 }
298305}
306+
307+ func TestHeadlessServiceKraft (t * testing.T ) {
308+ testCases := []struct {
309+ testName string
310+ expectedService * corev1.Service
311+ }{
312+ {
313+ testName : "Services with mixed NodePort and non-NodePort services that are evenly distributed" ,
314+ expectedService : & corev1.Service {
315+ ObjectMeta : metav1.ObjectMeta {
316+ Name : "kafka-headless" ,
317+ Namespace : "kafka" ,
318+ Labels : map [string ]string {"isBrokerNode" : "true" , "app" : "kafka" , "kafka_cr" : "kafka" },
319+ Annotations : map [string ]string {},
320+ OwnerReferences : []metav1.OwnerReference {
321+ {
322+ APIVersion : "" ,
323+ Kind : "" ,
324+ Name : "kafka" ,
325+ UID : "" ,
326+ Controller : util .BoolPointer (true ),
327+ BlockOwnerDeletion : util .BoolPointer (true ),
328+ },
329+ },
330+ },
331+ Spec : corev1.ServiceSpec {
332+ Type : corev1 .ServiceTypeClusterIP ,
333+ SessionAffinity : corev1 .ServiceAffinityNone ,
334+ Selector : apiutil .LabelsForBroker ("kafka" ),
335+ Ports : []corev1.ServicePort {
336+ {
337+ Name : "metrics" ,
338+ Protocol : "TCP" ,
339+ Port : 9020 ,
340+ TargetPort : intstr .FromInt (9020 ),
341+ NodePort : 0 ,
342+ },
343+ },
344+ ClusterIP : corev1 .ClusterIPNone ,
345+ PublishNotReadyAddresses : true ,
346+ },
347+ },
348+ },
349+ }
350+ mockCtrl := gomock .NewController (t )
351+
352+ for _ , test := range testCases {
353+ t .Run (test .testName , func (t * testing.T ) {
354+ mockClient := mocks .NewMockClient (mockCtrl )
355+ mockClient .EXPECT ().Get (gomock .Any (), gomock .Any (), gomock .Any ()).Return (nil ).AnyTimes ()
356+ r := Reconciler {
357+ Reconciler : resources.Reconciler {
358+ Client : mockClient ,
359+ KafkaCluster : & v1beta1.KafkaCluster {
360+ ObjectMeta : metav1.ObjectMeta {
361+ Name : "kafka" ,
362+ Namespace : "kafka" ,
363+ },
364+ Spec : v1beta1.KafkaClusterSpec {
365+ KRaftMode : true ,
366+ },
367+ },
368+ },
369+ }
370+
371+ actualService := r .headlessService ()
372+
373+ require .Equal (t , test .expectedService , actualService )
374+ })
375+ }
376+ }
377+
378+ func TestHeadlessControllerServiceKraft (t * testing.T ) {
379+ testCases := []struct {
380+ testName string
381+ expectedService * corev1.Service
382+ }{
383+ {
384+ testName : "Headless Controller Service for Kraft" ,
385+ expectedService : & corev1.Service {
386+ ObjectMeta : metav1.ObjectMeta {
387+ Name : "kafka-controller-headless" ,
388+ Namespace : "kafka" ,
389+ Labels : map [string ]string {"isControllerNode" : "true" , "app" : "kafka" , "kafka_cr" : "kafka" },
390+ Annotations : map [string ]string {},
391+ OwnerReferences : []metav1.OwnerReference {
392+ {
393+ APIVersion : "" ,
394+ Kind : "" ,
395+ Name : "kafka" ,
396+ UID : "" ,
397+ Controller : util .BoolPointer (true ),
398+ BlockOwnerDeletion : util .BoolPointer (true ),
399+ },
400+ },
401+ },
402+ Spec : corev1.ServiceSpec {
403+ Type : corev1 .ServiceTypeClusterIP ,
404+ SessionAffinity : corev1 .ServiceAffinityNone ,
405+ Selector : apiutil .LabelsForController ("kafka" ),
406+ Ports : []corev1.ServicePort {
407+ {
408+ Name : "metrics" ,
409+ Protocol : "TCP" ,
410+ Port : 9020 ,
411+ TargetPort : intstr .FromInt (9020 ),
412+ NodePort : 0 ,
413+ },
414+ },
415+ ClusterIP : corev1 .ClusterIPNone ,
416+ PublishNotReadyAddresses : true ,
417+ },
418+ },
419+ },
420+ }
421+ mockCtrl := gomock .NewController (t )
422+
423+ for _ , test := range testCases {
424+ t .Run (test .testName , func (t * testing.T ) {
425+ mockClient := mocks .NewMockClient (mockCtrl )
426+ mockClient .EXPECT ().Get (gomock .Any (), gomock .Any (), gomock .Any ()).Return (nil ).AnyTimes ()
427+ r := Reconciler {
428+ Reconciler : resources.Reconciler {
429+ Client : mockClient ,
430+ KafkaCluster : & v1beta1.KafkaCluster {
431+ ObjectMeta : metav1.ObjectMeta {
432+ Name : "kafka" ,
433+ Namespace : "kafka" ,
434+ },
435+ Spec : v1beta1.KafkaClusterSpec {
436+ KRaftMode : true ,
437+ },
438+ },
439+ },
440+ }
441+
442+ actualService := r .headlessControllerService ()
443+
444+ require .Equal (t , test .expectedService , actualService )
445+ })
446+ }
447+ }
0 commit comments