Skip to content

Commit 1042cdc

Browse files
authored
feat: export metrics to CastAI (#195)
1 parent ad43a14 commit 1042cdc

File tree

17 files changed

+559
-29
lines changed

17 files changed

+559
-29
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
.DS_Store
12
.idea
23
.run
34
*.iml
45
bin
56
.env
67
e2e/**/castai-*.json
8+
9+
.aider*

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,5 +48,9 @@ generate-e2e-client:
4848
go generate ./e2e/client
4949
.PHONY: generate-e2e-client
5050

51+
generate-api-client:
52+
go generate internal/castai/api/generate.go
53+
.PHONY: generate-api-client
54+
5155
deploy-loadtest: release
5256
IMAGE_REPOSITORY=$(DOCKER_REPOSITORY) IMAGE_TAG=$(VERSION) ./hack/loadtest/deploy.sh

cmd/controller/run.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/castai/cluster-controller/internal/config"
3131
"github.com/castai/cluster-controller/internal/controller"
3232
"github.com/castai/cluster-controller/internal/controller/logexporter"
33+
"github.com/castai/cluster-controller/internal/controller/metricexporter"
3334
"github.com/castai/cluster-controller/internal/helm"
3435
"github.com/castai/cluster-controller/internal/k8sversion"
3536
"github.com/castai/cluster-controller/internal/metrics"
@@ -55,7 +56,7 @@ func run(ctx context.Context) error {
5556
log.Fatalf("failed to create castai client: %v", err)
5657
}
5758

58-
client := castai.NewClient(logger, cl, cfg.ClusterID)
59+
client := castai.NewClient(logger, cl, cfg.ClusterID, cfg.SelfPod.Name)
5960

6061
logexporter.SetupLogExporter(logger, client)
6162

@@ -159,6 +160,11 @@ func runController(
159160
}
160161
}()
161162

163+
if cfg.Metrics.ExportEnabled {
164+
metricExporter := metricexporter.New(log, client, cfg.Metrics.ExportInterval)
165+
go metricExporter.Run(ctx)
166+
}
167+
162168
httpMux := http.NewServeMux()
163169
var checks []healthz.HealthChecker
164170
checks = append(checks, healthzAction)

