@@ -209,42 +209,61 @@ func consumeFromKafka(ctx context.Context, awsEnabled string, kafkaBootstrapServ
209209
210210 messagesConsumed .Inc ()
211211
212+ // **Enhanced log statement with message metadata**
213+ log .Printf ("Consumed message Topic:%s Partition:%d Offset:%d Key:%s Timestamp:%s" , * msg .TopicPartition .Topic , msg .TopicPartition .Partition , msg .TopicPartition .Offset , string (msg .Key ), msg .Timestamp .String ())
214+
212215 // Process message
213216 payload := & protos.Payload {}
214217 err = proto .Unmarshal (msg .Value , payload )
215218 if err != nil {
216- log .Printf ("failed to unmarshal protobuf message: %s" , err )
219+ log .Printf ("Failed to unmarshal protobuf message from Kafka at Offset:%d Partition:%d Error:%s" , msg . TopicPartition . Offset , msg . TopicPartition . Partition , err )
217220 messagesFailed .Inc ()
218221 continue
219222 }
220223
224+ // Log the VIN and CreatedAt from the payload
225+ log .Printf ("Processing Payload for VIN:%s CreatedAt:%s" , payload .Vin , payload .CreatedAt .AsTime ().String ())
226+
221227 // Store protobuf message in S3 as .pb file
222228 if awsEnabled == "true" {
223229 err := storeProtobufInS3 (s3Client , awsBucketName , payload )
224230 if err != nil {
225- log .Printf ("failed to store protobuf in S3: %s" , err )
231+ log .Printf ("Failed to store protobuf in S3 for VIN:%s Error: %s" , payload . Vin , err )
226232 messagesFailed .Inc ()
233+ } else {
234+ log .Printf ("Successfully stored protobuf in S3 for VIN:%s" , payload .Vin )
227235 }
228236 }
229237
230238 // Store protobuf message in local filesystem if LOCAL_ENABLED
231239 if localEnabled == "true" {
232240 err := storeProtobufLocally (localBasePath , payload )
233241 if err != nil {
234- log .Printf ("failed to store protobuf locally: %s" , err )
242+ log .Printf ("Failed to store protobuf locally for VIN:%s Error: %s" , payload . Vin , err )
235243 messagesFailed .Inc ()
244+ } else {
245+ log .Printf ("Successfully stored protobuf locally for VIN:%s" , payload .Vin )
236246 }
237247 }
238248
239249 // Marshal protobuf into JSON and store in PostgreSQL
240250 if postgresEnabled == "true" {
241251 err = storeProtobufInPostgres (db , payload )
242252 if err != nil {
243- log .Printf ("failed to store protobuf in PostgreSQL: %s" , err )
253+ log .Printf ("Failed to store protobuf in PostgreSQL for VIN:%s Error: %s" , payload . Vin , err )
244254 messagesFailed .Inc ()
255+ } else {
256+ log .Printf ("Successfully stored protobuf in PostgreSQL for VIN:%s" , payload .Vin )
245257 }
246258 }
247259
260+ // Commit offsets manually if auto-commit is disabled
261+ if _ , err := consumer .CommitMessage (msg ); err != nil {
262+ log .Printf ("Failed to commit message offset:%d Partition:%d Error:%s" , msg .TopicPartition .Offset , msg .TopicPartition .Partition , err )
263+ } else {
264+ log .Printf ("Committed message offset:%d Partition:%d" , msg .TopicPartition .Offset , msg .TopicPartition .Partition )
265+ }
266+
248267 messagesProcessed .Inc ()
249268 }
250269 }
@@ -277,6 +296,9 @@ func storeProtobufInS3(s3Client *s3.S3, bucketName string, payload *protos.Paylo
277296 return fmt .Errorf ("failed to put object in S3: %w" , err )
278297 }
279298
299+ // **Log the S3 object key for tracking**
300+ log .Printf ("Stored object in S3 Bucket:%s Key:%s for VIN:%s" , bucketName , objectKey , vin )
301+
280302 return nil
281303}
282304
@@ -310,6 +332,9 @@ func storeProtobufLocally(basePath string, payload *protos.Payload) error {
310332 return fmt .Errorf ("failed to write file %s: %w" , filePath , err )
311333 }
312334
335+ // **Log the file path for tracking**
336+ log .Printf ("Stored protobuf locally at Path:%s for VIN:%s" , filePath , vin )
337+
313338 return nil
314339}
315340
@@ -332,6 +357,9 @@ func storeProtobufInPostgres(db *sql.DB, payload *protos.Payload) error {
332357 return fmt .Errorf ("failed to insert into PostgreSQL: %w" , err )
333358 }
334359
360+ // **Log successful insertion into PostgreSQL**
361+ log .Printf ("Inserted data into PostgreSQL for VIN:%s CreatedAt:%s" , vin , createdAt .String ())
362+
335363 return nil
336364}
337365
@@ -371,8 +399,10 @@ func loadOldData(ctx context.Context, loadDays int, awsAccessKeyID string, awsSe
371399 // Process objects for the day
372400 err := listAndProcessObjects (ctx , s3Client , awsBucketName , day , db )
373401 if err != nil {
374- log .Printf ("failed to process objects for day %s: %s" , day .Format ("2006-01-02" ), err )
402+ log .Printf ("Failed to process objects for day %s: %s" , day .Format ("2006-01-02" ), err )
375403 continue
404+ } else {
405+ log .Printf ("Successfully processed data for date: %s" , day .Format ("2006-01-02" ))
376406 }
377407 }
378408
@@ -403,10 +433,13 @@ func listAndProcessObjects(ctx context.Context, s3Client *s3.S3, bucketName stri
403433 continue
404434 }
405435 if objDate .Year () == day .Year () && objDate .Month () == day .Month () && objDate .Day () == day .Day () {
436+ log .Printf ("Processing S3 object Key:%s LastModified:%s" , key , obj .LastModified .String ())
406437 err := processS3Object (s3Client , bucketName , key , db )
407438 if err != nil {
408- log .Printf ("failed to process object %s: %s" , key , err )
439+ log .Printf ("Failed to process object %s: %s" , key , err )
409440 continue
441+ } else {
442+ log .Printf ("Successfully processed object %s" , key )
410443 }
411444 }
412445
@@ -445,12 +478,18 @@ func processS3Object(s3Client *s3.S3, bucketName string, key string, db *sql.DB)
445478 return fmt .Errorf ("failed to unmarshal protobuf: %w" , err )
446479 }
447480
481+ // Log payload details
482+ log .Printf ("Processing payload from S3 object Key:%s VIN:%s CreatedAt:%s" , key , payload .Vin , payload .CreatedAt .AsTime ().String ())
483+
448484 // Store protobuf message in PostgreSQL
449485 err = storeProtobufInPostgres (db , payload )
450486 if err != nil {
451487 return fmt .Errorf ("failed to store protobuf in PostgreSQL: %w" , err )
452488 }
453489
490+ // Log successful processing
491+ log .Printf ("Successfully processed S3 object Key:%s and stored data in PostgreSQL for VIN:%s" , key , payload .Vin )
492+
454493 return nil
455494}
456495
0 commit comments