Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,8 @@ test:
generate-e2e-client:
go generate ./e2e/client
.PHONY: generate-e2e-client

# TODO: Make this less simplistic
run-loadtest:
# TODO: Required because of reusing config
API_KEY=dummy API_URL=http://example.com CLUSTER_ID=D30A163C-C5DF-4CC8-985C-D1449398295E KUBECONFIG=~/.kube/config LOG_LEVEL=4 go run . test-server
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ Option 1. Deploy controller in Kind cluster.
--set clusterID="your-cluster-id" | kubectl apply -f - -n castai-agent
```

### Load tests
See [docs](loadtest/README.md)

## Community

- [Twitter](https://twitter.com/cast_ai)
Expand Down
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/castai/cluster-controller/cmd/controller"
"github.com/castai/cluster-controller/cmd/monitor"
"github.com/castai/cluster-controller/cmd/testserver"
)

var rootCmd = &cobra.Command{
Expand Down Expand Up @@ -40,6 +41,7 @@ func Execute(ctx context.Context) {
func init() {
rootCmd.AddCommand(controller.NewCmd())
rootCmd.AddCommand(monitor.NewCmd())
rootCmd.AddCommand(testserver.NewCmd())
}

func fatal(err error) {
Expand Down
16 changes: 16 additions & 0 deletions cmd/testserver/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package testserver

import "github.com/spf13/cobra"

const Use = "test-server"

func NewCmd() *cobra.Command {
cmd := &cobra.Command{
Use: Use,
RunE: func(cmd *cobra.Command, args []string) error {
return run(cmd.Context())
},
}

return cmd
}
109 changes: 109 additions & 0 deletions cmd/testserver/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package testserver

import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"sync"
"time"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

"github.com/castai/cluster-controller/loadtest"
"github.com/castai/cluster-controller/loadtest/scenarios"
)

func run(ctx context.Context) error {
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
cfg := loadtest.GetConfig()
logger.Info("creating test server")

testServer := loadtest.NewTestServer(logger, loadtest.TestServerConfig{
MaxActionsPerCall: 1000,
TimeoutWaitingForActions: 60 * time.Second,
})

clientSet, err := createK8SClient(cfg, logger)
if err != nil {
return err
}

go func() {
logger.Info("Starting HTTP server for test")
err = loadtest.NewHttpServer(ctx, cfg, testServer)
if err != nil {
logger.Error("", "err", err)
panic(err)
}
}()

testScenarios := []scenarios.TestScenario{
// scenarios.PodEvents(5000, logger),
// scenarios.StuckDrain(100, 60, logger),
scenarios.StuckDrain(10, 1, logger),
}

var wg sync.WaitGroup
wg.Add(len(testScenarios))
errs := make(chan error, len(testScenarios))

for i, test := range testScenarios {
go func() {
defer wg.Done()
logger.Info(fmt.Sprintf("Starting test scenario %d", i))

err := scenarios.RunScenario(ctx, test, testServer, logger, clientSet)
errs <- err
}()
}

logger.Info("Waiting for test scenarios to finish")
wg.Wait()

close(errs)
receivedErrors := make([]error, 0)
for err := range errs {
if err != nil {
receivedErrors = append(receivedErrors, err)
}
}
logger.Info(fmt.Sprintf("All test scenarios are done, received (%d) errors, exiting", len(receivedErrors)))

return errors.Join(receivedErrors...)
}

func createK8SClient(cfg loadtest.Config, logger *slog.Logger) (*kubernetes.Clientset, error) {
if cfg.KubeConfig == "" {
logger.Info("Using in-cluster configuration")
restConfig, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("error creating in-cluster config: %w", err)
}
clientSet, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("obtaining kubernetes clientset: %w", err)
}
return clientSet, nil
}

logger.Info(fmt.Sprintf("Using kubeconfig from %q", cfg.KubeConfig))
data, err := os.ReadFile(cfg.KubeConfig)
if err != nil {
return nil, fmt.Errorf("reading kubeconfig at %s: %w", cfg.KubeConfig, err)
}

restConfig, err := clientcmd.RESTConfigFromKubeConfig(data)
if err != nil {
return nil, fmt.Errorf("creating rest config from %q: %w", cfg.KubeConfig, err)
}

clientSet, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, fmt.Errorf("obtaining kubernetes clientset: %w", err)
}
return clientSet, nil
}
28 changes: 28 additions & 0 deletions loadtest/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Load testing Cluster controller

Load test requires 3 components:
- Test server that simulates cluster-hub and the scenarios.
- Kwok controller to simulate nodes/pods
- Cluster controller itself.

Right now this is extremely basic and you have to run those manually locally.

Start kwok:
```
kwok --kubeconfig=~/.kube/config \
--manage-all-nodes=false \
--manage-nodes-with-annotation-selector=kwok.x-k8s.io/node=fake-node \
--node-lease-duration-seconds=40 \
--cidr=10.0.0.1/24 \
--node-ip=10.0.0.1
```

Run the test server on port 8080 against your current kubeconfig context:
```
KUBECONFIG=~/.kube/config PORT=8080 go run . test-server
```

After starting, start cluster controller with some dummy values and point it to the test server:
```
API_KEY=dummy API_URL=http://localhost:8080 CLUSTER_ID=D30A163C-C5DF-4CC8-985C-D1449398295E KUBECONFIG=~/.kube/config LOG_LEVEL=4 LEADER_ELECTION_NAMESPACE=default METRICS_ENABLED=true go run .
```
150 changes: 150 additions & 0 deletions loadtest/castai.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package loadtest

import (
"context"
"fmt"
"log/slog"
"sync"
"time"

"github.com/google/uuid"
"github.com/samber/lo"

"github.com/castai/cluster-controller/internal/castai"
)

// CastAITestServer acts as simple cluster hub mock replacement.
// It exposes a way to "push" actions to the cluster controller via GetActionsPushChannel
// and can be used as an implementation of the server interface that cluster controller expects to call.
type CastAITestServer struct {
log *slog.Logger
actionsPushChannel chan castai.ClusterAction
cfg TestServerConfig

logMx sync.Mutex
actionsLog map[string]chan string
}

func NewTestServer(logger *slog.Logger, cfg TestServerConfig) *CastAITestServer {
return &CastAITestServer{
log: logger,
actionsPushChannel: make(chan castai.ClusterAction, 10000),
cfg: cfg,
actionsLog: make(map[string]chan string),
}
}

// ExecuteActions pushes the list of actions to the queue for cluster controller to process.
// This method returns when all actions are acked or context is cancelled.
func (c *CastAITestServer) ExecuteActions(ctx context.Context, actions []castai.ClusterAction) {
// owner channel has 1:n relationship with the actions. It handles the ack
ownerChannel := make(chan string, len(actions))

for _, action := range actions {
if action.ID == "" {
action.ID = uuid.NewString()
}
c.addActionToStore(action.ID, ownerChannel)
c.actionsPushChannel <- action
}

// Read from owner channel until len(actions) times, then close and return.
finished := 0
for {
select {
case <-ctx.Done():
c.log.Info(fmt.Sprintf("Received signal to stop finished with cause (%q) and err (%v). Closing executor.", context.Cause(ctx), ctx.Err()))
return
case finishedAction := <-ownerChannel:
c.removeActionFromStore(finishedAction)
finished++
if finished == len(actions) {
close(ownerChannel)
return
}
}
}
}

/* Start Cluster-hub mock implementation */

func (c *CastAITestServer) GetActions(ctx context.Context, _ string) ([]*castai.ClusterAction, error) {
c.log.Info(fmt.Sprintf("GetActions called, have %d items in buffer", len(c.actionsPushChannel)))
actionsToReturn := make([]*castai.ClusterAction, 0)

// Wait for at least one action to arrive from whoever is pushing them.
// If none arrive, we simulate the "empty poll" case of cluster-hub and return empty list.
select {
case x := <-c.actionsPushChannel:
actionsToReturn = append(actionsToReturn, &x)
case <-time.After(c.cfg.TimeoutWaitingForActions):
c.log.Info(fmt.Sprintf("No actions to return in %v", c.cfg.TimeoutWaitingForActions))
return nil, nil
case <-ctx.Done():
return nil, fmt.Errorf("context done with cause (%w), err (%w)", context.Cause(ctx), ctx.Err())
}

// Attempt to drain up to max items from the channel.
for len(actionsToReturn) < c.cfg.MaxActionsPerCall {
select {
case x := <-c.actionsPushChannel:
actionsToReturn = append(actionsToReturn, &x)
case <-time.After(50 * time.Millisecond):
c.log.Info(fmt.Sprintf("Returning %d actions for processing", len(actionsToReturn)))
// If we haven't received enough items, just flush.
return actionsToReturn, nil
case <-ctx.Done():
return nil, fmt.Errorf("context done with cause (%w), err (%w)", context.Cause(ctx), ctx.Err())
}
}

c.log.Info(fmt.Sprintf("Returning %d actions for processing", len(actionsToReturn)))
return actionsToReturn, nil
}

func (c *CastAITestServer) AckAction(ctx context.Context, actionID string, req *castai.AckClusterActionRequest) error {
errMsg := lo.FromPtr(req.Error)
c.log.DebugContext(ctx, fmt.Sprintf("action %q acknowledged; has error: %v; error: %v", actionID, req.Error != nil, errMsg))

receiver := c.getActionReceiver(actionID)
if receiver == nil {
return fmt.Errorf("action %q does not have a receiver", actionID)
}
// Notify owner that this action was done.
receiver <- actionID

return nil
}

func (c *CastAITestServer) SendLog(ctx context.Context, e *castai.LogEntry) error {
// No-op for now, maybe track metrics in the future?
return nil
}

/* End Cluster-hub mock implementation */

func (c *CastAITestServer) addActionToStore(actionID string, receiver chan string) {
c.logMx.Lock()
defer c.logMx.Unlock()

c.actionsLog[actionID] = receiver
}

func (c *CastAITestServer) removeActionFromStore(actionID string) {
c.logMx.Lock()
defer c.logMx.Unlock()

delete(c.actionsLog, actionID)
}

func (c *CastAITestServer) getActionReceiver(actionID string) chan string {
c.logMx.Lock()
defer c.logMx.Unlock()

receiver, ok := c.actionsLog[actionID]
if !ok {
c.log.Error(fmt.Sprintf("Receiver for action %s is no longer there, possibly shutting down", actionID))
return nil
}
return receiver
}
49 changes: 49 additions & 0 deletions loadtest/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package loadtest

import (
"fmt"
"time"

"github.com/spf13/viper"
)

// Config for the HTTP server.
type Config struct {
// Port where the mock server to listen on.
Port int

// KubeConfig can point to a kubeconfig file. If empty, InCluster client will be assumed.
KubeConfig string
}

// TestServerConfig has settings for the mock server instance.
type TestServerConfig struct {
// MaxActionsPerCall is the upper limit of actions to return in one CastAITestServer.GetActions call.
MaxActionsPerCall int
// TimeoutWaitingForActions controls how long to wait for at least 1 action to appear on server side.
// This mimics CH behavior of not returning early if there are no pending actions and keeping the request "running".
TimeoutWaitingForActions time.Duration
}

var singletonCfg *Config

func GetConfig() Config {
// not thread safe, but you will not put this under concurrent pressure, right?
if singletonCfg != nil {
return *singletonCfg
}

_ = viper.BindEnv("port", "PORT")
_ = viper.BindEnv("kubeconfig", "KUBECONFIG")

singletonCfg = &Config{}
if err := viper.Unmarshal(&singletonCfg); err != nil {
panic(fmt.Errorf("parsing configuration: %w", err))
}

if singletonCfg.Port == 0 {
panic(fmt.Errorf("test server port must be set"))
}

return *singletonCfg
}
Loading
Loading