Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
.DS_Store
.idea
.run
*.iml
bin
.env
e2e/**/castai-*.json

.aider*
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,9 @@ generate-e2e-client:
go generate ./e2e/client
.PHONY: generate-e2e-client

generate-api-client:
go generate internal/castai/api/generate.go
.PHONY: generate-api-client

deploy-loadtest: release
IMAGE_REPOSITORY=$(DOCKER_REPOSITORY) IMAGE_TAG=$(VERSION) ./hack/loadtest/deploy.sh
8 changes: 7 additions & 1 deletion cmd/controller/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/castai/cluster-controller/internal/config"
"github.com/castai/cluster-controller/internal/controller"
"github.com/castai/cluster-controller/internal/controller/logexporter"
"github.com/castai/cluster-controller/internal/controller/metricexporter"
"github.com/castai/cluster-controller/internal/helm"
"github.com/castai/cluster-controller/internal/k8sversion"
"github.com/castai/cluster-controller/internal/metrics"
Expand All @@ -55,7 +56,7 @@ func run(ctx context.Context) error {
log.Fatalf("failed to create castai client: %v", err)
}

client := castai.NewClient(logger, cl, cfg.ClusterID)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be empty locally, is it a problem?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pod_name label value will be empty, but it's not a problem.

client := castai.NewClient(logger, cl, cfg.ClusterID, cfg.SelfPod.Name)

logexporter.SetupLogExporter(logger, client)

Expand Down Expand Up @@ -159,6 +160,11 @@ func runController(
}
}()

if cfg.Metrics.ExportEnabled {
metricExporter := metricexporter.New(log, client, cfg.Metrics.ExportInterval)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a log that this is enabled so we know

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a log in metricExporter.Run():

me.log.Infof("starting metrics exporter with interval %v", me.exportInterval)

go metricExporter.Run(ctx)
}

