@@ -27,7 +27,6 @@ import (
2727 "k8s.io/apimachinery/pkg/api/resource"
2828 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2929 "k8s.io/apimachinery/pkg/types"
30- "k8s.io/apimachinery/pkg/util/intstr"
3130 "sigs.k8s.io/controller-runtime/pkg/client"
3231
3332 "github.com/banzaicloud/koperator/api/v1beta1"
5251 }
5352)
5453
55- var allocatedNodePorts []int32
56- var safePort int32
57-
5854var _ = Describe ("KafkaClusterNodeportExternalAccess" , Ordered , Serial , func () {
5955 var (
6056 count uint64 = 0
@@ -89,7 +85,7 @@ var _ = Describe("KafkaClusterNodeportExternalAccess", Ordered, Serial, func() {
8985 err = k8sClient .Create (ctx , kafkaCluster )
9086 Expect (err ).NotTo (HaveOccurred ())
9187
92- waitForClusterRunningState (ctx , kafkaCluster , namespace )
88+ waitForClusterRunningStateWithTimeout (ctx , kafkaCluster , namespace , 120 * time . Second )
9389
9490 })
9591
@@ -121,13 +117,6 @@ var _ = Describe("KafkaClusterNodeportExternalAccess", Ordered, Serial, func() {
121117
122118 When ("hostnameOverride is configured with externalStartingPort 0" , func () {
123119 BeforeEach (func () {
124- allocatedNodePorts = nil
125- // Pre-allocate ports even when using auto-assignment to prevent conflicts
126- // Allocate 3 consecutive ports for the 3 brokers to avoid race conditions
127- safePort = GetNodePort (3 )
128- for i := int32 (0 ); i < 3 ; i ++ {
129- allocatedNodePorts = append (allocatedNodePorts , safePort + i )
130- }
131120 kafkaCluster .Spec .ListenersConfig .ExternalListeners = []v1beta1.ExternalListenerConfig {
132121 {
133122 CommonListenerSpec : v1beta1.CommonListenerSpec {
@@ -204,31 +193,18 @@ var _ = Describe("KafkaClusterNodeportExternalAccess", Ordered, Serial, func() {
204193 },
205194 }))
206195 })
207- AfterEach (func () {
208- for _ , port := range allocatedNodePorts {
209- ReleaseNodePort (port )
210- }
211- allocatedNodePorts = nil
212- })
213196 })
214197
215198 When ("NodePortExternalIP is configured" , func () {
216199 BeforeEach (func () {
217- allocatedNodePorts = nil
218- safePort = GetNodePort (3 )
219- // Allocate all 3 ports for the 3 brokers to avoid conflicts
220- for i := int32 (0 ); i < 3 ; i ++ {
221- allocatedNodePorts = append (allocatedNodePorts , safePort + i )
222- }
223- // update the external listener config with a nodeport listener
224200 kafkaCluster .Spec .ListenersConfig .ExternalListeners = []v1beta1.ExternalListenerConfig {
225201 {
226202 CommonListenerSpec : v1beta1.CommonListenerSpec {
227203 Name : "test" ,
228204 ContainerPort : 9733 ,
229205 Type : "plaintext" ,
230206 },
231- ExternalStartingPort : safePort ,
207+ ExternalStartingPort : 0 ,
232208 AccessMethod : corev1 .ServiceTypeNodePort ,
233209 },
234210 }
@@ -268,13 +244,6 @@ var _ = Describe("KafkaClusterNodeportExternalAccess", Ordered, Serial, func() {
268244 }
269245 })
270246
271- AfterEach (func () {
272- for _ , port := range allocatedNodePorts {
273- ReleaseNodePort (port )
274- }
275- allocatedNodePorts = nil
276- })
277-
278247 It ("reconciles the service successfully" , func (ctx SpecContext ) {
279248 var svc corev1.Service
280249 svcName := fmt .Sprintf ("%s-0-test" , kafkaClusterCRName )
@@ -302,13 +271,8 @@ var _ = Describe("KafkaClusterNodeportExternalAccess", Ordered, Serial, func() {
302271 Expect (svc .Spec .Ports [0 ].Port ).To (BeEquivalentTo (9733 ))
303272 Expect (svc .Spec .Ports [0 ].TargetPort .IntVal ).To (BeEquivalentTo (9733 ))
304273
305- Expect (svc .Spec .Ports ).To (ConsistOf (corev1.ServicePort {
306- Name : "broker-0" ,
307- Protocol : corev1 .ProtocolTCP ,
308- Port : 9733 ,
309- TargetPort : intstr .FromInt (9733 ),
310- NodePort : safePort ,
311- }))
274+ Expect (svc .Spec .Ports ).NotTo (BeEmpty ())
275+ Expect (svc .Spec .Ports [0 ].NodePort ).To (BeNumerically (">" , 0 ))
312276
313277 // check status
314278 err := k8sClient .Get (ctx , types.NamespacedName {
@@ -360,20 +324,14 @@ var _ = Describe("KafkaClusterNodeportExternalAccess", Ordered, Serial, func() {
360324
361325 When ("hostnameOverride is configured" , func () {
362326 BeforeEach (func () {
363- allocatedNodePorts = nil
364- safePort = GetNodePort (3 )
365- // Allocate all 3 ports for the 3 brokers to avoid conflicts
366- for i := int32 (0 ); i < 3 ; i ++ {
367- allocatedNodePorts = append (allocatedNodePorts , safePort + i )
368- }
369327 kafkaCluster .Spec .ListenersConfig .ExternalListeners = []v1beta1.ExternalListenerConfig {
370328 {
371329 CommonListenerSpec : v1beta1.CommonListenerSpec {
372330 Name : "test" ,
373331 ContainerPort : 9733 ,
374332 Type : "plaintext" ,
375333 },
376- ExternalStartingPort : safePort ,
334+ ExternalStartingPort : 0 ,
377335 IngressServiceSettings : v1beta1.IngressServiceSettings {
378336 HostnameOverride : ".external.nodeport.com" ,
379337 },
@@ -389,7 +347,17 @@ var _ = Describe("KafkaClusterNodeportExternalAccess", Ordered, Serial, func() {
389347 }, kafkaCluster )
390348 Expect (err ).NotTo (HaveOccurred ())
391349
392- expectedPort := safePort
350+ // Get the actual allocated ports from Services
351+ assignedNodePortPerBroker := make (map [int32 ]int32 , len (kafkaCluster .Spec .Brokers ))
352+ for _ , broker := range kafkaCluster .Spec .Brokers {
353+ var svc corev1.Service
354+ err := k8sClient .Get (ctx , types.NamespacedName {
355+ Name : fmt .Sprintf (kafka .NodePortServiceTemplate , kafkaCluster .GetName (), broker .Id , "test" ),
356+ Namespace : kafkaCluster .GetNamespace (),
357+ }, & svc )
358+ Expect (err ).NotTo (HaveOccurred ())
359+ assignedNodePortPerBroker [broker .Id ] = svc .Spec .Ports [0 ].NodePort
360+ }
393361
394362 Expect (kafkaCluster .Status .ListenerStatuses ).To (Equal (v1beta1.ListenerStatuses {
395363 InternalListeners : map [string ]v1beta1.ListenerStatusList {
@@ -416,26 +384,21 @@ var _ = Describe("KafkaClusterNodeportExternalAccess", Ordered, Serial, func() {
416384 "test" : {
417385 {
418386 Name : "broker-0" ,
419- Address : fmt .Sprintf ("%s-0-test.kafka-nodeport-%d.external.nodeport.com:%d" , kafkaCluster .Name , count , expectedPort ),
387+ Address : fmt .Sprintf ("%s-0-test.kafka-nodeport-%d.external.nodeport.com:%d" , kafkaCluster .Name , count , assignedNodePortPerBroker [ 0 ] ),
420388 },
421389 {
422390 Name : "broker-1" ,
423- Address : fmt .Sprintf ("%s-1-test.kafka-nodeport-%d.external.nodeport.com:%d" , kafkaCluster .Name , count , expectedPort + 1 ),
391+ Address : fmt .Sprintf ("%s-1-test.kafka-nodeport-%d.external.nodeport.com:%d" , kafkaCluster .Name , count , assignedNodePortPerBroker [ 1 ] ),
424392 },
425393 {
426394 Name : "broker-2" ,
427- Address : fmt .Sprintf ("%s-2-test.kafka-nodeport-%d.external.nodeport.com:%d" , kafkaCluster .Name , count , expectedPort + 2 ),
395+ Address : fmt .Sprintf ("%s-2-test.kafka-nodeport-%d.external.nodeport.com:%d" , kafkaCluster .Name , count , assignedNodePortPerBroker [ 2 ] ),
428396 },
429397 },
430398 },
431399 }))
432400 })
433- AfterEach (func () {
434- for _ , port := range allocatedNodePorts {
435- ReleaseNodePort (port )
436- }
437- allocatedNodePorts = nil
438- })
401+
439402 })
440403})
441404
0 commit comments