Skip to content

Commit 23464e8

Browse files
committed
check if files are already processed if yes skip download also
1 parent 555411b commit 23464e8

2 files changed

Lines changed: 35 additions & 38 deletions

File tree

database/crud.go

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package database
22

33
import (
44
"context"
5-
"errors"
65

76
"go.opentelemetry.io/otel"
87
"go.opentelemetry.io/otel/attribute"
@@ -11,6 +10,17 @@ import (
1110
"hangout.com/core/storage-service/logger"
1211
)
1312

13+
// IsAlreadyProcessed checks if the file with the given filename has process_status SUCCESS.
14+
func (dbConn *DatabaseConnectionPool) IsAlreadyProcessed(ctx context.Context, filename string) (bool, error) {
15+
var currentStatus model.ProcessStatus
16+
row := dbConn.pool.QueryRow(ctx, "SELECT process_status FROM media WHERE filename = $1", filename)
17+
err := row.Scan(&currentStatus)
18+
if err != nil {
19+
return false, err
20+
}
21+
return currentStatus == model.SUCCESS, nil
22+
}
23+
1424
func (dbConn *DatabaseConnectionPool) UpdateProcessingStatus(ctx context.Context, filename string, processStatus model.ProcessStatus, log logger.Log) error {
1525
tr := otel.Tracer("hangout.storage.database")
1626
ctx, span := tr.Start(ctx, "UpdateProcessingStatus")
@@ -21,25 +31,6 @@ func (dbConn *DatabaseConnectionPool) UpdateProcessingStatus(ctx context.Context
2131
)
2232
defer span.End()
2333

24-
// Check current process_status
25-
var currentStatus model.ProcessStatus
26-
row := dbConn.pool.QueryRow(ctx, "SELECT process_status FROM media WHERE filename = $1", filename)
27-
err := row.Scan(&currentStatus)
28-
if err != nil {
29-
span.RecordError(err)
30-
span.SetStatus(codes.Error, err.Error())
31-
log.Error(ctx, "could not fetch current process status", "error", err)
32-
return err
33-
}
34-
35-
if currentStatus == model.SUCCESS {
36-
err := errors.New("cannot update process_status: already SUCCESS")
37-
span.RecordError(err)
38-
span.SetStatus(codes.Error, err.Error())
39-
log.Error(ctx, "process_status is already SUCCESS, not updating", "filename", filename)
40-
return err
41-
}
42-
4334
query := `UPDATE media SET process_status = $1 where filename = $2`
4435
cmdTag, err := dbConn.pool.Exec(ctx, query, processStatus, filename)
4536
if err != nil {

worker/worker-pool.go

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,29 +68,35 @@ func (worker *WorkerPool) do(workerId int, file *files.File, workerLogger logger
6868

6969
workerLogger.Info(ctx, "starting file processing", "file-name", file.Filename, "user-id", file.UserId)
7070

71-
// download the given file from cloud storage
72-
cloudstorage.Download(ctx, s3Client, file, worker.cfg, workerLogger)
73-
// process the file
74-
err := file.Process(ctx, worker.cfg, worker.dbConnPool, workerLogger)
71+
// Check if already processed
72+
isProcessed, err := worker.dbConnPool.IsAlreadyProcessed(ctx, file.Filename)
7573
if err != nil {
76-
if err.Error() == "cannot update process_status: already SUCCESS" {
77-
// Acknowledge and skip upload
78-
if file.KafkaSession != nil && file.KafkaMessage != nil {
79-
file.KafkaSession.MarkMessage(file.KafkaMessage, "")
80-
}
81-
workerLogger.Info(ctx, "file already processed, skipping upload", "file-name", file.Filename)
82-
} else {
83-
workerLogger.Error(ctx, "could not process file", "error", err.Error())
84-
// do not acknowledge, so it can be retried
85-
}
86-
} else {
87-
// Success: upload and acknowledge
88-
cloudstorage.UploadDir(ctx, s3Client, file, worker.cfg, workerLogger)
74+
workerLogger.Error(ctx, "error checking process status", "error", err.Error())
75+
// Optionally: do not acknowledge, so it can be retried
76+
return
77+
}
78+
if isProcessed {
79+
workerLogger.Info(ctx, "file already processed, acknowledging and skipping", "file-name", file.Filename)
8980
if file.KafkaSession != nil && file.KafkaMessage != nil {
9081
file.KafkaSession.MarkMessage(file.KafkaMessage, "")
9182
}
92-
workerLogger.Info(ctx, "finished file processing", "file-name", file.Filename)
83+
return
84+
}
85+
86+
// Not processed: download, process, upload, then acknowledge
87+
cloudstorage.Download(ctx, s3Client, file, worker.cfg, workerLogger)
88+
err = file.Process(ctx, worker.cfg, worker.dbConnPool, workerLogger)
89+
if err != nil {
90+
workerLogger.Error(ctx, "could not process file", "error", err.Error())
91+
// Optionally: do not acknowledge, so it can be retried
92+
return
93+
}
94+
95+
cloudstorage.UploadDir(ctx, s3Client, file, worker.cfg, workerLogger)
96+
if file.KafkaSession != nil && file.KafkaMessage != nil {
97+
file.KafkaSession.MarkMessage(file.KafkaMessage, "")
9398
}
99+
workerLogger.Info(ctx, "finished file processing", "file-name", file.Filename)
94100
}
95101

96102
// Wait ensures all workers complete processing before the program exits

0 commit comments

Comments
 (0)