@@ -6,9 +6,12 @@ import (
66 "fmt"
77 "os"
88 "log"
9+ "net/http"
910 "github.com/confluentinc/confluent-kafka-go/kafka"
10- "google.golang.org/protobuf/proto"
1111 "github.com/teslamotors/fleet-telemetry/protos"
12+ "github.com/prometheus/client_golang/prometheus"
13+ "github.com/prometheus/client_golang/prometheus/promhttp"
14+ "google.golang.org/protobuf/proto"
1215)
1316
1417func main () {
@@ -51,13 +54,31 @@ func main() {
5154 for topic := range metadata .Topics {
5255 fmt .Println (topic )
5356 }
54-
57+
5558 // Subscribe to the topic
5659 err = consumer .Subscribe ("tesla_V" , nil )
5760 if err != nil {
5861 log .Fatalf ("Failed to subscribe to topic: %s" , err )
5962 }
6063
64+ // Initialize Prometheus metrics
65+ vehicleDataGauge := prometheus .NewGaugeVec (
66+ prometheus.GaugeOpts {
67+ Name : "vehicle_data" ,
68+ Help : "Vehicle data metrics" ,
69+ },
70+ []string {"field" , "vin" },
71+ )
72+
73+ // Register the metric
74+ prometheus .MustRegister (vehicleDataGauge )
75+
76+ // Start HTTP server for Prometheus metrics
77+ go func () {
78+ http .Handle ("/metrics" , promhttp .Handler ())
79+ log .Fatal (http .ListenAndServe (":2112" , nil ))
80+ }()
81+
6182 // Consume messages
6283 fmt .Println ("Waiting for messages..." )
6384
@@ -70,18 +91,92 @@ func main() {
7091 }
7192
7293 // Deserialize the Protobuf message
73- vehicleData := & protos.Payload {} // Use the correct message type
94+ vehicleData := & protos.Payload {}
7495 if err := proto .Unmarshal (msg .Value , vehicleData ); err != nil {
7596 log .Printf ("Failed to unmarshal Protobuf message: %v\n " , err )
7697 continue
7798 }
7899
79- // Output the message to console
80- vehicleDataJSON , err := json .MarshalIndent (vehicleData , "" , " " )
81- if err != nil {
82- log .Printf ("Failed to marshal vehicle data to JSON: %v\n " , err )
83- continue
100+ // Process each Datum in the Payload
101+ for _ , datum := range vehicleData .Data {
102+ fieldName := datum .Key .String () // Get the field name from the enum
103+ value := datum .Value
104+
105+ var numericValue float64
106+
107+ switch v := value .Value .(type ) {
108+ case * protos.Value_DoubleValue :
109+ numericValue = v .DoubleValue
110+ case * protos.Value_FloatValue :
111+ numericValue = float64 (v .FloatValue )
112+ case * protos.Value_IntValue :
113+ numericValue = float64 (v .IntValue )
114+ case * protos.Value_LongValue :
115+ numericValue = float64 (v .LongValue )
116+ case * protos.Value_BooleanValue :
117+ if v .BooleanValue {
118+ numericValue = 1.0
119+ } else {
120+ numericValue = 0.0
121+ }
122+ case * protos.Value_ChargingValue :
123+ numericValue = float64 (v .ChargingValue .Number ())
124+ case * protos.Value_ShiftStateValue :
125+ numericValue = float64 (v .ShiftStateValue .Number ())
126+ case * protos.Value_LaneAssistLevelValue :
127+ numericValue = float64 (v .LaneAssistLevelValue .Number ())
128+ case * protos.Value_ScheduledChargingModeValue :
129+ numericValue = float64 (v .ScheduledChargingModeValue .Number ())
130+ case * protos.Value_SentryModeStateValue :
131+ numericValue = float64 (v .SentryModeStateValue .Number ())
132+ case * protos.Value_SpeedAssistLevelValue :
133+ numericValue = float64 (v .SpeedAssistLevelValue .Number ())
134+ case * protos.Value_BmsStateValue :
135+ numericValue = float64 (v .BmsStateValue .Number ())
136+ case * protos.Value_BuckleStatusValue :
137+ numericValue = float64 (v .BuckleStatusValue .Number ())
138+ case * protos.Value_CarTypeValue :
139+ numericValue = float64 (v .CarTypeValue .Number ())
140+ case * protos.Value_ChargePortValue :
141+ numericValue = float64 (v .ChargePortValue .Number ())
142+ case * protos.Value_ChargePortLatchValue :
143+ numericValue = float64 (v .ChargePortLatchValue .Number ())
144+ case * protos.Value_CruiseStateValue :
145+ numericValue = float64 (v .CruiseStateValue .Number ())
146+ case * protos.Value_DriveInverterStateValue :
147+ numericValue = float64 (v .DriveInverterStateValue .Number ())
148+ case * protos.Value_HvilStatusValue :
149+ numericValue = float64 (v .HvilStatusValue .Number ())
150+ case * protos.Value_WindowStateValue :
151+ numericValue = float64 (v .WindowStateValue .Number ())
152+ case * protos.Value_SeatFoldPositionValue :
153+ numericValue = float64 (v .SeatFoldPositionValue .Number ())
154+ case * protos.Value_TractorAirStatusValue :
155+ numericValue = float64 (v .TractorAirStatusValue .Number ())
156+ case * protos.Value_FollowDistanceValue :
157+ numericValue = float64 (v .FollowDistanceValue .Number ())
158+ case * protos.Value_ForwardCollisionSensitivityValue :
159+ numericValue = float64 (v .ForwardCollisionSensitivityValue .Number ())
160+ case * protos.Value_GuestModeMobileAccessValue :
161+ numericValue = float64 (v .GuestModeMobileAccessValue .Number ())
162+ case * protos.Value_TrailerAirStatusValue :
163+ numericValue = float64 (v .TrailerAirStatusValue .Number ())
164+ case * protos.Value_DetailedChargeStateValue :
165+ numericValue = float64 (v .DetailedChargeStateValue .Number ())
166+ case * protos.Value_LocationValue :
167+ // Handle LocationValue separately
168+ lat := v .LocationValue .Latitude
169+ lon := v .LocationValue .Longitude
170+ vehicleDataGauge .WithLabelValues (fieldName + "_latitude" , vehicleData .Vin ).Set (lat )
171+ vehicleDataGauge .WithLabelValues (fieldName + "_longitude" , vehicleData .Vin ).Set (lon )
172+ continue // Skip the rest since we've handled this case
173+ default :
174+ // Skip non-numeric or unsupported types
175+ continue
176+ }
177+
178+ // Update the metric
179+ vehicleDataGauge .WithLabelValues (fieldName , vehicleData .Vin ).Set (numericValue )
84180 }
85- fmt .Printf ("Received message: %s\n " , vehicleDataJSON )
86181 }
87182}
0 commit comments