cmd/monitor/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func run(ctx context.Context) error {
4040
if err != nil {
4141
log.Fatalf("failed to create castai client: %v", err)
4242
}
43-
client := castai.NewClient(logger, cl, cfg.ClusterID)
43+
client := castai.NewClient(logger, cl, cfg.ClusterID, cfg.SelfPod.Name)
4444

4545
logexporter.SetupLogExporter(logger, client)
4646

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ require (
1313
github.com/google/uuid v1.6.0
1414
github.com/kelseyhightower/envconfig v1.4.0
1515
github.com/prometheus/client_golang v1.21.1
16+
github.com/prometheus/client_model v0.6.1
1617
github.com/samber/lo v1.47.0
1718
github.com/sirupsen/logrus v1.9.3
1819
github.com/spf13/cobra v1.8.1
@@ -63,7 +64,7 @@ require (
6364
github.com/distribution/reference v0.6.0 // indirect
6465
github.com/docker/cli v25.0.1+incompatible // indirect
6566
github.com/docker/distribution v2.8.3+incompatible // indirect
66-
github.com/docker/docker v27.1.1+incompatible // indirect
67+
github.com/docker/docker v28.3.3+incompatible // indirect
6768
github.com/docker/docker-credential-helpers v0.9.3 // indirect
6869
github.com/docker/go-connections v0.5.0 // indirect
6970
github.com/docker/go-events v0.0.0-20250114142523-c867878c5e32 // indirect
@@ -108,7 +109,7 @@ require (
108109
github.com/magiconair/properties v1.8.7 // indirect
109110
github.com/mailru/easyjson v0.7.7 // indirect
110111
github.com/mattn/go-colorable v0.1.13 // indirect
111-
github.com/mattn/go-isatty v0.0.17 // indirect
112+
github.com/mattn/go-isatty v0.0.20 // indirect
112113
github.com/mattn/go-runewidth v0.0.9 // indirect
113114
github.com/mitchellh/copystructure v1.2.0 // indirect
114115
github.com/mitchellh/go-wordwrap v1.0.1 // indirect
@@ -129,7 +130,6 @@ require (
129130
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
130131
github.com/pkg/errors v0.9.1 // indirect
131132
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
132-
github.com/prometheus/client_model v0.6.1 // indirect
133133
github.com/prometheus/common v0.62.0 // indirect
134134
github.com/prometheus/procfs v0.15.1 // indirect
135135
github.com/rubenv/sql-migrate v1.7.1 // indirect

go.sum

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ github.com/docker/distribution v2.8.3+incompatible h1:AtKxIZ36LoNK51+Z6RpzLpddBi
9696
github.com/docker/distribution v2.8.3+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
9797
github.com/docker/docker v27.1.1+incompatible h1:hO/M4MtV36kzKldqnA37IWhebRA+LnqqcqDja6kVaKY=
9898
github.com/docker/docker v27.1.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
99+
github.com/docker/docker v28.3.3+incompatible h1:Dypm25kh4rmk49v1eiVbsAtpAsYURjYkaKubwuBdxEI=
100+
github.com/docker/docker v28.3.3+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
99101
github.com/docker/docker-credential-helpers v0.9.3 h1:gAm/VtF9wgqJMoxzT3Gj5p4AqIjCBS4wrsOh9yRqcz8=
100102
github.com/docker/docker-credential-helpers v0.9.3/go.mod h1:x+4Gbw9aGmChi3qTLZj8Dfn0TD20M/fuWy0E5+WDeCo=
101103
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
@@ -297,8 +299,8 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
297299
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
298300
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
299301
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
300-
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
301-
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
302+
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
303+
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
302304
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
303305
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
304306
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
@@ -547,6 +549,7 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc
547549
golang.org/x/sys v0.0.0-20220513210249-45d2b4557a2a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
548550
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
549551
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
552+
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
550553
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
551554
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
552555
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=

internal/actions/drain_node_handler_test.go

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
246246
podFailedDeletionErr := &podFailedActionError{}
247247
t.Parallel()
248248
type fields struct {
249-
clientSet func() *fake.Clientset
249+
clientSet func(t *testing.T) *fake.Clientset
250250
}
251251
type args struct {
252252
action *castai.ClusterAction
@@ -265,7 +265,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
265265
name: "nil",
266266
args: args{},
267267
fields: fields{
268-
clientSet: func() *fake.Clientset {
268+
clientSet: func(t *testing.T) *fake.Clientset {
269269
return fake.NewClientset()
270270
},
271271
},
@@ -279,7 +279,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
279279
},
280280
},
281281
fields: fields{
282-
clientSet: func() *fake.Clientset {
282+
clientSet: func(t *testing.T) *fake.Clientset {
283283
return fake.NewClientset()
284284
},
285285
},
@@ -291,7 +291,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
291291
action: newActionDrainNode("", nodeID, providerID, 1, true),
292292
},
293293
fields: fields{
294-
clientSet: func() *fake.Clientset {
294+
clientSet: func(t *testing.T) *fake.Clientset {
295295
return setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
296296
},
297297
},
@@ -304,7 +304,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
304304
nil, nil, nil, nil, nil),
305305
},
306306
fields: fields{
307-
clientSet: func() *fake.Clientset {
307+
clientSet: func(t *testing.T) *fake.Clientset {
308308
return setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
309309
},
310310
},
@@ -313,7 +313,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
313313
{
314314
name: "action with another node id and provider id - skip drain",
315315
fields: fields{
316-
clientSet: func() *fake.Clientset {
316+
clientSet: func(t *testing.T) *fake.Clientset {
317317
return setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
318318
},
319319
},
@@ -325,7 +325,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
325325
{
326326
name: "action with proper node id and another provider id - skip drain",
327327
fields: fields{
328-
clientSet: func() *fake.Clientset {
328+
clientSet: func(t *testing.T) *fake.Clientset {
329329
return setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
330330
},
331331
},
@@ -337,7 +337,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
337337
{
338338
name: "action with another node id and proper provider id - skip drain",
339339
fields: fields{
340-
clientSet: func() *fake.Clientset {
340+
clientSet: func(t *testing.T) *fake.Clientset {
341341
return setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
342342
},
343343
},
@@ -349,7 +349,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
349349
{
350350
name: "drain node successfully",
351351
fields: fields{
352-
clientSet: func() *fake.Clientset {
352+
clientSet: func(t *testing.T) *fake.Clientset {
353353
c := setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
354354
prependEvictionReaction(t, c, true, false)
355355
return c
@@ -364,7 +364,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
364364
{
365365
name: "skip drain when node not found",
366366
fields: fields{
367-
clientSet: func() *fake.Clientset {
367+
clientSet: func(t *testing.T) *fake.Clientset {
368368
return setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
369369
},
370370
},
@@ -376,7 +376,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
376376
{
377377
name: "when eviction fails for a pod and force=false, leaves node cordoned and skip deletion",
378378
fields: fields{
379-
clientSet: func() *fake.Clientset {
379+
clientSet: func(t *testing.T) *fake.Clientset {
380380
c := setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
381381
prependEvictionReaction(t, c, false, false)
382382
return c
@@ -392,7 +392,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
392392
{
393393
name: "when eviction timeout is reached and force=false, leaves node cordoned and skip deletion",
394394
fields: fields{
395-
clientSet: func() *fake.Clientset {
395+
clientSet: func(t *testing.T) *fake.Clientset {
396396
c := setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
397397
prependEvictionReaction(t, c, false, true)
398398
return c
@@ -408,7 +408,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
408408
{
409409
name: "eviction fails and force=true, force remove pods: timeout during eviction",
410410
fields: fields{
411-
clientSet: func() *fake.Clientset {
411+
clientSet: func(*testing.T) *fake.Clientset {
412412
c := setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
413413
prependEvictionReaction(t, c, false, true)
414414
actualCalls := 0
@@ -446,7 +446,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
446446
{
447447
name: "eviction fails and force=true, force remove pods: failed pod during eviction",
448448
fields: fields{
449-
clientSet: func() *fake.Clientset {
449+
clientSet: func(t *testing.T) *fake.Clientset {
450450
c := setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
451451
prependEvictionReaction(t, c, false, false)
452452
actualCalls := 0
@@ -484,7 +484,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
484484
{
485485
name: "eviction fails and force=true, at least one pod fails to delete due to internal error, should return error",
486486
fields: fields{
487-
clientSet: func() *fake.Clientset {
487+
clientSet: func(t *testing.T) *fake.Clientset {
488488
c := setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
489489
c.PrependReactor("delete", "pods", func(action ktest.Action) (handled bool, ret runtime.Object, err error) {
490490
deleteAction := action.(ktest.DeleteActionImpl)
@@ -512,7 +512,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
512512
{
513513
name: "eviction fails and force=true, timeout during deletion should be retried and returned",
514514
fields: fields{
515-
clientSet: func() *fake.Clientset {
515+
clientSet: func(t *testing.T) *fake.Clientset {
516516
c := setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
517517
actualDeleteCalls := 0
518518
c.PrependReactor("delete", "pods", func(action ktest.Action) (handled bool, ret runtime.Object, err error) {
@@ -541,7 +541,7 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
541541
{
542542
name: "force=true, failed eviction for PDBs should be retried until timeout before deleting",
543543
fields: fields{
544-
clientSet: func() *fake.Clientset {
544+
clientSet: func(t *testing.T) *fake.Clientset {
545545
c := setupFakeClientWithNodePodEviction(nodeName, nodeID, providerID, podName)
546546
c.PrependReactor("create", "pods", func(action ktest.Action) (handled bool, ret runtime.Object, err error) {
547547
if action.GetSubresource() != "eviction" {
@@ -574,12 +574,11 @@ func TestDrainNodeHandler_Handle(t *testing.T) {
574574
},
575575
}
576576
for _, tt := range tests {
577-
tt := tt
578577
t.Run(tt.name, func(t *testing.T) {
579578
t.Parallel()
580579
h := &DrainNodeHandler{
581580
log: logrus.New(),
582-
clientset: tt.fields.clientSet(),
581+
clientset: tt.fields.clientSet(t),
583582
cfg: tt.args.cfg,
584583
}
585584
err := h.Handle(context.Background(), tt.args.action)

internal/castai/client.go

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"time"
1212

1313
"github.com/go-resty/resty/v2"
14+
dto "github.com/prometheus/client_model/go"
15+
"github.com/samber/lo"
1416
"github.com/sirupsen/logrus"
1517
"golang.org/x/net/http2"
1618

@@ -28,6 +30,7 @@ type CastAIClient interface {
2830
GetActions(ctx context.Context, k8sVersion string) ([]*ClusterAction, error)
2931
AckAction(ctx context.Context, actionID string, req *AckClusterActionRequest) error
3032
SendLog(ctx context.Context, e *LogEntry) error
33+
SendMetrics(ctx context.Context, gatherTime time.Time, metricFamilies []*dto.MetricFamily) error
3134
}
3235

3336
type LogEntry struct {
@@ -43,14 +46,16 @@ type Client struct {
4346
log *logrus.Logger
4447
rest *resty.Client
4548
clusterID string
49+
podName string
4650
}
4751

4852
// NewClient returns new Client for communicating with Cast AI.
49-
func NewClient(log *logrus.Logger, rest *resty.Client, clusterID string) *Client {
53+
func NewClient(log *logrus.Logger, rest *resty.Client, clusterID, podName string) *Client {
5054
return &Client{
5155
log: log,
5256
rest: rest,
5357
clusterID: clusterID,
58+
podName: podName,
5459
}
5560
}
5661

@@ -140,6 +145,23 @@ func (c *Client) SendLog(ctx context.Context, e *LogEntry) error {
140145
return nil
141146
}
142147

148+
func (c *Client) SendMetrics(ctx context.Context, gatherTime time.Time, metricFamilies []*dto.MetricFamily) error {
149+
req := convertPrometheusMetricFamilies(gatherTime, c.podName, metricFamilies)
150+
151+
resp, err := c.rest.R().
152+
SetBody(req).
153+
SetContext(ctx).
154+
Post(fmt.Sprintf("/v1/clusters/%s/components/%s/metrics", c.clusterID, "cluster-controller"))
155+
if err != nil {
156+
return fmt.Errorf("sending metrics: %w", err)
157+
}
158+
if resp.IsError() {
159+
return fmt.Errorf("sending metrics: request error status_code=%d body=%s", resp.StatusCode(), resp.Body())
160+
}
161+
162+
return nil
163+
}
164+
143165
func (c *Client) GetActions(ctx context.Context, k8sVersion string) ([]*ClusterAction, error) {
144166
res := &GetClusterActionsResponse{}
145167
resp, err := c.rest.R().
@@ -169,3 +191,52 @@ func (c *Client) AckAction(ctx context.Context, actionID string, req *AckCluster
169191
}
170192
return nil
171193
}
194+
195+
func convertPrometheusMetricFamilies(gatherTime time.Time, podName string, metricFamilies []*dto.MetricFamily) *PrometheusWriteRequest {
196+
timestamp := gatherTime.UnixMilli()
197+
198+
timeseries := []PrometheusTimeseries{}
199+
for _, family := range metricFamilies {
200+
for _, metric := range family.Metric {
201+
// Right now we support only export of counter metrics.
202+
if metric.Counter == nil {
203+
continue
204+
}
205+
206+
timeserie := PrometheusTimeseries{
207+
Labels: []PrometheusLabel{
208+
{
209+
Name: "__name__",
210+
Value: family.GetName(),
211+
},
212+
{
213+
Name: "pod_name",
214+
Value: podName,
215+
},
216+
},
217+
}
218+
for _, label := range metric.Label {
219+
if label.Name == nil {
220+
continue
221+
}
222+
223+
timeserie.Labels = append(timeserie.Labels, PrometheusLabel{
224+
Name: *label.Name,
225+
Value: lo.FromPtr(label.Value),
226+
})
227+
}
228+
229+
timeserie.Samples = []PrometheusSample{}
230+
timeserie.Samples = append(timeserie.Samples, PrometheusSample{
231+
Timestamp: timestamp,
232+
Value: metric.Counter.GetValue(),
233+
})
234+
235+
timeseries = append(timeseries, timeserie)
236+
}
237+
}
238+
239+
return &PrometheusWriteRequest{
240+
Timeseries: timeseries,
241+
}
242+
}

0 commit comments

Comments
 (0)