diff --git a/cmd/testserver/run.go b/cmd/testserver/run.go index a658665f..ce75e088 100644 --- a/cmd/testserver/run.go +++ b/cmd/testserver/run.go @@ -4,15 +4,21 @@ import ( "context" "errors" "fmt" + "io" "log/slog" "os" "sync" "time" + "github.com/sirupsen/logrus" + apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/util/flowcontrol" + "github.com/castai/cluster-controller/internal/helm" "github.com/castai/cluster-controller/loadtest" "github.com/castai/cluster-controller/loadtest/scenarios" ) @@ -27,10 +33,11 @@ func run(ctx context.Context) error { TimeoutWaitingForActions: 60 * time.Second, }) - clientSet, err := createK8SClient(cfg, logger) + clientSet, dynamicClient, apiExtClient, helmClient, err := createK8SClients(cfg, logger) if err != nil { return err } + logger.Info(fmt.Sprintf("Created %d clients", len([]any{clientSet, dynamicClient, apiExtClient, helmClient}))) go func() { logger.Info("Starting HTTP server for test") @@ -41,10 +48,10 @@ func run(ctx context.Context) error { } }() + // Choose scenarios below by adding/removing/etc. instances of scenarios.XXX() + // All scenarios in the list run in parallel (but not necessarily at the same time if preparation takes different time). testScenarios := []scenarios.TestScenario{ - // scenarios.PodEvents(5000, logger), - // scenarios.StuckDrain(100, 60, logger), - scenarios.StuckDrain(10, 1, logger), + scenarios.CheckNodeDeletedStuck(300, logger), } var wg sync.WaitGroup @@ -76,34 +83,50 @@ func run(ctx context.Context) error { return errors.Join(receivedErrors...) } -func createK8SClient(cfg loadtest.Config, logger *slog.Logger) (*kubernetes.Clientset, error) { - if cfg.KubeConfig == "" { - logger.Info("Using in-cluster configuration") - restConfig, err := rest.InClusterConfig() +func createK8SClients(cfg loadtest.Config, logger *slog.Logger) (*kubernetes.Clientset, *dynamic.DynamicClient, *apiextensionsclientset.Clientset, helm.Client, error) { + rateLimiter := flowcontrol.NewTokenBucketRateLimiter(20, 50) + + var restConfig *rest.Config + var err error + + switch { + case cfg.KubeConfig != "": + logger.Info(fmt.Sprintf("Using kubeconfig from %q", cfg.KubeConfig)) + data, err := os.ReadFile(cfg.KubeConfig) if err != nil { - return nil, fmt.Errorf("error creating in-cluster config: %w", err) + return nil, nil, nil, nil, fmt.Errorf("reading kubeconfig at %s: %w", cfg.KubeConfig, err) } - clientSet, err := kubernetes.NewForConfig(restConfig) + + restConfig, err = clientcmd.RESTConfigFromKubeConfig(data) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf("creating rest config from %q: %w", cfg.KubeConfig, err) + } + default: + logger.Info("Using in-cluster configuration") + restConfig, err = rest.InClusterConfig() if err != nil { - return nil, fmt.Errorf("obtaining kubernetes clientset: %w", err) + return nil, nil, nil, nil, fmt.Errorf("error creating in-cluster config: %w", err) } - return clientSet, nil } - logger.Info(fmt.Sprintf("Using kubeconfig from %q", cfg.KubeConfig)) - data, err := os.ReadFile(cfg.KubeConfig) + restConfig.RateLimiter = rateLimiter + + clientSet, err := kubernetes.NewForConfig(restConfig) if err != nil { - return nil, fmt.Errorf("reading kubeconfig at %s: %w", cfg.KubeConfig, err) + return nil, nil, nil, nil, fmt.Errorf("obtaining kubernetes clientset: %w", err) } - - restConfig, err := clientcmd.RESTConfigFromKubeConfig(data) + dynamicClient, err := dynamic.NewForConfig(restConfig) if err != nil { - return nil, fmt.Errorf("creating rest config from %q: %w", cfg.KubeConfig, err) + return nil, nil, nil, nil, fmt.Errorf("obtaining dynamic client: %w", err) } - - clientSet, err := kubernetes.NewForConfig(restConfig) + apiextensionsClient, err := apiextensionsclientset.NewForConfig(restConfig) if err != nil { - return nil, fmt.Errorf("obtaining kubernetes clientset: %w", err) + return nil, nil, nil, nil, fmt.Errorf("obtaining apiextensions client: %w", err) } - return clientSet, nil + + discard := logrus.New() + discard.Out = io.Discard + helmClient := helm.NewClient(discard, helm.NewChartLoader(discard), restConfig) + + return clientSet, dynamicClient, apiextensionsClient, helmClient, nil } diff --git a/loadtest/scenarios/check_node_deleted_stuck.go b/loadtest/scenarios/check_node_deleted_stuck.go new file mode 100644 index 00000000..5b50e075 --- /dev/null +++ b/loadtest/scenarios/check_node_deleted_stuck.go @@ -0,0 +1,121 @@ +package scenarios + +import ( + "context" + "errors" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/google/uuid" + "golang.org/x/sync/errgroup" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/internal/castai" +) + +// CheckNodeDeletedStuck simulates a case where the node is not deleted so the checker gets stuck. +func CheckNodeDeletedStuck(nodeCount int, log *slog.Logger) TestScenario { + return &checkNodeDeletedStuckScenario{ + nodeCount: nodeCount, + log: log, + } +} + +type checkNodeDeletedStuckScenario struct { + nodeCount int + log *slog.Logger + + nodes []*corev1.Node +} + +func (s *checkNodeDeletedStuckScenario) Name() string { + return "check node deleted" +} + +func (s *checkNodeDeletedStuckScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + s.nodes = make([]*corev1.Node, 0, s.nodeCount) + + var lock sync.Mutex + errGroup, ctx := errgroup.WithContext(ctx) + + for i := range s.nodeCount { + errGroup.Go(func() error { + nodeName := fmt.Sprintf("kwok-check-deleted-%d", i) + s.log.Info(fmt.Sprintf("Creating node %s", nodeName)) + node := NewKwokNode(KwokConfig{}, nodeName) + + _, err := clientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create fake node: %w", err) + } + if err != nil && apierrors.IsAlreadyExists(err) { + s.log.Warn("node already exists, will reuse but potential conflict between test runs", "nodeName", nodeName) + } + lock.Lock() + s.nodes = append(s.nodes, node) + lock.Unlock() + + return nil + }) + } + + return errGroup.Wait() +} + +func (s *checkNodeDeletedStuckScenario) Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + var lock sync.Mutex + var errs []error + var wg sync.WaitGroup + + wg.Add(len(s.nodes)) + // We iterate through all nodes as they are not deleted with the ns and can leak => so we want to delete as many as possible. + for _, n := range s.nodes { + go func() { + defer wg.Done() + + s.log.Info(fmt.Sprintf("Deleting node %s", n.Name)) + err := clientset.CoreV1().Nodes().Delete(ctx, n.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + s.log.Warn("failed to delete fake node, will continue with other nodes", "nodeName", n.Name) + lock.Lock() + errs = append(errs, err) + lock.Unlock() + } + }() + } + + wg.Wait() + + if len(errs) > 0 { + return errors.Join(errs...) + } + + s.log.Info("Finished up cleaning nodes for status check.") + return nil +} + +func (s *checkNodeDeletedStuckScenario) Run(ctx context.Context, _ string, _ kubernetes.Interface, executor ActionExecutor) error { + s.log.Info(fmt.Sprintf("Starting check node deleted action with %d nodes", len(s.nodes))) + + actions := make([]castai.ClusterAction, 0, len(s.nodes)) + // Note: there is no code that should delete the node so each action should fail with timeout + // -> this puts more load than "expected" to simulate such edge case. + for _, node := range s.nodes { + actions = append(actions, castai.ClusterAction{ + ID: uuid.NewString(), + CreatedAt: time.Now().UTC(), + ActionCheckNodeDeleted: &castai.ActionCheckNodeDeleted{ + NodeName: node.Name, + }, + }) + } + + executor.ExecuteActions(ctx, actions) + + return nil +} diff --git a/loadtest/scenarios/check_node_status.go b/loadtest/scenarios/check_node_status.go new file mode 100644 index 00000000..189eb6ee --- /dev/null +++ b/loadtest/scenarios/check_node_status.go @@ -0,0 +1,119 @@ +package scenarios + +import ( + "context" + "errors" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/google/uuid" + "golang.org/x/sync/errgroup" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/internal/castai" +) + +func CheckNodeStatus(nodeCount int, log *slog.Logger) TestScenario { + return &checkNodeStatusScenario{ + nodeCount: nodeCount, + log: log, + } +} + +type checkNodeStatusScenario struct { + nodeCount int + log *slog.Logger + + nodes []*corev1.Node +} + +func (s *checkNodeStatusScenario) Name() string { + return "check node status" +} + +func (s *checkNodeStatusScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + s.nodes = make([]*corev1.Node, 0, s.nodeCount) + + var lock sync.Mutex + errGroup, ctx := errgroup.WithContext(ctx) + + for i := range s.nodeCount { + errGroup.Go(func() error { + nodeName := fmt.Sprintf("kwok-check-status-%d", i) + s.log.Info(fmt.Sprintf("Creating node %s", nodeName)) + node := NewKwokNode(KwokConfig{}, nodeName) + + _, err := clientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create fake node: %w", err) + } + if err != nil && apierrors.IsAlreadyExists(err) { + s.log.Warn("node already exists, will reuse but potential conflict between test runs", "nodeName", nodeName) + } + lock.Lock() + s.nodes = append(s.nodes, node) + lock.Unlock() + + return nil + }) + } + + return errGroup.Wait() +} + +func (s *checkNodeStatusScenario) Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + var lock sync.Mutex + var errs []error + var wg sync.WaitGroup + + wg.Add(len(s.nodes)) + // We iterate through all nodes as they are not deleted with the ns and can leak => so we want to delete as many as possible. + for _, n := range s.nodes { + go func() { + defer wg.Done() + + s.log.Info(fmt.Sprintf("Deleting node %s", n.Name)) + err := clientset.CoreV1().Nodes().Delete(ctx, n.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + s.log.Warn("failed to delete fake node, will continue with other nodes", "nodeName", n.Name) + lock.Lock() + errs = append(errs, err) + lock.Unlock() + } + }() + } + + wg.Wait() + + if len(errs) > 0 { + return errors.Join(errs...) + } + + s.log.Info("Finished up cleaning nodes for status check.") + return nil +} + +func (s *checkNodeStatusScenario) Run(ctx context.Context, _ string, _ kubernetes.Interface, executor ActionExecutor) error { + s.log.Info(fmt.Sprintf("Starting check node status action with %d nodes", len(s.nodes))) + + actions := make([]castai.ClusterAction, 0, len(s.nodes)) + for _, node := range s.nodes { + actions = append(actions, castai.ClusterAction{ + ID: uuid.NewString(), + CreatedAt: time.Now().UTC(), + ActionCheckNodeStatus: &castai.ActionCheckNodeStatus{ + NodeName: node.Name, + NodeStatus: castai.ActionCheckNodeStatus_READY, + }, + }) + } + + executor.ExecuteActions(ctx, actions) + + return nil +} diff --git a/loadtest/scenarios/create_resource.go b/loadtest/scenarios/create_resource.go new file mode 100644 index 00000000..a614cfab --- /dev/null +++ b/loadtest/scenarios/create_resource.go @@ -0,0 +1,110 @@ +package scenarios + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/google/uuid" + apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/internal/castai" +) + +// CreateResource will simulate creating/patching N custom resources (ala workload autoscaler flow). +func CreateResource(count int, dynamicClient dynamic.Interface, apiextensions apiextensionsclientset.Interface, log *slog.Logger) TestScenario { + return &createResourceScenario{ + resourceCount: count, + apiextensionsClient: apiextensions, + dynamicClient: dynamicClient, + log: log, + } +} + +type createResourceScenario struct { + resourceCount int + apiextensionsClient apiextensionsclientset.Interface + dynamicClient dynamic.Interface + log *slog.Logger +} + +func (c *createResourceScenario) Name() string { + return "create resource" +} + +func (c *createResourceScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + crd := WoopCRD() + + c.log.Info("Creating CRD") + _, err := c.apiextensionsClient.ApiextensionsV1().CustomResourceDefinitions().Create(context.Background(), crd, v1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create CRD: %w", err) + } + + // Sometimes it takes a few seconds for CRD to be fully consistent, depending on provider. + time.Sleep(5 * time.Second) + + c.log.Info("Pre-creating half of the resources to test Patch path") + // CreateResource has Patch path that we want to validate as well - half the resources will be pre-created to cover this. + resourceGVR := schema.GroupVersionResource{ + Group: woopStubCRDGroup, + Version: "v1", + Resource: woopStubCRDPlural, + } + for i := range c.resourceCount / 2 { + instance := WoopCR(namespace, fmt.Sprintf("create-resource-%d", i)) + + _, err = c.dynamicClient.Resource(resourceGVR).Namespace(namespace).Create(context.Background(), instance, v1.CreateOptions{}) + if err != nil { + fmt.Printf("Error creating instance %d: %v\n", i, err) + } else { + fmt.Printf("Created instance: myresource-%d\n", i) + } + } + + return nil +} + +func (c *createResourceScenario) Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + // Note: we don't delete the CRs as namespace deletion will clean them up and they are much faster than deployments/pods. + + c.log.Info("Deleting custom resource definition") + err := c.apiextensionsClient.ApiextensionsV1().CustomResourceDefinitions().Delete(ctx, woopStubCRDName, v1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to delete CRD: %w", err) + } + + return nil +} + +func (c *createResourceScenario) Run(ctx context.Context, namespace string, clientset kubernetes.Interface, executor ActionExecutor) error { + actions := make([]castai.ClusterAction, 0, c.resourceCount) + woopGRV := WoopGVR() + for i := range c.resourceCount { + obj := WoopCR(namespace, fmt.Sprintf("create-resource-%d", i)) + content := obj.UnstructuredContent() + spec := content["spec"].(map[string]any) + spec["replicas"] = 100 + + actions = append(actions, castai.ClusterAction{ + ID: uuid.NewString(), + ActionCreate: &castai.ActionCreate{ + GroupVersionResource: castai.GroupVersionResource{ + Group: woopGRV.Group, + Version: woopGRV.Version, + Resource: woopGRV.Resource, + }, + Object: content, + }, + }) + } + executor.ExecuteActions(ctx, actions) + + return nil +} diff --git a/loadtest/scenarios/delete_node.go b/loadtest/scenarios/delete_node.go new file mode 100644 index 00000000..676e243a --- /dev/null +++ b/loadtest/scenarios/delete_node.go @@ -0,0 +1,170 @@ +package scenarios + +import ( + "context" + "errors" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/google/uuid" + "github.com/samber/lo" + "golang.org/x/sync/errgroup" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/internal/castai" +) + +func DeleteNode(nodeCount, deploymentReplicas int, log *slog.Logger) TestScenario { + return &deleteNodeScenario{ + nodeCount: nodeCount, + deploymentReplicas: deploymentReplicas, + log: log, + } +} + +type deleteNodeScenario struct { + nodeCount int + deploymentReplicas int + log *slog.Logger + + nodesToDelete []*corev1.Node +} + +func (s *deleteNodeScenario) Name() string { + return "delete node" +} + +func (s *deleteNodeScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + s.nodesToDelete = make([]*corev1.Node, 0, s.nodeCount) + + var lock sync.Mutex + errGroup, ctx := errgroup.WithContext(ctx) + + for i := range s.nodeCount { + errGroup.Go(func() error { + nodeName := fmt.Sprintf("kwok-delete-%d", i) + s.log.Info(fmt.Sprintf("Creating node %s", nodeName)) + node := NewKwokNode(KwokConfig{}, nodeName) + + _, err := clientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create fake node: %w", err) + } + if err != nil && apierrors.IsAlreadyExists(err) { + s.log.Warn("node already exists, will reuse but potential conflict between test runs", "nodeName", nodeName) + } + lock.Lock() + s.nodesToDelete = append(s.nodesToDelete, node) + lock.Unlock() + + s.log.Info(fmt.Sprintf("Creating deployment on node %s", nodeName)) + deployment := Deployment(fmt.Sprintf("fake-deployment-%s-%d", node.Name, i)) + deployment.ObjectMeta.Namespace = namespace + //nolint:gosec // Not afraid of overflow here. + deployment.Spec.Replicas = lo.ToPtr(int32(s.deploymentReplicas)) + deployment.Spec.Template.Spec.NodeName = nodeName + + _, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create fake deployment: %w", err) + } + + // Wait for deployment to become ready, otherwise we might start draining before the pod is up. + progressed := WaitUntil(ctx, 60*time.Second, func(ctx context.Context) bool { + d, err := clientset.AppsV1().Deployments(namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) + if err != nil { + s.log.Warn("failed to get deployment after creating", "err", err) + return false + } + return d.Status.ReadyReplicas == *d.Spec.Replicas + }) + if !progressed { + return fmt.Errorf("deployment %s did not progress to ready state in time", deployment.Name) + } + + return nil + }) + } + + return errGroup.Wait() +} + +func (s *deleteNodeScenario) Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + var lock sync.Mutex + var errs []error + var wg sync.WaitGroup + + wg.Add(len(s.nodesToDelete)) + // We iterate through all nodes as they are not deleted with the ns and can leak => so we want to delete as many as possible. + for _, n := range s.nodesToDelete { + go func() { + defer wg.Done() + + s.log.Info(fmt.Sprintf("Deleting node %s", n.Name)) + err := clientset.CoreV1().Nodes().Delete(ctx, n.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + s.log.Warn("failed to delete fake node, will continue with other nodes", "nodeName", n.Name) + lock.Lock() + errs = append(errs, err) + lock.Unlock() + } + }() + } + + wg.Wait() + + if len(errs) > 0 { + return errors.Join(errs...) + } + + // We assume no other tests are using the same NS so just delete all. + deploymentsInNS, err := clientset.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list deployments: %w", err) + } + + wg.Add(len(deploymentsInNS.Items)) + for _, deployment := range deploymentsInNS.Items { + go func() { + defer wg.Done() + s.log.Info(fmt.Sprintf("Deleting deployment %s", deployment.Name)) + err = clientset.AppsV1().Deployments(namespace).Delete(ctx, deployment.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + s.log.Warn( + "failed to delete fake deployment, will continue with other deployments and rely on namespace cleanup", + "deploymentName", + deployment.Name, + ) + } + }() + } + wg.Wait() + + s.log.Info("Finished up cleaning nodes and deployments.") + return nil +} + +func (s *deleteNodeScenario) Run(ctx context.Context, _ string, _ kubernetes.Interface, executor ActionExecutor) error { + s.log.Info(fmt.Sprintf("Starting delete node action with %d nodes", len(s.nodesToDelete))) + + actions := make([]castai.ClusterAction, 0, len(s.nodesToDelete)) + for _, node := range s.nodesToDelete { + actions = append(actions, castai.ClusterAction{ + ID: uuid.NewString(), + CreatedAt: time.Now().UTC(), + ActionDeleteNode: &castai.ActionDeleteNode{ + NodeName: node.Name, + NodeID: "", // This must be empty OR match a label on the node; otherwise delete does nothing. + }, + }) + } + + executor.ExecuteActions(ctx, actions) + + return nil +} diff --git a/loadtest/scenarios/delete_resource.go b/loadtest/scenarios/delete_resource.go new file mode 100644 index 00000000..098700fe --- /dev/null +++ b/loadtest/scenarios/delete_resource.go @@ -0,0 +1,108 @@ +package scenarios + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/google/uuid" + "github.com/samber/lo" + apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apierrors "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/internal/castai" +) + +// DeleteResource will simulate deleting N custom resources (ala workload autoscaler flow). +func DeleteResource(count int, dynamicClient dynamic.Interface, apiextensions apiextensionsclientset.Interface, log *slog.Logger) TestScenario { + return &deleteResourceScenario{ + resourceCount: count, + apiextensionsClient: apiextensions, + dynamicClient: dynamicClient, + log: log, + } +} + +type deleteResourceScenario struct { + resourceCount int + apiextensionsClient apiextensionsclientset.Interface + dynamicClient dynamic.Interface + log *slog.Logger +} + +func (c *deleteResourceScenario) Name() string { + return "delete resource" +} + +func (c *deleteResourceScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + crd := WoopCRD() + + c.log.Info("Creating CRD") + _, err := c.apiextensionsClient.ApiextensionsV1().CustomResourceDefinitions().Create(context.Background(), crd, v1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create CRD: %w", err) + } + + // Sometimes it takes a few seconds for CRD to be fully consistent, depending on provider. + time.Sleep(5 * time.Second) + + c.log.Info("Pre-creating resources") + resourceGVR := schema.GroupVersionResource{ + Group: woopStubCRDGroup, + Version: "v1", + Resource: woopStubCRDPlural, + } + for i := range c.resourceCount { + instance := WoopCR(namespace, fmt.Sprintf("delete-resource-%d", i)) + + _, err = c.dynamicClient.Resource(resourceGVR).Namespace(namespace).Create(context.Background(), instance, v1.CreateOptions{}) + if err != nil { + fmt.Printf("Error creating instance %d: %v\n", i, err) + } else { + fmt.Printf("Created instance: myresource-%d\n", i) + } + } + + return nil +} + +func (c *deleteResourceScenario) Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + // Note: we don't delete the CRs as namespace deletion will clean them up, and they are much faster than deployments/pods. + + c.log.Info("Deleting custom resource definition") + err := c.apiextensionsClient.ApiextensionsV1().CustomResourceDefinitions().Delete(ctx, woopStubCRDName, v1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to delete CRD: %w", err) + } + + return nil +} + +func (c *deleteResourceScenario) Run(ctx context.Context, namespace string, clientset kubernetes.Interface, executor ActionExecutor) error { + actions := make([]castai.ClusterAction, 0, c.resourceCount) + woopGRV := WoopGVR() + for i := range c.resourceCount { + actions = append(actions, castai.ClusterAction{ + ID: uuid.NewString(), + ActionDelete: &castai.ActionDelete{ + ID: castai.ObjectID{ + GroupVersionResource: castai.GroupVersionResource{ + Group: woopGRV.Group, + Version: woopGRV.Version, + Resource: woopGRV.Resource, + }, + Name: fmt.Sprintf("delete-resource-%d", i), + Namespace: lo.ToPtr(namespace), + }, + }, + }) + } + executor.ExecuteActions(ctx, actions) + + return nil +} diff --git a/loadtest/scenarios/drain_node.go b/loadtest/scenarios/drain_node.go new file mode 100644 index 00000000..c9853784 --- /dev/null +++ b/loadtest/scenarios/drain_node.go @@ -0,0 +1,173 @@ +package scenarios + +import ( + "context" + "errors" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/google/uuid" + "github.com/samber/lo" + "golang.org/x/sync/errgroup" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/internal/castai" +) + +// DrainNode simulates draining of nodes that passes successfully (as opposed to StuckDrain). +func DrainNode(nodeCount, deploymentReplicas int, log *slog.Logger) TestScenario { + return &drainNodeScenario{ + nodeCount: nodeCount, + deploymentReplicas: deploymentReplicas, + log: log, + } +} + +type drainNodeScenario struct { + nodeCount int + deploymentReplicas int + log *slog.Logger + + nodesToDrain []*corev1.Node +} + +func (s *drainNodeScenario) Name() string { + return "drain node" +} + +func (s *drainNodeScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + s.nodesToDrain = make([]*corev1.Node, 0, s.nodeCount) + + var lock sync.Mutex + errGroup, ctx := errgroup.WithContext(ctx) + + for i := range s.nodeCount { + errGroup.Go(func() error { + nodeName := fmt.Sprintf("kwok-drain-%d", i) + s.log.Info(fmt.Sprintf("Creating node %s", nodeName)) + node := NewKwokNode(KwokConfig{}, nodeName) + + _, err := clientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create fake node: %w", err) + } + if err != nil && apierrors.IsAlreadyExists(err) { + s.log.Warn("node already exists, will reuse but potential conflict between test runs", "nodeName", nodeName) + } + lock.Lock() + s.nodesToDrain = append(s.nodesToDrain, node) + lock.Unlock() + + s.log.Info(fmt.Sprintf("Creating deployment on node %s", nodeName)) + deployment := Deployment(fmt.Sprintf("fake-deployment-%s-%d", node.Name, i)) + deployment.ObjectMeta.Namespace = namespace + //nolint:gosec // Not afraid of overflow here. + deployment.Spec.Replicas = lo.ToPtr(int32(s.deploymentReplicas)) + deployment.Spec.Template.Spec.NodeName = nodeName + + _, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create fake deployment: %w", err) + } + + // Wait for deployment to become ready, otherwise we might start draining before the pod is up. + progressed := WaitUntil(ctx, 60*time.Second, func(ctx context.Context) bool { + d, err := clientset.AppsV1().Deployments(namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) + if err != nil { + s.log.Warn("failed to get deployment after creating", "err", err) + return false + } + return d.Status.ReadyReplicas == *d.Spec.Replicas + }) + if !progressed { + return fmt.Errorf("deployment %s did not progress to ready state in time", deployment.Name) + } + + return nil + }) + } + + return errGroup.Wait() +} + +func (s *drainNodeScenario) Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + var lock sync.Mutex + var errs []error + var wg sync.WaitGroup + + wg.Add(len(s.nodesToDrain)) + // We iterate through all nodes as they are not deleted with the ns and can leak => so we want to delete as many as possible. + for _, n := range s.nodesToDrain { + go func() { + defer wg.Done() + + s.log.Info(fmt.Sprintf("Deleting node %s", n.Name)) + err := clientset.CoreV1().Nodes().Delete(ctx, n.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + s.log.Warn("failed to delete fake node, will continue with other nodes", "nodeName", n.Name) + lock.Lock() + errs = append(errs, err) + lock.Unlock() + } + }() + } + + wg.Wait() + + if len(errs) > 0 { + return errors.Join(errs...) + } + + // We assume no other tests are using the same NS so just delete all. + deploymentsInNS, err := clientset.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list deployments: %w", err) + } + + wg.Add(len(deploymentsInNS.Items)) + for _, deployment := range deploymentsInNS.Items { + go func() { + defer wg.Done() + s.log.Info(fmt.Sprintf("Deleting deployment %s", deployment.Name)) + err = clientset.AppsV1().Deployments(namespace).Delete(ctx, deployment.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + s.log.Warn( + "failed to delete fake deployment, will continue with other deployments and rely on namespace cleanup", + "deploymentName", + deployment.Name, + ) + } + }() + } + wg.Wait() + + s.log.Info("Finished up cleaning nodes and deployments for drain.") + return nil +} + +func (s *drainNodeScenario) Run(ctx context.Context, _ string, _ kubernetes.Interface, executor ActionExecutor) error { + s.log.Info(fmt.Sprintf("Starting drain action creation with %d nodes", len(s.nodesToDrain))) + + actions := make([]castai.ClusterAction, 0, len(s.nodesToDrain)) + for _, node := range s.nodesToDrain { + actions = append(actions, castai.ClusterAction{ + ID: uuid.NewString(), + CreatedAt: time.Now().UTC(), + ActionDrainNode: &castai.ActionDrainNode{ + NodeName: node.Name, + NodeID: "", + DrainTimeoutSeconds: 60, + Force: true, + }, + }) + } + + executor.ExecuteActions(ctx, actions) + + return nil +} diff --git a/loadtest/scenarios/evict_pod.go b/loadtest/scenarios/evict_pod.go new file mode 100644 index 00000000..394657c5 --- /dev/null +++ b/loadtest/scenarios/evict_pod.go @@ -0,0 +1,90 @@ +package scenarios + +import ( + "context" + "errors" + "fmt" + "log/slog" + + "github.com/google/uuid" + "github.com/samber/lo" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/internal/castai" +) + +func EvictPod(count int, log *slog.Logger) TestScenario { + return &evictPodScenario{ + totalPods: count, + log: log, + } +} + +type evictPodScenario struct { + totalPods int + log *slog.Logger + + podsToEvict []*v1.Pod +} + +func (e *evictPodScenario) Name() string { + return "evict pod" +} + +func (e *evictPodScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + // Create N pods; store in state + for i := range e.totalPods { + select { + case <-ctx.Done(): + return fmt.Errorf("context done: %w", ctx.Err()) + default: + } + + pod := Pod(fmt.Sprintf("evict-pod-%d", i)) + pod.ObjectMeta.Namespace = namespace + + e.log.Info(fmt.Sprintf("Creating pod %s", pod.Name)) + _, err := clientset.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("creating pod: %w", err) + } + + e.podsToEvict = append(e.podsToEvict, pod) + } + + return nil +} + +func (e *evictPodScenario) Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + var errs []error + + for _, pod := range e.podsToEvict { + e.log.Info(fmt.Sprintf("Deleting pod %s", pod.Name)) + err := clientset.CoreV1().Pods(namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: lo.ToPtr(int64(0))}) + if err != nil && !apierrors.IsNotFound(err) { + e.log.Warn(fmt.Sprintf("failed to delete pod: %v", err)) + errs = append(errs, err) + } + } + return errors.Join(errs...) +} + +func (e *evictPodScenario) Run(ctx context.Context, namespace string, clientset kubernetes.Interface, executor ActionExecutor) error { + e.log.Info(fmt.Sprintf("Starting creating %d actions to evict pods", len(e.podsToEvict))) + actions := make([]castai.ClusterAction, 0, len(e.podsToEvict)) + for _, pod := range e.podsToEvict { + actions = append(actions, castai.ClusterAction{ + ID: uuid.NewString(), + ActionEvictPod: &castai.ActionEvictPod{ + Namespace: pod.Namespace, + PodName: pod.Name, + }, + }) + } + executor.ExecuteActions(ctx, actions) + + return nil +} diff --git a/loadtest/scenarios/k8s_objects.go b/loadtest/scenarios/k8s_objects.go index 111ff62c..8d8dd618 100644 --- a/loadtest/scenarios/k8s_objects.go +++ b/loadtest/scenarios/k8s_objects.go @@ -7,14 +7,22 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" ) const ( DefaultKwokMarker = "kwok.x-k8s.io/node" KwokMarkerValue = "fake" + + woopStubCRDName = "recommendations.autoscaling.cast.ai" + woopStubCRDGroup = "autoscaling.cast.ai" + woopStubCRDPlural = "recommendations" + woopStubCRDKind = "Recommendation" ) type KwokConfig struct { @@ -94,12 +102,31 @@ func NewKwokNode(cfg KwokConfig, nodeName string) *corev1.Node { // Deployment cannot run in reality, it uses fake container. // Deployment deploys on kwok fake nodes by default. func DeploymentWithStuckPDB(deploymentName string) (*appsv1.Deployment, *policyv1.PodDisruptionBudget) { + deployment := Deployment(deploymentName) + + pdb := &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-pdb", deploymentName), + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + Selector: deployment.Spec.Selector, + MaxUnavailable: lo.ToPtr(intstr.FromInt32(0)), + }, + } + + return deployment, pdb +} + +// Deployment creates a deployment that can run on kwok nodes in the default namespace. +func Deployment(name string) *appsv1.Deployment { labelApp := "appname" - labelValue := fmt.Sprintf("%s-stuck-pdb", deploymentName) + labelValue := fmt.Sprintf("%s-test-pod", name) - deployment := &appsv1.Deployment{ + kwokAffinity, kwokToleration := kwokNodeAffinityAndToleration() + + return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Name: deploymentName, + Name: name, Namespace: metav1.NamespaceDefault, }, Spec: appsv1.DeploymentSpec{ @@ -123,15 +150,71 @@ func DeploymentWithStuckPDB(deploymentName string) (*appsv1.Deployment, *policyv }, }, Affinity: &corev1.Affinity{ - NodeAffinity: &corev1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ - NodeSelectorTerms: []corev1.NodeSelectorTerm{ - { - MatchExpressions: []corev1.NodeSelectorRequirement{ - { - Key: DefaultKwokMarker, - Operator: corev1.NodeSelectorOpIn, - Values: []string{KwokMarkerValue}, + NodeAffinity: kwokAffinity, + }, + Tolerations: []corev1.Toleration{kwokToleration}, + }, + }, + }, + } +} + +// Pod returns a pod that can run on kwok nodes in the default namespace. +func Pod(name string) *corev1.Pod { + kwokAffinity, kwokToleration := kwokNodeAffinityAndToleration() + + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: metav1.NamespaceDefault, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "fake-container", + Image: "does-not-exist", + }, + }, + Affinity: &corev1.Affinity{ + NodeAffinity: kwokAffinity, + }, + Tolerations: []corev1.Toleration{ + kwokToleration, + }, + }, + } +} + +// WoopCRD is a stub CRD similar to workload autoscaler's one. +func WoopCRD() *apiextensionsv1.CustomResourceDefinition { + return &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: woopStubCRDName, + }, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Group: woopStubCRDGroup, + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ + { + Name: "v1", + Served: true, + Storage: true, + Schema: &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "spec": { + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "replicas": {Type: "integer"}, + "recommendation": { + Type: "array", + Items: &apiextensionsv1.JSONSchemaPropsOrArray{ + Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "containerName": {Type: "string"}, + }, + }, }, }, }, @@ -139,31 +222,68 @@ func DeploymentWithStuckPDB(deploymentName string) (*appsv1.Deployment, *policyv }, }, }, - Tolerations: []corev1.Toleration{ - { - Key: DefaultKwokMarker, - Operator: corev1.TolerationOpExists, - Effect: corev1.TaintEffectNoSchedule, - }, - }, }, }, + Scope: apiextensionsv1.NamespaceScoped, + Names: apiextensionsv1.CustomResourceDefinitionNames{ + Plural: woopStubCRDPlural, + Singular: "recommendation", + Kind: woopStubCRDKind, + ListKind: "RecommendationList", + }, }, } +} - pdb := &policyv1.PodDisruptionBudget{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-pdb", deploymentName), - }, - Spec: policyv1.PodDisruptionBudgetSpec{ - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - labelApp: labelValue, +// WoopCR creates an instance of the CRD from WoopCRD. +func WoopCR(namespace, name string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": fmt.Sprintf("%s/v1", woopStubCRDGroup), + "kind": woopStubCRDKind, + "metadata": map[string]any{ + "name": name, + "namespace": namespace, + }, + "spec": map[string]any{ + "replicas": 10, + "recommendation": []map[string]any{ + { + "containerName": "test", + }, }, }, - MaxUnavailable: lo.ToPtr(intstr.FromInt32(0)), }, } +} - return deployment, pdb +// WoopGVR returns the GVR for the CRD from WoopCRD. +func WoopGVR() *schema.GroupVersionResource { + return &schema.GroupVersionResource{ + Group: woopStubCRDGroup, + Version: "v1", + Resource: woopStubCRDPlural, + } +} + +func kwokNodeAffinityAndToleration() (*corev1.NodeAffinity, corev1.Toleration) { + return &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: DefaultKwokMarker, + Operator: corev1.NodeSelectorOpIn, + Values: []string{KwokMarkerValue}, + }, + }, + }, + }, + }, + }, corev1.Toleration{ + Key: DefaultKwokMarker, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoSchedule, + } } diff --git a/loadtest/scenarios/patch_node.go b/loadtest/scenarios/patch_node.go new file mode 100644 index 00000000..219ce78f --- /dev/null +++ b/loadtest/scenarios/patch_node.go @@ -0,0 +1,124 @@ +package scenarios + +import ( + "context" + "errors" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/google/uuid" + "github.com/samber/lo" + "golang.org/x/sync/errgroup" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/internal/castai" +) + +func PatchNode(nodeCount int, log *slog.Logger) TestScenario { + return &patchNodeScenario{ + nodeCount: nodeCount, + log: log, + } +} + +type patchNodeScenario struct { + nodeCount int + log *slog.Logger + + nodesToPatch []*corev1.Node +} + +func (s *patchNodeScenario) Name() string { + return "patch node" +} + +func (s *patchNodeScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + s.nodesToPatch = make([]*corev1.Node, 0, s.nodeCount) + + var lock sync.Mutex + errGroup, ctx := errgroup.WithContext(ctx) + + for i := range s.nodeCount { + errGroup.Go(func() error { + nodeName := fmt.Sprintf("kwok-patch-%d", i) + s.log.Info(fmt.Sprintf("Creating node %s", nodeName)) + node := NewKwokNode(KwokConfig{}, nodeName) + + _, err := clientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create fake node: %w", err) + } + if err != nil && apierrors.IsAlreadyExists(err) { + s.log.Warn("node already exists, will reuse but potential conflict between test runs", "nodeName", nodeName) + } + lock.Lock() + s.nodesToPatch = append(s.nodesToPatch, node) + lock.Unlock() + + return nil + }) + } + + return errGroup.Wait() +} + +func (s *patchNodeScenario) Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + var lock sync.Mutex + var errs []error + var wg sync.WaitGroup + + wg.Add(len(s.nodesToPatch)) + // We iterate through all nodes as they are not deleted with the ns and can leak => so we want to delete as many as possible. + for _, n := range s.nodesToPatch { + go func() { + defer wg.Done() + + s.log.Info(fmt.Sprintf("Deleting node %s", n.Name)) + err := clientset.CoreV1().Nodes().Delete(ctx, n.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + s.log.Warn("failed to delete fake node, will continue with other nodes", "nodeName", n.Name) + lock.Lock() + errs = append(errs, err) + lock.Unlock() + } + }() + } + + wg.Wait() + + if len(errs) > 0 { + return errors.Join(errs...) + } + + s.log.Info("Finished up cleaning nodes for patching.") + return nil +} + +func (s *patchNodeScenario) Run(ctx context.Context, _ string, _ kubernetes.Interface, executor ActionExecutor) error { + s.log.Info(fmt.Sprintf("Starting patch node action creation with %d nodes", len(s.nodesToPatch))) + + actions := make([]castai.ClusterAction, 0, len(s.nodesToPatch)) + for _, node := range s.nodesToPatch { + actions = append(actions, castai.ClusterAction{ + ID: uuid.NewString(), + CreatedAt: time.Now().UTC(), + ActionPatchNode: &castai.ActionPatchNode{ + NodeName: node.Name, + NodeID: "", + Labels: map[string]string{"Test": "label"}, + Annotations: map[string]string{"Test": "annotation"}, + Unschedulable: lo.ToPtr(true), + Capacity: nil, + }, + }) + } + + executor.ExecuteActions(ctx, actions) + + return nil +} diff --git a/loadtest/scenarios/patch_resource.go b/loadtest/scenarios/patch_resource.go new file mode 100644 index 00000000..7b277ef7 --- /dev/null +++ b/loadtest/scenarios/patch_resource.go @@ -0,0 +1,123 @@ +package scenarios + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/google/uuid" + "github.com/samber/lo" + apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apierrors "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/internal/castai" +) + +// PatchResource will simulate patching N custom resources (ala workload autoscaler flow). +func PatchResource(count int, dynamicClient dynamic.Interface, apiextensions apiextensionsclientset.Interface, log *slog.Logger) TestScenario { + return &patchResourceScenario{ + resourceCount: count, + apiextensionsClient: apiextensions, + dynamicClient: dynamicClient, + log: log, + } +} + +type patchResourceScenario struct { + resourceCount int + apiextensionsClient apiextensionsclientset.Interface + dynamicClient dynamic.Interface + log *slog.Logger +} + +func (c *patchResourceScenario) Name() string { + return "patch resource" +} + +func (c *patchResourceScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + crd := WoopCRD() + + c.log.Info("Creating CRD") + _, err := c.apiextensionsClient.ApiextensionsV1().CustomResourceDefinitions().Create(context.Background(), crd, v1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create CRD: %w", err) + } + + // Sometimes it takes a few seconds for CRD to be fully consistent, depending on provider. + time.Sleep(5 * time.Second) + + c.log.Info("Pre-creating resources") + resourceGVR := schema.GroupVersionResource{ + Group: woopStubCRDGroup, + Version: "v1", + Resource: woopStubCRDPlural, + } + for i := range c.resourceCount { + instance := WoopCR(namespace, fmt.Sprintf("patch-resource-%d", i)) + + _, err = c.dynamicClient.Resource(resourceGVR).Namespace(namespace).Create(context.Background(), instance, v1.CreateOptions{}) + if err != nil { + fmt.Printf("Error creating instance %d: %v\n", i, err) + } else { + fmt.Printf("Created instance: myresource-%d\n", i) + } + } + + return nil +} + +func (c *patchResourceScenario) Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + // Note: we don't delete the CRs as namespace deletion will clean them up, and they are much faster than deployments/pods. + + c.log.Info("Deleting custom resource definition") + err := c.apiextensionsClient.ApiextensionsV1().CustomResourceDefinitions().Delete(ctx, woopStubCRDName, v1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to delete CRD: %w", err) + } + + return nil +} + +func (c *patchResourceScenario) Run(ctx context.Context, namespace string, clientset kubernetes.Interface, executor ActionExecutor) error { + actions := make([]castai.ClusterAction, 0, c.resourceCount) + woopGRV := WoopGVR() + for i := range c.resourceCount { + actions = append(actions, castai.ClusterAction{ + ID: uuid.NewString(), + ActionPatch: &castai.ActionPatch{ + ID: castai.ObjectID{ + GroupVersionResource: castai.GroupVersionResource{ + Group: woopGRV.Group, + Version: woopGRV.Version, + Resource: woopGRV.Resource, + }, + Name: fmt.Sprintf("patch-resource-%d", i), + Namespace: lo.ToPtr(namespace), + }, + PatchType: "application/json-patch+json", + Patch: ` + [ + { + "op": "add", + "path": "/metadata/annotations", + "value": {} + }, + { + "op": "add", + "path": "/metadata/annotations/annotations-key", + "value": "annotation-value" + } + ] + `, + }, + }) + } + executor.ExecuteActions(ctx, actions) + + return nil +} diff --git a/loadtest/scenarios/scenario.go b/loadtest/scenarios/scenario.go index c59770a9..92f3863f 100644 --- a/loadtest/scenarios/scenario.go +++ b/loadtest/scenarios/scenario.go @@ -101,6 +101,7 @@ func RunScenario( err = scenario.Preparation(ctx, namespaceForTest, clientset) if err != nil { + logger.Warn("Preparation for scenario failed", "error", err) return fmt.Errorf("failed to run preparation function: %w", err) } diff --git a/loadtest/scenarios/stuck_drain.go b/loadtest/scenarios/stuck_drain.go index 905d16de..b15e501f 100644 --- a/loadtest/scenarios/stuck_drain.go +++ b/loadtest/scenarios/stuck_drain.go @@ -72,7 +72,7 @@ func (s *stuckDrainScenario) Preparation(ctx context.Context, namespace string, } // Wait for deployment to become ready, otherwise we might start draining before the pod is up. - progressed := WaitUntil(ctx, 30*time.Second, func() bool { + progressed := WaitUntil(ctx, 30*time.Second, func(ctx context.Context) bool { d, err := clientset.AppsV1().Deployments(namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) if err != nil { s.log.Warn("failed to get deployment after creating", "err", err) diff --git a/loadtest/scenarios/util.go b/loadtest/scenarios/util.go index 8ad3f034..548f8fce 100644 --- a/loadtest/scenarios/util.go +++ b/loadtest/scenarios/util.go @@ -5,7 +5,7 @@ import ( "time" ) -func WaitUntil(ctx context.Context, duration time.Duration, condition func() bool) bool { +func WaitUntil(ctx context.Context, duration time.Duration, condition func(ctx context.Context) bool) bool { start := time.Now() for { select { @@ -16,7 +16,7 @@ func WaitUntil(ctx context.Context, duration time.Duration, condition func() boo if time.Since(start) > duration { return false } - if condition() { + if condition(ctx) { return true } time.Sleep(500 * time.Millisecond)