Skip to content

Commit c8d42d5

Browse files
committed
Update create table
1 parent 13e043c commit c8d42d5

File tree

2 files changed

+31
-13
lines changed

2 files changed

+31
-13
lines changed

kustomization/fleet-telemetry-consumer/deployment.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ spec:
1515
spec:
1616
containers:
1717
- name: fleet-telemetry-consumer
18-
image: quay.io/rajsinghcpre/fleet-telemetry-consumer:v0.0.25
18+
image: quay.io/rajsinghcpre/fleet-telemetry-consumer:v0.0.26
1919
# Removed the command that uses the config file
2020
volumeMounts:
2121
- name: data

main.go

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,36 @@ func main() {
105105
log.Fatal(http.ListenAndServe(prometheusAddr, nil))
106106
}()
107107

108+
// **Create the telemetry table before starting goroutines**
109+
if postgresEnabled == "true" {
110+
connStr := fmt.Sprintf(
111+
"host=%s port=%s user=%s password=%s dbname=%s sslmode=%s",
112+
postgresHost, postgresPort, postgresUser, postgresPassword, postgresDBName, postgresSSLMode,
113+
)
114+
db, err := sql.Open("postgres", connStr)
115+
if err != nil {
116+
log.Fatalf("failed to connect to PostgreSQL: %s", err)
117+
}
118+
defer db.Close()
119+
120+
// Ensure the table exists
121+
err = createTelemetryTable(db)
122+
if err != nil {
123+
log.Fatalf("failed to create telemetry table: %s", err)
124+
}
125+
}
126+
108127
// If KAFKA_ENABLED is true
109128
if kafkaEnabled == "true" {
110129
wg.Add(1)
111130
go func() {
112131
defer wg.Done()
113-
consumeFromKafka(ctx, awsEnabled, kafkaBootstrapServers, kafkaGroupID, kafkaTopic, kafkaAutoOffsetReset, awsAccessKeyID, awsSecretAccessKey, awsBucketName, awsBucketRegion, awsBucketHost, awsBucketPort, awsBucketProtocol, localBasePath, localEnabled, postgresEnabled, postgresHost, postgresPort, postgresUser, postgresPassword, postgresDBName, postgresSSLMode)
132+
consumeFromKafka(
133+
ctx, awsEnabled, kafkaBootstrapServers, kafkaGroupID, kafkaTopic, kafkaAutoOffsetReset,
134+
awsAccessKeyID, awsSecretAccessKey, awsBucketName, awsBucketRegion,
135+
awsBucketHost, awsBucketPort, awsBucketProtocol, localBasePath, localEnabled,
136+
postgresEnabled, postgresHost, postgresPort, postgresUser, postgresPassword, postgresDBName, postgresSSLMode,
137+
)
114138
}()
115139
}
116140

@@ -121,7 +145,11 @@ func main() {
121145
wg.Add(1)
122146
go func() {
123147
defer wg.Done()
124-
err := loadOldData(ctx, loadDays, awsAccessKeyID, awsSecretAccessKey, awsBucketName, awsBucketRegion, awsBucketHost, awsBucketPort, awsBucketProtocol, postgresHost, postgresPort, postgresUser, postgresPassword, postgresDBName, postgresSSLMode)
148+
err := loadOldData(
149+
ctx, loadDays, awsAccessKeyID, awsSecretAccessKey, awsBucketName, awsBucketRegion,
150+
awsBucketHost, awsBucketPort, awsBucketProtocol,
151+
postgresHost, postgresPort, postgresUser, postgresPassword, postgresDBName, postgresSSLMode,
152+
)
125153
if err != nil {
126154
log.Printf("Error loading old data: %s", err)
127155
}
@@ -181,11 +209,6 @@ func consumeFromKafka(ctx context.Context, awsEnabled string, kafkaBootstrapServ
181209
log.Fatalf("failed to connect to PostgreSQL: %s", err)
182210
}
183211
defer db.Close()
184-
// Ensure the table exists
185-
err = createTelemetryTable(db)
186-
if err != nil {
187-
log.Fatalf("failed to create telemetry table: %s", err)
188-
}
189212
}
190213

191214
for {
@@ -386,11 +409,6 @@ func loadOldData(ctx context.Context, loadDays int, awsAccessKeyID string, awsSe
386409
}
387410
defer db.Close()
388411

389-
err = createTelemetryTable(db)
390-
if err != nil {
391-
return fmt.Errorf("failed to create telemetry table: %s", err)
392-
}
393-
394412
// For each day in LOAD_DAYS
395413
now := time.Now()
396414
for i := 0; i < loadDays; i++ {

0 commit comments

Comments
 (0)