Skip to content

Commit cb7ea5e

Browse files
yash97haouc
andcommitted
test: improve custom networking integration tests (#3668)
* test: improve custom networking integration tests with cross-node enforcement, ClusterIP connectivity, and ASG-driven instance recycling * allowing cluster sg ingress rule to allow custom networking sgid * making set asg more resilient, and adding error for sg creation call --------- Co-authored-by: Hao Zhou <haouc@users.noreply.github.com>
1 parent bfe48ec commit cb7ea5e

6 files changed

Lines changed: 235 additions & 7 deletions

File tree

test/framework/resources/aws/services/autoscaling.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424

2525
type AutoScaling interface {
2626
DescribeAutoScalingGroup(ctx context.Context, autoScalingGroupName string) ([]types.AutoScalingGroup, error)
27+
GetASGForInstance(ctx context.Context, instanceID string) (string, error)
28+
SetDesiredCapacity(ctx context.Context, asgName string, desired int32) error
2729
}
2830

2931
// Directly using the client to interact with the service instead of an interface.
@@ -51,3 +53,43 @@ func (d defaultAutoScaling) DescribeAutoScalingGroup(ctx context.Context, autoSc
5153

5254
return asg.AutoScalingGroups, nil
5355
}
56+
57+
func (d defaultAutoScaling) GetASGForInstance(ctx context.Context, instanceID string) (string, error) {
58+
out, err := d.client.DescribeAutoScalingInstances(ctx, &autoscaling.DescribeAutoScalingInstancesInput{
59+
InstanceIds: []string{instanceID},
60+
})
61+
if err != nil {
62+
return "", err
63+
}
64+
if len(out.AutoScalingInstances) == 0 || out.AutoScalingInstances[0].AutoScalingGroupName == nil {
65+
return "", fmt.Errorf("instance %s is not part of any ASG", instanceID)
66+
}
67+
return *out.AutoScalingInstances[0].AutoScalingGroupName, nil
68+
}
69+
70+
func (d defaultAutoScaling) SetDesiredCapacity(ctx context.Context, asgName string, desired int32) error {
71+
descOut, err := d.client.DescribeAutoScalingGroups(ctx, &autoscaling.DescribeAutoScalingGroupsInput{
72+
AutoScalingGroupNames: []string{asgName},
73+
})
74+
if err != nil {
75+
return err
76+
}
77+
if len(descOut.AutoScalingGroups) == 0 {
78+
return fmt.Errorf("ASG %s not found", asgName)
79+
}
80+
if aws.ToInt32(descOut.AutoScalingGroups[0].MaxSize) < desired || aws.ToInt32(descOut.AutoScalingGroups[0].MinSize) > desired {
81+
if _, err := d.client.UpdateAutoScalingGroup(ctx, &autoscaling.UpdateAutoScalingGroupInput{
82+
AutoScalingGroupName: aws.String(asgName),
83+
MaxSize: aws.Int32(max(desired, aws.ToInt32(descOut.AutoScalingGroups[0].MaxSize))),
84+
MinSize: aws.Int32(min(desired, aws.ToInt32(descOut.AutoScalingGroups[0].MinSize))),
85+
}); err != nil {
86+
return fmt.Errorf("raise MaxSize: %v", err)
87+
}
88+
}
89+
_, err = d.client.SetDesiredCapacity(ctx, &autoscaling.SetDesiredCapacityInput{
90+
AutoScalingGroupName: aws.String(asgName),
91+
DesiredCapacity: aws.Int32(desired),
92+
HonorCooldown: aws.Bool(false),
93+
})
94+
return err
95+
}

test/framework/resources/aws/utils/nodegroup.go

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ import (
2929
"github.com/aws/amazon-vpc-cni-k8s/test/framework/utils"
3030
"github.com/aws/aws-sdk-go-v2/aws"
3131
cloudformationtypes "github.com/aws/aws-sdk-go-v2/service/cloudformation/types"
32+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33+
"k8s.io/apimachinery/pkg/util/wait"
34+
"sigs.k8s.io/controller-runtime/pkg/client"
3235
)
3336

3437
const (
@@ -366,18 +369,64 @@ func TerminateInstances(f *framework.Framework) error {
366369
if err != nil {
367370
return fmt.Errorf("failed to get list of nodes created: %v", err)
368371
}
372+
if len(nodeList.Items) == 0 {
373+
return nil
374+
}
369375

370376
var instanceIDs []string
371377
for _, node := range nodeList.Items {
372378
instanceIDs = append(instanceIDs, k8sUtils.GetInstanceIDFromNode(node))
373379
}
380+
expected := int32(len(instanceIDs))
374381

375-
err = f.CloudServices.EC2().TerminateInstance(context.TODO(), instanceIDs)
382+
// Find the ASG owning the first instance. Assumes all nodes belong to the same ASG.
383+
asgName, err := f.CloudServices.AutoScaling().GetASGForInstance(context.TODO(), instanceIDs[0])
376384
if err != nil {
377-
return fmt.Errorf("failed to terminate instances: %v", err)
385+
return fmt.Errorf("failed to find ASG for instance %s: %v", instanceIDs[0], err)
378386
}
379387

380-
// Wait for instances to be replaced
381-
time.Sleep(time.Minute * 8)
382-
return nil
388+
// Scale the ASG to 0 so it terminates all instances through its own lifecycle
389+
if err := f.CloudServices.AutoScaling().SetDesiredCapacity(context.TODO(), asgName, 0); err != nil {
390+
return fmt.Errorf("failed to set desired capacity to 0 on %s: %v", asgName, err)
391+
}
392+
393+
// Wait until ASG has actually finished terminating its instances before scaling
394+
if err := wait.PollUntilContextTimeout(context.TODO(), 10*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) {
395+
asgs, derr := f.CloudServices.AutoScaling().DescribeAutoScalingGroup(ctx, asgName)
396+
if derr != nil || len(asgs) == 0 {
397+
return false, nil
398+
}
399+
return len(asgs[0].Instances) == 0, nil
400+
}); err != nil {
401+
return fmt.Errorf("timed out waiting for ASG %s to scale to 0: %v", asgName, err)
402+
}
403+
404+
// Force-delete stale Node objects so the scheduler/aws-node don't wait on wedged kubelets.
405+
zero := int64(0)
406+
opts := &client.DeleteOptions{GracePeriodSeconds: &zero, PropagationPolicy: func() *metav1.DeletionPropagation {
407+
p := metav1.DeletePropagationBackground
408+
return &p
409+
}()}
410+
for i := range nodeList.Items {
411+
_ = f.K8sResourceManagers.NodeManager().DeleteNode(&nodeList.Items[i], opts)
412+
}
413+
414+
if err := f.CloudServices.AutoScaling().SetDesiredCapacity(context.TODO(), asgName, expected); err != nil {
415+
return fmt.Errorf("failed to set desired capacity back to %d on %s: %v", expected, asgName, err)
416+
}
417+
418+
// Wait until ASG reports `expected` instances InService.
419+
return wait.PollUntilContextTimeout(context.TODO(), 10*time.Second, 8*time.Minute, true, func(ctx context.Context) (bool, error) {
420+
asgs, derr := f.CloudServices.AutoScaling().DescribeAutoScalingGroup(ctx, asgName)
421+
if derr != nil || len(asgs) == 0 {
422+
return false, nil
423+
}
424+
inService := int32(0)
425+
for _, inst := range asgs[0].Instances {
426+
if inst.LifecycleState == "InService" {
427+
inService++
428+
}
429+
}
430+
return inService >= expected, nil
431+
})
383432
}

test/framework/resources/k8s/resources/node.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/aws/amazon-vpc-cni-k8s/test/framework/utils"
2020

2121
v1 "k8s.io/api/core/v1"
22+
"k8s.io/apimachinery/pkg/api/errors"
2223
"k8s.io/apimachinery/pkg/util/wait"
2324
"sigs.k8s.io/controller-runtime/pkg/client"
2425
)
@@ -27,6 +28,7 @@ type NodeManager interface {
2728
GetNodes(nodeLabelKey string, nodeLabelVal string) (v1.NodeList, error)
2829
GetAllNodes() (v1.NodeList, error)
2930
UpdateNode(oldNode *v1.Node, newNode *v1.Node) error
31+
DeleteNode(node *v1.Node, opts ...client.DeleteOption) error
3032
WaitTillNodesReady(nodeLabelKey string, nodeLabelVal string, asgSize int) error
3133
}
3234

