Skip to content

Commit 224d30c

Browse files
chore(vpc-nat-gw): refactor before adding HA gw (#5197)
Signed-off-by: SkalaNetworks <contact@skala.network>
1 parent 5645113 commit 224d30c

File tree

6 files changed

+533
-172
lines changed

6 files changed

+533
-172
lines changed

pkg/controller/gc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func (c *Controller) gcVpcNatGateway() error {
102102
return err
103103
}
104104
}
105-
gwStsNames = append(gwStsNames, util.GenNatGwStsName(gw.Name))
105+
gwStsNames = append(gwStsNames, util.GenNatGwName(gw.Name))
106106
}
107107

108108
sel, _ := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{util.VpcNatGatewayLabel: "true"}})

pkg/controller/vpc_nat_gateway.go

Lines changed: 51 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func (c *Controller) handleDelVpcNatGw(key string) error {
116116
c.vpcNatGwKeyMutex.LockKey(key)
117117
defer func() { _ = c.vpcNatGwKeyMutex.UnlockKey(key) }()
118118

119-
name := util.GenNatGwStsName(key)
119+
name := util.GenNatGwName(key)
120120
klog.Infof("delete vpc nat gw %s", name)
121121
if err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace).Delete(context.Background(),
122122
name, metav1.DeleteOptions{}); err != nil {
@@ -195,7 +195,7 @@ func (c *Controller) handleAddOrUpdateVpcNatGw(key string) error {
195195
needToCreate := false
196196
needToUpdate := false
197197
oldSts, err := c.config.KubeClient.AppsV1().StatefulSets(c.config.PodNamespace).
198-
Get(context.Background(), util.GenNatGwStsName(gw.Name), metav1.GetOptions{})
198+
Get(context.Background(), util.GenNatGwName(gw.Name), metav1.GetOptions{})
199199
if err != nil {
200200
if !k8serrors.IsNotFound(err) {
201201
klog.Error(err)
@@ -680,13 +680,15 @@ func (c *Controller) setNatGwAPIAccess(annotations map[string]string) error {
680680
if externalNetworkAttachment, ok := annotations[nadv1.NetworkAttachmentAnnot]; ok {
681681
networkAttachments = append([]string{externalNetworkAttachment}, networkAttachments...)
682682
}
683+
683684
// Attach the NADs to the Pod by adding them to the special annotation
684685
annotations[nadv1.NetworkAttachmentAnnot] = strings.Join(networkAttachments, ",")
685686

686687
// Set the network route to the API, so we can reach it
687688
return c.setNatGwAPIRoute(annotations, namespace, name)
688689
}
689690

691+
// setNatGwAPIRoute adds routes to a pod to reach the K8S API server
690692
func (c *Controller) setNatGwAPIRoute(annotations map[string]string, nadNamespace, nadName string) error {
691693
dst := os.Getenv("KUBERNETES_SERVICE_HOST")
692694

@@ -734,21 +736,11 @@ func (c *Controller) genNatGwStatefulSet(gw *kubeovnv1.VpcNatGateway, oldSts *v1
734736
if oldSts != nil && len(oldSts.Annotations) != 0 {
735737
annotations = maps.Clone(oldSts.Annotations)
736738
}
737-
externalNadNamespace := c.config.PodNamespace
738-
externalNadName := util.GetNatGwExternalNetwork(gw.Spec.ExternalSubnets)
739-
if externalSubnet, err := c.subnetsLister.Get(externalNadName); err == nil {
740-
if name, namespace, ok := util.GetNadBySubnetProvider(externalSubnet.Spec.Provider); ok {
741-
externalNadName = name
742-
externalNadNamespace = namespace
743-
}
744-
}
745-
podAnnotations := map[string]string{
746-
util.VpcNatGatewayAnnotation: gw.Name,
747-
nadv1.NetworkAttachmentAnnot: fmt.Sprintf("%s/%s", externalNadNamespace, externalNadName),
748-
util.LogicalSwitchAnnotation: gw.Spec.Subnet,
749-
util.IPAddressAnnotation: gw.Spec.LanIP,
750-
}
751739

740+
externalNadNamespace, externalNadName := c.getExternalSubnetNad(gw)
741+
podAnnotations := util.GenNatGwPodAnnotations(gw, externalNadNamespace, externalNadName)
742+
743+
// Restart logic to fix #5072
752744
if oldSts != nil && len(oldSts.Spec.Template.Annotations) != 0 {
753745
if _, ok := oldSts.Spec.Template.Annotations[util.VpcNatGatewayContainerRestartAnnotation]; !ok && natGwPodContainerRestartCount > 0 {
754746
podAnnotations[util.VpcNatGatewayContainerRestartAnnotation] = ""
@@ -759,22 +751,28 @@ func (c *Controller) genNatGwStatefulSet(gw *kubeovnv1.VpcNatGateway, oldSts *v1
759751
// Add an interface that can reach the API server, we need access to it to probe Kube-OVN resources
760752
if gw.Spec.BgpSpeaker.Enabled {
761753
if err := c.setNatGwAPIAccess(podAnnotations); err != nil {
762-
klog.Error(err)
754+
klog.Errorf("couldn't add an API interface to the NAT gateway: %v", err)
763755
return nil, err
764756
}
765757
}
766758

767759
maps.Copy(annotations, podAnnotations)
768760

761+
// Retrieve all subnets in existence
769762
subnets, err := c.subnetsLister.List(labels.Everything())
770763
if err != nil {
771764
klog.Errorf("failed to list subnets: %v", err)
772765
return nil, err
773766
}
767+
768+
// Retrieve the gateways of the subnet sitting behind the NAT gateway
774769
v4Gateway, v6Gateway, err := c.GetGwBySubnet(gw.Spec.Subnet)
775770
if err != nil {
776771
klog.Errorf("failed to get gateway ips for subnet %s: %v", gw.Spec.Subnet, err)
772+
return nil, err
777773
}
774+
775+
// Add routes to join the services (is this still needed?)
778776
v4ClusterIPRange, v6ClusterIPRange := util.SplitStringIP(c.config.ServiceClusterIPRange)
779777
routes := make([]request.Route, 0, 2)
780778
if v4Gateway != "" && v4ClusterIPRange != "" {
@@ -783,6 +781,11 @@ func (c *Controller) genNatGwStatefulSet(gw *kubeovnv1.VpcNatGateway, oldSts *v1
783781
if v6Gateway != "" && v6ClusterIPRange != "" {
784782
routes = append(routes, request.Route{Destination: v6ClusterIPRange, Gateway: v6Gateway})
785783
}
784+
785+
// Add gateway to join every subnet in the same VPC? (is this still needed?)
786+
// Are we trying to give the NAT gateway access to every subnet in the VPC?
787+
// I suspect this is to solve a problem where a static route is inserted to redirect all the traffic
788+
// from a VPC into the NAT GW. When that happens, the GW has no return path to the other subnets.
786789
for _, subnet := range subnets {
787790
if subnet.Spec.Vpc != gw.Spec.Vpc || subnet.Name == gw.Spec.Subnet ||
788791
!isOvnSubnet(subnet) || !subnet.Status.IsValidated() ||
@@ -798,16 +801,19 @@ func (c *Controller) genNatGwStatefulSet(gw *kubeovnv1.VpcNatGateway, oldSts *v1
798801
}
799802
}
800803

804+
// Use this function is nat gw set route?
801805
if err = setPodRoutesAnnotation(annotations, util.OvnProvider, routes); err != nil {
802806
klog.Error(err)
803807
return nil, err
804808
}
805809

806-
subnet, err := c.findSubnetByNetworkAttachmentDefinition(externalNadNamespace, externalNadName, subnets)
810+
// Set the default routes to the external network
811+
subnet, err := c.subnetsLister.Get(util.GetNatGwExternalNetwork(gw.Spec.ExternalSubnets))
807812
if err != nil {
808813
klog.Error(err)
809814
return nil, err
810815
}
816+
811817
routes = routes[0:0]
812818
v4Gateway, v6Gateway = util.SplitStringIP(subnet.Spec.Gateway)
813819
if v4Gateway != "" {
@@ -821,25 +827,14 @@ func (c *Controller) genNatGwStatefulSet(gw *kubeovnv1.VpcNatGateway, oldSts *v1
821827
return nil, err
822828
}
823829

824-
selectors := make(map[string]string, len(gw.Spec.Selector))
825-
for _, v := range gw.Spec.Selector {
826-
parts := strings.Split(strings.TrimSpace(v), ":")
827-
if len(parts) != 2 {
828-
continue
829-
}
830-
selectors[strings.TrimSpace(parts[0])] = strings.TrimSpace(parts[1])
831-
}
830+
selectors := util.GenNatGwSelectors(gw.Spec.Selector)
832831
klog.V(3).Infof("prepare for vpc nat gateway pod, node selector: %v", selectors)
833832

834-
name := util.GenNatGwStsName(gw.Name)
835-
labels := map[string]string{
836-
"app": name,
837-
util.VpcNatGatewayLabel: "true",
838-
}
833+
labels := util.GenNatGwLabels(gw.Name)
839834

840835
sts := &v1.StatefulSet{
841836
ObjectMeta: metav1.ObjectMeta{
842-
Name: name,
837+
Name: util.GenNatGwName(gw.Name),
843838
Labels: labels,
844839
},
845840
Spec: v1.StatefulSetSpec{
@@ -877,108 +872,38 @@ func (c *Controller) genNatGwStatefulSet(gw *kubeovnv1.VpcNatGateway, oldSts *v1
877872
},
878873
}
879874

880-
// BGP speaker for GWs must be enabled globally and for this specific instance
875+
// BGP speaker is enabled on this instance, add a BGP speaker to the statefulset
881876
if gw.Spec.BgpSpeaker.Enabled {
882-
containers := sts.Spec.Template.Spec.Containers
883-
884-
// We need a speaker image configured in the NAT GW ConfigMap
885-
if vpcNatGwBgpSpeakerImage == "" {
886-
return nil, fmt.Errorf("%s should have bgp speaker image field if bgp enabled", util.VpcNatConfig)
887-
}
888-
889-
args := []string{
890-
"--nat-gw-mode", // Force to run in NAT GW mode, we're not announcing Pod IPs or Services, only EIPs
891-
}
892-
893-
speakerParams := gw.Spec.BgpSpeaker
894-
895-
if speakerParams.RouterID != "" { // Override default auto-selected RouterID
896-
args = append(args, "--router-id="+speakerParams.RouterID)
897-
}
898-
899-
if speakerParams.Password != "" { // Password for TCP MD5 BGP
900-
args = append(args, "--auth-password="+speakerParams.Password)
901-
}
902-
903-
if speakerParams.EnableGracefulRestart { // Enable graceful restart
904-
args = append(args, "--graceful-restart")
905-
}
906-
907-
if speakerParams.HoldTime != (metav1.Duration{}) { // Hold time
908-
args = append(args, "--holdtime="+speakerParams.HoldTime.Duration.String())
909-
}
910-
911-
if speakerParams.ASN == 0 { // The ASN we use to speak
912-
return nil, errors.New("ASN not set, but must be non-zero value")
913-
}
914-
915-
if speakerParams.RemoteASN == 0 { // The ASN we speak to
916-
return nil, errors.New("remote ASN not set, but must be non-zero value")
917-
}
918-
919-
args = append(args, fmt.Sprintf("--cluster-as=%d", speakerParams.ASN))
920-
args = append(args, fmt.Sprintf("--neighbor-as=%d", speakerParams.RemoteASN))
921-
922-
if len(speakerParams.Neighbors) == 0 {
923-
return nil, errors.New("no BGP neighbors specified")
924-
}
925-
926-
var neighIPv4 []string
927-
var neighIPv6 []string
928-
for _, neighbor := range speakerParams.Neighbors {
929-
switch util.CheckProtocol(neighbor) {
930-
case kubeovnv1.ProtocolIPv4:
931-
neighIPv4 = append(neighIPv4, neighbor)
932-
case kubeovnv1.ProtocolIPv6:
933-
neighIPv6 = append(neighIPv6, neighbor)
934-
default:
935-
return nil, fmt.Errorf("unsupported protocol for peer %s", neighbor)
936-
}
937-
}
938-
939-
argNeighIPv4 := strings.Join(neighIPv4, ",")
940-
argNeighIPv6 := strings.Join(neighIPv6, ",")
941-
argNeighIPv4 = "--neighbor-address=" + argNeighIPv4
942-
argNeighIPv6 = "--neighbor-ipv6-address=" + argNeighIPv6
877+
// We need to connect to the K8S API to make the BGP speaker work, this implies a ServiceAccount
878+
sts.Spec.Template.Spec.ServiceAccountName = "vpc-nat-gw"
943879

944-
if len(neighIPv4) > 0 {
945-
args = append(args, argNeighIPv4)
880+
// Craft a BGP speaker container to add to our statefulset
881+
bgpSpeakerContainer, err := util.GenNatGwBgpSpeakerContainer(gw.Spec.BgpSpeaker, vpcNatGwBgpSpeakerImage, gw.Name)
882+
if err != nil {
883+
klog.Errorf("failed to create a BGP speaker container for gateway %s: %v", gw.Name, err)
884+
return nil, err
946885
}
947886

948-
if len(neighIPv6) > 0 {
949-
args = append(args, argNeighIPv6)
950-
}
887+
// Add our container to the list of containers in the statefulset
888+
sts.Spec.Template.Spec.Containers = append(sts.Spec.Template.Spec.Containers, *bgpSpeakerContainer)
889+
}
951890

952-
// Extra args to start the speaker with, for example, logging levels...
953-
args = append(args, speakerParams.ExtraArgs...)
891+
return sts, nil
892+
}
954893

955-
sts.Spec.Template.Spec.ServiceAccountName = "vpc-nat-gw"
956-
speakerContainer := corev1.Container{
957-
Name: "vpc-nat-gw-speaker",
958-
Image: vpcNatGwBgpSpeakerImage,
959-
Command: []string{"/kube-ovn/kube-ovn-speaker"},
960-
ImagePullPolicy: corev1.PullIfNotPresent,
961-
Env: []corev1.EnvVar{
962-
{
963-
Name: util.GatewayNameEnv,
964-
Value: gw.Name,
965-
},
966-
{
967-
Name: "POD_IP",
968-
ValueFrom: &corev1.EnvVarSource{
969-
FieldRef: &corev1.ObjectFieldSelector{
970-
FieldPath: "status.podIP",
971-
},
972-
},
973-
},
974-
},
975-
Args: args,
894+
// getExternalSubnetNad returns the namespace and name of the NetworkAttachmentDefinition associated with
895+
// an external network attached to a NAT gateway
896+
func (c *Controller) getExternalSubnetNad(gw *kubeovnv1.VpcNatGateway) (string, string) {
897+
externalNadNamespace := c.config.PodNamespace
898+
externalNadName := util.GetNatGwExternalNetwork(gw.Spec.ExternalSubnets)
899+
if externalSubnet, err := c.subnetsLister.Get(externalNadName); err == nil {
900+
if name, namespace, ok := util.GetNadBySubnetProvider(externalSubnet.Spec.Provider); ok {
901+
externalNadName = name
902+
externalNadNamespace = namespace
976903
}
977-
978-
sts.Spec.Template.Spec.Containers = append(containers, speakerContainer)
979904
}
980905

981-
return sts, nil
906+
return externalNadNamespace, externalNadName
982907
}
983908

984909
func (c *Controller) cleanUpVpcNatGw() error {
@@ -995,7 +920,7 @@ func (c *Controller) cleanUpVpcNatGw() error {
995920

996921
func (c *Controller) getNatGwPod(name string) (*corev1.Pod, error) {
997922
sel, _ := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
998-
MatchLabels: map[string]string{"app": util.GenNatGwStsName(name), util.VpcNatGatewayLabel: "true"},
923+
MatchLabels: map[string]string{"app": util.GenNatGwName(name), util.VpcNatGatewayLabel: "true"},
999924
})
1000925

1001926
pods, err := c.podsLister.Pods(c.config.PodNamespace).List(sel)

pkg/util/net.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -577,13 +577,6 @@ func GetExternalNetwork(externalNet string) string {
577577
return externalNet
578578
}
579579

580-
func GetNatGwExternalNetwork(externalNets []string) string {
581-
if len(externalNets) == 0 {
582-
return vpcExternalNet
583-
}
584-
return externalNets[0]
585-
}
586-
587580
func TCPConnectivityCheck(endpoint string) error {
588581
conn, err := net.DialTimeout("tcp", endpoint, 3*time.Second)
589582
if err != nil {

pkg/util/net_test.go

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -819,39 +819,6 @@ func TestAppendGwByCidr(t *testing.T) {
819819
}
820820
}
821821

822-
func TestGetNatGwExternalNetwork(t *testing.T) {
823-
tests := []struct {
824-
name string
825-
externalNets []string
826-
expected string
827-
}{
828-
{
829-
name: "External network specified",
830-
externalNets: []string{"custom-external-network"},
831-
expected: "custom-external-network",
832-
},
833-
{
834-
name: "External network not specified",
835-
externalNets: []string{},
836-
expected: "ovn-vpc-external-network",
837-
},
838-
{
839-
name: "Multiple external networks specified",
840-
externalNets: []string{"custom-external-network1", "custom-external-network2"},
841-
expected: "custom-external-network1",
842-
},
843-
}
844-
845-
for _, tt := range tests {
846-
t.Run(tt.name, func(t *testing.T) {
847-
result := GetNatGwExternalNetwork(tt.externalNets)
848-
if result != tt.expected {
849-
t.Errorf("got %v, but want %v", result, tt.expected)
850-
}
851-
})
852-
}
853-
}
854-
855822
func TestSplitIpsByProtocol(t *testing.T) {
856823
tests := []struct {
857824
name string

0 commit comments

Comments
 (0)