|
8 | 8 | "io" |
9 | 9 | "net/http" |
10 | 10 | neturl "net/url" |
| 11 | + "slices" |
11 | 12 | "strconv" |
12 | 13 | "strings" |
13 | 14 | "sync" |
@@ -335,22 +336,34 @@ func (s *metricsAPIScaler) getEndpointsUrlsFromServiceURL(ctx context.Context, s |
335 | 336 | if err != nil { |
336 | 337 | return nil, err |
337 | 338 | } |
| 339 | + var uniqueAddresses []string |
338 | 340 | for _, endpointSlice := range serviceEndpointsSlices.Items { |
339 | 341 | for _, eps := range endpointSlice.Endpoints { |
340 | | - foundPort := "" |
341 | | - for _, port := range endpointSlice.Ports { |
342 | | - if port.Port != nil && strconv.Itoa(int(*port.Port)) == podPort { |
343 | | - foundPort = fmt.Sprintf(":%d", *port.Port) |
344 | | - break |
345 | | - } |
346 | | - } |
347 | | - if foundPort == "" { |
348 | | - s.logger.V(1).Info(fmt.Sprintf("Warning : could not find port %s in endpoint slice for service %s.%s definition. Will infer port from %s scheme", podPort, serviceName, namespace, url.Scheme)) |
| 342 | + // as suggested in https://github.com/kedacore/keda/pull/6565#discussion_r2395073047, make sure we take endpoint into account |
| 343 | + // only when it's ready |
| 344 | + if eps.Conditions.Ready == nil || !*eps.Conditions.Ready { |
| 345 | + continue |
349 | 346 | } |
350 | 347 | for _, address := range eps.Addresses { |
351 | | - if eps.NodeName != nil { |
352 | | - endpointUrls = append(endpointUrls, fmt.Sprintf("%s://%s%s%s", url.Scheme, address, foundPort, url.Path)) |
| 348 | + // deduplicate addresses as suggested in https://github.com/kedacore/keda/pull/6565#discussion_r2395073047 |
| 349 | + // because it's not guaranteed by Kubernetes that an endpoint IP address will have a unique representation |
| 350 | + // see https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#duplicate-endpoints |
| 351 | + if slices.Contains(uniqueAddresses, address) { |
| 352 | + continue |
| 353 | + } |
| 354 | + uniqueAddresses = append(uniqueAddresses, address) |
| 355 | + |
| 356 | + foundPort := "" |
| 357 | + for _, port := range endpointSlice.Ports { |
| 358 | + if port.Port != nil && strconv.Itoa(int(*port.Port)) == podPort { |
| 359 | + foundPort = fmt.Sprintf(":%d", *port.Port) |
| 360 | + break |
| 361 | + } |
| 362 | + } |
| 363 | + if foundPort == "" { |
| 364 | + s.logger.V(1).Info(fmt.Sprintf("Warning : could not find port %s in endpoint slice for service %s.%s definition. Will infer port from %s scheme", podPort, serviceName, namespace, url.Scheme)) |
353 | 365 | } |
| 366 | + endpointUrls = append(endpointUrls, fmt.Sprintf("%s://%s%s%s", url.Scheme, address, foundPort, url.Path)) |
354 | 367 | } |
355 | 368 | } |
356 | 369 | } |
|
0 commit comments