Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions pkg/daemon/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Controller struct {
podsLister listerv1.PodLister
podsSynced cache.InformerSynced
updatePodQueue workqueue.TypedRateLimitingInterface[string]
deletePodQueue workqueue.TypedRateLimitingInterface[string]
deletePodQueue workqueue.TypedRateLimitingInterface[*podEvent]

nodesLister listerv1.NodeLister
nodesSynced cache.InformerSynced
Expand Down Expand Up @@ -127,7 +127,7 @@ func NewController(config *Configuration,
podsLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
updatePodQueue: newTypedRateLimitingQueue[string]("UpdatePod", nil),
deletePodQueue: newTypedRateLimitingQueue[string]("DeletePod", nil),
deletePodQueue: newTypedRateLimitingQueue[*podEvent]("DeletePod", nil),

nodesLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
Expand Down Expand Up @@ -505,6 +505,10 @@ type serviceEvent struct {
oldObj, newObj any
}

type podEvent struct {
oldObj any
}

func (c *Controller) enqueueAddSubnet(obj any) {
c.subnetQueue.Add(&subnetEvent{newObj: obj})
}
Expand Down Expand Up @@ -637,8 +641,8 @@ func (c *Controller) enqueueDeletePod(obj any) {
return
}

key := cache.MetaObjectToName(pod).String()
c.deletePodQueue.Add(key)
klog.V(3).Infof("enqueue delete pod %s", pod.Name)
c.deletePodQueue.Add(&podEvent{oldObj: pod})
}

func (c *Controller) runUpdatePodWorker() {
Expand Down Expand Up @@ -674,20 +678,20 @@ func (c *Controller) processNextUpdatePodWorkItem() bool {
}

func (c *Controller) processNextDeletePodWorkItem() bool {
key, shutdown := c.deletePodQueue.Get()
event, shutdown := c.deletePodQueue.Get()
if shutdown {
return false
}

err := func(key string) error {
defer c.deletePodQueue.Done(key)
if err := c.handleDeletePod(key); err != nil {
c.deletePodQueue.AddRateLimited(key)
return fmt.Errorf("error syncing %q: %w, requeuing", key, err)
err := func(event *podEvent) error {
defer c.deletePodQueue.Done(event)
if err := c.handleDeletePod(event); err != nil {
c.deletePodQueue.AddRateLimited(event)
return fmt.Errorf("error syncing pod event: %w, requeuing", err)
}
c.deletePodQueue.Forget(key)
c.deletePodQueue.Forget(event)
return nil
}(key)
}(event)
if err != nil {
utilruntime.HandleError(err)
return true
Expand Down
23 changes: 8 additions & 15 deletions pkg/daemon/controller_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,27 +971,20 @@ func (c *Controller) handleUpdatePod(key string) error {
return nil
}

func (c *Controller) handleDeletePod(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
func (c *Controller) handleDeletePod(event *podEvent) error {
var pod *v1.Pod
if event.oldObj != nil {
pod = event.oldObj.(*v1.Pod)
} else {
return nil
}

pod, err := c.podsLister.Pods(namespace).Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Error(err)
return err
}

if _, ok := pod.Annotations[util.LogicalSwitchAnnotation]; !ok {
logicalSwitch, ok := pod.Annotations[util.LogicalSwitchAnnotation]
if !ok {
return nil
}

subnet, err := c.subnetsLister.Get(pod.Annotations[util.LogicalSwitchAnnotation])
subnet, err := c.subnetsLister.Get(logicalSwitch)
if err != nil {
klog.Error(err)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/daemon/controller_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func routeDiff(existingRoutes, v4Cidrs, v6Cidrs []string) (toAddV4, toAddV6, toD
return
}

func (c *Controller) handleDeletePod(key string) error {
func (c *Controller) handleDeletePod(event *podEvent) error {
return nil
}

Expand Down
17 changes: 12 additions & 5 deletions pkg/ovs/ovs-ofctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,18 @@ func AddOrUpdateU2OKeepSrcMac(client *ovs.Client, bridgeName, podIP, podMac, cha
return err
}

for _, existingFlow := range flows {
if existingFlow.Priority == util.U2OKeepSrcMacPriority && existingFlow.InPort == localnetPatchPortID {
klog.V(3).Infof("flow already exists in bridge %s for in_port=%d, ip_src=%s, dl_src=%s",
bridgeName, localnetPatchPortID, podIP, chassisMac)
return nil
expectedAction := fmt.Sprintf("mod_dl_src:%s", podMac)
for _, flow := range flows {
if flow.Priority != util.U2OKeepSrcMacPriority || flow.InPort != localnetPatchPortID {
continue
}

for _, action := range flow.Actions {
if actionText, _ := action.MarshalText(); string(actionText) == expectedAction {
klog.V(3).Infof("flow already exists in bridge %s for in_port=%d, ip_src=%s, dl_src=%s, actions=mod_dl_src:%s",
bridgeName, localnetPatchPortID, podIP, chassisMac, podMac)
return nil
}
Comment on lines +224 to +228
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error returned by action.MarshalText() is being ignored. While errors might be rare for standard OpenFlow actions, it's safer to handle them to prevent unexpected behavior. Please consider checking the error and logging it if it's not nil.

actionText, err := action.MarshalText()
if err != nil {
	klog.Warningf("failed to marshal action from flow: %v", err)
	continue
}
if string(actionText) == expectedAction {
	klog.V(3).Infof("flow already exists in bridge %s for in_port=%d, ip_src=%s, dl_src=%s, actions=mod_dl_src:%s",
		bridgeName, localnetPatchPortID, podIP, chassisMac, podMac)
	return nil
}

}
}

Expand Down
113 changes: 95 additions & 18 deletions test/e2e/kube-ovn/underlay/underlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,88 @@ var _ = framework.SerialDescribe("[group:underlay]", func() {
framework.ExpectNoError(err)
}
})

framework.ConformanceIt("should create and delete keepSrcMac OpenFlow rules when u2oInterconnection is enabled", func() {
f.SkipVersionPriorTo(1, 14, "keepSrcMac OpenFlow rules were introduced in v1.14")

ginkgo.By("Creating provider network " + providerNetworkName)
pn := makeProviderNetwork(providerNetworkName, false, linkMap)
_ = providerNetworkClient.CreateSync(pn)

ginkgo.By("Getting docker network " + dockerNetworkName)
network, err := docker.NetworkInspect(dockerNetworkName)
framework.ExpectNoError(err, "getting docker network "+dockerNetworkName)

ginkgo.By("Creating vlan " + vlanName)
vlan := framework.MakeVlan(vlanName, providerNetworkName, 0)
_ = vlanClient.Create(vlan)

ginkgo.By("Creating underlay subnet " + subnetName)
var cidrV4, cidrV6, gatewayV4, gatewayV6 string
for _, config := range dockerNetwork.IPAM.Config {
switch util.CheckProtocol(config.Subnet) {
case apiv1.ProtocolIPv4:
if f.HasIPv4() {
cidrV4 = config.Subnet
gatewayV4 = config.Gateway
}
case apiv1.ProtocolIPv6:
if f.HasIPv6() {
cidrV6 = config.Subnet
gatewayV6 = config.Gateway
}
}
}
underlayCidr := make([]string, 0, 2)
gateway := make([]string, 0, 2)
if f.HasIPv4() {
underlayCidr = append(underlayCidr, cidrV4)
gateway = append(gateway, gatewayV4)
}
if f.HasIPv6() {
underlayCidr = append(underlayCidr, cidrV6)
gateway = append(gateway, gatewayV6)
}

excludeIPs := make([]string, 0, len(network.Containers)*2)
for _, container := range network.Containers {
if container.IPv4Address != "" && f.HasIPv4() {
excludeIPs = append(excludeIPs, strings.Split(container.IPv4Address, "/")[0])
}
if container.IPv6Address != "" && f.HasIPv6() {
excludeIPs = append(excludeIPs, strings.Split(container.IPv6Address, "/")[0])
}
}

ginkgo.By("Creating underlay subnet with u2oInterconnection enabled " + subnetName)
subnet := framework.MakeSubnet(subnetName, vlanName, strings.Join(underlayCidr, ","), strings.Join(gateway, ","), "", "", excludeIPs, nil, []string{namespaceName})
subnet.Spec.U2OInterconnection = true
_ = subnetClient.CreateSync(subnet)

ginkgo.By("Waiting for U2OInterconnection status to be ready")
waitSubnetU2OStatus(f, subnetName, subnetClient, true)

ginkgo.By("Creating underlay pod " + u2oPodNameUnderlay)
annotations := map[string]string{
util.LogicalSwitchAnnotation: subnetName,
}
args := []string{"netexec", "--http-port", strconv.Itoa(curlListenPort)}
underlayPod := framework.MakePod(namespaceName, u2oPodNameUnderlay, nil, annotations, framework.AgnhostImage, nil, args)
underlayPod = podClient.CreateSync(underlayPod)
waitSubnetStatusUpdate(subnetName, subnetClient, 2)

ginkgo.By("Verifying keepSrcMac OpenFlow rules exist after pod creation")
checkKeepSrcMacFlow(underlayPod, providerNetworkName, true)

ginkgo.By("Deleting underlay pod " + u2oPodNameUnderlay)
podClient.DeleteSync(u2oPodNameUnderlay)
waitSubnetStatusUpdate(subnetName, subnetClient, 1)

ginkgo.By("Verifying keepSrcMac OpenFlow rules are deleted after pod deletion")
// Wait a bit for the flow rules to be cleaned up
time.Sleep(2 * time.Second)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This time.Sleep is unnecessary. The subsequent call to checkKeepSrcMacFlow already uses framework.WaitUntil to poll for the expected state of the OpenFlow rules, making this explicit sleep redundant. Removing it will make the test slightly faster and more robust against timing variations.

checkKeepSrcMacFlow(underlayPod, providerNetworkName, false)
})
})

func checkU2OItems(f *framework.Framework, subnet *apiv1.Subnet, underlayPod, overlayPod *corev1.Pod, isU2OCustomVpc bool, pnName string) {
Expand Down Expand Up @@ -1503,25 +1585,26 @@ func checkReachable(podName, podNamespace, sourceIP, targetIP, targetPort string
func checkKeepSrcMacFlow(pod *corev1.Pod, providerNetworkName string, expectRules bool) {
ginkgo.GinkgoHelper()

cmd := fmt.Sprintf("kubectl exec -n %s %s -- ip -o link show eth0 | awk '{print $16}'", pod.Namespace, pod.Name)
output, err := exec.Command("bash", "-c", cmd).CombinedOutput()
if err != nil {
framework.Logf("Error getting MAC address: %v, %s", err, string(output))
return
}
podMac := strings.TrimSpace(string(output))

podNodeName := pod.Spec.NodeName
ginkgo.By(fmt.Sprintf("Checking keepSrcMac OpenFlow rule on node %s for Pod %s with MAC %s (expect rules: %v)",
podNodeName, pod.Name, podMac, expectRules))
framework.Logf("Checking keepSrcMac OpenFlow rule on node %s for Pod %s", podNodeName, pod.Name)

podMac := pod.Annotations[util.MacAddressAnnotation]
if podMac == "" {
if !expectRules {
return
}
}

var ruleFound bool
framework.WaitUntil(1*time.Second, 5*time.Second, func(_ context.Context) (bool, error) {
nodeCmd := fmt.Sprintf("kubectl ko ofctl %s dump-flows br-%s | grep actions=mod_dl_src:%s | wc -l",
podNodeName, providerNetworkName, podMac)
output, _ := exec.Command("bash", "-c", nodeCmd).CombinedOutput()
outputStr := string(output)
output, err := exec.Command("bash", "-c", nodeCmd).CombinedOutput()
if err != nil {
return false, nil
}

outputStr := string(output)
lines := strings.Split(outputStr, "\n")
var countStr string
for i := len(lines) - 1; i >= 0; i-- {
Expand All @@ -1538,18 +1621,12 @@ func checkKeepSrcMacFlow(pod *corev1.Pod, providerNetworkName string, expectRule
countNum, _ = strconv.Atoi(matches[0])
}

framework.Logf("Raw output: '%s', extracted count: %d", outputStr, countNum)
ruleFound = countNum > 0

if (expectRules && ruleFound) || (!expectRules && !ruleFound) {
return true, nil
}

if expectRules {
framework.Logf("keepSrcMac flow rule not found but expected, retrying...")
} else {
framework.Logf("keepSrcMac flow rule found but not expected, retrying...")
}
return false, nil
}, "")

Expand Down