Skip to content

Commit 7abc4b7

Browse files
committed
feat(endpoint_slice): refactor and support arbitrary providers
Signed-off-by: SkalaNetworks <contact@skala.network>
1 parent 99acf9f commit 7abc4b7

4 files changed

Lines changed: 250 additions & 61 deletions

File tree

pkg/controller/endpoint_slice.go

Lines changed: 139 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ func (c *Controller) handleUpdateEndpointSlice(key string) error {
8585
svc := cachedService.DeepCopy()
8686

8787
var (
88-
pods []*v1.Pod
8988
lbVips []string
9089
vip, vpcName, subnetName string
9190
ok bool
@@ -123,11 +122,7 @@ func (c *Controller) handleUpdateEndpointSlice(key string) error {
123122
}
124123
}
125124

126-
if pods, err = c.podsLister.Pods(namespace).List(labels.Set(svc.Spec.Selector).AsSelector()); err != nil {
127-
klog.Errorf("failed to get pods for service %s in namespace %s: %v", name, namespace, err)
128-
return err
129-
}
130-
vpcName, subnetName = c.getVpcSubnetName(pods, endpointSlices, svc)
125+
vpcName, subnetName = c.getVpcSubnetName(endpointSlices, svc)
131126

132127
var (
133128
vpc *kubeovnv1.Vpc
@@ -241,51 +236,65 @@ func (c *Controller) handleUpdateEndpointSlice(key string) error {
241236
return nil
242237
}
243238

244-
func (c *Controller) getVpcSubnetName(pods []*v1.Pod, endpointSlices []*discoveryv1.EndpointSlice, service *v1.Service) (string, string) {
245-
var (
246-
vpcName string
247-
subnetName string
248-
)
249-
250-
for _, pod := range pods {
251-
if len(pod.Annotations) == 0 {
252-
continue
239+
func (c *Controller) getVpcSubnetName(endpointSlices []*discoveryv1.EndpointSlice, service *v1.Service) (vpcName, subnetName string) {
240+
// Let the user self-determine what VPC and subnet to use
241+
if service.Annotations != nil {
242+
if vpc := service.Annotations[util.LogicalRouterAnnotation]; vpc != "" {
243+
vpcName = vpc
253244
}
254-
if subnetName == "" {
255-
subnetName = pod.Annotations[util.LogicalSwitchAnnotation]
245+
if subnet := service.Annotations[util.LogicalSwitchAnnotation]; subnet != "" {
246+
subnetName = subnet
256247
}
257248

258-
LOOP:
259-
for _, endpointSlice := range endpointSlices {
260-
for _, endpoint := range endpointSlice.Endpoints {
261-
for _, addr := range endpoint.Addresses {
262-
for _, podIP := range pod.Status.PodIPs {
263-
if addr == podIP.IP {
264-
if vpcName == "" {
265-
vpcName = pod.Annotations[util.LogicalRouterAnnotation]
266-
}
267-
if vpcName != "" {
268-
break LOOP
269-
}
270-
}
271-
}
272-
}
273-
}
249+
if vpcName != "" && subnetName != "" {
250+
return vpcName, subnetName
274251
}
252+
}
275253

276-
if vpcName != "" && subnetName != "" {
277-
break
254+
for _, slice := range endpointSlices {
255+
for _, endpoint := range slice.Endpoints {
256+
if endpoint.TargetRef == nil {
257+
continue
258+
}
259+
260+
namespace := endpoint.TargetRef.Namespace
261+
name := endpoint.TargetRef.Name
262+
263+
pod, err := c.podsLister.Pods(namespace).Get(name)
264+
if err != nil {
265+
err := fmt.Errorf("couldn't retrieve pod %s/%s", namespace, name)
266+
klog.Error(err)
267+
continue
268+
}
269+
270+
vpc, subnet, err := c.getEndpointVpcAndSubnet(pod, endpoint.Addresses)
271+
if err != nil {
272+
err := fmt.Errorf("couldn't retrieve get subnet/vpc for pod %s/%s", namespace, name)
273+
klog.Error(err)
274+
continue
275+
}
276+
277+
if vpcName == "" {
278+
vpcName = vpc
279+
}
280+
281+
if subnetName == "" {
282+
subnetName = subnet
283+
}
284+
285+
if vpcName != "" && subnetName != "" {
286+
return vpcName, subnetName
287+
}
278288
}
279289
}
280290

291+
// Default to this if we can't find anything from the endpoints
281292
if vpcName == "" {
282-
if vpcName = service.Annotations[util.VpcAnnotation]; vpcName == "" {
283-
vpcName = c.config.ClusterRouter
284-
}
293+
vpcName = util.DefaultSubnet
285294
}
286295

287296
if subnetName == "" {
288-
subnetName = util.DefaultSubnet
297+
subnetName = c.config.ClusterRouter
289298
}
290299

291300
return vpcName, subnetName
@@ -375,7 +384,14 @@ func (c *Controller) getIPPortMappingBackend(endpointSlices []*discoveryv1.Endpo
375384

376385
for _, endpoint := range endpointSlice.Endpoints {
377386
if isGenIPPortMapping && endpoint.TargetRef.Name != "" {
378-
lspName, err := c.getEndpointTargetLSP(endpoint.TargetRef.Name, endpoint.TargetRef.Namespace, endpoint.Addresses)
387+
pod, err := c.podsLister.Pods(endpoint.TargetRef.Namespace).Get(endpoint.TargetRef.Name)
388+
if err != nil {
389+
err := fmt.Errorf("failed to get pod %s/%s: %w", endpoint.TargetRef.Namespace, endpoint.TargetRef.Name, err)
390+
klog.Error(err)
391+
continue
392+
}
393+
394+
lspName, err := c.getEndpointTargetLSPName(pod, endpoint.Addresses)
379395
if err != nil {
380396
err := fmt.Errorf("couldn't get LSP for the endpoint's target: %w", err)
381397
klog.Error(err)
@@ -408,6 +424,23 @@ func endpointReady(endpoint discoveryv1.Endpoint) bool {
408424
return endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready
409425
}
410426

427+
// getPodProviders returns all the providers available on a pod
428+
func (c *Controller) getPodProviders(pod *v1.Pod) ([]string, error) {
429+
// Get all the networks to which the pod is attached
430+
podNetworks, err := c.getPodKubeovnNets(pod)
431+
if err != nil {
432+
return nil, fmt.Errorf("failed to get pod networks: %w", err)
433+
}
434+
435+
// Retrieve all the providers
436+
var providers []string
437+
for _, podNetwork := range podNetworks {
438+
providers = append(providers, podNetwork.ProviderName)
439+
}
440+
441+
return providers, nil
442+
}
443+
411444
// getMatchingProviderForAddress returns the provider linked to a subnet in which a particular address is present
412445
func getMatchingProviderForAddress(pod *v1.Pod, providers []string, address string) string {
413446
if pod.Annotations == nil {
@@ -430,10 +463,29 @@ func getMatchingProviderForAddress(pod *v1.Pod, providers []string, address stri
430463
return ""
431464
}
432465

433-
// getEndpointTargetLSPName returns the name of the LSP for a pod targeted by an endpoint.
466+
// getEndpointProvider returns the provider linked to the addresses of an endpoint
467+
func (c *Controller) getEndpointProvider(pod *v1.Pod, addresses []string) (string, error) {
468+
// Retrieve all the providers of the pod
469+
providers, err := c.getPodProviders(pod)
470+
if err != nil {
471+
return "", err
472+
}
473+
474+
// Get the first matching provider for any of the address in the endpoint
475+
var provider string
476+
for _, address := range addresses {
477+
if provider = getMatchingProviderForAddress(pod, providers, address); provider != "" {
478+
return provider, nil
479+
}
480+
}
481+
482+
return "", nil
483+
}
484+
485+
// getEndpointTargetLSPNameFromProvider returns the name of the LSP for a pod targeted by an endpoint.
434486
// A custom provider can be specified if the LSP is within a subnet that doesn't use
435487
// the default "ovn" provider.
436-
func getEndpointTargetLSPName(pod *v1.Pod, provider string) string {
488+
func getEndpointTargetLSPNameFromProvider(pod *v1.Pod, provider string) string {
437489
// If no provider is specified, use the default one
438490
if provider == "" {
439491
provider = util.OvnProvider
@@ -451,32 +503,59 @@ func getEndpointTargetLSPName(pod *v1.Pod, provider string) string {
451503
}
452504

453505
// getEndpointTargetLSP returns the name of the LSP on which addresses are attached for a specific pod
454-
func (c *Controller) getEndpointTargetLSP(pod, namespace string, addresses []string) (string, error) {
455-
// Retrieve the pod object from its namespace and name
456-
podObj, err := c.podsLister.Pods(namespace).Get(pod)
506+
func (c *Controller) getEndpointTargetLSPName(pod *v1.Pod, addresses []string) (string, error) {
507+
// Retrieve the provider for those addresses
508+
provider, err := c.getEndpointProvider(pod, addresses)
457509
if err != nil {
458-
return "", fmt.Errorf("failed to get pod %s/%s: %w", namespace, pod, err)
510+
return "", err
459511
}
460512

461-
// Get all the networks to which the pod is attached
462-
podNetworks, err := c.getPodKubeovnNets(podObj)
513+
return getEndpointTargetLSPNameFromProvider(pod, provider), nil
514+
}
515+
516+
// getSubnetByProvider returns the subnet linked to a provider on a pod
517+
func (c *Controller) getSubnetByProvider(pod *v1.Pod, provider string) (string, error) {
518+
subnetName, exists := pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, provider)]
519+
if !exists {
520+
return "", fmt.Errorf("couldn't find subnet linked to provider %s", provider)
521+
}
522+
523+
return subnetName, nil
524+
}
525+
526+
// getVpcByProvider returns the VPC linked to a provider on a pod
527+
func (c *Controller) getVpcByProvider(pod *v1.Pod, provider string) (string, error) {
528+
vpcName, exists := pod.Annotations[fmt.Sprintf(util.LogicalRouterAnnotationTemplate, provider)]
529+
if !exists {
530+
return "", fmt.Errorf("couldn't find vpc linked to provider %s", provider)
531+
}
532+
533+
return vpcName, nil
534+
}
535+
536+
// getEndpointVpcAndSubnet returns the VPC/subnet for a pod and a set of addresses attached to it
537+
func (c *Controller) getEndpointVpcAndSubnet(pod *v1.Pod, addresses []string) (string, string, error) {
538+
// Retrieve the provider for those addresses
539+
provider, err := c.getEndpointProvider(pod, addresses)
463540
if err != nil {
464-
return "", fmt.Errorf("failed to get pod networks: %w", err)
541+
return "", "", err
465542
}
466543

467-
// Retrieve all the providers
468-
var providers []string
469-
for _, podNetwork := range podNetworks {
470-
providers = append(providers, podNetwork.ProviderName)
544+
if provider == "" {
545+
return "", "", nil
471546
}
472547

473-
// Get the first matching provider for any of the address in the endpoint
474-
var provider string
475-
for _, address := range addresses {
476-
if provider = getMatchingProviderForAddress(podObj, providers, address); provider != "" {
477-
break
478-
}
548+
// Retrieve the subnet
549+
subnet, err := c.getSubnetByProvider(pod, provider)
550+
if err != nil {
551+
return "", "", err
552+
}
553+
554+
// Retrieve the VPC
555+
vpc, err := c.getVpcByProvider(pod, provider)
556+
if err != nil {
557+
return "", "", err
479558
}
480559

481-
return getEndpointTargetLSPName(podObj, provider), nil
560+
return vpc, subnet, nil
482561
}

pkg/controller/endpoint_slice_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func TestGetEndpointTargetLSPName(t *testing.T) {
222222

223223
for _, tt := range tests {
224224
t.Run(tt.name, func(t *testing.T) {
225-
result := getEndpointTargetLSPName(tt.pod, tt.provider)
225+
result := getEndpointTargetLSPNameFromProvider(tt.pod, tt.provider)
226226
if result != tt.expected {
227227
t.Errorf("getEndpointTargetLSPName() = %q, want %q", result, tt.expected)
228228
}

pkg/controller/switch_lb_rule.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,9 +358,32 @@ func generateHeadlessService(slr *kubeovnv1.SwitchLBRule, oldSvc *corev1.Service
358358
},
359359
}
360360
}
361+
362+
// If the user supplies a VPC/subnet for the SLR, propagate it to the service
363+
setUserDefinedNetwork(newSvc, slr)
364+
361365
return newSvc
362366
}
363367

368+
// setUserDefinedNetwork propagates user-defined VPC/subnet from the SLR to the Service
369+
func setUserDefinedNetwork(service *corev1.Service, slr *kubeovnv1.SwitchLBRule) {
370+
if service == nil || slr == nil || slr.Annotations == nil {
371+
return
372+
}
373+
374+
if service.Annotations == nil {
375+
service.Annotations = make(map[string]string)
376+
}
377+
378+
if vpc := slr.Annotations[util.LogicalRouterAnnotation]; vpc != "" {
379+
service.Annotations[util.LogicalRouterAnnotation] = vpc
380+
}
381+
382+
if subnet := slr.Annotations[util.LogicalSwitchAnnotation]; subnet != "" {
383+
service.Annotations[util.LogicalSwitchAnnotation] = subnet
384+
}
385+
}
386+
364387
func generateEndpoints(slr *kubeovnv1.SwitchLBRule, oldEps *corev1.Endpoints) *corev1.Endpoints {
365388
var (
366389
name string

0 commit comments

Comments
 (0)