diff --git a/pkg/daemon/controller.go b/pkg/daemon/controller.go index 4afe96620b6..113bbcd5e76 100644 --- a/pkg/daemon/controller.go +++ b/pkg/daemon/controller.go @@ -56,7 +56,7 @@ type Controller struct { podsLister listerv1.PodLister podsSynced cache.InformerSynced updatePodQueue workqueue.TypedRateLimitingInterface[string] - deletePodQueue workqueue.TypedRateLimitingInterface[string] + deletePodQueue workqueue.TypedRateLimitingInterface[*podEvent] nodesLister listerv1.NodeLister nodesSynced cache.InformerSynced @@ -127,7 +127,7 @@ func NewController(config *Configuration, podsLister: podInformer.Lister(), podsSynced: podInformer.Informer().HasSynced, updatePodQueue: newTypedRateLimitingQueue[string]("UpdatePod", nil), - deletePodQueue: newTypedRateLimitingQueue[string]("DeletePod", nil), + deletePodQueue: newTypedRateLimitingQueue[*podEvent]("DeletePod", nil), nodesLister: nodeInformer.Lister(), nodesSynced: nodeInformer.Informer().HasSynced, @@ -505,6 +505,10 @@ type serviceEvent struct { oldObj, newObj any } +type podEvent struct { + oldObj any +} + func (c *Controller) enqueueAddSubnet(obj any) { c.subnetQueue.Add(&subnetEvent{newObj: obj}) } @@ -637,8 +641,8 @@ func (c *Controller) enqueueDeletePod(obj any) { return } - key := cache.MetaObjectToName(pod).String() - c.deletePodQueue.Add(key) + klog.V(3).Infof("enqueue delete pod %s", pod.Name) + c.deletePodQueue.Add(&podEvent{oldObj: pod}) } func (c *Controller) runUpdatePodWorker() { @@ -674,20 +678,20 @@ func (c *Controller) processNextUpdatePodWorkItem() bool { } func (c *Controller) processNextDeletePodWorkItem() bool { - key, shutdown := c.deletePodQueue.Get() + event, shutdown := c.deletePodQueue.Get() if shutdown { return false } - err := func(key string) error { - defer c.deletePodQueue.Done(key) - if err := c.handleDeletePod(key); err != nil { - c.deletePodQueue.AddRateLimited(key) - return fmt.Errorf("error syncing %q: %w, requeuing", key, err) + err := func(event *podEvent) error { + defer c.deletePodQueue.Done(event) + if err := c.handleDeletePod(event); err != nil { + c.deletePodQueue.AddRateLimited(event) + return fmt.Errorf("error syncing pod event: %w, requeuing", err) } - c.deletePodQueue.Forget(key) + c.deletePodQueue.Forget(event) return nil - }(key) + }(event) if err != nil { utilruntime.HandleError(err) return true diff --git a/pkg/daemon/controller_linux.go b/pkg/daemon/controller_linux.go index ef7b15cfd8d..c0b581975c6 100644 --- a/pkg/daemon/controller_linux.go +++ b/pkg/daemon/controller_linux.go @@ -971,27 +971,20 @@ func (c *Controller) handleUpdatePod(key string) error { return nil } -func (c *Controller) handleDeletePod(key string) error { - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) +func (c *Controller) handleDeletePod(event *podEvent) error { + var pod *v1.Pod + if event.oldObj != nil { + pod = event.oldObj.(*v1.Pod) + } else { return nil } - pod, err := c.podsLister.Pods(namespace).Get(name) - if err != nil { - if k8serrors.IsNotFound(err) { - return nil - } - klog.Error(err) - return err - } - - if _, ok := pod.Annotations[util.LogicalSwitchAnnotation]; !ok { + logicalSwitch, ok := pod.Annotations[util.LogicalSwitchAnnotation] + if !ok { return nil } - subnet, err := c.subnetsLister.Get(pod.Annotations[util.LogicalSwitchAnnotation]) + subnet, err := c.subnetsLister.Get(logicalSwitch) if err != nil { klog.Error(err) return err diff --git a/pkg/daemon/controller_windows.go b/pkg/daemon/controller_windows.go index 8eb82ae48e1..58da2e5f29f 100644 --- a/pkg/daemon/controller_windows.go +++ b/pkg/daemon/controller_windows.go @@ -141,7 +141,7 @@ func routeDiff(existingRoutes, v4Cidrs, v6Cidrs []string) (toAddV4, toAddV6, toD return } -func (c *Controller) handleDeletePod(key string) error { +func (c *Controller) handleDeletePod(event *podEvent) error { return nil } diff --git a/pkg/ovs/ovs-ofctl.go b/pkg/ovs/ovs-ofctl.go index 448f3ccd8c7..11421850da6 100644 --- a/pkg/ovs/ovs-ofctl.go +++ b/pkg/ovs/ovs-ofctl.go @@ -214,11 +214,18 @@ func AddOrUpdateU2OKeepSrcMac(client *ovs.Client, bridgeName, podIP, podMac, cha return err } - for _, existingFlow := range flows { - if existingFlow.Priority == util.U2OKeepSrcMacPriority && existingFlow.InPort == localnetPatchPortID { - klog.V(3).Infof("flow already exists in bridge %s for in_port=%d, ip_src=%s, dl_src=%s", - bridgeName, localnetPatchPortID, podIP, chassisMac) - return nil + expectedAction := fmt.Sprintf("mod_dl_src:%s", podMac) + for _, flow := range flows { + if flow.Priority != util.U2OKeepSrcMacPriority || flow.InPort != localnetPatchPortID { + continue + } + + for _, action := range flow.Actions { + if actionText, _ := action.MarshalText(); string(actionText) == expectedAction { + klog.V(3).Infof("flow already exists in bridge %s for in_port=%d, ip_src=%s, dl_src=%s, actions=mod_dl_src:%s", + bridgeName, localnetPatchPortID, podIP, chassisMac, podMac) + return nil + } } } diff --git a/test/e2e/kube-ovn/underlay/underlay.go b/test/e2e/kube-ovn/underlay/underlay.go index 64b4dbe65d0..928f628facc 100644 --- a/test/e2e/kube-ovn/underlay/underlay.go +++ b/test/e2e/kube-ovn/underlay/underlay.go @@ -1323,6 +1323,88 @@ var _ = framework.SerialDescribe("[group:underlay]", func() { framework.ExpectNoError(err) } }) + + framework.ConformanceIt("should create and delete keepSrcMac OpenFlow rules when u2oInterconnection is enabled", func() { + f.SkipVersionPriorTo(1, 14, "keepSrcMac OpenFlow rules were introduced in v1.14") + + ginkgo.By("Creating provider network " + providerNetworkName) + pn := makeProviderNetwork(providerNetworkName, false, linkMap) + _ = providerNetworkClient.CreateSync(pn) + + ginkgo.By("Getting docker network " + dockerNetworkName) + network, err := docker.NetworkInspect(dockerNetworkName) + framework.ExpectNoError(err, "getting docker network "+dockerNetworkName) + + ginkgo.By("Creating vlan " + vlanName) + vlan := framework.MakeVlan(vlanName, providerNetworkName, 0) + _ = vlanClient.Create(vlan) + + ginkgo.By("Creating underlay subnet " + subnetName) + var cidrV4, cidrV6, gatewayV4, gatewayV6 string + for _, config := range dockerNetwork.IPAM.Config { + switch util.CheckProtocol(config.Subnet) { + case apiv1.ProtocolIPv4: + if f.HasIPv4() { + cidrV4 = config.Subnet + gatewayV4 = config.Gateway + } + case apiv1.ProtocolIPv6: + if f.HasIPv6() { + cidrV6 = config.Subnet + gatewayV6 = config.Gateway + } + } + } + underlayCidr := make([]string, 0, 2) + gateway := make([]string, 0, 2) + if f.HasIPv4() { + underlayCidr = append(underlayCidr, cidrV4) + gateway = append(gateway, gatewayV4) + } + if f.HasIPv6() { + underlayCidr = append(underlayCidr, cidrV6) + gateway = append(gateway, gatewayV6) + } + + excludeIPs := make([]string, 0, len(network.Containers)*2) + for _, container := range network.Containers { + if container.IPv4Address != "" && f.HasIPv4() { + excludeIPs = append(excludeIPs, strings.Split(container.IPv4Address, "/")[0]) + } + if container.IPv6Address != "" && f.HasIPv6() { + excludeIPs = append(excludeIPs, strings.Split(container.IPv6Address, "/")[0]) + } + } + + ginkgo.By("Creating underlay subnet with u2oInterconnection enabled " + subnetName) + subnet := framework.MakeSubnet(subnetName, vlanName, strings.Join(underlayCidr, ","), strings.Join(gateway, ","), "", "", excludeIPs, nil, []string{namespaceName}) + subnet.Spec.U2OInterconnection = true + _ = subnetClient.CreateSync(subnet) + + ginkgo.By("Waiting for U2OInterconnection status to be ready") + waitSubnetU2OStatus(f, subnetName, subnetClient, true) + + ginkgo.By("Creating underlay pod " + u2oPodNameUnderlay) + annotations := map[string]string{ + util.LogicalSwitchAnnotation: subnetName, + } + args := []string{"netexec", "--http-port", strconv.Itoa(curlListenPort)} + underlayPod := framework.MakePod(namespaceName, u2oPodNameUnderlay, nil, annotations, framework.AgnhostImage, nil, args) + underlayPod = podClient.CreateSync(underlayPod) + waitSubnetStatusUpdate(subnetName, subnetClient, 2) + + ginkgo.By("Verifying keepSrcMac OpenFlow rules exist after pod creation") + checkKeepSrcMacFlow(underlayPod, providerNetworkName, true) + + ginkgo.By("Deleting underlay pod " + u2oPodNameUnderlay) + podClient.DeleteSync(u2oPodNameUnderlay) + waitSubnetStatusUpdate(subnetName, subnetClient, 1) + + ginkgo.By("Verifying keepSrcMac OpenFlow rules are deleted after pod deletion") + // Wait a bit for the flow rules to be cleaned up + time.Sleep(2 * time.Second) + checkKeepSrcMacFlow(underlayPod, providerNetworkName, false) + }) }) func checkU2OItems(f *framework.Framework, subnet *apiv1.Subnet, underlayPod, overlayPod *corev1.Pod, isU2OCustomVpc bool, pnName string) { @@ -1503,25 +1585,26 @@ func checkReachable(podName, podNamespace, sourceIP, targetIP, targetPort string func checkKeepSrcMacFlow(pod *corev1.Pod, providerNetworkName string, expectRules bool) { ginkgo.GinkgoHelper() - cmd := fmt.Sprintf("kubectl exec -n %s %s -- ip -o link show eth0 | awk '{print $16}'", pod.Namespace, pod.Name) - output, err := exec.Command("bash", "-c", cmd).CombinedOutput() - if err != nil { - framework.Logf("Error getting MAC address: %v, %s", err, string(output)) - return - } - podMac := strings.TrimSpace(string(output)) - podNodeName := pod.Spec.NodeName - ginkgo.By(fmt.Sprintf("Checking keepSrcMac OpenFlow rule on node %s for Pod %s with MAC %s (expect rules: %v)", - podNodeName, pod.Name, podMac, expectRules)) + framework.Logf("Checking keepSrcMac OpenFlow rule on node %s for Pod %s", podNodeName, pod.Name) + + podMac := pod.Annotations[util.MacAddressAnnotation] + if podMac == "" { + if !expectRules { + return + } + } var ruleFound bool framework.WaitUntil(1*time.Second, 5*time.Second, func(_ context.Context) (bool, error) { nodeCmd := fmt.Sprintf("kubectl ko ofctl %s dump-flows br-%s | grep actions=mod_dl_src:%s | wc -l", podNodeName, providerNetworkName, podMac) - output, _ := exec.Command("bash", "-c", nodeCmd).CombinedOutput() - outputStr := string(output) + output, err := exec.Command("bash", "-c", nodeCmd).CombinedOutput() + if err != nil { + return false, nil + } + outputStr := string(output) lines := strings.Split(outputStr, "\n") var countStr string for i := len(lines) - 1; i >= 0; i-- { @@ -1538,18 +1621,12 @@ func checkKeepSrcMacFlow(pod *corev1.Pod, providerNetworkName string, expectRule countNum, _ = strconv.Atoi(matches[0]) } - framework.Logf("Raw output: '%s', extracted count: %d", outputStr, countNum) ruleFound = countNum > 0 if (expectRules && ruleFound) || (!expectRules && !ruleFound) { return true, nil } - if expectRules { - framework.Logf("keepSrcMac flow rule not found but expected, retrying...") - } else { - framework.Logf("keepSrcMac flow rule found but not expected, retrying...") - } return false, nil }, "")