Skip to content

Commit 4a5b652

Browse files
committed
Final clean up
1 parent 4a9a7a4 commit 4a5b652

File tree

2 files changed

+47
-80
lines changed

2 files changed

+47
-80
lines changed

kustomization/fleet-telemetry-consumer/deployment.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ spec:
1515
spec:
1616
containers:
1717
- name: fleet-telemetry-consumer
18-
image: quay.io/rajsinghcpre/fleet-telemetry-consumer:v0.0.11
18+
image: quay.io/rajsinghcpre/fleet-telemetry-consumer:v0.0.12
1919
command: ["/fleet-telemetry-consumer", "-config", "/etc/fleet-telemetry-consumer/config.json"]
2020
volumeMounts:
2121
- name: config-volume

main.go

Lines changed: 46 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,15 @@ package main
22

33
import (
44
"bytes"
5-
"context"
65
"encoding/json"
76
"flag"
87
"fmt"
98
"log"
109
"math"
1110
"net/http"
1211
"os"
13-
"os/signal"
1412
"path/filepath"
1513
"strconv"
16-
"syscall"
1714
"time"
1815

1916
"github.com/aws/aws-sdk-go/aws"
@@ -53,9 +50,9 @@ type KafkaConfig struct {
5350

5451
// Service encapsulates the application's dependencies
5552
type Service struct {
56-
Config Config
57-
S3Client *s3.S3
58-
KafkaConsumer *kafka.Consumer
53+
Config Config
54+
S3Client *s3.S3
55+
KafkaConsumer *kafka.Consumer
5956
PrometheusGauge *prometheus.GaugeVec
6057
}
6158

@@ -65,7 +62,7 @@ func NewService(cfg Config) (*Service, error) {
6562
Config: cfg,
6663
}
6764

68-
// Initialize S3 if AWS config is provided
65+
// Initialize AWS S3 if configuration is provided
6966
if service.Config.AWS != nil {
7067
s3Client, err := configureS3(service.Config.AWS)
7168
if err != nil {
@@ -182,7 +179,7 @@ func loadConfig(path string) (Config, error) {
182179

183180
// uploadToS3 uploads data to the specified S3 bucket with a timestamped key
184181
func uploadToS3(s3Svc *s3.S3, bucket string, data []byte) error {
185-
timestamp := time.Now().Format("2006/01/02/15-04-05.000")
182+
timestamp := time.Now().Format("2006/01/02/15-04-05.000")
186183
key := filepath.Join(timestamp, "data.json")
187184

188185
input := &s3.PutObjectInput{
@@ -275,7 +272,7 @@ func handleDoorValues(doors *protos.Doors, gauge *prometheus.GaugeVec, vin strin
275272
}
276273

277274
// startPrometheusServer launches the Prometheus metrics HTTP server
278-
func startPrometheusServer(ctx context.Context, addr string) {
275+
func startPrometheusServer(addr string) {
279276
mux := http.NewServeMux()
280277
mux.Handle("/metrics", promhttp.Handler())
281278

@@ -284,20 +281,9 @@ func startPrometheusServer(ctx context.Context, addr string) {
284281
Handler: mux,
285282
}
286283

287-
go func() {
288-
log.Printf("Starting Prometheus metrics server at %s/metrics", addr)
289-
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
290-
log.Fatalf("Prometheus HTTP server failed: %v", err)
291-
}
292-
}()
293-
294-
// Shutdown the server gracefully on context cancellation
295-
<-ctx.Done()
296-
log.Println("Shutting down Prometheus HTTP server...")
297-
ctxShutDown, cancel := context.WithTimeout(context.Background(), 5*time.Second)
298-
defer cancel()
299-
if err := server.Shutdown(ctxShutDown); err != nil {
300-
log.Fatalf("Prometheus server Shutdown Failed:%+v", err)
284+
log.Printf("Starting Prometheus metrics server at %s/metrics", addr)
285+
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
286+
log.Fatalf("Prometheus HTTP server failed: %v", err)
301287
}
302288
}
303289

@@ -319,76 +305,57 @@ func main() {
319305
log.Fatalf("Service initialization error: %v", err)
320306
}
321307

322-
// Create a context that is cancelled on interrupt signals
323-
ctx, cancel := context.WithCancel(context.Background())
324-
defer cancel()
325-
326-
// Handle OS signals for graceful shutdown
327-
go func() {
328-
sigChan := make(chan os.Signal, 1)
329-
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
330-
sig := <-sigChan
331-
log.Printf("Received signal '%v', initiating shutdown...", sig)
332-
cancel()
333-
}()
334-
335308
// Start Prometheus metrics server
336-
go startPrometheusServer(ctx, ":2112")
309+
go startPrometheusServer(":2112")
337310

338311
// Begin consuming Kafka messages
339312
log.Println("Starting Kafka message consumption...")
340313
for {
341-
select {
342-
case <-ctx.Done():
343-
log.Println("Shutdown signal received. Exiting message consumption loop.")
344-
return
345-
default:
346-
msg, err := service.KafkaConsumer.ReadMessage(-1)
347-
if err != nil {
348-
// Handle Kafka consumer errors
349-
if kafkaError, ok := err.(kafka.Error); ok && kafkaError.Code() == kafka.ErrAllBrokersDown {
350-
log.Printf("Kafka broker is down: %v", err)
351-
time.Sleep(5 * time.Second) // Wait before retrying
352-
continue
353-
}
354-
log.Printf("Error while consuming message: %v", err)
314+
msg, err := service.KafkaConsumer.ReadMessage(-1)
315+
if err != nil {
316+
// Handle Kafka consumer errors
317+
if kafkaError, ok := err.(kafka.Error); ok && kafkaError.Code() == kafka.ErrAllBrokersDown {
318+
log.Printf("Kafka broker is down: %v", err)
319+
time.Sleep(5 * time.Second) // Wait before retrying
355320
continue
356321
}
322+
log.Printf("Error while consuming message: %v", err)
323+
continue
324+
}
357325

358-
// Deserialize the Protobuf message
359-
vehicleData := &protos.Payload{}
360-
if err := proto.Unmarshal(msg.Value, vehicleData); err != nil {
361-
log.Printf("Failed to unmarshal Protobuf message: %v", err)
362-
continue
363-
}
326+
// Deserialize the Protobuf message
327+
vehicleData := &protos.Payload{}
328+
if err := proto.Unmarshal(msg.Value, vehicleData); err != nil {
329+
log.Printf("Failed to unmarshal Protobuf message: %v", err)
330+
continue
331+
}
364332

365-
// Marshal vehicleData to JSON
366-
vehicleDataJSON, err := protojson.Marshal(vehicleData)
367-
if err != nil {
368-
log.Printf("Failed to marshal vehicleData to JSON: %v", err)
369-
continue
370-
}
333+
// Marshal vehicleData to JSON
334+
vehicleDataJSON, err := protojson.Marshal(vehicleData)
335+
if err != nil {
336+
log.Printf("Failed to marshal vehicleData to JSON: %v", err)
337+
continue
338+
}
371339

372-
log.Printf("Received Vehicle Data: %s", string(vehicleDataJSON))
340+
log.Printf("Received Vehicle Data: %s", string(vehicleDataJSON))
373341

374-
// Process each Datum in the Payload
375-
for _, datum := range vehicleData.Data {
376-
fieldName := datum.Key.String()
377-
value := datum.Value
378-
processValue(fieldName, value, service.PrometheusGauge, vehicleData.Vin)
379-
}
342+
// Process each Datum in the Payload
343+
for _, datum := range vehicleData.Data {
344+
fieldName := datum.Key.String()
345+
value := datum.Value
346+
processValue(fieldName, value, service.PrometheusGauge, vehicleData.Vin)
347+
}
380348

381-
// Upload to S3 if enabled
382-
if service.S3Client != nil {
383-
if err := uploadToS3(service.S3Client, service.Config.AWS.Bucket, vehicleDataJSON); err != nil {
384-
log.Printf("Failed to upload vehicle data to S3: %v", err)
385-
}
349+
// Upload to S3 if enabled
350+
if service.S3Client != nil {
351+
if err := uploadToS3(service.S3Client, service.Config.AWS.Bucket, vehicleDataJSON); err != nil {
352+
log.Printf("Failed to upload vehicle data to S3: %v", err)
386353
}
354+
}
387355

388-
// Commit the message offset after successful processing
389-
if _, err := service.KafkaConsumer.CommitMessage(msg); err != nil {
390-
log.Printf("Failed to commit Kafka message: %v", err)
391-
}
356+
// Commit the message offset after successful processing
357+
if _, err := service.KafkaConsumer.CommitMessage(msg); err != nil {
358+
log.Printf("Failed to commit Kafka message: %v", err)
392359
}
393360
}
394361
}

0 commit comments

Comments
 (0)