Skip to content

Commit 8413ae0

Browse files
committed
Refactor controller to use EndpointSlices instead of Endpoints
Signed-off-by: Mengxin Liu <liumengxinfly@gmail.com>
1 parent 3fede2a commit 8413ae0

7 files changed

Lines changed: 220 additions & 102 deletions

File tree

charts/kube-ovn-v2/templates/rbac/ovn-CR.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,14 @@ rules:
135135
- get
136136
- list
137137
- watch
138+
- apiGroups:
139+
- discovery.k8s.io
140+
resources:
141+
- endpointslices
142+
verbs:
143+
- get
144+
- list
145+
- watch
138146
- apiGroups:
139147
- apps
140148
resources:

charts/kube-ovn/templates/ovn-CR.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,14 @@ rules:
135135
- get
136136
- list
137137
- watch
138+
- apiGroups:
139+
- discovery.k8s.io
140+
resources:
141+
- endpointslices
142+
verbs:
143+
- get
144+
- list
145+
- watch
138146
- apiGroups:
139147
- apps
140148
resources:

dist/images/install.sh

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3480,6 +3480,14 @@ rules:
34803480
- get
34813481
- list
34823482
- watch
3483+
- apiGroups:
3484+
- discovery.k8s.io
3485+
resources:
3486+
- endpointslices
3487+
verbs:
3488+
- get
3489+
- list
3490+
- watch
34833491
- apiGroups:
34843492
- apps
34853493
resources:

