diff --git a/Makefile b/Makefile index df427822..19d25a4f 100644 --- a/Makefile +++ b/Makefile @@ -38,3 +38,8 @@ test: generate-e2e-client: go generate ./e2e/client .PHONY: generate-e2e-client + +# TODO: Make this less simplistic +run-loadtest: + # TODO: Required because of reusing config + API_KEY=dummy API_URL=http://example.com CLUSTER_ID=D30A163C-C5DF-4CC8-985C-D1449398295E KUBECONFIG=~/.kube/config LOG_LEVEL=4 go run . test-server \ No newline at end of file diff --git a/README.md b/README.md index 9981360d..657277ab 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,9 @@ Option 1. Deploy controller in Kind cluster. --set clusterID="your-cluster-id" | kubectl apply -f - -n castai-agent ``` +### Load tests +See [docs](loadtest/README.md) + ## Community - [Twitter](https://twitter.com/cast_ai) diff --git a/cmd/root.go b/cmd/root.go index 16410faa..316a049b 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -9,6 +9,7 @@ import ( "github.com/castai/cluster-controller/cmd/controller" "github.com/castai/cluster-controller/cmd/monitor" + "github.com/castai/cluster-controller/cmd/testserver" ) var rootCmd = &cobra.Command{ @@ -40,6 +41,7 @@ func Execute(ctx context.Context) { func init() { rootCmd.AddCommand(controller.NewCmd()) rootCmd.AddCommand(monitor.NewCmd()) + rootCmd.AddCommand(testserver.NewCmd()) } func fatal(err error) { diff --git a/cmd/testserver/command.go b/cmd/testserver/command.go new file mode 100644 index 00000000..1007e0a7 --- /dev/null +++ b/cmd/testserver/command.go @@ -0,0 +1,16 @@ +package testserver + +import "github.com/spf13/cobra" + +const Use = "test-server" + +func NewCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: Use, + RunE: func(cmd *cobra.Command, args []string) error { + return run(cmd.Context()) + }, + } + + return cmd +} diff --git a/cmd/testserver/run.go b/cmd/testserver/run.go new file mode 100644 index 00000000..a658665f --- /dev/null +++ b/cmd/testserver/run.go @@ -0,0 +1,109 @@ +package testserver + +import ( + "context" + "errors" + "fmt" + "log/slog" + "os" + "sync" + "time" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + + "github.com/castai/cluster-controller/loadtest" + "github.com/castai/cluster-controller/loadtest/scenarios" +) + +func run(ctx context.Context) error { + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + cfg := loadtest.GetConfig() + logger.Info("creating test server") + + testServer := loadtest.NewTestServer(logger, loadtest.TestServerConfig{ + MaxActionsPerCall: 1000, + TimeoutWaitingForActions: 60 * time.Second, + }) + + clientSet, err := createK8SClient(cfg, logger) + if err != nil { + return err + } + + go func() { + logger.Info("Starting HTTP server for test") + err = loadtest.NewHttpServer(ctx, cfg, testServer) + if err != nil { + logger.Error("", "err", err) + panic(err) + } + }() + + testScenarios := []scenarios.TestScenario{ + // scenarios.PodEvents(5000, logger), + // scenarios.StuckDrain(100, 60, logger), + scenarios.StuckDrain(10, 1, logger), + } + + var wg sync.WaitGroup + wg.Add(len(testScenarios)) + errs := make(chan error, len(testScenarios)) + + for i, test := range testScenarios { + go func() { + defer wg.Done() + logger.Info(fmt.Sprintf("Starting test scenario %d", i)) + + err := scenarios.RunScenario(ctx, test, testServer, logger, clientSet) + errs <- err + }() + } + + logger.Info("Waiting for test scenarios to finish") + wg.Wait() + + close(errs) + receivedErrors := make([]error, 0) + for err := range errs { + if err != nil { + receivedErrors = append(receivedErrors, err) + } + } + logger.Info(fmt.Sprintf("All test scenarios are done, received (%d) errors, exiting", len(receivedErrors))) + + return errors.Join(receivedErrors...) +} + +func createK8SClient(cfg loadtest.Config, logger *slog.Logger) (*kubernetes.Clientset, error) { + if cfg.KubeConfig == "" { + logger.Info("Using in-cluster configuration") + restConfig, err := rest.InClusterConfig() + if err != nil { + return nil, fmt.Errorf("error creating in-cluster config: %w", err) + } + clientSet, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, fmt.Errorf("obtaining kubernetes clientset: %w", err) + } + return clientSet, nil + } + + logger.Info(fmt.Sprintf("Using kubeconfig from %q", cfg.KubeConfig)) + data, err := os.ReadFile(cfg.KubeConfig) + if err != nil { + return nil, fmt.Errorf("reading kubeconfig at %s: %w", cfg.KubeConfig, err) + } + + restConfig, err := clientcmd.RESTConfigFromKubeConfig(data) + if err != nil { + return nil, fmt.Errorf("creating rest config from %q: %w", cfg.KubeConfig, err) + } + + clientSet, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, fmt.Errorf("obtaining kubernetes clientset: %w", err) + } + return clientSet, nil +} diff --git a/loadtest/README.md b/loadtest/README.md new file mode 100644 index 00000000..9628522e --- /dev/null +++ b/loadtest/README.md @@ -0,0 +1,28 @@ +# Load testing Cluster controller + +Load test requires 3 components: +- Test server that simulates cluster-hub and the scenarios. +- Kwok controller to simulate nodes/pods +- Cluster controller itself. + +Right now this is extremely basic and you have to run those manually locally. + +Start kwok: +``` + kwok --kubeconfig=~/.kube/config \ + --manage-all-nodes=false \ + --manage-nodes-with-annotation-selector=kwok.x-k8s.io/node=fake-node \ + --node-lease-duration-seconds=40 \ + --cidr=10.0.0.1/24 \ + --node-ip=10.0.0.1 +``` + +Run the test server on port 8080 against your current kubeconfig context: +``` +KUBECONFIG=~/.kube/config PORT=8080 go run . test-server +``` + +After starting, start cluster controller with some dummy values and point it to the test server: +``` +API_KEY=dummy API_URL=http://localhost:8080 CLUSTER_ID=D30A163C-C5DF-4CC8-985C-D1449398295E KUBECONFIG=~/.kube/config LOG_LEVEL=4 LEADER_ELECTION_NAMESPACE=default METRICS_ENABLED=true go run . +``` diff --git a/loadtest/castai.go b/loadtest/castai.go new file mode 100644 index 00000000..71d5e4b6 --- /dev/null +++ b/loadtest/castai.go @@ -0,0 +1,150 @@ +package loadtest + +import ( + "context" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/google/uuid" + "github.com/samber/lo" + + "github.com/castai/cluster-controller/internal/castai" +) + +// CastAITestServer acts as simple cluster hub mock replacement. +// It exposes a way to "push" actions to the cluster controller via GetActionsPushChannel +// and can be used as an implementation of the server interface that cluster controller expects to call. +type CastAITestServer struct { + log *slog.Logger + actionsPushChannel chan castai.ClusterAction + cfg TestServerConfig + + logMx sync.Mutex + actionsLog map[string]chan string +} + +func NewTestServer(logger *slog.Logger, cfg TestServerConfig) *CastAITestServer { + return &CastAITestServer{ + log: logger, + actionsPushChannel: make(chan castai.ClusterAction, 10000), + cfg: cfg, + actionsLog: make(map[string]chan string), + } +} + +// ExecuteActions pushes the list of actions to the queue for cluster controller to process. +// This method returns when all actions are acked or context is cancelled. +func (c *CastAITestServer) ExecuteActions(ctx context.Context, actions []castai.ClusterAction) { + // owner channel has 1:n relationship with the actions. It handles the ack + ownerChannel := make(chan string, len(actions)) + + for _, action := range actions { + if action.ID == "" { + action.ID = uuid.NewString() + } + c.addActionToStore(action.ID, ownerChannel) + c.actionsPushChannel <- action + } + + // Read from owner channel until len(actions) times, then close and return. + finished := 0 + for { + select { + case <-ctx.Done(): + c.log.Info(fmt.Sprintf("Received signal to stop finished with cause (%q) and err (%v). Closing executor.", context.Cause(ctx), ctx.Err())) + return + case finishedAction := <-ownerChannel: + c.removeActionFromStore(finishedAction) + finished++ + if finished == len(actions) { + close(ownerChannel) + return + } + } + } +} + +/* Start Cluster-hub mock implementation */ + +func (c *CastAITestServer) GetActions(ctx context.Context, _ string) ([]*castai.ClusterAction, error) { + c.log.Info(fmt.Sprintf("GetActions called, have %d items in buffer", len(c.actionsPushChannel))) + actionsToReturn := make([]*castai.ClusterAction, 0) + + // Wait for at least one action to arrive from whoever is pushing them. + // If none arrive, we simulate the "empty poll" case of cluster-hub and return empty list. + select { + case x := <-c.actionsPushChannel: + actionsToReturn = append(actionsToReturn, &x) + case <-time.After(c.cfg.TimeoutWaitingForActions): + c.log.Info(fmt.Sprintf("No actions to return in %v", c.cfg.TimeoutWaitingForActions)) + return nil, nil + case <-ctx.Done(): + return nil, fmt.Errorf("context done with cause (%w), err (%w)", context.Cause(ctx), ctx.Err()) + } + + // Attempt to drain up to max items from the channel. + for len(actionsToReturn) < c.cfg.MaxActionsPerCall { + select { + case x := <-c.actionsPushChannel: + actionsToReturn = append(actionsToReturn, &x) + case <-time.After(50 * time.Millisecond): + c.log.Info(fmt.Sprintf("Returning %d actions for processing", len(actionsToReturn))) + // If we haven't received enough items, just flush. + return actionsToReturn, nil + case <-ctx.Done(): + return nil, fmt.Errorf("context done with cause (%w), err (%w)", context.Cause(ctx), ctx.Err()) + } + } + + c.log.Info(fmt.Sprintf("Returning %d actions for processing", len(actionsToReturn))) + return actionsToReturn, nil +} + +func (c *CastAITestServer) AckAction(ctx context.Context, actionID string, req *castai.AckClusterActionRequest) error { + errMsg := lo.FromPtr(req.Error) + c.log.DebugContext(ctx, fmt.Sprintf("action %q acknowledged; has error: %v; error: %v", actionID, req.Error != nil, errMsg)) + + receiver := c.getActionReceiver(actionID) + if receiver == nil { + return fmt.Errorf("action %q does not have a receiver", actionID) + } + // Notify owner that this action was done. + receiver <- actionID + + return nil +} + +func (c *CastAITestServer) SendLog(ctx context.Context, e *castai.LogEntry) error { + // No-op for now, maybe track metrics in the future? + return nil +} + +/* End Cluster-hub mock implementation */ + +func (c *CastAITestServer) addActionToStore(actionID string, receiver chan string) { + c.logMx.Lock() + defer c.logMx.Unlock() + + c.actionsLog[actionID] = receiver +} + +func (c *CastAITestServer) removeActionFromStore(actionID string) { + c.logMx.Lock() + defer c.logMx.Unlock() + + delete(c.actionsLog, actionID) +} + +func (c *CastAITestServer) getActionReceiver(actionID string) chan string { + c.logMx.Lock() + defer c.logMx.Unlock() + + receiver, ok := c.actionsLog[actionID] + if !ok { + c.log.Error(fmt.Sprintf("Receiver for action %s is no longer there, possibly shutting down", actionID)) + return nil + } + return receiver +} diff --git a/loadtest/config.go b/loadtest/config.go new file mode 100644 index 00000000..61cfbb5b --- /dev/null +++ b/loadtest/config.go @@ -0,0 +1,49 @@ +package loadtest + +import ( + "fmt" + "time" + + "github.com/spf13/viper" +) + +// Config for the HTTP server. +type Config struct { + // Port where the mock server to listen on. + Port int + + // KubeConfig can point to a kubeconfig file. If empty, InCluster client will be assumed. + KubeConfig string +} + +// TestServerConfig has settings for the mock server instance. +type TestServerConfig struct { + // MaxActionsPerCall is the upper limit of actions to return in one CastAITestServer.GetActions call. + MaxActionsPerCall int + // TimeoutWaitingForActions controls how long to wait for at least 1 action to appear on server side. + // This mimics CH behavior of not returning early if there are no pending actions and keeping the request "running". + TimeoutWaitingForActions time.Duration +} + +var singletonCfg *Config + +func GetConfig() Config { + // not thread safe, but you will not put this under concurrent pressure, right? + if singletonCfg != nil { + return *singletonCfg + } + + _ = viper.BindEnv("port", "PORT") + _ = viper.BindEnv("kubeconfig", "KUBECONFIG") + + singletonCfg = &Config{} + if err := viper.Unmarshal(&singletonCfg); err != nil { + panic(fmt.Errorf("parsing configuration: %w", err)) + } + + if singletonCfg.Port == 0 { + panic(fmt.Errorf("test server port must be set")) + } + + return *singletonCfg +} diff --git a/loadtest/http.go b/loadtest/http.go new file mode 100644 index 00000000..c0e106ec --- /dev/null +++ b/loadtest/http.go @@ -0,0 +1,65 @@ +package loadtest + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/castai/cluster-controller/internal/castai" +) + +func NewHttpServer(ctx context.Context, cfg Config, testServer *CastAITestServer) error { + http.HandleFunc("/v1/kubernetes/clusters/{cluster_id}/actions", func(w http.ResponseWriter, r *http.Request) { + result, err := testServer.GetActions(r.Context(), "") + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + response := &castai.GetClusterActionsResponse{ + Items: result, + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(response); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + }) + + http.HandleFunc("/v1/kubernetes/clusters/{cluster_id}/actions/{action_id}/ack", func(w http.ResponseWriter, r *http.Request) { + actionID := r.PathValue("action_id") + var req castai.AckClusterActionRequest + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + err = testServer.AckAction(r.Context(), actionID, &req) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + }) + + http.HandleFunc("/v1/kubernetes/clusters/{cluster_id}/actions/logs", func(w http.ResponseWriter, r *http.Request) { + var req castai.LogEntry + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + err = testServer.SendLog(r.Context(), &req) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + }) + + //nolint:gosec // Missing timeouts are not a real issue here. + return http.ListenAndServe(fmt.Sprintf(":%d", cfg.Port), nil) +} diff --git a/loadtest/scenarios/k8s_objects.go b/loadtest/scenarios/k8s_objects.go new file mode 100644 index 00000000..90c6c0af --- /dev/null +++ b/loadtest/scenarios/k8s_objects.go @@ -0,0 +1,169 @@ +package scenarios + +import ( + "fmt" + + "github.com/samber/lo" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" +) + +const ( + DefaultKwokMarker = "kwok.x-k8s.io/node" + KwokMarkerValue = "fake-node" +) + +type KwokConfig struct { + // Label should match what kwok is configured to use via --manage-nodes-with-label-selector + // Default is DefaultKwokMarker. Value is always KwokMarkerValue. + Label string + + // Annotation should match what kwok is configured to use via --manage-nodes-with-annotation-selector + // Default is DefaultKwokMarker. Value is always KwokMarkerValue. + Annotation string +} + +// NewKwokNode creates a fake node with reasonable defaults. +// Can be customized but the marker label/annotation must be present. +// Tainted by default with DefaultKwokMarker to avoid running real pods on it. +// Requires that a kwok-controller is running to actually simulate the node on apply. +func NewKwokNode(cfg KwokConfig, nodeName string) *corev1.Node { + kwokLabel := DefaultKwokMarker + if cfg.Label != "" { + kwokLabel = cfg.Label + } + kwokAnnotation := DefaultKwokMarker + if cfg.Annotation != "" { + kwokAnnotation = cfg.Annotation + } + + defaultLabels := map[string]string{ + kwokLabel: KwokMarkerValue, + corev1.LabelArchStable: "amd64", + corev1.LabelHostname: nodeName, + corev1.LabelOSStable: "linux", + "kubernetes.io/role": "fake-node", + "type": "kwok", + } + + defaultAnnotations := map[string]string{ + kwokAnnotation: KwokMarkerValue, + } + + return &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + Labels: defaultLabels, + Annotations: defaultAnnotations, + }, + Spec: corev1.NodeSpec{ + Taints: []corev1.Taint{ + { + Key: DefaultKwokMarker, + Value: KwokMarkerValue, + Effect: corev1.TaintEffectNoSchedule, + }, + }, + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewMilliQuantity(32, resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(256*1024*1024*1024, resource.BinarySI), + corev1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + }, + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewMilliQuantity(32, resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(256*1024*1024*1024, resource.BinarySI), + corev1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + }, + NodeInfo: corev1.NodeSystemInfo{ + KubeletVersion: "kwok", + OperatingSystem: "linux", + Architecture: "amd64", + }, + Phase: corev1.NodeRunning, + }, + } +} + +// DeploymentWithStuckPDB creates a 1-replica deployment with "stuck PDB" that is never satisfiable, i.e. no pods can be evicted. +// Deployment cannot run in reality, it uses fake container. +// Deployment deploys on kwok fake nodes by default. +func DeploymentWithStuckPDB(deploymentName string) (*appsv1.Deployment, *policyv1.PodDisruptionBudget) { + labelApp := "appname" + labelValue := fmt.Sprintf("%s-stuck-pdb", deploymentName) + + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: deploymentName, + Namespace: metav1.NamespaceDefault, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: lo.ToPtr(int32(1)), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + labelApp: labelValue, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + labelApp: labelValue, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "fake-container", + Image: "does-not-exist", + }, + }, + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: DefaultKwokMarker, + Operator: corev1.NodeSelectorOpIn, + Values: []string{KwokMarkerValue}, + }, + }, + }, + }, + }, + }, + }, + Tolerations: []corev1.Toleration{ + { + Key: DefaultKwokMarker, + Operator: corev1.TolerationOpExists, + Effect: corev1.TaintEffectNoSchedule, + }, + }, + }, + }, + }, + } + + pdb := &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-pdb", deploymentName), + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + labelApp: labelValue, + }, + }, + MaxUnavailable: lo.ToPtr(intstr.FromInt32(0)), + }, + } + + return deployment, pdb +} diff --git a/loadtest/scenarios/pod_events.go b/loadtest/scenarios/pod_events.go new file mode 100644 index 00000000..af8ad46b --- /dev/null +++ b/loadtest/scenarios/pod_events.go @@ -0,0 +1,74 @@ +package scenarios + +import ( + "context" + "fmt" + "log/slog" + "time" + + "github.com/google/uuid" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/internal/castai" +) + +func PodEvents(count int, log *slog.Logger) TestScenario { + return &podEventsScenario{ + totalEvents: count, + log: log, + } +} + +type podEventsScenario struct { + totalEvents int + log *slog.Logger +} + +func (p *podEventsScenario) Name() string { + return "pod events" +} + +func (p *podEventsScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + // nothing to prepare for this test, pod does not have to exist to create events. + return nil +} + +func (p *podEventsScenario) Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + // nothing to clean for this test, events are dropped automatically after certain time. + + return nil +} + +func (p *podEventsScenario) Run(ctx context.Context, namespace string, _ kubernetes.Interface, executor ActionExecutor) error { + p.log.Info(fmt.Sprintf("Starting creating %d events for different pods", p.totalEvents)) + actions := make([]castai.ClusterAction, 0, p.totalEvents) + for i := range p.totalEvents { + actions = append(actions, castai.ClusterAction{ + ID: uuid.NewString(), + ActionCreateEvent: &castai.ActionCreateEvent{ + Reporter: "provisioning.cast.ai", + ObjectRef: corev1.ObjectReference{ + Kind: "Pod", + // Actions are executed async on CC, meaning they are acked even if rejected by server. + // This means we can't rely on the test namespace as it'll disappear before all events are processed. + // So we use a namespace that _will_ be there. + Namespace: corev1.NamespaceDefault, + Name: "Dummy-pod", + UID: types.UID(uuid.New().String()), + APIVersion: "v1", + }, + EventTime: time.Now(), + EventType: "Warning", + // Reason is different so events won't be aggregated by CC's event broadcaster. + Reason: fmt.Sprintf("Just because! %d", i), + Action: "During node creation.", + Message: "Oh common, you can do better.", + }, + }) + } + executor.ExecuteActions(ctx, actions) + + return nil +} diff --git a/loadtest/scenarios/scenario.go b/loadtest/scenarios/scenario.go new file mode 100644 index 00000000..c59770a9 --- /dev/null +++ b/loadtest/scenarios/scenario.go @@ -0,0 +1,117 @@ +package scenarios + +import ( + "context" + "fmt" + "log/slog" + "math/rand" + "time" + + "github.com/samber/lo" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/internal/castai" +) + +// TODO Spend more than 2 seconds thinking about names + +type ActionExecutor interface { + // ExecuteActions is expected to execute all actions and wait for ack before returning; otherwise cleanups might run too early. + ExecuteActions(ctx context.Context, actions []castai.ClusterAction) +} + +type TestScenario interface { + Name() string + // Preparation should create any necessary resources in the cluster for the test so it runs in realistic env. + Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error + // Cleanup should delete any items created by the preparation or the test itself. + // It might be called even if Preparation or Run did not complete so it should handle those cases gracefully. + // The scenario's namespace is deleted at the end but ideally scenarios delete their resources as well, + // otherwise namespace deletion can take very long to propagate. + Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error + Run(ctx context.Context, namespace string, clientset kubernetes.Interface, executor ActionExecutor) error +} + +func RunScenario( + ctx context.Context, + scenario TestScenario, + actioner ActionExecutor, + logger *slog.Logger, + clientset kubernetes.Interface, +) error { + //nolint:gosec // No point to use crypto/rand. + namespaceForTest := fmt.Sprintf("test-namespace-%d", rand.Int31()) + logger = logger.With("namespace", namespaceForTest, "scenario", scenario.Name()) + + // Prepare the namespace to run the test in. + logger.Info("Preparing namespace for test") + _, err := clientset.CoreV1().Namespaces().Get(ctx, namespaceForTest, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to get namespace for test %v: %w", namespaceForTest, err) + } + if !apierrors.IsNotFound(err) { + return fmt.Errorf("namespace %v already exists and could be in use, cannot continue", namespaceForTest) + } + + logger.Info("Namespace does not exist, will create") + _, err = clientset.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespaceForTest, + }, + }, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create namespace %v: %w", namespaceForTest, err) + } + defer func() { + // Cleanup uses different context so it runs even when the overall one is already cancelled + ctxForCleanup, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + logger.Info("Deleting namespace for test") + err := clientset.CoreV1().Namespaces().Delete(ctxForCleanup, namespaceForTest, metav1.DeleteOptions{ + GracePeriodSeconds: lo.ToPtr(int64(0)), + PropagationPolicy: lo.ToPtr(metav1.DeletePropagationBackground), + }) + if err != nil { + logger.Error(fmt.Sprintf("Failed to delete namespace for test %v: %v", namespaceForTest, err)) + return + } + logger.Info("Successfully deleted namespace for test") + }() + logger.Info("Namespace created") + + logger.Info("Starting test scenario") + + logger.Info("Running preparation function") + // We defer the cleanup before running preparation or run because each can "fail" in the middle and leave hanging resources. + defer func() { + // Cleanup uses different context so it runs even when the overall one is already cancelled + ctxForCleanup, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + logger.Info("Running cleanup function") + err := scenario.Cleanup(ctxForCleanup, namespaceForTest, clientset) + if err != nil { + logger.Error("failed ot run cleanup", "error", err) + } + }() + + err = scenario.Preparation(ctx, namespaceForTest, clientset) + if err != nil { + return fmt.Errorf("failed to run preparation function: %w", err) + } + + scenarioCtx, cancel := context.WithTimeout(ctx, 30*time.Minute) + defer cancel() + + logger.Info("Starting scenario execution") + err = scenario.Run(scenarioCtx, namespaceForTest, clientset, actioner) + if err != nil { + return fmt.Errorf("failed to run scenario: %w", err) + } + + return nil +} diff --git a/loadtest/scenarios/stuck_drain.go b/loadtest/scenarios/stuck_drain.go new file mode 100644 index 00000000..905d16de --- /dev/null +++ b/loadtest/scenarios/stuck_drain.go @@ -0,0 +1,157 @@ +package scenarios + +import ( + "context" + "errors" + "fmt" + "log/slog" + "time" + + "github.com/google/uuid" + "github.com/samber/lo" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + "github.com/castai/cluster-controller/internal/castai" +) + +func StuckDrain(nodeCount, deploymentReplicas int, log *slog.Logger) TestScenario { + return &stuckDrainScenario{ + nodeCount: nodeCount, + deploymentReplicas: deploymentReplicas, + log: log, + } +} + +type stuckDrainScenario struct { + nodeCount int + deploymentReplicas int + log *slog.Logger + + nodesToDrain []*corev1.Node +} + +func (s *stuckDrainScenario) Name() string { + return "drain node with stuck pdb" +} + +func (s *stuckDrainScenario) Preparation(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + s.nodesToDrain = make([]*corev1.Node, 0, s.nodeCount) + for i := range s.nodeCount { + nodeName := fmt.Sprintf("kwok-stuck-drain-%d", i) + s.log.Info(fmt.Sprintf("Creating node %s", nodeName)) + node := NewKwokNode(KwokConfig{}, nodeName) + + _, err := clientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("failed to create fake node: %w", err) + } + if err != nil && apierrors.IsAlreadyExists(err) { + s.log.Warn("node already exists, will reuse but potential conflict between test runs", "nodeName", nodeName) + } + s.nodesToDrain = append(s.nodesToDrain, node) + + s.log.Info(fmt.Sprintf("Creating deployment on node %s", nodeName)) + deployment, pdb := DeploymentWithStuckPDB(fmt.Sprintf("fake-deployment-%s-%d", node.Name, i)) + deployment.ObjectMeta.Namespace = namespace + //nolint:gosec // Not afraid of overflow here. + deployment.Spec.Replicas = lo.ToPtr(int32(s.deploymentReplicas)) + deployment.Spec.Template.Spec.NodeName = nodeName + pdb.ObjectMeta.Namespace = namespace + + _, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create fake deployment: %w", err) + } + + _, err = clientset.PolicyV1().PodDisruptionBudgets(namespace).Create(ctx, pdb, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create fake pod disruption budget: %w", err) + } + + // Wait for deployment to become ready, otherwise we might start draining before the pod is up. + progressed := WaitUntil(ctx, 30*time.Second, func() bool { + d, err := clientset.AppsV1().Deployments(namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) + if err != nil { + s.log.Warn("failed to get deployment after creating", "err", err) + return false + } + return d.Status.ReadyReplicas == *d.Spec.Replicas + }) + if !progressed { + return fmt.Errorf("deployment %s did not progress to ready state in time", deployment.Name) + } + } + + return nil +} + +func (s *stuckDrainScenario) Cleanup(ctx context.Context, namespace string, clientset kubernetes.Interface) error { + var errs []error + // We iterate through all nodes as they are not deleted with the ns and can leak => so we want do delete as many as possible. + for _, n := range s.nodesToDrain { + s.log.Info(fmt.Sprintf("Deleting node %s", n.Name)) + err := clientset.CoreV1().Nodes().Delete(ctx, n.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + s.log.Warn("failed to delete fake node, will continue with other nodes", "nodeName", n.Name) + errs = append(errs, err) + } + } + if len(errs) > 0 { + return errors.Join(errs...) + } + + // We assume no other tests are using the same NS so just delete all. + deploymentsInNS, err := clientset.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list deployments: %w", err) + } + + for _, deployment := range deploymentsInNS.Items { + s.log.Info(fmt.Sprintf("Deleting deployment %s", deployment.Name)) + err = clientset.AppsV1().Deployments(namespace).Delete(ctx, deployment.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to delete fake deployment: %w", err) + } + } + + pdbsInNS, err := clientset.PolicyV1().PodDisruptionBudgets(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("failed to list pod disruption budgets: %w", err) + } + + for _, pdb := range pdbsInNS.Items { + s.log.Info(fmt.Sprintf("Deleting PDB %s", pdb.Name)) + err = clientset.PolicyV1().PodDisruptionBudgets(namespace).Delete(ctx, pdb.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to delete fake pod disruption budget: %w", err) + } + } + + s.log.Info("Finished up cleaning nodes for drain, deployments and PDBs.") + return nil +} + +func (s *stuckDrainScenario) Run(ctx context.Context, _ string, _ kubernetes.Interface, executor ActionExecutor) error { + s.log.Info(fmt.Sprintf("Starting drain action creation with %d nodes", len(s.nodesToDrain))) + + actions := make([]castai.ClusterAction, 0, len(s.nodesToDrain)) + for _, node := range s.nodesToDrain { + actions = append(actions, castai.ClusterAction{ + ID: uuid.NewString(), + CreatedAt: time.Now().UTC(), + ActionDrainNode: &castai.ActionDrainNode{ + NodeName: node.Name, + NodeID: "", + DrainTimeoutSeconds: 60, + Force: false, + }, + }) + } + + executor.ExecuteActions(ctx, actions) + + return nil +} diff --git a/loadtest/scenarios/util.go b/loadtest/scenarios/util.go new file mode 100644 index 00000000..8ad3f034 --- /dev/null +++ b/loadtest/scenarios/util.go @@ -0,0 +1,24 @@ +package scenarios + +import ( + "context" + "time" +) + +func WaitUntil(ctx context.Context, duration time.Duration, condition func() bool) bool { + start := time.Now() + for { + select { + case <-ctx.Done(): + return false + default: + } + if time.Since(start) > duration { + return false + } + if condition() { + return true + } + time.Sleep(500 * time.Millisecond) + } +}