Skip to content

Commit bfcb4a1

Browse files
committed
feat(endpoint_slice): refactor and support arbitrary providers
Signed-off-by: SkalaNetworks <contact@skala.network>
1 parent 99acf9f commit bfcb4a1

4 files changed

Lines changed: 362 additions & 57 deletions

File tree

pkg/controller/endpoint_slice.go

Lines changed: 199 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ func (c *Controller) handleUpdateEndpointSlice(key string) error {
8585
svc := cachedService.DeepCopy()
8686

8787
var (
88-
pods []*v1.Pod
8988
lbVips []string
9089
vip, vpcName, subnetName string
9190
ok bool
@@ -123,11 +122,10 @@ func (c *Controller) handleUpdateEndpointSlice(key string) error {
123122
}
124123
}
125124

126-
if pods, err = c.podsLister.Pods(namespace).List(labels.Set(svc.Spec.Selector).AsSelector()); err != nil {
127-
klog.Errorf("failed to get pods for service %s in namespace %s: %v", name, namespace, err)
125+
vpcName, subnetName, err = c.getVpcAndSubnetForEndpoints(endpointSlices, svc)
126+
if err != nil {
128127
return err
129128
}
130-
vpcName, subnetName = c.getVpcSubnetName(pods, endpointSlices, svc)
131129

132130
var (
133131
vpc *kubeovnv1.Vpc
@@ -241,53 +239,129 @@ func (c *Controller) handleUpdateEndpointSlice(key string) error {
241239
return nil
242240
}
243241

244-
func (c *Controller) getVpcSubnetName(pods []*v1.Pod, endpointSlices []*discoveryv1.EndpointSlice, service *v1.Service) (string, string) {
245-
var (
246-
vpcName string
247-
subnetName string
248-
)
242+
// serviceHasSelector returns if a service has selectors
243+
func serviceHasSelector(service *v1.Service) bool {
244+
return len(service.Spec.Selector) > 0
245+
}
249246

250-
for _, pod := range pods {
251-
if len(pod.Annotations) == 0 {
252-
continue
247+
// getVpcAndSubnetForEndpoints returns the name of the VPC/Subnet for EndpointSlices
248+
func (c *Controller) getVpcAndSubnetForEndpoints(endpointSlices []*discoveryv1.EndpointSlice, service *v1.Service) (vpcName, subnetName string, err error) {
249+
// Let the user self-determine what VPC and subnet to use if they provided annotations
250+
if service.Annotations != nil {
251+
if vpc := service.Annotations[util.LogicalRouterAnnotation]; vpc != "" {
252+
vpcName = vpc
253253
}
254-
if subnetName == "" {
255-
subnetName = pod.Annotations[util.LogicalSwitchAnnotation]
254+
if subnet := service.Annotations[util.LogicalSwitchAnnotation]; subnet != "" {
255+
subnetName = subnet
256256
}
257257

258-
LOOP:
259-
for _, endpointSlice := range endpointSlices {
260-
for _, endpoint := range endpointSlice.Endpoints {
261-
for _, addr := range endpoint.Addresses {
262-
for _, podIP := range pod.Status.PodIPs {
263-
if addr == podIP.IP {
264-
if vpcName == "" {
265-
vpcName = pod.Annotations[util.LogicalRouterAnnotation]
266-
}
267-
if vpcName != "" {
268-
break LOOP
269-
}
270-
}
271-
}
272-
}
273-
}
258+
if vpcName != "" && subnetName != "" {
259+
return vpcName, subnetName, nil
274260
}
261+
}
275262

276-
if vpcName != "" && subnetName != "" {
277-
break
263+
// Choose the most optimized and straightforward way to retrieve the name of the VPC and subnet
264+
if serviceHasSelector(service) {
265+
// The service has a selector, which means that the EndpointSlices should have targets.
266+
// We can use those targets instead of looking at every pod in the namespace.
267+
vpcName, subnetName = c.findVpcAndSubnetWithTargets(endpointSlices)
268+
} else {
269+
// The service has no selectors, we must find which pods in the namespace of the service
270+
// are targeted by the endpoint by only looking at the IPs.
271+
pods, err := c.podsLister.Pods(service.Namespace).List(labels.Everything())
272+
if err != nil {
273+
err := fmt.Errorf("failed to get pods for service %s in namespace %s: %w", service.Name, service.Namespace, err)
274+
klog.Error(err)
275+
return "", "", err
278276
}
277+
278+
vpcName, subnetName = c.findVpcAndSubnetWithNoTargets(endpointSlices, pods)
279279
}
280280

281-
if vpcName == "" {
281+
if vpcName == "" { // Default to what's on the service or to the default VPC
282282
if vpcName = service.Annotations[util.VpcAnnotation]; vpcName == "" {
283283
vpcName = c.config.ClusterRouter
284284
}
285285
}
286286

287-
if subnetName == "" {
287+
if subnetName == "" { // Use the default subnet
288288
subnetName = util.DefaultSubnet
289289
}
290290

291+
return vpcName, subnetName, nil
292+
}
293+
294+
// findVpcAndSubnetWithTargets returns the name of the VPC and Subnet for endpoints with targets
295+
func (c *Controller) findVpcAndSubnetWithTargets(endpointSlices []*discoveryv1.EndpointSlice) (vpcName, subnetName string) {
296+
for _, slice := range endpointSlices {
297+
for _, endpoint := range slice.Endpoints {
298+
if endpoint.TargetRef == nil {
299+
continue
300+
}
301+
302+
namespace, name := endpoint.TargetRef.Namespace, endpoint.TargetRef.Name
303+
if name == "" || namespace == "" {
304+
continue
305+
}
306+
307+
pod, err := c.podsLister.Pods(namespace).Get(name)
308+
if err != nil {
309+
err := fmt.Errorf("couldn't retrieve pod %s/%s: %w", namespace, name, err)
310+
klog.Error(err)
311+
continue
312+
}
313+
314+
vpc, subnet, err := c.getEndpointVpcAndSubnet(pod, endpoint.Addresses)
315+
if err != nil {
316+
err := fmt.Errorf("couldn't retrieve get subnet/vpc for pod %s/%s: %w", namespace, name, err)
317+
klog.Error(err)
318+
continue
319+
}
320+
321+
if vpcName == "" {
322+
vpcName = vpc
323+
}
324+
325+
if subnetName == "" {
326+
subnetName = subnet
327+
}
328+
329+
if vpcName != "" && subnetName != "" {
330+
return vpcName, subnetName
331+
}
332+
}
333+
}
334+
335+
return vpcName, subnetName
336+
}
337+
338+
// findVpcAndSubnetWithNoTargets returns the name of the VPC and Subnet for endpoints with no targets
339+
func (c *Controller) findVpcAndSubnetWithNoTargets(endpointSlices []*discoveryv1.EndpointSlice, pods []*v1.Pod) (vpcName, subnetName string) {
340+
for _, slice := range endpointSlices {
341+
for _, endpoint := range slice.Endpoints {
342+
for _, pod := range pods {
343+
vpc, subnet, err := c.getEndpointVpcAndSubnet(pod, endpoint.Addresses)
344+
if err != nil {
345+
err := fmt.Errorf("couldn't retrieve subnet/vpc for pod %s/%s: %w", pod.Namespace, pod.Name, err)
346+
klog.Error(err)
347+
continue
348+
}
349+
350+
if vpcName == "" {
351+
vpcName = vpc
352+
}
353+
354+
if subnetName == "" {
355+
subnetName = subnet
356+
}
357+
358+
if vpcName != "" && subnetName != "" {
359+
return vpcName, subnetName
360+
}
361+
}
362+
}
363+
}
364+
291365
return vpcName, subnetName
292366
}
293367

@@ -375,7 +449,14 @@ func (c *Controller) getIPPortMappingBackend(endpointSlices []*discoveryv1.Endpo
375449

376450
for _, endpoint := range endpointSlice.Endpoints {
377451
if isGenIPPortMapping && endpoint.TargetRef.Name != "" {
378-
lspName, err := c.getEndpointTargetLSP(endpoint.TargetRef.Name, endpoint.TargetRef.Namespace, endpoint.Addresses)
452+
pod, err := c.podsLister.Pods(endpoint.TargetRef.Namespace).Get(endpoint.TargetRef.Name)
453+
if err != nil {
454+
err := fmt.Errorf("failed to get pod %s/%s: %w", endpoint.TargetRef.Namespace, endpoint.TargetRef.Name, err)
455+
klog.Error(err)
456+
continue
457+
}
458+
459+
lspName, err := c.getEndpointTargetLSPName(pod, endpoint.Addresses)
379460
if err != nil {
380461
err := fmt.Errorf("couldn't get LSP for the endpoint's target: %w", err)
381462
klog.Error(err)
@@ -408,6 +489,23 @@ func endpointReady(endpoint discoveryv1.Endpoint) bool {
408489
return endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready
409490
}
410491

492+
// getPodProviders returns all the providers available on a pod
493+
func (c *Controller) getPodProviders(pod *v1.Pod) ([]string, error) {
494+
// Get all the networks to which the pod is attached
495+
podNetworks, err := c.getPodKubeovnNets(pod)
496+
if err != nil {
497+
return nil, fmt.Errorf("failed to get pod networks: %w", err)
498+
}
499+
500+
// Retrieve all the providers
501+
var providers []string
502+
for _, podNetwork := range podNetworks {
503+
providers = append(providers, podNetwork.ProviderName)
504+
}
505+
506+
return providers, nil
507+
}
508+
411509
// getMatchingProviderForAddress returns the provider linked to a subnet in which a particular address is present
412510
func getMatchingProviderForAddress(pod *v1.Pod, providers []string, address string) string {
413511
if pod.Annotations == nil {
@@ -430,10 +528,29 @@ func getMatchingProviderForAddress(pod *v1.Pod, providers []string, address stri
430528
return ""
431529
}
432530

433-
// getEndpointTargetLSPName returns the name of the LSP for a pod targeted by an endpoint.
531+
// getEndpointProvider returns the provider linked to the addresses of an endpoint
532+
func (c *Controller) getEndpointProvider(pod *v1.Pod, addresses []string) (string, error) {
533+
// Retrieve all the providers of the pod
534+
providers, err := c.getPodProviders(pod)
535+
if err != nil {
536+
return "", err
537+
}
538+
539+
// Get the first matching provider for any of the address in the endpoint
540+
var provider string
541+
for _, address := range addresses {
542+
if provider = getMatchingProviderForAddress(pod, providers, address); provider != "" {
543+
return provider, nil
544+
}
545+
}
546+
547+
return "", nil
548+
}
549+
550+
// getEndpointTargetLSPNameFromProvider returns the name of the LSP for a pod targeted by an endpoint.
434551
// A custom provider can be specified if the LSP is within a subnet that doesn't use
435552
// the default "ovn" provider.
436-
func getEndpointTargetLSPName(pod *v1.Pod, provider string) string {
553+
func getEndpointTargetLSPNameFromProvider(pod *v1.Pod, provider string) string {
437554
// If no provider is specified, use the default one
438555
if provider == "" {
439556
provider = util.OvnProvider
@@ -451,32 +568,59 @@ func getEndpointTargetLSPName(pod *v1.Pod, provider string) string {
451568
}
452569

453570
// getEndpointTargetLSP returns the name of the LSP on which addresses are attached for a specific pod
454-
func (c *Controller) getEndpointTargetLSP(pod, namespace string, addresses []string) (string, error) {
455-
// Retrieve the pod object from its namespace and name
456-
podObj, err := c.podsLister.Pods(namespace).Get(pod)
571+
func (c *Controller) getEndpointTargetLSPName(pod *v1.Pod, addresses []string) (string, error) {
572+
// Retrieve the provider for those addresses
573+
provider, err := c.getEndpointProvider(pod, addresses)
457574
if err != nil {
458-
return "", fmt.Errorf("failed to get pod %s/%s: %w", namespace, pod, err)
575+
return "", err
459576
}
460577

461-
// Get all the networks to which the pod is attached
462-
podNetworks, err := c.getPodKubeovnNets(podObj)
578+
return getEndpointTargetLSPNameFromProvider(pod, provider), nil
579+
}
580+
581+
// getSubnetByProvider returns the subnet linked to a provider on a pod
582+
func (c *Controller) getSubnetByProvider(pod *v1.Pod, provider string) (string, error) {
583+
subnetName, exists := pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, provider)]
584+
if !exists {
585+
return "", fmt.Errorf("couldn't find subnet linked to provider %s", provider)
586+
}
587+
588+
return subnetName, nil
589+
}
590+
591+
// getVpcByProvider returns the VPC linked to a provider on a pod
592+
func (c *Controller) getVpcByProvider(pod *v1.Pod, provider string) (string, error) {
593+
vpcName, exists := pod.Annotations[fmt.Sprintf(util.LogicalRouterAnnotationTemplate, provider)]
594+
if !exists {
595+
return "", fmt.Errorf("couldn't find vpc linked to provider %s", provider)
596+
}
597+
598+
return vpcName, nil
599+
}
600+
601+
// getEndpointVpcAndSubnet returns the VPC/subnet for a pod and a set of addresses attached to it
602+
func (c *Controller) getEndpointVpcAndSubnet(pod *v1.Pod, addresses []string) (string, string, error) {
603+
// Retrieve the provider for those addresses
604+
provider, err := c.getEndpointProvider(pod, addresses)
463605
if err != nil {
464-
return "", fmt.Errorf("failed to get pod networks: %w", err)
606+
return "", "", err
465607
}
466608

467-
// Retrieve all the providers
468-
var providers []string
469-
for _, podNetwork := range podNetworks {
470-
providers = append(providers, podNetwork.ProviderName)
609+
if provider == "" {
610+
return "", "", nil
471611
}
472612

473-
// Get the first matching provider for any of the address in the endpoint
474-
var provider string
475-
for _, address := range addresses {
476-
if provider = getMatchingProviderForAddress(podObj, providers, address); provider != "" {
477-
break
478-
}
613+
// Retrieve the subnet
614+
subnet, err := c.getSubnetByProvider(pod, provider)
615+
if err != nil {
616+
return "", "", err
617+
}
618+
619+
// Retrieve the VPC
620+
vpc, err := c.getVpcByProvider(pod, provider)
621+
if err != nil {
622+
return "", "", err
479623
}
480624

481-
return getEndpointTargetLSPName(podObj, provider), nil
625+
return vpc, subnet, nil
482626
}

0 commit comments

Comments
 (0)