Skip to content

Commit 1e2798f

Browse files
authored
KUBE-417: Add a way to do load testing of CC (#169)
* Starting loadtest code * Manual load test of drain node * Basic skeleton of running load test scenarios * Add testserver cobra command * Simple make step for now * Wait for ready deployments in stuck drain test * Change how test server works so scenario can wait for actions to be acked and not cleanup early. * Cleanup * Move scenarios to interface and struct * Re-enable events in default run * Linter * Remove SendAKSINitData * Remove dependency on main config file and use loadtest one. * Quick docs on running locally. * Use separate context for cleanups * Remove buffer config * Remove unused commented code * Defer cleanup before running prepare so it cleans up half-baked prepares. * Logging adjustment. * Small adjustments * Linter fix. * Remove empty func
1 parent 816de29 commit 1e2798f

File tree

14 files changed

+968
-0
lines changed

14 files changed

+968
-0
lines changed

Makefile

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,8 @@ test:
3838
generate-e2e-client:
3939
go generate ./e2e/client
4040
.PHONY: generate-e2e-client
41+
42+
# TODO: Make this less simplistic
43+
run-loadtest:
44+
# TODO: Required because of reusing config
45+
API_KEY=dummy API_URL=http://example.com CLUSTER_ID=D30A163C-C5DF-4CC8-985C-D1449398295E KUBECONFIG=~/.kube/config LOG_LEVEL=4 go run . test-server

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ Option 1. Deploy controller in Kind cluster.
6767
--set clusterID="your-cluster-id" | kubectl apply -f - -n castai-agent
6868
```
6969

70+
### Load tests
71+
See [docs](loadtest/README.md)
72+
7073
## Community
7174

7275
- [Twitter](https://twitter.com/cast_ai)

cmd/root.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/castai/cluster-controller/cmd/controller"
1111
"github.com/castai/cluster-controller/cmd/monitor"
12+
"github.com/castai/cluster-controller/cmd/testserver"
1213
)
1314

1415
var rootCmd = &cobra.Command{
@@ -40,6 +41,7 @@ func Execute(ctx context.Context) {
4041
func init() {
4142
rootCmd.AddCommand(controller.NewCmd())
4243
rootCmd.AddCommand(monitor.NewCmd())
44+
rootCmd.AddCommand(testserver.NewCmd())
4345
}
4446

4547
func fatal(err error) {

cmd/testserver/command.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package testserver
2+
3+
import "github.com/spf13/cobra"
4+
5+
const Use = "test-server"
6+
7+
func NewCmd() *cobra.Command {
8+
cmd := &cobra.Command{
9+
Use: Use,
10+
RunE: func(cmd *cobra.Command, args []string) error {
11+
return run(cmd.Context())
12+
},
13+
}
14+
15+
return cmd
16+
}

cmd/testserver/run.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package testserver
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"log/slog"
8+
"os"
9+
"sync"
10+
"time"
11+
12+
"k8s.io/client-go/kubernetes"
13+
"k8s.io/client-go/rest"
14+
"k8s.io/client-go/tools/clientcmd"
15+
16+
"github.com/castai/cluster-controller/loadtest"
17+
"github.com/castai/cluster-controller/loadtest/scenarios"
18+
)
19+
20+
func run(ctx context.Context) error {
21+
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
22+
cfg := loadtest.GetConfig()
23+
logger.Info("creating test server")
24+
25+
testServer := loadtest.NewTestServer(logger, loadtest.TestServerConfig{
26+
MaxActionsPerCall: 1000,
27+
TimeoutWaitingForActions: 60 * time.Second,
28+
})
29+
30+
clientSet, err := createK8SClient(cfg, logger)
31+
if err != nil {
32+
return err
33+
}
34+
35+
go func() {
36+
logger.Info("Starting HTTP server for test")
37+
err = loadtest.NewHttpServer(ctx, cfg, testServer)
38+
if err != nil {
39+
logger.Error("", "err", err)
40+
panic(err)
41+
}
42+
}()
43+
44+
testScenarios := []scenarios.TestScenario{
45+
// scenarios.PodEvents(5000, logger),
46+
// scenarios.StuckDrain(100, 60, logger),
47+
scenarios.StuckDrain(10, 1, logger),
48+
}
49+
50+
var wg sync.WaitGroup
51+
wg.Add(len(testScenarios))
52+
errs := make(chan error, len(testScenarios))
53+
54+
for i, test := range testScenarios {
55+
go func() {
56+
defer wg.Done()
57+
logger.Info(fmt.Sprintf("Starting test scenario %d", i))
58+
59+
err := scenarios.RunScenario(ctx, test, testServer, logger, clientSet)
60+
errs <- err
61+
}()
62+
}
63+
64+
logger.Info("Waiting for test scenarios to finish")
65+
wg.Wait()
66+
67+
close(errs)
68+
receivedErrors := make([]error, 0)
69+
for err := range errs {
70+
if err != nil {
71+
receivedErrors = append(receivedErrors, err)
72+
}
73+
}
74+
logger.Info(fmt.Sprintf("All test scenarios are done, received (%d) errors, exiting", len(receivedErrors)))
75+
76+
return errors.Join(receivedErrors...)
77+
}
78+
79+
func createK8SClient(cfg loadtest.Config, logger *slog.Logger) (*kubernetes.Clientset, error) {
80+
if cfg.KubeConfig == "" {
81+
logger.Info("Using in-cluster configuration")
82+
restConfig, err := rest.InClusterConfig()
83+
if err != nil {
84+
return nil, fmt.Errorf("error creating in-cluster config: %w", err)
85+
}
86+
clientSet, err := kubernetes.NewForConfig(restConfig)
87+
if err != nil {
88+
return nil, fmt.Errorf("obtaining kubernetes clientset: %w", err)
89+
}
90+
return clientSet, nil
91+
}
92+
93+
logger.Info(fmt.Sprintf("Using kubeconfig from %q", cfg.KubeConfig))
94+
data, err := os.ReadFile(cfg.KubeConfig)
95+
if err != nil {
96+
return nil, fmt.Errorf("reading kubeconfig at %s: %w", cfg.KubeConfig, err)
97+
}
98+
99+
restConfig, err := clientcmd.RESTConfigFromKubeConfig(data)
100+
if err != nil {
101+
return nil, fmt.Errorf("creating rest config from %q: %w", cfg.KubeConfig, err)
102+
}
103+
104+
clientSet, err := kubernetes.NewForConfig(restConfig)
105+
if err != nil {
106+
return nil, fmt.Errorf("obtaining kubernetes clientset: %w", err)
107+
}
108+
return clientSet, nil
109+
}

loadtest/README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Load testing Cluster controller
2+
3+
Load test requires 3 components:
4+
- Test server that simulates cluster-hub and the scenarios.
5+
- Kwok controller to simulate nodes/pods
6+
- Cluster controller itself.
7+
8+
Right now this is extremely basic and you have to run those manually locally.
9+
10+
Start kwok:
11+
```
12+
kwok --kubeconfig=~/.kube/config \
13+
--manage-all-nodes=false \
14+
--manage-nodes-with-annotation-selector=kwok.x-k8s.io/node=fake-node \
15+
--node-lease-duration-seconds=40 \
16+
--cidr=10.0.0.1/24 \
17+
--node-ip=10.0.0.1
18+
```
19+
20+
Run the test server on port 8080 against your current kubeconfig context:
21+
```
22+
KUBECONFIG=~/.kube/config PORT=8080 go run . test-server
23+
```
24+
25+
After starting, start cluster controller with some dummy values and point it to the test server:
26+
```
27+
API_KEY=dummy API_URL=http://localhost:8080 CLUSTER_ID=D30A163C-C5DF-4CC8-985C-D1449398295E KUBECONFIG=~/.kube/config LOG_LEVEL=4 LEADER_ELECTION_NAMESPACE=default METRICS_ENABLED=true go run .
28+
```

loadtest/castai.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package loadtest
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
"sync"
8+
"time"
9+
10+
"github.com/google/uuid"
11+
"github.com/samber/lo"
12+
13+
"github.com/castai/cluster-controller/internal/castai"
14+
)
15+
16+
// CastAITestServer acts as simple cluster hub mock replacement.
17+
// It exposes a way to "push" actions to the cluster controller via GetActionsPushChannel
18+
// and can be used as an implementation of the server interface that cluster controller expects to call.
19+
type CastAITestServer struct {
20+
log *slog.Logger
21+
actionsPushChannel chan castai.ClusterAction
22+
cfg TestServerConfig
23+
24+
logMx sync.Mutex
25+
actionsLog map[string]chan string
26+
}
27+
28+
func NewTestServer(logger *slog.Logger, cfg TestServerConfig) *CastAITestServer {
29+
return &CastAITestServer{
30+
log: logger,
31+
actionsPushChannel: make(chan castai.ClusterAction, 10000),
32+
cfg: cfg,
33+
actionsLog: make(map[string]chan string),
34+
}
35+
}
36+
37+
// ExecuteActions pushes the list of actions to the queue for cluster controller to process.
38+
// This method returns when all actions are acked or context is cancelled.
39+
func (c *CastAITestServer) ExecuteActions(ctx context.Context, actions []castai.ClusterAction) {
40+
// owner channel has 1:n relationship with the actions. It handles the ack
41+
ownerChannel := make(chan string, len(actions))
42+
43+
for _, action := range actions {
44+
if action.ID == "" {
45+
action.ID = uuid.NewString()
46+
}
47+
c.addActionToStore(action.ID, ownerChannel)
48+
c.actionsPushChannel <- action
49+
}
50+
51+
// Read from owner channel until len(actions) times, then close and return.
52+
finished := 0
53+
for {
54+
select {
55+
case <-ctx.Done():
56+
c.log.Info(fmt.Sprintf("Received signal to stop finished with cause (%q) and err (%v). Closing executor.", context.Cause(ctx), ctx.Err()))
57+
return
58+
case finishedAction := <-ownerChannel:
59+
c.removeActionFromStore(finishedAction)
60+
finished++
61+
if finished == len(actions) {
62+
close(ownerChannel)
63+
return
64+
}
65+
}
66+
}
67+
}
68+
69+
/* Start Cluster-hub mock implementation */
70+
71+
func (c *CastAITestServer) GetActions(ctx context.Context, _ string) ([]*castai.ClusterAction, error) {
72+
c.log.Info(fmt.Sprintf("GetActions called, have %d items in buffer", len(c.actionsPushChannel)))
73+
actionsToReturn := make([]*castai.ClusterAction, 0)
74+
75+
// Wait for at least one action to arrive from whoever is pushing them.
76+
// If none arrive, we simulate the "empty poll" case of cluster-hub and return empty list.
77+
select {
78+
case x := <-c.actionsPushChannel:
79+
actionsToReturn = append(actionsToReturn, &x)
80+
case <-time.After(c.cfg.TimeoutWaitingForActions):
81+
c.log.Info(fmt.Sprintf("No actions to return in %v", c.cfg.TimeoutWaitingForActions))
82+
return nil, nil
83+
case <-ctx.Done():
84+
return nil, fmt.Errorf("context done with cause (%w), err (%w)", context.Cause(ctx), ctx.Err())
85+
}
86+
87+
// Attempt to drain up to max items from the channel.
88+
for len(actionsToReturn) < c.cfg.MaxActionsPerCall {
89+
select {
90+
case x := <-c.actionsPushChannel:
91+
actionsToReturn = append(actionsToReturn, &x)
92+
case <-time.After(50 * time.Millisecond):
93+
c.log.Info(fmt.Sprintf("Returning %d actions for processing", len(actionsToReturn)))
94+
// If we haven't received enough items, just flush.
95+
return actionsToReturn, nil
96+
case <-ctx.Done():
97+
return nil, fmt.Errorf("context done with cause (%w), err (%w)", context.Cause(ctx), ctx.Err())
98+
}
99+
}
100+
101+
c.log.Info(fmt.Sprintf("Returning %d actions for processing", len(actionsToReturn)))
102+
return actionsToReturn, nil
103+
}
104+
105+
func (c *CastAITestServer) AckAction(ctx context.Context, actionID string, req *castai.AckClusterActionRequest) error {
106+
errMsg := lo.FromPtr(req.Error)
107+
c.log.DebugContext(ctx, fmt.Sprintf("action %q acknowledged; has error: %v; error: %v", actionID, req.Error != nil, errMsg))
108+
109+
receiver := c.getActionReceiver(actionID)
110+
if receiver == nil {
111+
return fmt.Errorf("action %q does not have a receiver", actionID)
112+
}
113+
// Notify owner that this action was done.
114+
receiver <- actionID
115+
116+
return nil
117+
}
118+
119+
func (c *CastAITestServer) SendLog(ctx context.Context, e *castai.LogEntry) error {
120+
// No-op for now, maybe track metrics in the future?
121+
return nil
122+
}
123+
124+
/* End Cluster-hub mock implementation */
125+
126+
func (c *CastAITestServer) addActionToStore(actionID string, receiver chan string) {
127+
c.logMx.Lock()
128+
defer c.logMx.Unlock()
129+
130+
c.actionsLog[actionID] = receiver
131+
}
132+
133+
func (c *CastAITestServer) removeActionFromStore(actionID string) {
134+
c.logMx.Lock()
135+
defer c.logMx.Unlock()
136+
137+
delete(c.actionsLog, actionID)
138+
}
139+
140+
func (c *CastAITestServer) getActionReceiver(actionID string) chan string {
141+
c.logMx.Lock()
142+
defer c.logMx.Unlock()
143+
144+
receiver, ok := c.actionsLog[actionID]
145+
if !ok {
146+
c.log.Error(fmt.Sprintf("Receiver for action %s is no longer there, possibly shutting down", actionID))
147+
return nil
148+
}
149+
return receiver
150+
}

loadtest/config.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package loadtest
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/spf13/viper"
8+
)
9+
10+
// Config for the HTTP server.
11+
type Config struct {
12+
// Port where the mock server to listen on.
13+
Port int
14+
15+
// KubeConfig can point to a kubeconfig file. If empty, InCluster client will be assumed.
16+
KubeConfig string
17+
}
18+
19+
// TestServerConfig has settings for the mock server instance.
20+
type TestServerConfig struct {
21+
// MaxActionsPerCall is the upper limit of actions to return in one CastAITestServer.GetActions call.
22+
MaxActionsPerCall int
23+
// TimeoutWaitingForActions controls how long to wait for at least 1 action to appear on server side.
24+
// This mimics CH behavior of not returning early if there are no pending actions and keeping the request "running".
25+
TimeoutWaitingForActions time.Duration
26+
}
27+
28+
var singletonCfg *Config
29+
30+
func GetConfig() Config {
31+
// not thread safe, but you will not put this under concurrent pressure, right?
32+
if singletonCfg != nil {
33+
return *singletonCfg
34+
}
35+
36+
_ = viper.BindEnv("port", "PORT")
37+
_ = viper.BindEnv("kubeconfig", "KUBECONFIG")
38+
39+
singletonCfg = &Config{}
40+
if err := viper.Unmarshal(&singletonCfg); err != nil {
41+
panic(fmt.Errorf("parsing configuration: %w", err))
42+
}
43+
44+
if singletonCfg.Port == 0 {
45+
panic(fmt.Errorf("test server port must be set"))
46+
}
47+
48+
return *singletonCfg
49+
}

0 commit comments

Comments
 (0)