Skip to content

Commit 772490f

Browse files
authored
fix gabage u2o openflow (#5757)
* fix delete pod not remove u2o flow Signed-off-by: clyi <clyi@alauda.io>
1 parent 112d9ae commit 772490f

File tree

5 files changed

+132
-51
lines changed

5 files changed

+132
-51
lines changed

pkg/daemon/controller.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ type Controller struct {
5656
podsLister listerv1.PodLister
5757
podsSynced cache.InformerSynced
5858
updatePodQueue workqueue.TypedRateLimitingInterface[string]
59-
deletePodQueue workqueue.TypedRateLimitingInterface[string]
59+
deletePodQueue workqueue.TypedRateLimitingInterface[*podEvent]
6060

6161
nodesLister listerv1.NodeLister
6262
nodesSynced cache.InformerSynced
@@ -127,7 +127,7 @@ func NewController(config *Configuration,
127127
podsLister: podInformer.Lister(),
128128
podsSynced: podInformer.Informer().HasSynced,
129129
updatePodQueue: newTypedRateLimitingQueue[string]("UpdatePod", nil),
130-
deletePodQueue: newTypedRateLimitingQueue[string]("DeletePod", nil),
130+
deletePodQueue: newTypedRateLimitingQueue[*podEvent]("DeletePod", nil),
131131

132132
nodesLister: nodeInformer.Lister(),
133133
nodesSynced: nodeInformer.Informer().HasSynced,
@@ -505,6 +505,10 @@ type serviceEvent struct {
505505
oldObj, newObj any
506506
}
507507

508+
type podEvent struct {
509+
oldObj any
510+
}
511+
508512
func (c *Controller) enqueueAddSubnet(obj any) {
509513
c.subnetQueue.Add(&subnetEvent{newObj: obj})
510514
}
@@ -637,8 +641,8 @@ func (c *Controller) enqueueDeletePod(obj any) {
637641
return
638642
}
639643

640-
key := cache.MetaObjectToName(pod).String()
641-
c.deletePodQueue.Add(key)
644+
klog.V(3).Infof("enqueue delete pod %s", pod.Name)
645+
c.deletePodQueue.Add(&podEvent{oldObj: pod})
642646
}
643647

644648
func (c *Controller) runUpdatePodWorker() {
@@ -674,20 +678,20 @@ func (c *Controller) processNextUpdatePodWorkItem() bool {
674678
}
675679

676680
func (c *Controller) processNextDeletePodWorkItem() bool {
677-
key, shutdown := c.deletePodQueue.Get()
681+
event, shutdown := c.deletePodQueue.Get()
678682
if shutdown {
679683
return false
680684
}
681685

682-
err := func(key string) error {
683-
defer c.deletePodQueue.Done(key)
684-
if err := c.handleDeletePod(key); err != nil {
685-
c.deletePodQueue.AddRateLimited(key)
686-
return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
686+
err := func(event *podEvent) error {
687+
defer c.deletePodQueue.Done(event)
688+
if err := c.handleDeletePod(event); err != nil {
689+
c.deletePodQueue.AddRateLimited(event)
690+
return fmt.Errorf("error syncing pod event: %w, requeuing", err)
687691
}
688-
c.deletePodQueue.Forget(key)
692+
c.deletePodQueue.Forget(event)
689693
return nil
690-
}(key)
694+
}(event)
691695
if err != nil {
692696
utilruntime.HandleError(err)
693697
return true

pkg/daemon/controller_linux.go

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -971,27 +971,20 @@ func (c *Controller) handleUpdatePod(key string) error {
971971
return nil
972972
}
973973

974-
func (c *Controller) handleDeletePod(key string) error {
975-
namespace, name, err := cache.SplitMetaNamespaceKey(key)
976-
if err != nil {
977-
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
974+
func (c *Controller) handleDeletePod(event *podEvent) error {
975+
var pod *v1.Pod
976+
if event.oldObj != nil {
977+
pod = event.oldObj.(*v1.Pod)
978+
} else {
978979
return nil
979980
}
980981

981-
pod, err := c.podsLister.Pods(namespace).Get(name)
982-
if err != nil {
983-
if k8serrors.IsNotFound(err) {
984-
return nil
985-
}
986-
klog.Error(err)
987-
return err
988-
}
989-
990-
if _, ok := pod.Annotations[util.LogicalSwitchAnnotation]; !ok {
982+
logicalSwitch, ok := pod.Annotations[util.LogicalSwitchAnnotation]
983+
if !ok {
991984
return nil
992985
}
993986

994-
subnet, err := c.subnetsLister.Get(pod.Annotations[util.LogicalSwitchAnnotation])
987+
subnet, err := c.subnetsLister.Get(logicalSwitch)
995988
if err != nil {
996989
klog.Error(err)
997990
return err

pkg/daemon/controller_windows.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func routeDiff(existingRoutes, v4Cidrs, v6Cidrs []string) (toAddV4, toAddV6, toD
141141
return
142142
}
143143

144-
func (c *Controller) handleDeletePod(key string) error {
144+
func (c *Controller) handleDeletePod(event *podEvent) error {
145145
return nil
146146
}
147147

pkg/ovs/ovs-ofctl.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,18 @@ func AddOrUpdateU2OKeepSrcMac(client *ovs.Client, bridgeName, podIP, podMac, cha
214214
return err
215215
}
216216

217-
for _, existingFlow := range flows {
218-
if existingFlow.Priority == util.U2OKeepSrcMacPriority && existingFlow.InPort == localnetPatchPortID {
219-
klog.V(3).Infof("flow already exists in bridge %s for in_port=%d, ip_src=%s, dl_src=%s",
220-
bridgeName, localnetPatchPortID, podIP, chassisMac)
221-
return nil
217+
expectedAction := fmt.Sprintf("mod_dl_src:%s", podMac)
218+
for _, flow := range flows {
219+
if flow.Priority != util.U2OKeepSrcMacPriority || flow.InPort != localnetPatchPortID {
220+
continue
221+
}
222+
223+
for _, action := range flow.Actions {
224+
if actionText, _ := action.MarshalText(); string(actionText) == expectedAction {
225+
klog.V(3).Infof("flow already exists in bridge %s for in_port=%d, ip_src=%s, dl_src=%s, actions=mod_dl_src:%s",
226+
bridgeName, localnetPatchPortID, podIP, chassisMac, podMac)
227+
return nil
228+
}
222229
}
223230
}
224231

test/e2e/kube-ovn/underlay/underlay.go

Lines changed: 95 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1323,6 +1323,88 @@ var _ = framework.SerialDescribe("[group:underlay]", func() {
13231323
framework.ExpectNoError(err)
13241324
}
13251325
})
1326+
1327+
framework.ConformanceIt("should create and delete keepSrcMac OpenFlow rules when u2oInterconnection is enabled", func() {
1328+
f.SkipVersionPriorTo(1, 14, "keepSrcMac OpenFlow rules were introduced in v1.14")
1329+
1330+
ginkgo.By("Creating provider network " + providerNetworkName)
1331+
pn := makeProviderNetwork(providerNetworkName, false, linkMap)
1332+
_ = providerNetworkClient.CreateSync(pn)
1333+
1334+
ginkgo.By("Getting docker network " + dockerNetworkName)
1335+
network, err := docker.NetworkInspect(dockerNetworkName)
1336+
framework.ExpectNoError(err, "getting docker network "+dockerNetworkName)
1337+
1338+
ginkgo.By("Creating vlan " + vlanName)
1339+
vlan := framework.MakeVlan(vlanName, providerNetworkName, 0)
1340+
_ = vlanClient.Create(vlan)
1341+
1342+
ginkgo.By("Creating underlay subnet " + subnetName)
1343+
var cidrV4, cidrV6, gatewayV4, gatewayV6 string
1344+
for _, config := range dockerNetwork.IPAM.Config {
1345+
switch util.CheckProtocol(config.Subnet) {
1346+
case apiv1.ProtocolIPv4:
1347+
if f.HasIPv4() {
1348+
cidrV4 = config.Subnet
1349+
gatewayV4 = config.Gateway
1350+
}
1351+
case apiv1.ProtocolIPv6:
1352+
if f.HasIPv6() {
1353+
cidrV6 = config.Subnet
1354+
gatewayV6 = config.Gateway
1355+
}
1356+
}
1357+
}
1358+
underlayCidr := make([]string, 0, 2)
1359+
gateway := make([]string, 0, 2)
1360+
if f.HasIPv4() {
1361+
underlayCidr = append(underlayCidr, cidrV4)
1362+
gateway = append(gateway, gatewayV4)
1363+
}
1364+
if f.HasIPv6() {
1365+
underlayCidr = append(underlayCidr, cidrV6)
1366+
gateway = append(gateway, gatewayV6)
1367+
}
1368+
1369+
excludeIPs := make([]string, 0, len(network.Containers)*2)
1370+
for _, container := range network.Containers {
1371+
if container.IPv4Address != "" && f.HasIPv4() {
1372+
excludeIPs = append(excludeIPs, strings.Split(container.IPv4Address, "/")[0])
1373+
}
1374+
if container.IPv6Address != "" && f.HasIPv6() {
1375+
excludeIPs = append(excludeIPs, strings.Split(container.IPv6Address, "/")[0])
1376+
}
1377+
}
1378+
1379+
ginkgo.By("Creating underlay subnet with u2oInterconnection enabled " + subnetName)
1380+
subnet := framework.MakeSubnet(subnetName, vlanName, strings.Join(underlayCidr, ","), strings.Join(gateway, ","), "", "", excludeIPs, nil, []string{namespaceName})
1381+
subnet.Spec.U2OInterconnection = true
1382+
_ = subnetClient.CreateSync(subnet)
1383+
1384+
ginkgo.By("Waiting for U2OInterconnection status to be ready")
1385+
waitSubnetU2OStatus(f, subnetName, subnetClient, true)
1386+
1387+
ginkgo.By("Creating underlay pod " + u2oPodNameUnderlay)
1388+
annotations := map[string]string{
1389+
util.LogicalSwitchAnnotation: subnetName,
1390+
}
1391+
args := []string{"netexec", "--http-port", strconv.Itoa(curlListenPort)}
1392+
underlayPod := framework.MakePod(namespaceName, u2oPodNameUnderlay, nil, annotations, framework.AgnhostImage, nil, args)
1393+
underlayPod = podClient.CreateSync(underlayPod)
1394+
waitSubnetStatusUpdate(subnetName, subnetClient, 2)
1395+
1396+
ginkgo.By("Verifying keepSrcMac OpenFlow rules exist after pod creation")
1397+
checkKeepSrcMacFlow(underlayPod, providerNetworkName, true)
1398+
1399+
ginkgo.By("Deleting underlay pod " + u2oPodNameUnderlay)
1400+
podClient.DeleteSync(u2oPodNameUnderlay)
1401+
waitSubnetStatusUpdate(subnetName, subnetClient, 1)
1402+
1403+
ginkgo.By("Verifying keepSrcMac OpenFlow rules are deleted after pod deletion")
1404+
// Wait a bit for the flow rules to be cleaned up
1405+
time.Sleep(2 * time.Second)
1406+
checkKeepSrcMacFlow(underlayPod, providerNetworkName, false)
1407+
})
13261408
})
13271409

13281410
func checkU2OItems(f *framework.Framework, subnet *apiv1.Subnet, underlayPod, overlayPod *corev1.Pod, isU2OCustomVpc bool, pnName string) {
@@ -1503,25 +1585,26 @@ func checkReachable(podName, podNamespace, sourceIP, targetIP, targetPort string
15031585
func checkKeepSrcMacFlow(pod *corev1.Pod, providerNetworkName string, expectRules bool) {
15041586
ginkgo.GinkgoHelper()
15051587

1506-
cmd := fmt.Sprintf("kubectl exec -n %s %s -- ip -o link show eth0 | awk '{print $16}'", pod.Namespace, pod.Name)
1507-
output, err := exec.Command("bash", "-c", cmd).CombinedOutput()
1508-
if err != nil {
1509-
framework.Logf("Error getting MAC address: %v, %s", err, string(output))
1510-
return
1511-
}
1512-
podMac := strings.TrimSpace(string(output))
1513-
15141588
podNodeName := pod.Spec.NodeName
1515-
ginkgo.By(fmt.Sprintf("Checking keepSrcMac OpenFlow rule on node %s for Pod %s with MAC %s (expect rules: %v)",
1516-
podNodeName, pod.Name, podMac, expectRules))
1589+
framework.Logf("Checking keepSrcMac OpenFlow rule on node %s for Pod %s", podNodeName, pod.Name)
1590+
1591+
podMac := pod.Annotations[util.MacAddressAnnotation]
1592+
if podMac == "" {
1593+
if !expectRules {
1594+
return
1595+
}
1596+
}
15171597

15181598
var ruleFound bool
15191599
framework.WaitUntil(1*time.Second, 5*time.Second, func(_ context.Context) (bool, error) {
15201600
nodeCmd := fmt.Sprintf("kubectl ko ofctl %s dump-flows br-%s | grep actions=mod_dl_src:%s | wc -l",
15211601
podNodeName, providerNetworkName, podMac)
1522-
output, _ := exec.Command("bash", "-c", nodeCmd).CombinedOutput()
1523-
outputStr := string(output)
1602+
output, err := exec.Command("bash", "-c", nodeCmd).CombinedOutput()
1603+
if err != nil {
1604+
return false, nil
1605+
}
15241606

1607+
outputStr := string(output)
15251608
lines := strings.Split(outputStr, "\n")
15261609
var countStr string
15271610
for i := len(lines) - 1; i >= 0; i-- {
@@ -1538,18 +1621,12 @@ func checkKeepSrcMacFlow(pod *corev1.Pod, providerNetworkName string, expectRule
15381621
countNum, _ = strconv.Atoi(matches[0])
15391622
}
15401623

1541-
framework.Logf("Raw output: '%s', extracted count: %d", outputStr, countNum)
15421624
ruleFound = countNum > 0
15431625

15441626
if (expectRules && ruleFound) || (!expectRules && !ruleFound) {
15451627
return true, nil
15461628
}
15471629

1548-
if expectRules {
1549-
framework.Logf("keepSrcMac flow rule not found but expected, retrying...")
1550-
} else {
1551-
framework.Logf("keepSrcMac flow rule found but not expected, retrying...")
1552-
}
15531630
return false, nil
15541631
}, "")
15551632

0 commit comments

Comments
 (0)