Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 44 additions & 92 deletions pkg/controller/vpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,21 +468,29 @@ func (c *Controller) handleAddOrUpdateVpc(key string) error {
// Add static routes created by addCustomVPCStaticRouteForSubnet
for _, subnet := range subnets {
if subnet.Spec.Vpc == key {
staticTargetRoutes = append(staticTargetRoutes, &kubeovnv1.StaticRoute{
Policy: kubeovnv1.PolicySrc,
CIDR: subnet.Spec.CIDRBlock,
NextHopIP: subnet.Spec.Gateway,
RouteTable: subnet.Spec.RouteTable,
})
v4Gw, v6Gw := util.SplitStringIP(subnet.Spec.Gateway)
v4Cidr, v6Cidr := util.SplitStringIP(subnet.Spec.CIDRBlock)
if v4Gw != "" && v4Cidr != "" {
staticTargetRoutes = append(staticTargetRoutes, &kubeovnv1.StaticRoute{
Policy: kubeovnv1.PolicySrc,
CIDR: v4Cidr,
NextHopIP: v4Gw,
RouteTable: subnet.Spec.RouteTable,
})
}
if v6Gw != "" && v6Cidr != "" {
staticTargetRoutes = append(staticTargetRoutes, &kubeovnv1.StaticRoute{
Policy: kubeovnv1.PolicySrc,
CIDR: v6Cidr,
NextHopIP: v6Gw,
RouteTable: subnet.Spec.RouteTable,
})
}
}
}
}

routeNeedDel, routeNeedAdd, err := diffStaticRoute(staticExistedRoutes, staticTargetRoutes)
if err != nil {
klog.Errorf("failed to diff vpc %s static route, %v", vpc.Name, err)
return err
}
routeNeedDel, routeNeedAdd := diffStaticRoute(staticExistedRoutes, staticTargetRoutes)