@@ -73,6 +75,14 @@ func (d *defaultNodeManager) UpdateNode(oldNode *v1.Node, newNode *v1.Node) erro
7375
return d.k8sClient.Patch(context.Background(), newNode, client.MergeFrom(oldNode))
7476
}
7577

78+
func (d *defaultNodeManager) DeleteNode(node *v1.Node, opts ...client.DeleteOption) error {
79+
err := d.k8sClient.Delete(context.Background(), node, opts...)
80+
if errors.IsNotFound(err) {
81+
return nil
82+
}
83+
return err
84+
}
85+
7686
func (d *defaultNodeManager) WaitTillNodesReady(nodeLabelKey string, nodeLabelVal string, asgSize int) error {
7787
return wait.PollImmediateUntil(utils.PollIntervalLong, func() (done bool, err error) {
7888
nodeList, err := d.GetNodes(nodeLabelKey, nodeLabelVal)

test/framework/utils/const.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,5 @@ const (
3535
PollIntervalLong = time.Second * 20
3636

3737
DefaultDeploymentReadyTimeout = time.Second * 300
38+
ShortDeploymentReadyTimeout = time.Second * 120
3839
)

test/integration/custom-networking/custom_networking_suite_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ var (
5757
corednsSGOpenPort = 53
5858
primaryENISGID string
5959
primaryENISGList []string
60+
clusterSGID string
6061
// List of ENIConfig per Availability Zone
6162
eniConfigList []*v1alpha1.ENIConfig
6263
eniConfigBuilderList []*manifest.ENIConfigBuilder
@@ -87,6 +88,16 @@ var _ = BeforeSuite(func() {
8788
Expect(err).ToNot(HaveOccurred())
8889
customNetworkingSGID = *createSecurityGroupOutput.GroupId
8990

91+
By("Getting Cluster Security Group ID")
92+
clusterRes, err := f.CloudServices.EKS().DescribeCluster(context.TODO(), f.Options.ClusterName)
93+
Expect(err).NotTo(HaveOccurred())
94+
clusterSGID = *(clusterRes.Cluster.ResourcesVpcConfig.ClusterSecurityGroupId)
95+
96+
By("allowing custom networking SG in cluster SG")
97+
err = f.CloudServices.EC2().AuthorizeSecurityGroupIngress(context.TODO(), clusterSGID, "-1",
98+
-1, -1, customNetworkingSGID, true)
99+
Expect(err).ToNot(HaveOccurred())
100+
90101
By("authorizing egress and ingress on security group for single port")
91102
f.CloudServices.EC2().AuthorizeSecurityGroupEgress(context.TODO(), customNetworkingSGID, "TCP",
92103
customNetworkingSGOpenPort, customNetworkingSGOpenPort, "0.0.0.0/0")
@@ -218,6 +229,10 @@ var _ = AfterSuite(func() {
218229
-1, -1, customNetworkingSGID, true)
219230
}
220231

232+
By("removing custom networking SG from cluster SG")
233+
_ = f.CloudServices.EC2().RevokeSecurityGroupIngress(context.TODO(), clusterSGID, "-1",
234+
-1, -1, customNetworkingSGID, true)
235+
221236
By("deleting security group")
222237
errs.Append(f.CloudServices.EC2().DeleteSecurityGroup(context.TODO(), customNetworkingSGID))
223238

test/integration/custom-networking/custom_networking_test.go

Lines changed: 113 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,22 @@
1414
package custom_networking
1515

1616
import (
17+
"context"
1718
"fmt"
1819
"net"
1920
"strconv"
2021
"time"
2122

2223
awsUtils "github.com/aws/amazon-vpc-cni-k8s/test/framework/resources/aws/utils"
2324
"github.com/aws/amazon-vpc-cni-k8s/test/framework/resources/k8s/manifest"
25+
k8sUtils "github.com/aws/amazon-vpc-cni-k8s/test/framework/resources/k8s/utils"
2426
"github.com/aws/amazon-vpc-cni-k8s/test/framework/utils"
2527

2628
. "github.com/onsi/ginkgo/v2"
2729
. "github.com/onsi/gomega"
2830
v1 "k8s.io/api/apps/v1"
2931
coreV1 "k8s.io/api/core/v1"
32+
"k8s.io/apimachinery/pkg/types"
3033
)
3134

3235
var _ = Describe("Custom Networking Test", func() {
@@ -51,7 +54,6 @@ var _ = Describe("Custom Networking Test", func() {
5154
Command([]string{"nc"}).
5255
Args([]string{"-k", "-l", strconv.Itoa(port)}).
5356
Build()
54-
5557
deploymentBuilder := manifest.NewBusyBoxDeploymentBuilder(f.Options.TestImageRegistry).
5658
Container(container).
5759
Replicas(replicaCount).
@@ -90,7 +92,29 @@ var _ = Describe("Custom Networking Test", func() {
9092
Parallelism(1).
9193
Build()
9294

95+
// Force client Job onto a DIFFERENT node than THIS target pod so that
96+
// traffic actually traverses the ENI and AWS SG enforcement applies
97+
// (same-node pod-to-pod bypasses ENI-level SG evaluation in the Linux bridge).
98+
// We only exclude the one target pod's node, not all target nodes, so the
99+
// Job can still schedule on 2-node clusters when targets span both nodes.
100+
testJob.Spec.Template.Spec.Affinity = &coreV1.Affinity{
101+
NodeAffinity: &coreV1.NodeAffinity{
102+
RequiredDuringSchedulingIgnoredDuringExecution: &coreV1.NodeSelector{
103+
NodeSelectorTerms: []coreV1.NodeSelectorTerm{{
104+
MatchExpressions: []coreV1.NodeSelectorRequirement{{
105+
Key: "kubernetes.io/hostname",
106+
Operator: coreV1.NodeSelectorOpNotIn,
107+
Values: []string{pod.Spec.NodeName},
108+
}},
109+
}},
110+
},
111+
},
112+
}
113+
93114
_, err := f.K8sResourceManagers.JobManager().CreateAndWaitTillJobCompleted(testJob)
115+
logJobPodDiag(testJob.Name)
116+
logTargetENIDiag(pod)
117+
94118
if shouldConnect {
95119
By("verifying connection to pod succeeds on port " + strconv.Itoa(port))
96120
Expect(err).ToNot(HaveOccurred())
@@ -116,6 +140,7 @@ var _ = Describe("Custom Networking Test", func() {
116140
shouldConnect = true
117141
})
118142
It("should connect", func() {})
143+
119144
})
120145

121146
Context("when connecting to unreachable port", func() {
@@ -128,6 +153,49 @@ var _ = Describe("Custom Networking Test", func() {
128153
})
129154
})
130155

156+
Context("when a custom-networking pod connects to an external endpoint on an egress-allowed port", func() {
157+
It("should succeed reaching 1.1.1.1:53", func() {
158+
testJob := manifest.NewDefaultJobBuilder().
159+
Container(manifest.NewNetCatAlpineContainer(f.Options.TestImageRegistry).
160+
Command([]string{"nc"}).
161+
Args([]string{"-z", "-v", "-w5", "1.1.1.1", "53"}).
162+
Build()).
163+
Name("external-reachability").
164+
Parallelism(1).
165+
Build()
166+
_, err := f.K8sResourceManagers.JobManager().CreateAndWaitTillJobCompleted(testJob)
167+
logJobPodDiag(testJob.Name)
168+
Expect(err).ToNot(HaveOccurred())
169+
Expect(f.K8sResourceManagers.JobManager().DeleteAndWaitTillJobIsDeleted(testJob)).ToNot(HaveOccurred())
170+
})
171+
})
172+
173+
Context("when a custom-networking pod connects to the Kubernetes API ClusterIP", func() {
174+
It("should reach the API server via ClusterIP", func() {
175+
// Use the well-known kubernetes.default ClusterIP service which exists on every cluster.
176+
// Resolve the ClusterIP dynamically to avoid hardcoding.
177+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
178+
defer cancel()
179+
180+
kubeSvc, err := f.K8sResourceManagers.ServiceManager().GetService(ctx, "default", "kubernetes")
181+
Expect(err).ToNot(HaveOccurred())
182+
clusterIP := kubeSvc.Spec.ClusterIP
183+
184+
testJob := manifest.NewDefaultJobBuilder().
185+
Container(manifest.NewNetCatAlpineContainer(f.Options.TestImageRegistry).
186+
Command([]string{"nc"}).
187+
Args([]string{"-z", "-v", "-w5", clusterIP, "443"}).
188+
Build()).
189+
Name("k8s-api-clusterip").
190+
Parallelism(1).
191+
Build()
192+
_, err = f.K8sResourceManagers.JobManager().CreateAndWaitTillJobCompleted(testJob)
193+
logJobPodDiag(testJob.Name)
194+
Expect(err).ToNot(HaveOccurred())
195+
Expect(f.K8sResourceManagers.JobManager().DeleteAndWaitTillJobIsDeleted(testJob)).ToNot(HaveOccurred())
196+
})
197+
})
198+
131199
Context("when creating deployment on nodes that do not have ENIConfig", func() {
132200
JustBeforeEach(func() {
133201
By("deleting ENIConfig for all availability zones")
@@ -158,7 +226,7 @@ var _ = Describe("Custom Networking Test", func() {
158226

159227
By("verifying deployment should not succeed")
160228
deployment, err = f.K8sResourceManagers.DeploymentManager().
161-
CreateAndWaitTillDeploymentIsReady(deployment, utils.DefaultDeploymentReadyTimeout)
229+
CreateAndWaitTillDeploymentIsReady(deployment, utils.ShortDeploymentReadyTimeout)
162230
Expect(err).To(HaveOccurred())
163231

164232
By("deleting the failed deployment")
@@ -208,3 +276,46 @@ var _ = Describe("Custom Networking Test", func() {
208276
})
209277
})
210278
})
279+
280+
// logJobPodDiag prints pod name, node, IP, phase, and container logs for every
281+
// pod belonging to the given Job. Useful for post-mortem when a Job fails.
282+
func logJobPodDiag(jobName string) {
283+
pods, err := f.K8sResourceManagers.PodManager().GetPodsWithLabelSelector("job-name", jobName)
284+
if err != nil {
285+
return
286+
}
287+
for _, p := range pods.Items {
288+
logs, _ := f.K8sResourceManagers.PodManager().PodLogs(p.Namespace, p.Name)
289+
fmt.Fprintf(GinkgoWriter, "[DIAG] pod=%s node=%s ip=%s phase=%s\n%s\n",
290+
p.Name, p.Spec.NodeName, p.Status.PodIP, p.Status.Phase, logs)
291+
}
292+
}
293+
294+
// logTargetENIDiag looks up the EC2 ENI that owns the target pod's IP and
295+
// prints its ENI ID, subnet, and security groups.
296+
func logTargetENIDiag(pod coreV1.Pod) {
297+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
298+
defer cancel()
299+
300+
node := &coreV1.Node{}
301+
if err := f.K8sClient.Get(ctx, types.NamespacedName{Name: pod.Spec.NodeName}, node); err != nil {
302+
return
303+
}
304+
instance, err := f.CloudServices.EC2().DescribeInstance(ctx, k8sUtils.GetInstanceIDFromNode(*node))
305+
if err != nil {
306+
return
307+
}
308+
for _, nic := range instance.NetworkInterfaces {
309+
for _, pip := range nic.PrivateIpAddresses {
310+
if pip.PrivateIpAddress != nil && *pip.PrivateIpAddress == pod.Status.PodIP {
311+
var sgs []string
312+
for _, g := range nic.Groups {
313+
sgs = append(sgs, *g.GroupId)
314+
}
315+
fmt.Fprintf(GinkgoWriter, "[DIAG] target ENI=%s subnet=%s SGs=%v\n",
316+
*nic.NetworkInterfaceId, *nic.SubnetId, sgs)
317+
return
318+
}
319+
}
320+
}
321+
}

0 commit comments

Comments
 (0)