44 "context"
55 "errors"
66 "fmt"
7+ "log/slog"
78 "net/http"
89 "os"
910 "os/signal"
@@ -14,14 +15,17 @@ import (
1415 "github.com/sirupsen/logrus"
1516 "k8s.io/apimachinery/pkg/labels"
1617 "k8s.io/apimachinery/pkg/selection"
17- "k8s.io/client-go/kubernetes "
18+ "k8s.io/client-go/dynamic "
1819 "k8s.io/client-go/tools/clientcmd"
1920 "k8s.io/client-go/util/flowcontrol"
2021
2122 "github.com/castai/gpu-metrics-exporter/internal/castai"
2223 "github.com/castai/gpu-metrics-exporter/internal/config"
2324 "github.com/castai/gpu-metrics-exporter/internal/exporter"
2425 "github.com/castai/gpu-metrics-exporter/internal/server"
26+ "github.com/castai/gpu-metrics-exporter/internal/workload"
27+ "github.com/castai/logging"
28+ "github.com/castai/metrics"
2529)
2630
2731var (
@@ -38,19 +42,30 @@ func main() {
3842 log .Fatal (err )
3943 }
4044
41- logLevel , err := logrus . ParseLevel (cfg .LogLevel )
45+ logLevel , err := parseLogLevel (cfg .LogLevel )
4246 if err != nil {
43- log .Fatal (err )
47+ log .Warnf ("failed to parse log level, defaulting to 'info': %v" , err )
48+ logLevel = slog .LevelInfo
4449 }
45- log .SetLevel (logLevel )
4650
47- if err := run (cfg , log ); err != nil && ! errors .Is (err , context .Canceled ) {
51+ castaiLogger := logging .New (logging .NewTextHandler (logging.TextHandlerConfig {
52+ Output : os .Stdout ,
53+ Level : logLevel ,
54+ }))
55+
56+ if err := run (cfg , castaiLogger ); err != nil && ! errors .Is (err , context .Canceled ) {
4857 log .Fatal (err )
4958 }
5059}
5160
52- func run (cfg * config.Config , log logrus.FieldLogger ) error {
53- mux := server .NewServerMux (log )
61+ func parseLogLevel (level string ) (slog.Level , error ) {
62+ var lvl slog.Level
63+ err := lvl .UnmarshalText ([]byte (level ))
64+ return lvl , err
65+ }
66+
67+ func run (cfg * config.Config , log * logging.Logger ) error {
68+ mux := server .NewServerMux ()
5469
5570 srv := & http.Server {
5671 Addr : fmt .Sprintf (":%d" , cfg .HTTPListenPort ),
@@ -75,19 +90,45 @@ func run(cfg *config.Config, log logrus.FieldLogger) error {
7590 cancel ()
7691 }()
7792
78- clientset , err := newKubernetesClientset (cfg )
93+ dynClient , err := newDynamicClient (cfg )
7994 if err != nil {
80- log .Fatal ( err )
95+ log .WithField ( "error" , err . Error ()). Fatal ( "failed to create kubernetes dynamic client" )
8196 }
8297
8398 labelSelector , err := selectorFromMap (cfg .DCGMLabels )
8499 if err != nil {
85- log .Fatal (err )
100+ log .WithField ("error" , err .Error ()).Fatal ("failed to create get label selector" )
101+ }
102+
103+ metricClient , err := metrics .NewMetricClient (
104+ metrics.Config {
105+ APIAddr : cfg .TelemetryURL ,
106+ APIToken : cfg .APIKey ,
107+ ClusterID : cfg .ClusterID ,
108+ }, log )
109+ if err != nil {
110+ log .WithField ("error" , err .Error ()).Warn ("failed to create metrics client" )
111+ }
112+
113+ if metricClient != nil {
114+ go func () {
115+ if err := metricClient .Start (ctx ); err != nil && ! errors .Is (err , context .Canceled ) {
116+ log .WithField ("error" , err .Error ()).Error ("error in metrics client" )
117+ }
118+ }()
86119 }
87120
88121 client := setupCastAIClient (log , cfg )
89122 scraper := exporter .NewScraper (& http.Client {}, log )
90- mapper := exporter .NewMapper (cfg .NodeName )
123+ workloadResolver , err := workload .NewResolver (dynClient , workload.Config {
124+ LabelKeys : []string {"workloads.cast.ai/custom-workload" },
125+ CacheSize : 512 ,
126+ })
127+ if err != nil {
128+ log .WithField ("error" , err .Error ()).Fatal ("failed to create workload resolver" )
129+ }
130+
131+ mapper := exporter .NewMapper (cfg .NodeName , workloadResolver , log )
91132 ex := exporter .NewExporter (exporter.Config {
92133 ExportInterval : cfg .ExportInterval ,
93134 Selector : labelSelector .String (),
@@ -96,7 +137,7 @@ func run(cfg *config.Config, log logrus.FieldLogger) error {
96137 DCGMExporterHost : cfg .DCGMHost ,
97138 Enabled : true ,
98139 NodeName : cfg .NodeName ,
99- }, clientset , log , scraper , mapper , client )
140+ }, dynClient , log , scraper , mapper , client , metricClient )
100141
101142 go func () {
102143 if err := ex .Start (ctx ); err != nil && ! errors .Is (err , context .Canceled ) {
@@ -108,19 +149,14 @@ func run(cfg *config.Config, log logrus.FieldLogger) error {
108149 return srv .ListenAndServe ()
109150}
110151
111- func newKubernetesClientset (cfg * config.Config ) (* kubernetes.Clientset , error ) {
112- config , err := clientcmd .BuildConfigFromFlags ("" , cfg .KubeConfigPath )
113- if err != nil {
114- return nil , err
115- }
116- config .RateLimiter = flowcontrol .NewTokenBucketRateLimiter (float32 (10 ), 25 )
117-
118- clientset , err := kubernetes .NewForConfig (config )
152+ func newDynamicClient (cfg * config.Config ) (dynamic.Interface , error ) {
153+ restConfig , err := clientcmd .BuildConfigFromFlags ("" , cfg .KubeConfigPath )
119154 if err != nil {
120155 return nil , err
121156 }
157+ restConfig .RateLimiter = flowcontrol .NewTokenBucketRateLimiter (float32 (10 ), 25 )
122158
123- return clientset , nil
159+ return dynamic . NewForConfig ( restConfig )
124160}
125161
126162func selectorFromMap (labelMap map [string ]string ) (labels.Selector , error ) {
@@ -138,7 +174,7 @@ func selectorFromMap(labelMap map[string]string) (labels.Selector, error) {
138174 return selector .Add (requirements ... ), nil
139175}
140176
141- func setupCastAIClient (log logrus. FieldLogger , cfg * config.Config ) castai.Client {
177+ func setupCastAIClient (log * logging. Logger , cfg * config.Config ) castai.Client {
142178 clientConfig := castai.Config {
143179 ClusterID : cfg .ClusterID ,
144180 APIKey : cfg .APIKey ,
0 commit comments