for _, item := range routeNeedDel {
klog.Infof("vpc %s del static route: %+v", vpc.Name, item)
Expand Down Expand Up @@ -608,7 +616,6 @@ func (c *Controller) handleAddOrUpdateVpc(key string) error {
if subnet.Status.IsNotReady() {
c.addOrUpdateSubnetQueue.Add(subnet.Name)
}
Comment on lines 616 to 618
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This change alters the logic for enqueuing subnets for reconciliation. Previously, all subnets of an updated VPC were enqueued. Now, only subnets not in a 'Ready' state are. This could prevent necessary updates on 'Ready' subnets when their parent VPC's configuration changes (e.g., spec.namespaces), potentially leading to inconsistencies.

The original code had a redundant if block. To correctly remove the redundancy while preserving the intended logic of reconciling all affected subnets, the unconditional enqueue should be kept.

c.addOrUpdateSubnetQueue.Add(subnet.Name)

c.addOrUpdateSubnetQueue.Add(subnet.Name)
if vpc.Name != util.DefaultVpc && vpc.Spec.EnableBfd && subnet.Spec.EnableEcmp {
custVpcEnableExternalEcmp = true
}
Expand Down Expand Up @@ -760,7 +767,7 @@ func (c *Controller) handleUpdateVpcExternal(vpc *kubeovnv1.Vpc, custVpcEnableEx
}
}

if err := c.updateVpcExternalStatus(vpc.Name, vpc.Spec.EnableExternal); err != nil {
if err := c.updateVpcExternalStatus(vpc.Name); err != nil {
klog.Errorf("failed to update vpc external subnets status, %v", err)
return err
}
Expand Down Expand Up @@ -810,15 +817,15 @@ func (c *Controller) reconcileVpcBfdLRP(vpc *kubeovnv1.Vpc) (string, []string, e
nodeNames := make([]string, 0, len(nodes))
chassisCount = min(chassisCount, len(nodes))
chassisNames := make([]string, 0, chassisCount)
for _, nodes := range nodes[:chassisCount] {
chassis, err := c.OVNSbClient.GetChassisByHost(nodes.Name)
for _, node := range nodes[:chassisCount] {
chassis, err := c.OVNSbClient.GetChassisByHost(node.Name)
if err != nil {
err = fmt.Errorf("failed to get chassis of node %s: %w", nodes.Name, err)
err = fmt.Errorf("failed to get chassis of node %s: %w", node.Name, err)
klog.Error(err)
return portName, nil, err
}
chassisNames = append(chassisNames, chassis.Name)
nodeNames = append(nodeNames, nodes.Name)
nodeNames = append(nodeNames, node.Name)
}

networks := strings.Split(vpc.Spec.BFDPort.IP, ",")
Expand Down Expand Up @@ -896,40 +903,14 @@ func (c *Controller) batchAddPolicyRouteToVpc(name string, policies []*kubeovnv1
}

func (c *Controller) deletePolicyRouteFromVpc(vpcName string, priority int, match string) error {
var (
vpc, cachedVpc *kubeovnv1.Vpc
err error
)

if err = c.OVNNbClient.DeleteLogicalRouterPolicy(vpcName, priority, match); err != nil {
klog.Error(err)
return err
}

cachedVpc, err = c.vpcsLister.Get(vpcName)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Error(err)
return err
}
vpc = cachedVpc.DeepCopy()
// make sure custom policies not be deleted
_, err = c.config.KubeOvnClient.KubeovnV1().Vpcs().Update(context.Background(), vpc, metav1.UpdateOptions{})
if err != nil {
if err := c.OVNNbClient.DeleteLogicalRouterPolicy(vpcName, priority, match); err != nil {
klog.Error(err)
return err
}
return nil
}

func (c *Controller) batchDeletePolicyRouteFromVpc(name string, policies []*kubeovnv1.PolicyRoute) error {
var (
vpc, cachedVpc *kubeovnv1.Vpc
err error
)

start := time.Now()
routerPolicies := make([]*ovnnb.LogicalRouterPolicy, 0, len(policies))
for _, policy := range policies {
Expand All @@ -939,26 +920,10 @@ func (c *Controller) batchDeletePolicyRouteFromVpc(name string, policies []*kube
})
}

if err = c.OVNNbClient.BatchDeleteLogicalRouterPolicy(name, routerPolicies); err != nil {
if err := c.OVNNbClient.BatchDeleteLogicalRouterPolicy(name, routerPolicies); err != nil {
return err
}
klog.V(3).Infof("take to %v batch delete policy route from vpc %s policies %d", time.Since(start), name, len(policies))

cachedVpc, err = c.vpcsLister.Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Error(err)
return err
}
vpc = cachedVpc.DeepCopy()
// make sure custom policies not be deleted
_, err = c.config.KubeOvnClient.KubeovnV1().Vpcs().Update(context.Background(), vpc, metav1.UpdateOptions{})
if err != nil {
klog.Error(err)
return err
}
return nil
}

Expand Down Expand Up @@ -1000,10 +965,6 @@ func (c *Controller) deleteStaticRouteFromVpc(name, table, cidr, nextHop string,
}

func (c *Controller) batchDeleteStaticRouteFromVpc(name string, staticRoutes []*kubeovnv1.StaticRoute) error {
var (
vpc, cachedVpc *kubeovnv1.Vpc
err error
)
start := time.Now()
routeCount := len(staticRoutes)
delRoutes := make([]*ovnnb.LogicalRouterStaticRoute, 0, routeCount)
Expand All @@ -1017,27 +978,11 @@ func (c *Controller) batchDeleteStaticRouteFromVpc(name string, staticRoutes []*
}
delRoutes = append(delRoutes, newRoute)
}
if err = c.OVNNbClient.BatchDeleteLogicalRouterStaticRoute(name, delRoutes); err != nil {
if err := c.OVNNbClient.BatchDeleteLogicalRouterStaticRoute(name, delRoutes); err != nil {
klog.Errorf("batch del vpc %s static route %d failed, %v", name, routeCount, err)
return err
}
klog.V(3).Infof("take to %v batch delete static route from vpc %s static routes %d", time.Since(start), name, len(delRoutes))

cachedVpc, err = c.vpcsLister.Get(name)
if err != nil {
if k8serrors.IsNotFound(err) {
return nil
}
klog.Error(err)
return err
}
vpc = cachedVpc.DeepCopy()
// make sure custom policies not be deleted
_, err = c.config.KubeOvnClient.KubeovnV1().Vpcs().Update(context.Background(), vpc, metav1.UpdateOptions{})
if err != nil {
klog.Error(err)
return err
}
return nil
}

Expand Down Expand Up @@ -1077,11 +1022,13 @@ func diffPolicyRouteWithLogical(exists []*ovnnb.LogicalRouterPolicy, target []*k
key string
ok bool
)
klog.Infof("diffPolicyRouteWithLogical exists: %v, target: %v", exists, target)
existsMap = make(map[string]*kubeovnv1.PolicyRoute, len(exists))

for _, item := range exists {
if item.ExternalIDs["vpc-egress-gateway"] != "" ||
item.ExternalIDs["isU2ORoutePolicy"] != "true" {
klog.Infof("diffPolicyRouteWithLogical item: %+v", item)
if item.ExternalIDs["vpc-egress-gateway"] != "" || item.ExternalIDs["subnet"] != "" ||
item.ExternalIDs["isU2ORoutePolicy"] == "true" {
continue
}
policy := &kubeovnv1.PolicyRoute{
Expand All @@ -1091,6 +1038,7 @@ func diffPolicyRouteWithLogical(exists []*ovnnb.LogicalRouterPolicy, target []*k
}
existsMap[getPolicyRouteItemKey(policy)] = policy
}
klog.Infof("diffPolicyRouteWithLogical existsMap: %v", existsMap)

for _, item := range target {
key = getPolicyRouteItemKey(item)
Expand All @@ -1102,17 +1050,20 @@ func diffPolicyRouteWithLogical(exists []*ovnnb.LogicalRouterPolicy, target []*k
}
}

klog.Infof("diffPolicyRouteWithLogical existsMap after delete: %v", existsMap)

for _, item := range existsMap {
dels = append(dels, item)
}
klog.Infof("diffPolicyRouteWithLogical dels: %v, adds: %v", dels, adds)
return dels, adds
}

func getPolicyRouteItemKey(item *kubeovnv1.PolicyRoute) (key string) {
return fmt.Sprintf("%d:%s:%s:%s", item.Priority, item.Match, item.Action, item.NextHopIP)
}

func diffStaticRoute(exist []*ovnnb.LogicalRouterStaticRoute, target []*kubeovnv1.StaticRoute) (routeNeedDel, routeNeedAdd []*kubeovnv1.StaticRoute, err error) {
func diffStaticRoute(exist []*ovnnb.LogicalRouterStaticRoute, target []*kubeovnv1.StaticRoute) (routeNeedDel, routeNeedAdd []*kubeovnv1.StaticRoute) {
existRouteMap := make(map[string]*kubeovnv1.StaticRoute, len(exist))
for _, item := range exist {
policy := kubeovnv1.PolicyDst
Expand Down Expand Up @@ -1143,7 +1094,7 @@ func diffStaticRoute(exist []*ovnnb.LogicalRouterStaticRoute, target []*kubeovnv
for _, item := range existRouteMap {
routeNeedDel = append(routeNeedDel, item)
}
return routeNeedDel, routeNeedAdd, err
return routeNeedDel, routeNeedAdd
}

func getStaticRouteItemKey(item *kubeovnv1.StaticRoute) string {
Expand Down Expand Up @@ -1423,14 +1374,15 @@ func (c *Controller) handleDeleteVpcStaticRoute(key string) error {
needUpdate := false
newStaticRoutes := make([]*kubeovnv1.StaticRoute, 0, len(vpc.Spec.StaticRoutes))
for _, route := range vpc.Spec.StaticRoutes {
if route.ECMPMode != util.StaticRouteBfdEcmp {
newStaticRoutes = append(newStaticRoutes, route)
if route.ECMPMode == util.StaticRouteBfdEcmp {
needUpdate = true
continue
}
newStaticRoutes = append(newStaticRoutes, route)
}
// keep non ecmp bfd routes
vpc.Spec.StaticRoutes = newStaticRoutes
// keep routes except bfd ecmp routes
if needUpdate {
vpc.Spec.StaticRoutes = newStaticRoutes
if _, err = c.config.KubeOvnClient.KubeovnV1().Vpcs().Update(context.Background(), vpc, metav1.UpdateOptions{}); err != nil {
klog.Errorf("failed to update vpc spec static route %s, %v", vpc.Name, err)
return err
Expand Down Expand Up @@ -1500,7 +1452,7 @@ func (c *Controller) getRouteTablesByVpc(vpc *kubeovnv1.Vpc) map[string][]*kubeo
return rtbs
}

func (c *Controller) updateVpcExternalStatus(key string, enableExternal bool) error {
func (c *Controller) updateVpcExternalStatus(key string) error {
cachedVpc, err := c.vpcsLister.Get(key)
if err != nil {
klog.Errorf("failed to get vpc %s, %v", key, err)
Expand All @@ -1510,7 +1462,7 @@ func (c *Controller) updateVpcExternalStatus(key string, enableExternal bool) er
vpc.Status.EnableExternal = vpc.Spec.EnableExternal
vpc.Status.EnableBfd = vpc.Spec.EnableBfd

if enableExternal {
if vpc.Spec.EnableExternal {
sort.Strings(vpc.Spec.ExtraExternalSubnets)
vpc.Status.ExtraExternalSubnets = vpc.Spec.ExtraExternalSubnets
} else {
Expand Down