Skip to content

Commit 4adf2c8

Browse files
committed
Cleanup
1 parent 5dbc800 commit 4adf2c8

File tree

2 files changed

+82
-56
lines changed

2 files changed

+82
-56
lines changed

kustomization/fleet-telemetry-consumer/deployment.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ metadata:
44
name: fleet-telemetry-consumer
55
namespace: tesla
66
spec:
7-
replicas: 1
7+
replicas: 3
88
selector:
99
matchLabels:
1010
app: fleet-telemetry-consumer
@@ -15,7 +15,7 @@ spec:
1515
spec:
1616
containers:
1717
- name: fleet-telemetry-consumer
18-
image: quay.io/rajsinghcpre/fleet-telemetry-consumer:v0.0.17
18+
image: quay.io/rajsinghcpre/fleet-telemetry-consumer:v0.0.18
1919
# Removed the command that uses the config file
2020
volumeMounts:
2121
- name: data

main.go

Lines changed: 80 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -107,44 +107,18 @@ func NewService(cfg Config) (*Service, error) {
107107
}
108108

109109
// Initialize AWS S3 if enabled
110-
if cfg.AWS != nil && cfg.AWS.Enabled {
111-
s3Client, err := configureS3(cfg.AWS)
112-
if err != nil {
113-
return nil, fmt.Errorf("failed to configure S3: %w", err)
114-
}
115-
service.S3Client = s3Client
116-
117-
if err := testS3Connection(s3Client, cfg.AWS.Bucket); err != nil {
118-
return nil, fmt.Errorf("S3 connection test failed: %w", err)
119-
}
120-
log.Println("S3 connection established successfully.")
121-
} else {
122-
log.Println("AWS S3 uploads are disabled.")
110+
if err := initializeS3(service, cfg.AWS); err != nil {
111+
return nil, err
123112
}
124113

125114
// Initialize Kafka consumer
126-
consumer, err := configureKafka(cfg.Kafka)
127-
if err != nil {
128-
return nil, fmt.Errorf("failed to configure Kafka consumer: %w", err)
115+
if err := initializeKafka(service, cfg.Kafka); err != nil {
116+
return nil, err
129117
}
130-
service.KafkaConsumer = consumer
131118

132119
// Initialize PostgreSQL if enabled
133-
if cfg.PostgreSQL != nil && cfg.PostgreSQL.Enabled {
134-
db, err := configurePostgreSQL(cfg.PostgreSQL)
135-
if err != nil {
136-
return nil, fmt.Errorf("failed to configure PostgreSQL: %w", err)
137-
}
138-
service.DB = db
139-
log.Println("PostgreSQL connection established successfully.")
140-
141-
// Create table if it doesn't exist
142-
if err := createTelemetryTable(db); err != nil {
143-
return nil, fmt.Errorf("failed to create telemetry table: %w", err)
144-
}
145-
log.Println("Telemetry table is ready.")
146-
} else {
147-
log.Println("PostgreSQL storage is disabled.")
120+
if err := initializePostgreSQL(service, cfg.PostgreSQL); err != nil {
121+
return nil, err
148122
}
149123

150124
// Load existing S3 data if both S3 and PostgreSQL are enabled
@@ -381,8 +355,17 @@ func loadConfigFromEnv() (Config, error) {
381355
return cfg, nil
382356
}
383357

384-
// uploadToS3 uploads Protobuf data to the specified S3 bucket with a vin/year/month/day key structure
385-
func uploadToS3(s3Svc *s3.S3, bucket, vin string, data []byte, key string) error {
358+
// uploadToS3 uploads Protobuf data to the specified S3 bucket with a vin/year/month/day/createdAt key structure
359+
func uploadToS3(s3Svc *s3.S3, bucket, vin string, data []byte, createdAt time.Time) error {
360+
key := fmt.Sprintf("%s/%04d/%02d/%02d/%04d%02d%02dT%02d%02d%02d.%09dZ.pb",
361+
vin,
362+
createdAt.Year(),
363+
int(createdAt.Month()),
364+
createdAt.Day(),
365+
createdAt.Year(), int(createdAt.Month()), createdAt.Day(),
366+
createdAt.Hour(), createdAt.Minute(), createdAt.Second(), createdAt.Nanosecond(),
367+
)
368+
386369
input := &s3.PutObjectInput{
387370
Bucket: aws.String(bucket),
388371
Key: aws.String(key),
@@ -399,22 +382,25 @@ func uploadToS3(s3Svc *s3.S3, bucket, vin string, data []byte, key string) error
399382
return nil
400383
}
401384

402-
// backupLocally saves Protobuf data to the local filesystem with a vin/year/month/day folder structure
403-
func backupLocally(basePath, vin string, data []byte, key string) error {
404-
// Extract the filename from the key
405-
fileName := filepath.Base(key)
406-
385+
// backupLocally saves Protobuf data to the local filesystem with a vin/year/month/day/createdAt filename structure
386+
func backupLocally(basePath, vin string, data []byte, createdAt time.Time) error {
407387
// Create the directories as per the existing structure
408388
dirPath := filepath.Join(basePath,
409389
vin,
410-
fmt.Sprintf("%04d", time.Now().UTC().Year()),
411-
fmt.Sprintf("%02d", time.Now().UTC().Month()),
412-
fmt.Sprintf("%02d", time.Now().UTC().Day()),
390+
fmt.Sprintf("%04d", createdAt.Year()),
391+
fmt.Sprintf("%02d", createdAt.Month()),
392+
fmt.Sprintf("%02d", createdAt.Day()),
413393
)
414394
if err := os.MkdirAll(dirPath, os.ModePerm); err != nil {
415395
return fmt.Errorf("failed to create directories '%s': %w", dirPath, err)
416396
}
417397

398+
// Use createdAt as the filename
399+
fileName := fmt.Sprintf("%04d%02d%02dT%02d%02d%02d.%09dZ.pb",
400+
createdAt.Year(), int(createdAt.Month()), createdAt.Day(),
401+
createdAt.Hour(), createdAt.Minute(), createdAt.Second(), createdAt.Nanosecond(),
402+
)
403+
418404
// Full file path
419405
filePath := filepath.Join(dirPath, fileName)
420406
if err := os.WriteFile(filePath, data, 0644); err != nil {
@@ -613,26 +599,16 @@ func startConsumerLoop(service *Service, ctx context.Context, wg *sync.WaitGroup
613599
continue
614600
}
615601

616-
// Define the key with .pb extension
617-
key := fmt.Sprintf("%s/%04d/%02d/%02d/%04d%02d%02dT%02d%02d%02d.%06dZ.pb",
618-
vehicleData.Vin,
619-
createdAt.Year(),
620-
int(createdAt.Month()),
621-
createdAt.Day(),
622-
createdAt.Year(), int(createdAt.Month()), createdAt.Day(),
623-
createdAt.Hour(), createdAt.Minute(), createdAt.Second(), createdAt.Nanosecond()/1000,
624-
)
625-
626602
// Upload to S3 if enabled
627603
if service.S3Client != nil {
628-
if err := uploadToS3(service.S3Client, service.Config.AWS.Bucket, vehicleData.Vin, serializedData, key); err != nil {
604+
if err := uploadToS3(service.S3Client, service.Config.AWS.Bucket, vehicleData.Vin, serializedData, createdAt); err != nil {
629605
log.Printf("Failed to upload vehicle data to S3: %v", err)
630606
}
631607
}
632608

633609
// Backup locally if enabled
634610
if service.LocalBackupEnabled {
635-
if err := backupLocally(service.LocalBasePath, vehicleData.Vin, serializedData, key); err != nil {
611+
if err := backupLocally(service.LocalBasePath, vehicleData.Vin, serializedData, createdAt); err != nil {
636612
log.Printf("Failed to backup vehicle data locally: %v", err)
637613
}
638614
}
@@ -744,6 +720,56 @@ func loadExistingS3Data(service *Service) error {
744720
return nil
745721
}
746722

723+
// initializeS3 sets up the AWS S3 client if enabled
724+
func initializeS3(service *Service, s3Config *S3Config) error {
725+
if s3Config != nil && s3Config.Enabled {
726+
s3Client, err := configureS3(s3Config)
727+
if err != nil {
728+
return fmt.Errorf("failed to configure S3: %w", err)
729+
}
730+
service.S3Client = s3Client
731+
732+
if err := testS3Connection(s3Client, s3Config.Bucket); err != nil {
733+
return fmt.Errorf("S3 connection test failed: %w", err)
734+
}
735+
log.Println("S3 connection established successfully.")
736+
} else {
737+
log.Println("AWS S3 uploads are disabled.")
738+
}
739+
return nil
740+
}
741+
742+
// initializeKafka sets up the Kafka consumer
743+
func initializeKafka(service *Service, kafkaCfg KafkaConfig) error {
744+
consumer, err := configureKafka(kafkaCfg)
745+
if err != nil {
746+
return fmt.Errorf("failed to configure Kafka consumer: %w", err)
747+
}
748+
service.KafkaConsumer = consumer
749+
return nil
750+
}
751+
752+
// initializePostgreSQL sets up the PostgreSQL connection if enabled
753+
func initializePostgreSQL(service *Service, pgConfig *PostgreSQLConfig) error {
754+
if pgConfig != nil && pgConfig.Enabled {
755+
db, err := configurePostgreSQL(pgConfig)
756+
if err != nil {
757+
return fmt.Errorf("failed to configure PostgreSQL: %w", err)
758+
}
759+
service.DB = db
760+
log.Println("PostgreSQL connection established successfully.")
761+
762+
// Create table if it doesn't exist
763+
if err := createTelemetryTable(db); err != nil {
764+
return fmt.Errorf("failed to create telemetry table: %w", err)
765+
}
766+
log.Println("Telemetry table is ready.")
767+
} else {
768+
log.Println("PostgreSQL storage is disabled.")
769+
}
770+
return nil
771+
}
772+
747773
// main is the entry point of the application
748774
func main() {
749775
// Load configuration from environment variables

0 commit comments

Comments
 (0)