Skip to content

Commit 2aeea86

Browse files
authored
u2o keep src mac (#5192)
Signed-off-by: clyi <clyi@alauda.io>
1 parent 9d398fa commit 2aeea86

File tree

7 files changed

+440
-34
lines changed

7 files changed

+440
-34
lines changed

pkg/daemon/controller.go

Lines changed: 60 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,10 @@ type Controller struct {
5151
ovnEipsLister kubeovnlister.OvnEipLister
5252
ovnEipsSynced cache.InformerSynced
5353

54-
podsLister listerv1.PodLister
55-
podsSynced cache.InformerSynced
56-
podQueue workqueue.TypedRateLimitingInterface[string]
54+
podsLister listerv1.PodLister
55+
podsSynced cache.InformerSynced
56+
updatePodQueue workqueue.TypedRateLimitingInterface[string]
57+
deletePodQueue workqueue.TypedRateLimitingInterface[string]
5758

5859
nodesLister listerv1.NodeLister
5960
nodesSynced cache.InformerSynced
@@ -112,9 +113,10 @@ func NewController(config *Configuration, stopCh <-chan struct{}, podInformerFac
112113
ovnEipsLister: ovnEipInformer.Lister(),
113114
ovnEipsSynced: ovnEipInformer.Informer().HasSynced,
114115

115-
podsLister: podInformer.Lister(),
116-
podsSynced: podInformer.Informer().HasSynced,
117-
podQueue: newTypedRateLimitingQueue[string]("Pod", nil),
116+
podsLister: podInformer.Lister(),
117+
podsSynced: podInformer.Informer().HasSynced,
118+
updatePodQueue: newTypedRateLimitingQueue[string]("UpdatePod", nil),
119+
deletePodQueue: newTypedRateLimitingQueue[string]("DeletePod", nil),
118120

119121
nodesLister: nodeInformer.Lister(),
120122
nodesSynced: nodeInformer.Informer().HasSynced,
@@ -175,7 +177,8 @@ func NewController(config *Configuration, stopCh <-chan struct{}, podInformerFac
175177
}
176178

177179
if _, err = podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
178-
UpdateFunc: controller.enqueuePod,
180+
UpdateFunc: controller.enqueueUpdatePod,
181+
DeleteFunc: controller.enqueueDeletePod,
179182
}); err != nil {
180183
return nil, err
181184
}
@@ -518,7 +521,7 @@ func (c *Controller) processNextServiceWorkItem() bool {
518521
return true
519522
}
520523

521-
func (c *Controller) enqueuePod(oldObj, newObj any) {
524+
func (c *Controller) enqueueUpdatePod(oldObj, newObj any) {
522525
oldPod := oldObj.(*v1.Pod)
523526
newPod := newObj.(*v1.Pod)
524527
key := cache.MetaObjectToName(newPod).String()
@@ -529,8 +532,9 @@ func (c *Controller) enqueuePod(oldObj, newObj any) {
529532
oldPod.Annotations[util.NetemQosJitterAnnotation] != newPod.Annotations[util.NetemQosJitterAnnotation] ||
530533
oldPod.Annotations[util.NetemQosLimitAnnotation] != newPod.Annotations[util.NetemQosLimitAnnotation] ||
531534
oldPod.Annotations[util.NetemQosLossAnnotation] != newPod.Annotations[util.NetemQosLossAnnotation] ||
532-
oldPod.Annotations[util.MirrorControlAnnotation] != newPod.Annotations[util.MirrorControlAnnotation] {
533-
c.podQueue.Add(key)
535+
oldPod.Annotations[util.MirrorControlAnnotation] != newPod.Annotations[util.MirrorControlAnnotation] ||
536+
oldPod.Annotations[util.IPAddressAnnotation] != newPod.Annotations[util.IPAddressAnnotation] {
537+
c.updatePodQueue.Add(key)
534538
return
535539
}
536540

@@ -548,30 +552,63 @@ func (c *Controller) enqueuePod(oldObj, newObj any) {
548552
oldPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLimitAnnotationTemplate, provider)] ||
549553
oldPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.NetemQosLossAnnotationTemplate, provider)] ||
550554
oldPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] != newPod.Annotations[fmt.Sprintf(util.MirrorControlAnnotationTemplate, provider)] {
551-
c.podQueue.Add(key)
555+
c.updatePodQueue.Add(key)
552556
}
553557
}
554558
}
555559
}
556560