httpMux := http.NewServeMux()
var checks []healthz.HealthChecker
checks = append(checks, healthzAction)
Expand Down
2 changes: 1 addition & 1 deletion cmd/monitor/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func run(ctx context.Context) error {
if err != nil {
log.Fatalf("failed to create castai client: %v", err)
}
client := castai.NewClient(logger, cl, cfg.ClusterID)
client := castai.NewClient(logger, cl, cfg.ClusterID, cfg.SelfPod.Name)

logexporter.SetupLogExporter(logger, client)

Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/google/uuid v1.6.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/prometheus/client_golang v1.21.1
github.com/prometheus/client_model v0.6.1
github.com/samber/lo v1.47.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.1
Expand Down Expand Up @@ -63,7 +64,7 @@ require (
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/cli v25.0.1+incompatible // indirect
github.com/docker/distribution v2.8.3+incompatible // indirect
github.com/docker/docker v27.1.1+incompatible // indirect
github.com/docker/docker v28.3.3+incompatible // indirect
github.com/docker/docker-credential-helpers v0.9.3 // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-events v0.0.0-20250114142523-c867878c5e32 // indirect
Expand Down Expand Up @@ -108,7 +109,7 @@ require (
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-wordwrap v1.0.1 // indirect
Expand All @@ -129,7 +130,6 @@ require (
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rubenv/sql-migrate v1.7.1 // indirect
Expand Down
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ github.com/docker/distribution v2.8.3+incompatible h1:AtKxIZ36LoNK51+Z6RpzLpddBi
github.com/docker/distribution v2.8.3+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docker/docker v27.1.1+incompatible h1:hO/M4MtV36kzKldqnA37IWhebRA+LnqqcqDja6kVaKY=
github.com/docker/docker v27.1.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/docker v28.3.3+incompatible h1:Dypm25kh4rmk49v1eiVbsAtpAsYURjYkaKubwuBdxEI=
github.com/docker/docker v28.3.3+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/docker-credential-helpers v0.9.3 h1:gAm/VtF9wgqJMoxzT3Gj5p4AqIjCBS4wrsOh9yRqcz8=
github.com/docker/docker-credential-helpers v0.9.3/go.mod h1:x+4Gbw9aGmChi3qTLZj8Dfn0TD20M/fuWy0E5+WDeCo=
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
Expand Down Expand Up @@ -297,8 +299,8 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
Expand Down Expand Up @@ -547,6 +549,7 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220513210249-45d2b4557a2a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
37 changes: 18 additions & 19 deletions internal/actions/drain_node_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
podFailedDeletionErr := &podFailedActionError{}
t.Parallel()
type fields struct {
clientSet func() *fake.Clientset
clientSet func(t *testing.T) *fake.Clientset
}
type args struct {
action *castai.ClusterAction
Expand All @@ -265,7 +265,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
name: "nil",
args: args{},
fields: fields{
clientSet: func() *fake.Clientset {
clientSet: func(t *testing.T) *fake.Clientset {
return fake.NewClientset()
},
},
Expand All @@ -279,7 +279,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
},
},
fields: fields{
clientSet: func() *fake.Clientset {
clientSet: func(t *testing.T) *fake.Clientset {
return fake.NewClientset()
},
},
Expand All @@ -291,7 +291,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
action: newActionDrainNode("", nodeID, providerID, 1, true),
},
fields: fields{
clientSet: func() *fake.Clientset {
clientSet: func(t *testing.T) *fake.Clientset {
return setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
},
},
Expand All @@ -304,7 +304,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
nil, nil, nil, nil, nil),
},
fields: fields{
clientSet: func() *fake.Clientset {
clientSet: func(t *testing.T) *fake.Clientset {
return setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
},
},
Expand All @@ -313,7 +313,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
{
name: "action with another node id and provider id - skip drain",
fields: fields{
clientSet: func() *fake.Clientset {
clientSet: func(t *testing.T) *fake.Clientset {
return setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
},
},
Expand All @@ -325,7 +325,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
{
name: "action with proper node id and another provider id - skip drain",
fields: fields{
clientSet: func() *fake.Clientset {
clientSet: func(t *testing.T) *fake.Clientset {
return setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
},
},
Expand All @@ -337,7 +337,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
{
name: "action with another node id and proper provider id - skip drain",
fields: fields{
clientSet: func() *fake.Clientset {
clientSet: func(t *testing.T) *fake.Clientset {
return setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
},
},
Expand All @@ -349,7 +349,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
{
name: "drain node successfully",
fields: fields{
clientSet: func() *fake.Clientset {
clientSet: func(t *testing.T) *fake.Clientset {
c := setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
prependEvictionReaction(t, c, true, false)
return c
Expand All @@ -364,7 +364,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
{
name: "skip drain when node not found",
fields: fields{
clientSet: func() *fake.Clientset {
clientSet: func(t *testing.T) *fake.Clientset {
return setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
},
},
Expand All @@ -376,7 +376,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
{
name: "when eviction fails for a pod and force=false, leaves node cordoned and skip deletion",
fields: fields{
clientSet: func() *fake.Clientset {
clientSet: func(t *testing.T) *fake.Clientset {
c := setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
prependEvictionReaction(t, c, false, false)
return c
Expand All @@ -392,7 +392,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
{
name: "when eviction timeout is reached and force=false, leaves node cordoned and skip deletion",
fields: fields{
clientSet: func() *fake.Clientset {
clientSet: func(t *testing.T) *fake.Clientset {
c := setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
prependEvictionReaction(t, c, false, true)
return c
Expand All @@ -408,7 +408,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
{
name: "eviction fails and force=true, force remove pods: timeout during eviction",
fields: fields{
clientSet: func() *fake.Clientset {
clientSet: func(*testing.T) *fake.Clientset {
c := setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
prependEvictionReaction(t, c, false, true)
actualCalls := 0
Expand Down Expand Up @@ -446,7 +446,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
{
name: "eviction fails and force=true, force remove pods: failed pod during eviction",
fields: fields{
clientSet: func() *fake.Clientset {
clientSet: func(t *testing.T) *fake.Clientset {
c := setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
prependEvictionReaction(t, c, false, false)
actualCalls := 0
Expand Down Expand Up @@ -484,7 +484,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
{
name: "eviction fails and force=true, at least one pod fails to delete due to internal error, should return error",
fields: fields{
clientSet: func() *fake.Clientset {
clientSet: func(t *testing.T) *fake.Clientset {
c := setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
c.PrependReactor("delete", "pods", func(action ktest.Action) (handled bool, ret runtime.Object, err error) {
deleteAction := action.(ktest.DeleteActionImpl)
Expand Down Expand Up @@ -512,7 +512,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
{
name: "eviction fails and force=true, timeout during deletion should be retried and returned",
fields: fields{
clientSet: func() *fake.Clientset {
clientSet: func(t *testing.T) *fake.Clientset {
c := setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
actualDeleteCalls := 0
c.PrependReactor("delete", "pods", func(action ktest.Action) (handled bool, ret runtime.Object, err error) {
Expand Down Expand Up @@ -541,7 +541,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
{
name: "force=true, failed eviction for PDBs should be retried until timeout before deleting",
fields: fields{
clientSet: func() *fake.Clientset {
clientSet: func(t *testing.T) *fake.Clientset {
c := setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
c.PrependReactor("create", "pods", func(action ktest.Action) (handled bool, ret runtime.Object, err error) {
if action.GetSubresource() != "eviction" {
Expand Down Expand Up @@ -574,12 +574,11 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
h := &DrainNodeHandler{
log: logrus.New(),
clientset: tt.fields.clientSet(),
clientset: tt.fields.clientSet(t),
cfg: tt.args.cfg,
}
err := h.Handle(context.Background(), tt.args.action)
Expand Down
73 changes: 72 additions & 1 deletion internal/castai/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"time"

"github.com/go-resty/resty/v2"
dto "github.com/prometheus/client_model/go"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
"golang.org/x/net/http2"

Expand All @@ -28,6 +30,7 @@ type CastAIClient interface {
GetActions(ctx context.Context, k8sVersion string) ([]*ClusterAction, error)
AckAction(ctx context.Context, actionID string, req *AckClusterActionRequest) error
SendLog(ctx context.Context, e *LogEntry) error
SendMetrics(ctx context.Context, gatherTime time.Time, metricFamilies []*dto.MetricFamily) error
}

type LogEntry struct {
Expand All @@ -43,14 +46,16 @@ type Client struct {
log *logrus.Logger
rest *resty.Client
clusterID string
podName string
}

// NewClient returns new Client for communicating with Cast AI.
func NewClient(log *logrus.Logger, rest *resty.Client, clusterID string) *Client {
func NewClient(log *logrus.Logger, rest *resty.Client, clusterID, podName string) *Client {
return &Client{
log: log,
rest: rest,
clusterID: clusterID,
podName: podName,
}
}

Expand Down Expand Up @@ -140,6 +145,23 @@ func (c *Client) SendLog(ctx context.Context, e *LogEntry) error {
return nil
}

func (c *Client) SendMetrics(ctx context.Context, gatherTime time.Time, metricFamilies []*dto.MetricFamily) error {
req := convertPrometheusMetricFamilies(gatherTime, c.podName, metricFamilies)

resp, err := c.rest.R().
SetBody(req).
SetContext(ctx).
Post(fmt.Sprintf("/v1/clusters/%s/components/%s/metrics", c.clusterID, "cluster-controller"))
if err != nil {
return fmt.Errorf("sending metrics: %w", err)
}
if resp.IsError() {
return fmt.Errorf("sending metrics: request error status_code=%d body=%s", resp.StatusCode(), resp.Body())
}

return nil
}

func (c *Client) GetActions(ctx context.Context, k8sVersion string) ([]*ClusterAction, error) {
res := &GetClusterActionsResponse{}
resp, err := c.rest.R().
Expand Down Expand Up @@ -169,3 +191,52 @@ func (c *Client) AckAction(ctx context.Context, actionID string, req *AckCluster
}
return nil
}

func convertPrometheusMetricFamilies(gatherTime time.Time, podName string, metricFamilies []*dto.MetricFamily) *PrometheusWriteRequest {
timestamp := gatherTime.UnixMilli()

timeseries := []PrometheusTimeseries{}
for _, family := range metricFamilies {
for _, metric := range family.Metric {
// Right now we support only export of counter metrics.
if metric.Counter == nil {
continue
}

timeserie := PrometheusTimeseries{
Labels: []PrometheusLabel{
{
Name: "__name__",
Value: family.GetName(),
},
{
Name: "pod_name",
Value: podName,
},
},
}
for _, label := range metric.Label {
if label.Name == nil {
continue
}

timeserie.Labels = append(timeserie.Labels, PrometheusLabel{
Name: *label.Name,
Value: lo.FromPtr(label.Value),
})
}

timeserie.Samples = []PrometheusSample{}
timeserie.Samples = append(timeserie.Samples, PrometheusSample{
Timestamp: timestamp,
Value: metric.Counter.GetValue(),
})

timeseries = append(timeseries, timeserie)
}
}

return &PrometheusWriteRequest{
Timeseries: timeseries,
}
}
Loading
Loading