4
4
"context"
5
5
"encoding/binary"
6
6
"encoding/json"
7
- "fmt"
8
7
"hash/fnv"
9
8
10
9
envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
@@ -230,27 +229,32 @@ func transformK8sEndpoints(ctx context.Context, inputs EndpointsInputs) func(kct
230
229
spec := kubeUpstream .Kube
231
230
kubeSvcPort , singlePortSvc := findPortForService (kctx , svcs , spec )
232
231
if kubeSvcPort == nil {
233
- logger .Debug ("findPortForService - not found. " , zap .Uint32 ("port" , spec .GetServicePort ()), zap .String ("svcName " , spec .GetServiceName ()), zap .String ("svcNamespace " , spec .GetServiceNamespace ()))
232
+ logger .Debug ("port not found for service " , zap .Uint32 ("port" , spec .GetServicePort ()), zap .String ("name " , spec .GetServiceName ()), zap .String ("namespace " , spec .GetServiceNamespace ()))
234
233
return nil
235
234
}
236
235
237
- svcNs := spec .GetServiceNamespace ()
238
- svcName := spec .GetServiceName ()
239
- // Fetch all EndpointSlices for the service
236
+ // Fetch all EndpointSlices for the upstream service
240
237
key := types.NamespacedName {
241
- Namespace : svcNs ,
242
- Name : svcName ,
238
+ Namespace : spec . GetServiceNamespace () ,
239
+ Name : spec . GetServiceName () ,
243
240
}
244
241
245
242
endpointSlices := krt .Fetch (kctx , inputs .EndpointSlices , krt .FilterIndex (inputs .EndpointSlicesByService , key ))
246
243
if len (endpointSlices ) == 0 {
247
- warnsToLog = append ( warnsToLog , fmt . Sprintf ( "EndpointSlices not found for service %v/%v" , svcNs , svcName ))
244
+ logger . Debug ( "no endpointslices found for service" , zap . String ( "name" , key . Name ), zap . String ( "namespace" , key . Namespace ))
248
245
return nil
249
246
}
250
247
251
- if len (endpointSlices ) == 0 {
252
- warnsToLog = append (warnsToLog , fmt .Sprintf ("EndpointSlices not found for service %v/%v" , svcNs , svcName ))
253
- logger .Debug ("EndpointSlices not found for service" )
248
+ // Handle potential eventually consistency of EndpointSlices for the upstream service
249
+ found := false
250
+ for _ , endpointSlice := range endpointSlices {
251
+ if port := findPortInEndpointSlice (endpointSlice , singlePortSvc , kubeSvcPort ); port != 0 {
252
+ found = true
253
+ break
254
+ }
255
+ }
256
+ if ! found {
257
+ logger .Debug ("no ports found in endpointslices for service" , zap .String ("name" , key .Name ), zap .String ("namespace" , key .Namespace ))
254
258
return nil
255
259
}
256
260
@@ -266,8 +270,9 @@ func transformK8sEndpoints(ctx context.Context, inputs EndpointsInputs) func(kct
266
270
for _ , endpointSlice := range endpointSlices {
267
271
port := findPortInEndpointSlice (endpointSlice , singlePortSvc , kubeSvcPort )
268
272
if port == 0 {
269
- warnsToLog = append (warnsToLog , fmt .Sprintf ("port %v not found for service %v/%v in EndpointSlice %v" ,
270
- spec .GetServicePort (), svcNs , svcName , endpointSlice .Name ))
273
+ logger .Debug ("no port found in endpointslice; will try next endpointslice if one exists" ,
274
+ zap .String ("name" , endpointSlice .Name ),
275
+ zap .String ("namespace" , endpointSlice .Namespace ))
271
276
continue
272
277
}
273
278
@@ -400,6 +405,11 @@ func findPortForService(kctx krt.HandlerContext, services krt.Collection[*corev1
400
405
401
406
func findPortInEndpointSlice (endpointSlice * discoveryv1.EndpointSlice , singlePortService bool , kubeServicePort * corev1.ServicePort ) uint32 {
402
407
var port uint32
408
+
409
+ if endpointSlice == nil || kubeServicePort == nil {
410
+ return port
411
+ }
412
+
403
413
for _ , p := range endpointSlice .Ports {
404
414
if p .Port == nil {
405
415
continue
0 commit comments