557-
func (c *Controller) runPodWorker() {
558-
for c.processNextPodWorkItem() {
561+
func (c *Controller) enqueueDeletePod(obj any) {
562+
pod := obj.(*v1.Pod)
563+
key := cache.MetaObjectToName(pod).String()
564+
c.deletePodQueue.Add(key)
565+
}
566+
567+
func (c *Controller) runUpdatePodWorker() {
568+
for c.processNextUpdatePodWorkItem() {
569+
}
570+
}
571+
572+
func (c *Controller) runDeletePodWorker() {
573+
for c.processNextDeletePodWorkItem() {
559574
}
560575
}
561576

562-
func (c *Controller) processNextPodWorkItem() bool {
563-
key, shutdown := c.podQueue.Get()
577+
func (c *Controller) processNextUpdatePodWorkItem() bool {
578+
key, shutdown := c.updatePodQueue.Get()
579+
if shutdown {
580+
return false
581+
}
582+
583+
err := func(key string) error {
584+
defer c.updatePodQueue.Done(key)
585+
if err := c.handleUpdatePod(key); err != nil {
586+
c.updatePodQueue.AddRateLimited(key)
587+
return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
588+
}
589+
c.updatePodQueue.Forget(key)
590+
return nil
591+
}(key)
592+
if err != nil {
593+
utilruntime.HandleError(err)
594+
return true
595+
}
596+
return true
597+
}
598+
599+
func (c *Controller) processNextDeletePodWorkItem() bool {
600+
key, shutdown := c.deletePodQueue.Get()
564601
if shutdown {
565602
return false
566603
}
567604

568605
err := func(key string) error {
569-
defer c.podQueue.Done(key)
570-
if err := c.handlePod(key); err != nil {
571-
c.podQueue.AddRateLimited(key)
606+
defer c.deletePodQueue.Done(key)
607+
if err := c.handleDeletePod(key); err != nil {
608+
c.deletePodQueue.AddRateLimited(key)
572609
return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
573610
}
574-
c.podQueue.Forget(key)
611+
c.deletePodQueue.Forget(key)
575612
return nil
576613
}(key)
577614
if err != nil {
@@ -615,7 +652,8 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
615652
defer c.deleteProviderNetworkQueue.ShutDown()
616653
defer c.subnetQueue.ShutDown()
617654
defer c.serviceQueue.ShutDown()
618-
defer c.podQueue.ShutDown()
655+
defer c.updatePodQueue.ShutDown()
656+
defer c.deletePodQueue.ShutDown()
619657

620658
go wait.Until(ovs.CleanLostInterface, time.Minute, stopCh)
621659
go wait.Until(recompute, 10*time.Minute, stopCh)
@@ -633,7 +671,8 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
633671
go wait.Until(c.runAddOrUpdateServicekWorker, time.Second, stopCh)
634672
go wait.Until(c.runDeleteProviderNetworkWorker, time.Second, stopCh)
635673
go wait.Until(c.runSubnetWorker, time.Second, stopCh)
636-
go wait.Until(c.runPodWorker, time.Second, stopCh)
674+
go wait.Until(c.runUpdatePodWorker, time.Second, stopCh)
675+
go wait.Until(c.runDeletePodWorker, time.Second, stopCh)
637676
go wait.Until(c.runGateway, 3*time.Second, stopCh)
638677
go wait.Until(c.loopEncapIPCheck, 3*time.Second, stopCh)
639678
go wait.Until(c.ovnMetricsUpdate, 3*time.Second, stopCh)

pkg/daemon/controller_linux.go

Lines changed: 170 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"syscall"
1414

1515
ovsutil "github.com/digitalocean/go-openvswitch/ovs"
16+
nadv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
1617
nadutils "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/utils"
1718
"github.com/kubeovn/felix/ipsets"
1819
"github.com/kubeovn/go-iptables/iptables"
@@ -229,6 +230,13 @@ func (c *Controller) reconcileRouters(event *subnetEvent) error {
229230
}
230231
}
231232

233+
isAdd, needAction := c.CheckSubnetU2OChangeAction(oldSubnet, newSubnet)
234+
if needAction {
235+
if err := c.HandleU2OForSubnet(newSubnet, isAdd); err != nil {
236+
return err
237+
}
238+
}
239+
232240
if err = c.handleEnableExternalLBAddressChange(oldSubnet, newSubnet); err != nil {
233241
klog.Errorf("failed to handle enable external lb address change: %v", err)
234242
return err
@@ -762,7 +770,100 @@ func (c *Controller) getPolicyRouting(subnet *kubeovnv1.Subnet) ([]netlink.Rule,
762770
return rules, routes, nil
763771
}
764772

765-
func (c *Controller) handlePod(key string) error {
773+
func (c *Controller) GetProviderInfoFromSubnet(subnet *kubeovnv1.Subnet) (bridgeName, chassisMac string, err error) {
774+
if subnet == nil {
775+
return "", "", nil
776+
}
777+
if subnet.Spec.Vlan == "" {
778+
return "", "", nil
779+
}
780+
781+
vlan, err := c.vlansLister.Get(subnet.Spec.Vlan)
782+
if err != nil {
783+
return "", "", fmt.Errorf("failed to get vlan %s: %w", subnet.Spec.Vlan, err)
784+
}
785+
providerName := vlan.Spec.Provider
786+
chassisMac, err = GetProviderChassisMac(providerName)
787+
if err != nil {
788+
return "", "", fmt.Errorf("failed to get chassis mac for provider %s: %w", providerName, err)
789+
}
790+
791+
bridgeName = util.ExternalBridgeName(providerName)
792+
return bridgeName, chassisMac, nil
793+
}
794+
795+
func HandleU2OForPod(ovsClient *ovsutil.Client, pod *v1.Pod, bridgeName, chassisMac string, isAdd bool) error {
796+
if pod == nil {
797+
return errors.New("pod is nil")
798+
}
799+
800+
podMac := pod.Annotations[util.MacAddressAnnotation]
801+
802+
podIPs := []string{}
803+
if pod.Annotations != nil && pod.Annotations[util.IPAddressAnnotation] != "" {
804+
podIPs = append(podIPs, strings.Split(pod.Annotations[util.IPAddressAnnotation], ",")...)
805+
806+
for _, podIP := range podIPs {
807+
var err error
808+
if isAdd {
809+
err = ovs.AddOrUpdateU2OKeepSrcMac(ovsClient, bridgeName, podIP, podMac, chassisMac)
810+
} else {
811+
err = ovs.DeleteU2OKeepSrcMac(ovsClient, bridgeName, podIP, chassisMac)
812+
}
813+
814+
if err != nil {
815+
action := "add"
816+
if !isAdd {
817+
action = "delete"
818+
}
819+
return fmt.Errorf("failed to %s U2O rule for pod %s/%s: %w", action, pod.Namespace, pod.Name, err)
820+
}
821+
}
822+
}
823+
824+
return nil
825+
}
826+
827+
func (c *Controller) HandleU2OForSubnet(subnet *kubeovnv1.Subnet, isAdd bool) error {
828+
klog.Infof("U2O processing for subnet %s, action: %v", subnet.Name, isAdd)
829+
830+
bridgeName, chassisMac, err := c.GetProviderInfoFromSubnet(subnet)
831+
if err != nil {
832+
return fmt.Errorf("failed to get provider info: %w", err)
833+
}
834+
835+
pods, err := c.podsLister.List(labels.Everything())
836+
if err != nil {
837+
return fmt.Errorf("failed to list pods: %w", err)
838+
}
839+
840+
for _, pod := range pods {
841+
if pod.Annotations[util.LogicalSwitchAnnotation] != subnet.Name {
842+
continue
843+
}
844+
if err := HandleU2OForPod(c.ovsClient, pod, bridgeName, chassisMac, isAdd); err != nil {
845+
klog.Error(err)
846+
return err
847+
}
848+
}
849+
850+
return nil
851+
}
852+
853+
func (c *Controller) CheckSubnetU2OChangeAction(oldSubnet, newSubnet *kubeovnv1.Subnet) (bool, bool) {
854+
if newSubnet == nil ||
855+
(oldSubnet != nil && newSubnet != nil && oldSubnet.Spec.U2OInterconnection == newSubnet.Spec.U2OInterconnection) {
856+
return false, false
857+
}
858+
859+
if newSubnet.Spec.Vlan == "" || newSubnet.Spec.LogicalGateway {
860+
return false, false
861+
}
862+
863+
return newSubnet.Spec.U2OInterconnection, true
864+
}
865+
866+
func (c *Controller) handleUpdatePod(key string) error {
766867
namespace, name, err := cache.SplitMetaNamespaceKey(key)
767868
if err != nil {
768869
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
@@ -785,6 +886,26 @@ func (c *Controller) handlePod(key string) error {
785886
return err
786887
}
787888

889+
if _, ok := pod.Annotations[util.LogicalSwitchAnnotation]; ok {
890+
subnet, err := c.subnetsLister.Get(pod.Annotations[util.LogicalSwitchAnnotation])
891+
if err != nil {
892+
klog.Error(err)
893+
return err
894+
}
895+
896+
if subnet.Spec.U2OInterconnection {
897+
bridgeName, chassisMac, err := c.GetProviderInfoFromSubnet(subnet)
898+
if err != nil {
899+
klog.Error(err)
900+
return err
901+
}
902+
if err := HandleU2OForPod(c.ovsClient, pod, bridgeName, chassisMac, true); err != nil {
903+
klog.Error(err)
904+
return err
905+
}
906+
}
907+
}
908+
788909
podName := pod.Name
789910
if pod.Annotations[fmt.Sprintf(util.VMAnnotationTemplate, util.OvnProvider)] != "" {
790911
podName = pod.Annotations[fmt.Sprintf(util.VMAnnotationTemplate, util.OvnProvider)]
@@ -815,6 +936,9 @@ func (c *Controller) handlePod(key string) error {
815936
// set multus-nic bandwidth
816937
attachNets, err := nadutils.ParsePodNetworkAnnotation(pod)
817938
if err != nil {
939+
if _, ok := err.(*nadv1.NoK8sNetworkError); ok {
940+
return nil
941+
}
818942
klog.Error(err)
819943
return err
820944
}
@@ -843,6 +967,51 @@ func (c *Controller) handlePod(key string) error {
843967
}
844968
}
845969
}
970+
971+
return nil
972+
}
973+
974+
func (c *Controller) handleDeletePod(key string) error {
975+
namespace, name, err := cache.SplitMetaNamespaceKey(key)
976+
if err != nil {
977+
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
978+
return nil
979+
}
980+
981+
pod, err := c.podsLister.Pods(namespace).Get(name)
982+
if err != nil {
983+
if k8serrors.IsNotFound(err) {
984+
return nil
985+
}
986+
klog.Error(err)
987+
return err
988+
}
989+
990+
if _, ok := pod.Annotations[util.LogicalSwitchAnnotation]; !ok {
991+
return nil
992+
}
993+
994+
subnet, err := c.subnetsLister.Get(pod.Annotations[util.LogicalSwitchAnnotation])
995+
if err != nil {
996+
klog.Error(err)
997+
return err
998+
}
999+
1000+
if !subnet.Spec.U2OInterconnection {
1001+
return nil
1002+
}
1003+
1004+
bridgeName, chassisMac, err := c.GetProviderInfoFromSubnet(subnet)
1005+
if err != nil {
1006+
klog.Error(err)
1007+
return err
1008+
}
1009+
1010+
if err := HandleU2OForPod(c.ovsClient, pod, bridgeName, chassisMac, false); err != nil {
1011+
klog.Error(err)
1012+
return err
1013+
}
1014+
8461015
return nil
8471016
}
8481017

pkg/daemon/controller_windows.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,11 @@ func routeDiff(existingRoutes, v4Cidrs, v6Cidrs []string) (toAddV4, toAddV6, toD
141141
return
142142
}
143143

144-
func (c *Controller) handlePod(key string) error {
144+
func (c *Controller) handleDeletePod(key string) error {
145+
return nil
146+
}
147+
148+
func (c *Controller) handleUpdatePod(key string) error {
145149
namespace, name, err := cache.SplitMetaNamespaceKey(key)
146150
if err != nil {
147151
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))

pkg/daemon/ovs.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,3 +286,17 @@ func initProviderChassisMac(provider string) error {
286286
}
287287
return nil
288288
}
289+
290+
func GetProviderChassisMac(provider string) (string, error) {
291+
mappings, err := getOvnMappings("ovn-chassis-mac-mappings")
292+
if err != nil {
293+
return "", fmt.Errorf("failed to get chassis mac for provider %s: %w", provider, err)
294+
}
295+
296+
mac, ok := mappings[provider]
297+
if !ok {
298+
return "", fmt.Errorf("no chassis mac found for provider %s", provider)
299+
}
300+
301+
return mac, nil
302+
}

0 commit comments

Comments
 (0)