44 "context"
55 "errors"
66 "fmt"
7+ "log/slog"
78 "net/http"
89 "os"
910 "os/signal"
@@ -14,14 +15,17 @@ import (
1415 "github.com/sirupsen/logrus"
1516 "k8s.io/apimachinery/pkg/labels"
1617 "k8s.io/apimachinery/pkg/selection"
17- "k8s.io/client-go/kubernetes "
18+ "k8s.io/client-go/dynamic "
1819 "k8s.io/client-go/tools/clientcmd"
1920 "k8s.io/client-go/util/flowcontrol"
2021
2122 "github.com/castai/gpu-metrics-exporter/internal/castai"
2223 "github.com/castai/gpu-metrics-exporter/internal/config"
2324 "github.com/castai/gpu-metrics-exporter/internal/exporter"
2425 "github.com/castai/gpu-metrics-exporter/internal/server"
26+ "github.com/castai/gpu-metrics-exporter/internal/workload"
27+ "github.com/castai/logging"
28+ "github.com/castai/metrics"
2529)
2630
2731var (
3034 Version = "local"
3135)
3236
37+ const (
38+ workloadCacheSize = 512
39+ workloadsLabelKey = "workloads.cast.ai/custom-workload"
40+ )
41+
3342func main () {
3443 log := logrus .New ()
3544
@@ -38,19 +47,30 @@ func main() {
3847 log .Fatal (err )
3948 }
4049
41- logLevel , err := logrus . ParseLevel (cfg .LogLevel )
50+ logLevel , err := parseLogLevel (cfg .LogLevel )
4251 if err != nil {
43- log .Fatal (err )
52+ log .Warnf ("failed to parse log level, defaulting to 'info': %v" , err )
53+ logLevel = slog .LevelInfo
4454 }
45- log .SetLevel (logLevel )
4655
47- if err := run (cfg , log ); err != nil && ! errors .Is (err , context .Canceled ) {
56+ castaiLogger := logging .New (logging .NewTextHandler (logging.TextHandlerConfig {
57+ Output : os .Stdout ,
58+ Level : logLevel ,
59+ }))
60+
61+ if err := run (cfg , castaiLogger ); err != nil && ! errors .Is (err , context .Canceled ) {
4862 log .Fatal (err )
4963 }
5064}
5165
52- func run (cfg * config.Config , log logrus.FieldLogger ) error {
53- mux := server .NewServerMux (log )
66+ func parseLogLevel (level string ) (slog.Level , error ) {
67+ var lvl slog.Level
68+ err := lvl .UnmarshalText ([]byte (level ))
69+ return lvl , err
70+ }
71+
72+ func run (cfg * config.Config , log * logging.Logger ) error {
73+ mux := server .NewServerMux ()
5474
5575 srv := & http.Server {
5676 Addr : fmt .Sprintf (":%d" , cfg .HTTPListenPort ),
@@ -75,19 +95,45 @@ func run(cfg *config.Config, log logrus.FieldLogger) error {
7595 cancel ()
7696 }()
7797
78- clientset , err := newKubernetesClientset (cfg )
98+ dynClient , err := newDynamicClient (cfg )
7999 if err != nil {
80- log .Fatal ( err )
100+ log .WithField ( "error" , err . Error ()). Fatal ( "failed to create kubernetes dynamic client" )
81101 }
82102
83103 labelSelector , err := selectorFromMap (cfg .DCGMLabels )
84104 if err != nil {
85- log .Fatal (err )
105+ log .WithField ("error" , err .Error ()).Fatal ("failed to create get label selector" )
106+ }
107+
108+ metricClient , err := metrics .NewMetricClient (
109+ metrics.Config {
110+ APIAddr : cfg .TelemetryURL ,
111+ APIToken : cfg .APIKey ,
112+ ClusterID : cfg .ClusterID ,
113+ }, log )
114+ if err != nil {
115+ log .WithField ("error" , err .Error ()).Warn ("failed to create metrics client" )
116+ }
117+
118+ if metricClient != nil {
119+ go func () {
120+ if err := metricClient .Start (ctx ); err != nil && ! errors .Is (err , context .Canceled ) {
121+ log .WithField ("error" , err .Error ()).Error ("error in metrics client" )
122+ }
123+ }()
86124 }
87125
88126 client := setupCastAIClient (log , cfg )
89127 scraper := exporter .NewScraper (& http.Client {}, log )
90- mapper := exporter .NewMapper (cfg .NodeName )
128+ workloadResolver , err := workload .NewResolver (dynClient , workload.Config {
129+ LabelKeys : []string {workloadsLabelKey },
130+ CacheSize : workloadCacheSize ,
131+ })
132+ if err != nil {
133+ log .WithField ("error" , err .Error ()).Fatal ("failed to create workload resolver" )
134+ }
135+
136+ mapper := exporter .NewMapper (cfg .NodeName , workloadResolver , log )
91137 ex := exporter .NewExporter (exporter.Config {
92138 ExportInterval : cfg .ExportInterval ,
93139 Selector : labelSelector .String (),
@@ -96,7 +142,7 @@ func run(cfg *config.Config, log logrus.FieldLogger) error {
96142 DCGMExporterHost : cfg .DCGMHost ,
97143 Enabled : true ,
98144 NodeName : cfg .NodeName ,
99- }, clientset , log , scraper , mapper , client )
145+ }, dynClient , log , scraper , mapper , client , metricClient )
100146
101147 go func () {
102148 if err := ex .Start (ctx ); err != nil && ! errors .Is (err , context .Canceled ) {
@@ -108,19 +154,14 @@ func run(cfg *config.Config, log logrus.FieldLogger) error {
108154 return srv .ListenAndServe ()
109155}
110156
111- func newKubernetesClientset (cfg * config.Config ) (* kubernetes.Clientset , error ) {
112- config , err := clientcmd .BuildConfigFromFlags ("" , cfg .KubeConfigPath )
113- if err != nil {
114- return nil , err
115- }
116- config .RateLimiter = flowcontrol .NewTokenBucketRateLimiter (float32 (10 ), 25 )
117-
118- clientset , err := kubernetes .NewForConfig (config )
157+ func newDynamicClient (cfg * config.Config ) (dynamic.Interface , error ) {
158+ restConfig , err := clientcmd .BuildConfigFromFlags ("" , cfg .KubeConfigPath )
119159 if err != nil {
120160 return nil , err
121161 }
162+ restConfig .RateLimiter = flowcontrol .NewTokenBucketRateLimiter (float32 (10 ), 25 )
122163
123- return clientset , nil
164+ return dynamic . NewForConfig ( restConfig )
124165}
125166
126167func selectorFromMap (labelMap map [string ]string ) (labels.Selector , error ) {
@@ -138,7 +179,7 @@ func selectorFromMap(labelMap map[string]string) (labels.Selector, error) {
138179 return selector .Add (requirements ... ), nil
139180}
140181
141- func setupCastAIClient (log logrus. FieldLogger , cfg * config.Config ) castai.Client {
182+ func setupCastAIClient (log * logging. Logger , cfg * config.Config ) castai.Client {
142183 clientConfig := castai.Config {
143184 ClusterID : cfg .ClusterID ,
144185 APIKey : cfg .APIKey ,
0 commit comments