Skip to content

Commit fadbd84

Browse files
committed
feat(endpoint_slice): refactor and support arbitrary providers
Signed-off-by: SkalaNetworks <[email protected]>
1 parent 99acf9f commit fadbd84

4 files changed

Lines changed: 246 additions & 63 deletions

File tree

pkg/controller/endpoint_slice.go

Lines changed: 137 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ func (c *Controller) handleUpdateEndpointSlice(key string) error {
8585
svc := cachedService.DeepCopy()
8686

8787
var (
88-
pods []*v1.Pod
8988
lbVips []string
9089
vip, vpcName, subnetName string
9190
ok bool
@@ -123,11 +122,12 @@ func (c *Controller) handleUpdateEndpointSlice(key string) error {
123122
}
124123
}
125124

126-
if pods, err = c.podsLister.Pods(namespace).List(labels.Set(svc.Spec.Selector).AsSelector()); err != nil {
127-
klog.Errorf("failed to get pods for service %s in namespace %s: %v", name, namespace, err)
125+
vpcName, subnetName = c.getVpcSubnetName(endpointSlices, svc)
126+
if vpcName == "" || subnetName == "" {
127+
err := fmt.Errorf("failed to retrieve vpc/subnet in which service %s/%s is", svc.Namespace, svc.Name)
128+
klog.Error(err)
128129
return err
129130
}
130-
vpcName, subnetName = c.getVpcSubnetName(pods, endpointSlices, svc)
131131

132132
var (
133133
vpc *kubeovnv1.Vpc
@@ -241,51 +241,56 @@ func (c *Controller) handleUpdateEndpointSlice(key string) error {
241241
return nil
242242
}
243243

244-
func (c *Controller) getVpcSubnetName(pods []*v1.Pod, endpointSlices []*discoveryv1.EndpointSlice, service *v1.Service) (string, string) {
245-
var (
246-
vpcName string
247-
subnetName string
248-
)
249-
250-
for _, pod := range pods {
251-
if len(pod.Annotations) == 0 {
252-
continue
244+
func (c *Controller) getVpcSubnetName(endpointSlices []*discoveryv1.EndpointSlice, service *v1.Service) (vpcName, subnetName string) {
245+
// Let the user self-determine what VPC and subnet to use
246+
if service.Annotations != nil {
247+
if vpc := service.Annotations[util.LogicalRouterAnnotation]; vpc != "" {
248+
vpcName = vpc
253249
}
254-
if subnetName == "" {
255-
subnetName = pod.Annotations[util.LogicalSwitchAnnotation]
256-
}
257-
258-
LOOP:
259-
for _, endpointSlice := range endpointSlices {
260-
for _, endpoint := range endpointSlice.Endpoints {
261-
for _, addr := range endpoint.Addresses {
262-
for _, podIP := range pod.Status.PodIPs {
263-
if addr == podIP.IP {
264-
if vpcName == "" {
265-
vpcName = pod.Annotations[util.LogicalRouterAnnotation]
266-
}
267-
if vpcName != "" {
268-
break LOOP
269-
}
270-
}
271-
}
272-
}
273-
}
250+
if subnet := service.Annotations[util.LogicalSwitchAnnotation]; subnet != "" {
251+
subnetName = subnet
274252
}
275253

276254
if vpcName != "" && subnetName != "" {
277-
break
255+
return vpcName, subnetName
278256
}
279257
}
280258

281-
if vpcName == "" {
282-
if vpcName = service.Annotations[util.VpcAnnotation]; vpcName == "" {
283-
vpcName = c.config.ClusterRouter
284-
}
285-
}
259+
for _, slice := range endpointSlices {
260+
for _, endpoint := range slice.Endpoints {
261+
if endpoint.TargetRef == nil {
262+
continue
263+
}
264+
265+
namespace := endpoint.TargetRef.Namespace
266+
name := endpoint.TargetRef.Name
267+
268+
pod, err := c.podsLister.Pods(namespace).Get(name)
269+
if err != nil {
270+
err := fmt.Errorf("couldn't retrieve pod %s/%s", namespace, name)
271+
klog.Error(err)
272+
continue
273+
}
274+
275+
vpc, subnet, err := c.getEndpointVpcAndSubnet(pod, endpoint.Addresses)
276+
if err != nil {
277+
err := fmt.Errorf("couldn't retrieve get subnet/vpc for pod %s/%s", namespace, name)
278+
klog.Error(err)
279+
continue
280+
}
281+
282+
if vpcName == "" {
283+
vpcName = vpc
284+
}
285+
286+
if subnetName == "" {
287+
subnetName = subnet
288+
}
286289

287-
if subnetName == "" {
288-
subnetName = util.DefaultSubnet
290+
if vpcName != "" && subnetName != "" {
291+
return vpcName, subnetName
292+
}
293+
}
289294
}
290295

291296
return vpcName, subnetName
@@ -375,7 +380,14 @@ func (c *Controller) getIPPortMappingBackend(endpointSlices []*discoveryv1.Endpo
375380

376381
for _, endpoint := range endpointSlice.Endpoints {
377382
if isGenIPPortMapping && endpoint.TargetRef.Name != "" {
378-
lspName, err := c.getEndpointTargetLSP(endpoint.TargetRef.Name, endpoint.TargetRef.Namespace, endpoint.Addresses)
383+
pod, err := c.podsLister.Pods(endpoint.TargetRef.Namespace).Get(endpoint.TargetRef.Name)
384+
if err != nil {
385+
err := fmt.Errorf("failed to get pod %s/%s: %w", endpoint.TargetRef.Namespace, endpoint.TargetRef.Name, err)
386+
klog.Error(err)
387+
continue
388+
}
389+
390+
lspName, err := c.getEndpointTargetLSPName(pod, endpoint.Addresses)
379391
if err != nil {
380392
err := fmt.Errorf("couldn't get LSP for the endpoint's target: %w", err)
381393
klog.Error(err)
@@ -408,6 +420,23 @@ func endpointReady(endpoint discoveryv1.Endpoint) bool {
408420
return endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready
409421
}
410422

423+
// getPodProviders returns all the providers available on a pod
424+
func (c *Controller) getPodProviders(pod *v1.Pod) ([]string, error) {
425+
// Get all the networks to which the pod is attached
426+
podNetworks, err := c.getPodKubeovnNets(pod)
427+
if err != nil {
428+
return nil, fmt.Errorf("failed to get pod networks: %w", err)
429+
}
430+
431+
// Retrieve all the providers
432+
var providers []string
433+
for _, podNetwork := range podNetworks {
434+
providers = append(providers, podNetwork.ProviderName)
435+
}
436+
437+
return providers, nil
438+
}
439+
411440
// getMatchingProviderForAddress returns the provider linked to a subnet in which a particular address is present
412441
func getMatchingProviderForAddress(pod *v1.Pod, providers []string, address string) string {
413442
if pod.Annotations == nil {
@@ -430,10 +459,29 @@ func getMatchingProviderForAddress(pod *v1.Pod, providers []string, address stri
430459
return ""
431460
}
432461

433-
// getEndpointTargetLSPName returns the name of the LSP for a pod targeted by an endpoint.
462+
// getEndpointProvider returns the provider linked to the addresses of an endpoint
463+
func (c *Controller) getEndpointProvider(pod *v1.Pod, addresses []string) (string, error) {
464+
// Retrieve all the providers of the pod
465+
providers, err := c.getPodProviders(pod)
466+
if err != nil {
467+
return "", err
468+
}
469+
470+
// Get the first matching provider for any of the address in the endpoint
471+
var provider string
472+
for _, address := range addresses {
473+
if provider = getMatchingProviderForAddress(pod, providers, address); provider != "" {
474+
return provider, nil
475+
}
476+
}
477+
478+
return "", nil
479+
}
480+
481+
// getEndpointTargetLSPNameFromProvider returns the name of the LSP for a pod targeted by an endpoint.
434482
// A custom provider can be specified if the LSP is within a subnet that doesn't use
435483
// the default "ovn" provider.
436-
func getEndpointTargetLSPName(pod *v1.Pod, provider string) string {
484+
func getEndpointTargetLSPNameFromProvider(pod *v1.Pod, provider string) string {
437485
// If no provider is specified, use the default one
438486
if provider == "" {
439487
provider = util.OvnProvider
@@ -451,32 +499,59 @@ func getEndpointTargetLSPName(pod *v1.Pod, provider string) string {
451499
}
452500

453501
// getEndpointTargetLSP returns the name of the LSP on which addresses are attached for a specific pod
454-
func (c *Controller) getEndpointTargetLSP(pod, namespace string, addresses []string) (string, error) {
455-
// Retrieve the pod object from its namespace and name
456-
podObj, err := c.podsLister.Pods(namespace).Get(pod)
502+
func (c *Controller) getEndpointTargetLSPName(pod *v1.Pod, addresses []string) (string, error) {
503+
// Retrieve the provider for those addresses
504+
provider, err := c.getEndpointProvider(pod, addresses)
457505
if err != nil {
458-
return "", fmt.Errorf("failed to get pod %s/%s: %w", namespace, pod, err)
506+
return "", err
459507
}
460508

461-
// Get all the networks to which the pod is attached
462-
podNetworks, err := c.getPodKubeovnNets(podObj)
509+
return getEndpointTargetLSPNameFromProvider(pod, provider), nil
510+
}
511+
512+
// getSubnetByProvider returns the subnet linked to a provider on a pod
513+
func (c *Controller) getSubnetByProvider(pod *v1.Pod, provider string) (string, error) {
514+
subnetName, exists := pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, provider)]
515+
if !exists {
516+
return "", fmt.Errorf("couldn't find subnet linked to provider %s", provider)
517+
}
518+
519+
return subnetName, nil
520+
}
521+
522+
// getVpcByProvider returns the VPC linked to a provider on a pod
523+
func (c *Controller) getVpcByProvider(pod *v1.Pod, provider string) (string, error) {
524+
vpcName, exists := pod.Annotations[fmt.Sprintf(util.LogicalRouterAnnotationTemplate, provider)]
525+
if !exists {
526+
return "", fmt.Errorf("couldn't find vpc linked to provider %s", provider)
527+
}
528+
529+
return vpcName, nil
530+
}
531+
532+
// getEndpointVpcAndSubnet returns the VPC/subnet for a pod and a set of addresses attached to it
533+
func (c *Controller) getEndpointVpcAndSubnet(pod *v1.Pod, addresses []string) (string, string, error) {
534+
// Retrieve the provider for those addresses
535+
provider, err := c.getEndpointProvider(pod, addresses)
463536
if err != nil {
464-
return "", fmt.Errorf("failed to get pod networks: %w", err)
537+
return "", "", err
465538
}
466539

467-
// Retrieve all the providers
468-
var providers []string
469-
for _, podNetwork := range podNetworks {
470-
providers = append(providers, podNetwork.ProviderName)
540+
if provider == "" {
541+
return "", "", nil
471542
}
472543

473-
// Get the first matching provider for any of the address in the endpoint
474-
var provider string
475-
for _, address := range addresses {
476-
if provider = getMatchingProviderForAddress(podObj, providers, address); provider != "" {
477-
break
478-
}
544+
// Retrieve the subnet
545+
subnet, err := c.getSubnetByProvider(pod, provider)
546+
if err != nil {
547+
return "", "", err
548+
}
549+
550+
// Retrieve the VPC
551+
vpc, err := c.getVpcByProvider(pod, provider)
552+
if err != nil {
553+
return "", "", err
479554
}
480555

481-
return getEndpointTargetLSPName(podObj, provider), nil
556+
return vpc, subnet, nil
482557
}

pkg/controller/endpoint_slice_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func TestGetEndpointTargetLSPName(t *testing.T) {
222222

223223
for _, tt := range tests {
224224
t.Run(tt.name, func(t *testing.T) {
225-
result := getEndpointTargetLSPName(tt.pod, tt.provider)
225+
result := getEndpointTargetLSPNameFromProvider(tt.pod, tt.provider)
226226
if result != tt.expected {
227227
t.Errorf("getEndpointTargetLSPName() = %q, want %q", result, tt.expected)
228228
}

pkg/controller/switch_lb_rule.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,9 +358,32 @@ func generateHeadlessService(slr *kubeovnv1.SwitchLBRule, oldSvc *corev1.Service
358358
},
359359
}
360360
}
361+
362+
// If the user supplies a VPC/subnet for the SLR, propagate it to the service
363+
setUserDefinedNetwork(newSvc, slr)
364+
361365
return newSvc
362366
}
363367

368+
// setUserDefinedNetwork propagates user-defined VPC/subnet from the SLR to the Service
369+
func setUserDefinedNetwork(service *corev1.Service, slr *kubeovnv1.SwitchLBRule) {
370+
if service == nil || slr == nil || slr.Annotations == nil {
371+
return
372+
}
373+
374+
if service.Annotations == nil {
375+
service.Annotations = make(map[string]string)
376+
}
377+
378+
if vpc := slr.Annotations[util.LogicalRouterAnnotation]; vpc != "" {
379+
service.Annotations[util.LogicalRouterAnnotation] = vpc
380+
}
381+
382+
if subnet := slr.Annotations[util.LogicalSwitchAnnotation]; subnet != "" {
383+
service.Annotations[util.LogicalSwitchAnnotation] = subnet
384+
}
385+
}
386+
364387
func generateEndpoints(slr *kubeovnv1.SwitchLBRule, oldEps *corev1.Endpoints) *corev1.Endpoints {
365388
var (
366389
name string

0 commit comments

Comments
 (0)