pkg/controller/controller.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
appsv1 "k8s.io/client-go/listers/apps/v1"
2121
certListerv1 "k8s.io/client-go/listers/certificates/v1"
2222
v1 "k8s.io/client-go/listers/core/v1"
23+
discoveryv1 "k8s.io/client-go/listers/discovery/v1"
2324
netv1 "k8s.io/client-go/listers/networking/v1"
2425
"k8s.io/client-go/tools/cache"
2526
"k8s.io/client-go/tools/record"
@@ -222,10 +223,10 @@ type Controller struct {
222223
updateServiceQueue workqueue.TypedRateLimitingInterface[*updateSvcObject]
223224
svcKeyMutex keymutex.KeyMutex
224225

225-
endpointsLister v1.EndpointsLister
226-
endpointsSynced cache.InformerSynced
227-
addOrUpdateEndpointQueue workqueue.TypedRateLimitingInterface[string]
228-
epKeyMutex keymutex.KeyMutex
226+
endpointSlicesLister discoveryv1.EndpointSliceLister
227+
endpointSlicesSynced cache.InformerSynced
228+
addOrUpdateEndpointSliceQueue workqueue.TypedRateLimitingInterface[string]
229+
epKeyMutex keymutex.KeyMutex
229230

230231
deploymentsLister appsv1.DeploymentLister
231232
deploymentsSynced cache.InformerSynced
@@ -349,7 +350,7 @@ func Run(ctx context.Context, config *Configuration) {
349350
namespaceInformer := informerFactory.Core().V1().Namespaces()
350351
nodeInformer := informerFactory.Core().V1().Nodes()
351352
serviceInformer := informerFactory.Core().V1().Services()
352-
endpointInformer := informerFactory.Core().V1().Endpoints()
353+
endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices()
353354
deploymentInformer := deployInformerFactory.Apps().V1().Deployments()
354355
qosPolicyInformer := kubeovnInformerFactory.Kubeovn().V1().QoSPolicies()
355356
configMapInformer := cmInformerFactory.Core().V1().ConfigMaps()
@@ -493,10 +494,10 @@ func Run(ctx context.Context, config *Configuration) {
493494
updateServiceQueue: newTypedRateLimitingQueue[*updateSvcObject]("UpdateService", nil),
494495
svcKeyMutex: keymutex.NewHashed(numKeyLocks),
495496

496-
endpointsLister: endpointInformer.Lister(),
497-
endpointsSynced: endpointInformer.Informer().HasSynced,
498-
addOrUpdateEndpointQueue: newTypedRateLimitingQueue[string]("UpdateEndpoint", nil),
499-
epKeyMutex: keymutex.NewHashed(numKeyLocks),
497+
endpointSlicesLister: endpointSliceInformer.Lister(),
498+
endpointSlicesSynced: endpointSliceInformer.Informer().HasSynced,
499+
addOrUpdateEndpointSliceQueue: newTypedRateLimitingQueue[string]("UpdateEndpointSlice", nil),
500+
epKeyMutex: keymutex.NewHashed(numKeyLocks),
500501

501502
deploymentsLister: deploymentInformer.Lister(),
502503
deploymentsSynced: deploymentInformer.Informer().HasSynced,
@@ -642,7 +643,7 @@ func Run(ctx context.Context, config *Configuration) {
642643
controller.ipSynced, controller.virtualIpsSynced, controller.iptablesEipSynced,
643644
controller.iptablesFipSynced, controller.iptablesDnatRuleSynced, controller.iptablesSnatRuleSynced,
644645
controller.vlanSynced, controller.podsSynced, controller.namespacesSynced, controller.nodesSynced,
645-
controller.serviceSynced, controller.endpointsSynced, controller.deploymentsSynced, controller.configMapsSynced,
646+
controller.serviceSynced, controller.endpointSlicesSynced, controller.deploymentsSynced, controller.configMapsSynced,
646647
controller.ovnEipSynced, controller.ovnFipSynced, controller.ovnSnatRuleSynced,
647648
controller.ovnDnatRuleSynced,
648649
}
@@ -692,11 +693,11 @@ func Run(ctx context.Context, config *Configuration) {
692693
util.LogFatalAndExit(err, "failed to add service event handler")
693694
}
694695

695-
if _, err = endpointInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
696-
AddFunc: controller.enqueueAddEndpoint,
697-
UpdateFunc: controller.enqueueUpdateEndpoint,
696+
if _, err = endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
697+
AddFunc: controller.enqueueAddEndpointSlice,
698+
UpdateFunc: controller.enqueueUpdateEndpointSlice,
698699
}); err != nil {
699-
util.LogFatalAndExit(err, "failed to add endpoint event handler")
700+
util.LogFatalAndExit(err, "failed to add endpoint slice event handler")
700701
}
701702

702703
if _, err = deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -1017,7 +1018,7 @@ func (c *Controller) shutdown() {
10171018
c.addServiceQueue.ShutDown()
10181019
c.deleteServiceQueue.ShutDown()
10191020
c.updateServiceQueue.ShutDown()
1020-
c.addOrUpdateEndpointQueue.ShutDown()
1021+
c.addOrUpdateEndpointSliceQueue.ShutDown()
10211022

10221023
c.addVlanQueue.ShutDown()
10231024
c.delVlanQueue.ShutDown()
@@ -1211,7 +1212,7 @@ func (c *Controller) startWorkers(ctx context.Context) {
12111212

12121213
if c.config.EnableLb {
12131214
go wait.Until(runWorker("update service", c.updateServiceQueue, c.handleUpdateService), time.Second, ctx.Done())
1214-
go wait.Until(runWorker("add/update endpoint", c.addOrUpdateEndpointQueue, c.handleUpdateEndpoint), time.Second, ctx.Done())
1215+
go wait.Until(runWorker("add/update endpoint slice", c.addOrUpdateEndpointSliceQueue, c.handleUpdateEndpointSlice), time.Second, ctx.Done())
12151216
}
12161217

12171218
if c.config.EnableNP {
Lines changed: 66 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
v1 "k8s.io/api/core/v1"
9+
discoveryv1 "k8s.io/api/discovery/v1"
910
"k8s.io/apimachinery/pkg/api/errors"
1011
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1112
"k8s.io/apimachinery/pkg/labels"
@@ -17,29 +18,40 @@ import (
1718
"github.com/kubeovn/kube-ovn/pkg/util"
1819
)
1920

20-
func (c *Controller) enqueueAddEndpoint(obj any) {
21-
key := cache.MetaObjectToName(obj.(*v1.Endpoints)).String()
22-
klog.V(3).Infof("enqueue add endpoint %s", key)
23-
c.addOrUpdateEndpointQueue.Add(key)
21+
func findServiceKey(endpointSlice *discoveryv1.EndpointSlice) string {
22+
if endpointSlice != nil && endpointSlice.Labels != nil && endpointSlice.Labels[discoveryv1.LabelServiceName] != "" {
23+
return endpointSlice.Namespace + "/" + endpointSlice.Labels[discoveryv1.LabelServiceName]
24+
}
25+
return ""
2426
}
2527

26-
func (c *Controller) enqueueUpdateEndpoint(oldObj, newObj any) {
27-
oldEp := oldObj.(*v1.Endpoints)
28-
newEp := newObj.(*v1.Endpoints)
29-
if oldEp.ResourceVersion == newEp.ResourceVersion {
28+
func (c *Controller) enqueueAddEndpointSlice(obj any) {
29+
key := findServiceKey(obj.(*discoveryv1.EndpointSlice))
30+
if key != "" {
31+
klog.V(3).Infof("enqueue add endpointSlice %s", key)
32+
c.addOrUpdateEndpointSliceQueue.Add(key)
33+
}
34+
}
35+
36+
func (c *Controller) enqueueUpdateEndpointSlice(oldObj, newObj any) {
37+
oldEndpointSlice := oldObj.(*discoveryv1.EndpointSlice)
38+
newEndpointSlice := newObj.(*discoveryv1.EndpointSlice)
39+
if oldEndpointSlice.ResourceVersion == newEndpointSlice.ResourceVersion {
3040
return
3141
}
3242

33-
if len(oldEp.Subsets) == 0 && len(newEp.Subsets) == 0 {
43+
if len(oldEndpointSlice.Endpoints) == 0 && len(newEndpointSlice.Endpoints) == 0 {
3444
return
3545
}
3646

37-
key := cache.MetaObjectToName(newEp).String()
38-
klog.V(3).Infof("enqueue update endpoint %s", key)
39-
c.addOrUpdateEndpointQueue.Add(key)
47+
key := findServiceKey(newEndpointSlice)
48+
if key != "" {
49+
klog.V(3).Infof("enqueue update endpointSlice for service %s", key)
50+
c.addOrUpdateEndpointSliceQueue.Add(key)
51+
}
4052
}
4153

42-
func (c *Controller) handleUpdateEndpoint(key string) error {
54+
func (c *Controller) handleUpdateEndpointSlice(key string) error {
4355
namespace, name, err := cache.SplitMetaNamespaceKey(key)
4456
if err != nil {
4557
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
@@ -48,9 +60,9 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
4860

4961
c.epKeyMutex.LockKey(key)
5062
defer func() { _ = c.epKeyMutex.UnlockKey(key) }()
51-
klog.Infof("handle update endpoint %s", key)
63+
klog.Infof("handle update endpointSlice for service %s", key)
5264

53-
ep, err := c.endpointsLister.Endpoints(namespace).Get(name)
65+
endpointSlices, err := c.endpointSlicesLister.EndpointSlices(namespace).List(labels.Set(map[string]string{discoveryv1.LabelServiceName: name}).AsSelector())
5466
if err != nil {
5567
if errors.IsNotFound(err) {
5668
return nil
@@ -81,11 +93,10 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
8193
if vip, ok = svc.Annotations[util.SwitchLBRuleVipsAnnotation]; ok {
8294
lbVips = []string{vip}
8395

84-
for _, subset := range ep.Subsets {
85-
for _, address := range subset.Addresses {
86-
// TODO: IPv6
96+
for _, endpointSlice := range endpointSlices {
97+
for _, endpoint := range endpointSlice.Endpoints {
8798
if util.CheckProtocol(vip) == kubeovnv1.ProtocolIPv4 &&
88-
address.TargetRef.Name != "" {
99+
endpoint.TargetRef.Name != "" {
89100
ignoreHealthCheck = false
90101
}
91102
}
@@ -113,32 +124,7 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
113124
klog.Errorf("failed to get pods for service %s in namespace %s: %v", name, namespace, err)
114125
return err
115126
}
116-
for i, pod := range pods {
117-
if pod.Status.PodIP != "" || len(pod.Status.PodIPs) != 0 {
118-
continue
119-
}
120-
121-
for _, subset := range ep.Subsets {
122-
for _, addr := range subset.Addresses {
123-
if addr.TargetRef == nil || addr.TargetRef.Kind != "Pod" || addr.TargetRef.Name != pod.Name {
124-
continue
125-
}
126-
127-
p, err := c.config.KubeClient.CoreV1().Pods(pod.Namespace).Get(context.Background(), pod.Name, metav1.GetOptions{})
128-
if err != nil {
129-
klog.Errorf("failed to get pod %s/%s: %v", pod.Namespace, pod.Name, err)
130-
return err
131-
}
132-
pods[i] = p.DeepCopy()
133-
break
134-
}
135-
if pods[i] != pod {
136-
break
137-
}
138-
}
139-
}
140-
141-
vpcName, subnetName = c.getVpcSubnetName(pods, ep, svc)
127+
vpcName, subnetName = c.getVpcSubnetName(pods, endpointSlices, svc)
142128

143129
var (
144130
vpc *kubeovnv1.Vpc
@@ -189,7 +175,7 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
189175
checkIP = util.MasqueradeCheckIP
190176
}
191177
isGenIPPortMapping := !ignoreHealthCheck || isPreferLocalBackend
192-
ipPortMapping, backends = getIPPortMappingBackend(ep, pods, port, lbVip, checkIP, isGenIPPortMapping)
178+
ipPortMapping, backends = getIPPortMappingBackend(endpointSlices, port, lbVip, checkIP, isGenIPPortMapping)
193179
// for performance reason delete lb with no backends
194180
if len(backends) != 0 {
195181
vip = util.JoinHostPort(lbVip, port.Port)
@@ -252,7 +238,7 @@ func (c *Controller) handleUpdateEndpoint(key string) error {
252238
return nil
253239
}
254240

255-
func (c *Controller) getVpcSubnetName(pods []*v1.Pod, endpoints *v1.Endpoints, service *v1.Service) (string, string) {
241+
func (c *Controller) getVpcSubnetName(pods []*v1.Pod, endpointSlices []*discoveryv1.EndpointSlice, service *v1.Service) (string, string) {
256242
var (
257243
vpcName string
258244
subnetName string
@@ -267,14 +253,16 @@ func (c *Controller) getVpcSubnetName(pods []*v1.Pod, endpoints *v1.Endpoints, s
267253
}
268254

269255
LOOP:
270-
for _, subset := range endpoints.Subsets {
271-
for _, addr := range subset.Addresses {
272-
if addr.IP == pod.Status.PodIP {
273-
if vpcName == "" {
274-
vpcName = pod.Annotations[util.LogicalRouterAnnotation]
275-
}
276-
if vpcName != "" {
277-
break LOOP
256+
for _, endpointSlice := range endpointSlices {
257+
for _, endpoint := range endpointSlice.Endpoints {
258+
for _, addr := range endpoint.Addresses {
259+
if addr == pod.Status.PodIP {
260+
if vpcName == "" {
261+
vpcName = pod.Annotations[util.LogicalRouterAnnotation]
262+
}
263+
if vpcName != "" {
264+
break LOOP
265+
}
278266
}
279267
}
280268
}
@@ -361,56 +349,50 @@ func (c *Controller) getHealthCheckVip(subnetName, lbVip string) (string, error)
361349
return checkIP, nil
362350
}
363351

364-
func getIPPortMappingBackend(endpoints *v1.Endpoints, pods []*v1.Pod, servicePort v1.ServicePort, serviceIP, checkVip string, isGenIPPortMapping bool) (map[string]string, []string) {
352+
func getIPPortMappingBackend(endpointSlices []*discoveryv1.EndpointSlice, servicePort v1.ServicePort, serviceIP, checkVip string, isGenIPPortMapping bool) (map[string]string, []string) {
365353
var (
366354
ipPortMapping = map[string]string{}
367355
backends = []string{}
368356
protocol = util.CheckProtocol(serviceIP)
369357
)
370358

371-
for _, subset := range endpoints.Subsets {
359+
for _, endpointSlice := range endpointSlices {
372360
var targetPort int32
373-
for _, port := range subset.Ports {
374-
if port.Name == servicePort.Name {
375-
targetPort = port.Port
361+
for _, port := range endpointSlice.Ports {
362+
if port.Name != nil && *port.Name == servicePort.Name {
363+
targetPort = *port.Port
376364
break
377365
}
378366
}
379367
if targetPort == 0 {
380368
continue
381369
}
382370

383-
for _, address := range subset.Addresses {
384-
if isGenIPPortMapping && address.TargetRef.Name != "" {
385-
ipName := fmt.Sprintf("%s.%s", address.TargetRef.Name, address.TargetRef.Namespace)
386-
ipPortMapping[address.IP] = fmt.Sprintf(util.HealthCheckNamedVipTemplate, ipName, checkVip)
387-
}
388-
if address.TargetRef == nil || address.TargetRef.Kind != "Pod" {
389-
if util.CheckProtocol(address.IP) == protocol {
390-
backends = append(backends, util.JoinHostPort(address.IP, targetPort))
371+
for _, endpoint := range endpointSlice.Endpoints {
372+
if isGenIPPortMapping && endpoint.TargetRef.Name != "" {
373+
ipName := fmt.Sprintf("%s.%s", endpoint.TargetRef.Name, endpoint.TargetRef.Namespace)
374+
for _, address := range endpoint.Addresses {
375+
ipPortMapping[address] = fmt.Sprintf(util.HealthCheckNamedVipTemplate, ipName, checkVip)
391376
}
377+
}
378+
}
379+
380+
for _, endpoint := range endpointSlice.Endpoints {
381+
if !endpointReady(endpoint) {
392382
continue
393383
}
394-
var ip string
395-
for _, pod := range pods {
396-
if pod.Name == address.TargetRef.Name {
397-
for _, podIP := range util.PodIPs(*pod) {
398-
if util.CheckProtocol(podIP) == protocol {
399-
ip = podIP
400-
break
401-
}
402-
}
403-
break
384+
385+
for _, address := range endpoint.Addresses {
386+
if util.CheckProtocol(address) == protocol {
387+
backends = append(backends, util.JoinHostPort(address, targetPort))
404388
}
405389
}
406-
if ip == "" && util.CheckProtocol(address.IP) == protocol {
407-
ip = address.IP
408-
}
409-
if ip != "" {
410-
backends = append(backends, util.JoinHostPort(ip, targetPort))
411-
}
412390
}
413391
}
414392

415393
return ipPortMapping, backends
416394
}
395+
396+
func endpointReady(endpoint discoveryv1.Endpoint) bool {
397+
return endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready
398+
}

0 commit comments

Comments
 (0)