Skip to content

Commit 0ef9f61

Browse files
committed
feat(endpoint_slice): refactor and support arbitrary providers
Signed-off-by: SkalaNetworks <contact@skala.network>
1 parent 99acf9f commit 0ef9f61

4 files changed

Lines changed: 247 additions & 57 deletions

File tree

pkg/controller/endpoint_slice.go

Lines changed: 136 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ func (c *Controller) handleUpdateEndpointSlice(key string) error {
8585
svc := cachedService.DeepCopy()
8686

8787
var (
88-
pods []*v1.Pod
8988
lbVips []string
9089
vip, vpcName, subnetName string
9190
ok bool
@@ -123,11 +122,7 @@ func (c *Controller) handleUpdateEndpointSlice(key string) error {
123122
}
124123
}
125124

126-
if pods, err = c.podsLister.Pods(namespace).List(labels.Set(svc.Spec.Selector).AsSelector()); err != nil {
127-
klog.Errorf("failed to get pods for service %s in namespace %s: %v", name, namespace, err)
128-
return err
129-
}
130-
vpcName, subnetName = c.getVpcSubnetName(pods, endpointSlices, svc)
125+
vpcName, subnetName = c.getVpcSubnetName(endpointSlices, svc)
131126

132127
var (
133128
vpc *kubeovnv1.Vpc
@@ -241,40 +236,55 @@ func (c *Controller) handleUpdateEndpointSlice(key string) error {
241236
return nil
242237
}
243238

244-
func (c *Controller) getVpcSubnetName(pods []*v1.Pod, endpointSlices []*discoveryv1.EndpointSlice, service *v1.Service) (string, string) {
245-
var (
246-
vpcName string
247-
subnetName string
248-
)
249-
250-
for _, pod := range pods {
251-
if len(pod.Annotations) == 0 {
252-
continue
239+
func (c *Controller) getVpcSubnetName(endpointSlices []*discoveryv1.EndpointSlice, service *v1.Service) (vpcName, subnetName string) {
240+
// Let the user self-determine what VPC and subnet to use
241+
if service.Annotations != nil {
242+
if vpc := service.Annotations[util.LogicalRouterAnnotation]; vpc != "" {
243+
vpcName = vpc
253244
}
254-
if subnetName == "" {
255-
subnetName = pod.Annotations[util.LogicalSwitchAnnotation]
245+
if subnet := service.Annotations[util.LogicalSwitchAnnotation]; subnet != "" {
246+
subnetName = subnet
256247
}
257248

258-
LOOP:
259-
for _, endpointSlice := range endpointSlices {
260-
for _, endpoint := range endpointSlice.Endpoints {
261-
for _, addr := range endpoint.Addresses {
262-
for _, podIP := range pod.Status.PodIPs {
263-
if addr == podIP.IP {
264-
if vpcName == "" {
265-
vpcName = pod.Annotations[util.LogicalRouterAnnotation]
266-
}
267-
if vpcName != "" {
268-
break LOOP
269-
}
270-
}
271-
}
272-
}
273-
}
249+
if vpcName != "" && subnetName != "" {
250+
return vpcName, subnetName
274251
}
252+
}
275253

276-
if vpcName != "" && subnetName != "" {
277-
break
254+
for _, slice := range endpointSlices {
255+
for _, endpoint := range slice.Endpoints {
256+
if endpoint.TargetRef == nil {
257+
continue
258+
}
259+
260+
namespace := endpoint.TargetRef.Namespace
261+
name := endpoint.TargetRef.Name
262+
263+
pod, err := c.podsLister.Pods(namespace).Get(name)
264+
if err != nil {
265+
err := fmt.Errorf("couldn't retrieve pod %s/%s", namespace, name)
266+
klog.Error(err)
267+
continue
268+
}
269+
270+
vpc, subnet, err := c.getEndpointVpcAndSubnet(pod, endpoint.Addresses)
271+
if err != nil {
272+
err := fmt.Errorf("couldn't retrieve get subnet/vpc for pod %s/%s", namespace, name)
273+
klog.Error(err)
274+
continue
275+
}
276+
277+
if vpcName == "" {
278+
vpcName = vpc
279+
}
280+
281+
if subnetName == "" {
282+
subnetName = subnet
283+
}
284+
285+
if vpcName != "" && subnetName != "" {
286+
return vpcName, subnetName
287+
}
278288
}
279289
}
280290

@@ -375,7 +385,14 @@ func (c *Controller) getIPPortMappingBackend(endpointSlices []*discoveryv1.Endpo
375385

376386
for _, endpoint := range endpointSlice.Endpoints {
377387
if isGenIPPortMapping && endpoint.TargetRef.Name != "" {
378-
lspName, err := c.getEndpointTargetLSP(endpoint.TargetRef.Name, endpoint.TargetRef.Namespace, endpoint.Addresses)
388+
pod, err := c.podsLister.Pods(endpoint.TargetRef.Namespace).Get(endpoint.TargetRef.Name)
389+
if err != nil {
390+
err := fmt.Errorf("failed to get pod %s/%s: %w", endpoint.TargetRef.Namespace, endpoint.TargetRef.Name, err)
391+
klog.Error(err)
392+
continue
393+
}
394+
395+
lspName, err := c.getEndpointTargetLSPName(pod, endpoint.Addresses)
379396
if err != nil {
380397
err := fmt.Errorf("couldn't get LSP for the endpoint's target: %w", err)
381398
klog.Error(err)
@@ -408,6 +425,23 @@ func endpointReady(endpoint discoveryv1.Endpoint) bool {
408425
return endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready
409426
}
410427

428+
// getPodProviders returns all the providers available on a pod
429+
func (c *Controller) getPodProviders(pod *v1.Pod) ([]string, error) {
430+
// Get all the networks to which the pod is attached
431+
podNetworks, err := c.getPodKubeovnNets(pod)
432+
if err != nil {
433+
return nil, fmt.Errorf("failed to get pod networks: %w", err)
434+
}
435+
436+
// Retrieve all the providers
437+
var providers []string
438+
for _, podNetwork := range podNetworks {
439+
providers = append(providers, podNetwork.ProviderName)
440+
}
441+
442+
return providers, nil
443+
}
444+
411445
// getMatchingProviderForAddress returns the provider linked to a subnet in which a particular address is present
412446
func getMatchingProviderForAddress(pod *v1.Pod, providers []string, address string) string {
413447
if pod.Annotations == nil {
@@ -430,10 +464,29 @@ func getMatchingProviderForAddress(pod *v1.Pod, providers []string, address stri
430464
return ""
431465
}
432466

433-
// getEndpointTargetLSPName returns the name of the LSP for a pod targeted by an endpoint.
467+
// getEndpointProvider returns the provider linked to the addresses of an endpoint
468+
func (c *Controller) getEndpointProvider(pod *v1.Pod, addresses []string) (string, error) {
469+
// Retrieve all the providers of the pod
470+
providers, err := c.getPodProviders(pod)
471+
if err != nil {
472+
return "", err
473+
}
474+
475+
// Get the first matching provider for any of the address in the endpoint
476+
var provider string
477+
for _, address := range addresses {
478+
if provider = getMatchingProviderForAddress(pod, providers, address); provider != "" {
479+
return provider, nil
480+
}
481+
}
482+
483+
return "", nil
484+
}
485+
486+
// getEndpointTargetLSPNameFromProvider returns the name of the LSP for a pod targeted by an endpoint.
434487
// A custom provider can be specified if the LSP is within a subnet that doesn't use
435488
// the default "ovn" provider.
436-
func getEndpointTargetLSPName(pod *v1.Pod, provider string) string {
489+
func getEndpointTargetLSPNameFromProvider(pod *v1.Pod, provider string) string {
437490
// If no provider is specified, use the default one
438491
if provider == "" {
439492
provider = util.OvnProvider
@@ -451,32 +504,59 @@ func getEndpointTargetLSPName(pod *v1.Pod, provider string) string {
451504
}
452505

453506
// getEndpointTargetLSP returns the name of the LSP on which addresses are attached for a specific pod
454-
func (c *Controller) getEndpointTargetLSP(pod, namespace string, addresses []string) (string, error) {
455-
// Retrieve the pod object from its namespace and name
456-
podObj, err := c.podsLister.Pods(namespace).Get(pod)
507+
func (c *Controller) getEndpointTargetLSPName(pod *v1.Pod, addresses []string) (string, error) {
508+
// Retrieve the provider for those addresses
509+
provider, err := c.getEndpointProvider(pod, addresses)
457510
if err != nil {
458-
return "", fmt.Errorf("failed to get pod %s/%s: %w", namespace, pod, err)
511+
return "", err
459512
}
460513

461-
// Get all the networks to which the pod is attached
462-
podNetworks, err := c.getPodKubeovnNets(podObj)
514+
return getEndpointTargetLSPNameFromProvider(pod, provider), nil
515+
}
516+
517+
// getSubnetByProvider returns the subnet linked to a provider on a pod
518+
func (c *Controller) getSubnetByProvider(pod *v1.Pod, provider string) (string, error) {
519+
subnetName, exists := pod.Annotations[fmt.Sprintf(util.LogicalSwitchAnnotationTemplate, provider)]
520+
if !exists {
521+
return "", fmt.Errorf("couldn't find subnet linked to provider %s", provider)
522+
}
523+
524+
return subnetName, nil
525+
}
526+
527+
// getVpcByProvider returns the VPC linked to a provider on a pod
528+
func (c *Controller) getVpcByProvider(pod *v1.Pod, provider string) (string, error) {
529+
vpcName, exists := pod.Annotations[fmt.Sprintf(util.LogicalRouterAnnotationTemplate, provider)]
530+
if !exists {
531+
return "", fmt.Errorf("couldn't find vpc linked to provider %s", provider)
532+
}
533+
534+
return vpcName, nil
535+
}
536+
537+
// getEndpointVpcAndSubnet returns the VPC/subnet for a pod and a set of addresses attached to it
538+
func (c *Controller) getEndpointVpcAndSubnet(pod *v1.Pod, addresses []string) (string, string, error) {
539+
// Retrieve the provider for those addresses
540+
provider, err := c.getEndpointProvider(pod, addresses)
463541
if err != nil {
464-
return "", fmt.Errorf("failed to get pod networks: %w", err)
542+
return "", "", err
465543
}
466544

467-
// Retrieve all the providers
468-
var providers []string
469-
for _, podNetwork := range podNetworks {
470-
providers = append(providers, podNetwork.ProviderName)
545+
if provider == "" {
546+
return "", "", nil
471547
}
472548

473-
// Get the first matching provider for any of the address in the endpoint
474-
var provider string
475-
for _, address := range addresses {
476-
if provider = getMatchingProviderForAddress(podObj, providers, address); provider != "" {
477-
break
478-
}
549+
// Retrieve the subnet
550+
subnet, err := c.getSubnetByProvider(pod, provider)
551+
if err != nil {
552+
return "", "", err
553+
}
554+
555+
// Retrieve the VPC
556+
vpc, err := c.getVpcByProvider(pod, provider)
557+
if err != nil {
558+
return "", "", err
479559
}
480560

481-
return getEndpointTargetLSPName(podObj, provider), nil
561+
return vpc, subnet, nil
482562
}

pkg/controller/endpoint_slice_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func TestGetEndpointTargetLSPName(t *testing.T) {
222222

223223
for _, tt := range tests {
224224
t.Run(tt.name, func(t *testing.T) {
225-
result := getEndpointTargetLSPName(tt.pod, tt.provider)
225+
result := getEndpointTargetLSPNameFromProvider(tt.pod, tt.provider)
226226
if result != tt.expected {
227227
t.Errorf("getEndpointTargetLSPName() = %q, want %q", result, tt.expected)
228228
}

pkg/controller/switch_lb_rule.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,9 +358,32 @@ func generateHeadlessService(slr *kubeovnv1.SwitchLBRule, oldSvc *corev1.Service
358358
},
359359
}
360360
}
361+
362+
// If the user supplies a VPC/subnet for the SLR, propagate it to the service
363+
setUserDefinedNetwork(newSvc, slr)
364+
361365
return newSvc
362366
}
363367

368+
// setUserDefinedNetwork propagates user-defined VPC/subnet from the SLR to the Service
369+
func setUserDefinedNetwork(service *corev1.Service, slr *kubeovnv1.SwitchLBRule) {
370+
if service == nil || slr == nil || slr.Annotations == nil {
371+
return
372+
}
373+
374+
if service.Annotations == nil {
375+
service.Annotations = make(map[string]string)
376+
}
377+
378+
if vpc := slr.Annotations[util.LogicalRouterAnnotation]; vpc != "" {
379+
service.Annotations[util.LogicalRouterAnnotation] = vpc
380+
}
381+
382+
if subnet := slr.Annotations[util.LogicalSwitchAnnotation]; subnet != "" {
383+
service.Annotations[util.LogicalSwitchAnnotation] = subnet
384+
}
385+
}
386+
364387
func generateEndpoints(slr *kubeovnv1.SwitchLBRule, oldEps *corev1.Endpoints) *corev1.Endpoints {
365388
var (
366389
name string

pkg/controller/switch_lb_rule_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ import (
44
"reflect"
55
"testing"
66

7+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
8+
9+
kubeovnv1 "github.com/kubeovn/kube-ovn/pkg/apis/kubeovn/v1"
10+
"github.com/kubeovn/kube-ovn/pkg/util"
11+
712
corev1 "k8s.io/api/core/v1"
813
)
914

@@ -72,3 +77,85 @@ func Test_getIPFamilies(t *testing.T) {
7277
})
7378
}
7479
}
80+
81+
func Test_setUserDefinedNetwork(t *testing.T) {
82+
tests := []struct {
83+
name string
84+
service *corev1.Service
85+
slr *kubeovnv1.SwitchLBRule
86+
result *corev1.Service
87+
}{
88+
{
89+
name: "Propagate VPC",
90+
service: &corev1.Service{},
91+
slr: &kubeovnv1.SwitchLBRule{
92+
ObjectMeta: metav1.ObjectMeta{
93+
Annotations: map[string]string{
94+
util.LogicalRouterAnnotation: "test",
95+
},
96+
},
97+
},
98+
result: &corev1.Service{
99+
ObjectMeta: metav1.ObjectMeta{
100+
Annotations: map[string]string{
101+
util.LogicalRouterAnnotation: "test",
102+
},
103+
},
104+
},
105+
},
106+
{
107+
name: "Propagate Subnet",
108+
service: &corev1.Service{},
109+
slr: &kubeovnv1.SwitchLBRule{
110+
ObjectMeta: metav1.ObjectMeta{
111+
Annotations: map[string]string{
112+
util.LogicalSwitchAnnotation: "test",
113+
},
114+
},
115+
},
116+
result: &corev1.Service{
117+
ObjectMeta: metav1.ObjectMeta{
118+
Annotations: map[string]string{
119+
util.LogicalSwitchAnnotation: "test",
120+
},
121+
},
122+
},
123+
},
124+
{
125+
name: "Propagate VPC/Subnet",
126+
service: &corev1.Service{},
127+
slr: &kubeovnv1.SwitchLBRule{
128+
ObjectMeta: metav1.ObjectMeta{
129+
Annotations: map[string]string{
130+
util.LogicalRouterAnnotation: "test1",
131+
util.LogicalSwitchAnnotation: "test2",
132+
},
133+
},
134+
},
135+
result: &corev1.Service{
136+
ObjectMeta: metav1.ObjectMeta{
137+
Annotations: map[string]string{
138+
util.LogicalRouterAnnotation: "test1",
139+
util.LogicalSwitchAnnotation: "test2",
140+
},
141+
},
142+
},
143+
},
144+
{
145+
name: "Propagate nothing",
146+
service: &corev1.Service{},
147+
slr: &kubeovnv1.SwitchLBRule{},
148+
result: &corev1.Service{},
149+
},
150+
}
151+
152+
for _, tt := range tests {
153+
t.Run(tt.name, func(t *testing.T) {
154+
setUserDefinedNetwork(tt.service, tt.slr)
155+
156+
if !reflect.DeepEqual(*tt.service, *tt.result) {
157+
t.Errorf("Expected service %v, but got %v", *tt.service, *tt.result)
158+
}
159+
})
160+
}
161+
}

0 commit comments

Comments
 (0)