Skip to content

Commit 05738f6

Browse files
authored
Merge pull request #82 from opticSquid/fix/process-bugs
Skips already processed files even if evet is replayed, deletes local directory after s3 upload
2 parents 54debe3 + 23464e8 commit 05738f6

4 files changed

Lines changed: 64 additions & 8 deletions

File tree

cloudstorage/fileIO.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,12 +176,35 @@ func UploadDir(ctx context.Context, s3Client *s3.Client, event *files.File, cfg
176176
})
177177

178178
if err != nil {
179-
log.Error(ctx, "Error walking the folder", "folder", currentDir, "error", err)
179+
log.Error(ctx, "Error walking the directory", "directory", currentDir, "error", err)
180180
} else {
181181
log.Info(ctx, "Folder uploaded successfully", "directory", currentDir)
182+
span.SetStatus(codes.Ok, "Folder uploaded successfully")
183+
log.Info(ctx, "Deleting the local directory", "directory", currentDir)
184+
deleteUploadedDirFromLocal(ctx, event, log)
182185
}
183186
}
184187

188+
func deleteUploadedDirFromLocal(ctx context.Context, file *files.File, log logger.Log) {
189+
tr := otel.Tracer("hangout.storage.cloudstorage")
190+
ctx, span := tr.Start(ctx, "DeleteUploadedFiles")
191+
defer span.End()
192+
193+
span.SetAttributes(
194+
attribute.String("file.name", file.Filename),
195+
attribute.Int("file.userId", int(file.UserId)),
196+
)
197+
198+
baseFilename := strings.Split(file.Filename, ".")[0]
199+
dirPath := "/tmp/" + baseFilename
200+
err := os.RemoveAll(dirPath)
201+
if err != nil {
202+
log.Error(ctx, "Could not delete local directory", "directory", dirPath, "error", err)
203+
span.RecordError(err)
204+
}
205+
log.Info(ctx, "Local directory deleted successfully", "file", file.Filename, "directory", dirPath)
206+
}
207+
185208
func getContentType(extension string) string {
186209
switch extension {
187210
case ".mpd":

database/crud.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,17 @@ import (
1010
"hangout.com/core/storage-service/logger"
1111
)
1212

13+
// IsAlreadyProcessed checks if the file with the given filename has process_status SUCCESS.
14+
func (dbConn *DatabaseConnectionPool) IsAlreadyProcessed(ctx context.Context, filename string) (bool, error) {
15+
var currentStatus model.ProcessStatus
16+
row := dbConn.pool.QueryRow(ctx, "SELECT process_status FROM media WHERE filename = $1", filename)
17+
err := row.Scan(&currentStatus)
18+
if err != nil {
19+
return false, err
20+
}
21+
return currentStatus == model.SUCCESS, nil
22+
}
23+
1324
func (dbConn *DatabaseConnectionPool) UpdateProcessingStatus(ctx context.Context, filename string, processStatus model.ProcessStatus, log logger.Log) error {
1425
tr := otel.Tracer("hangout.storage.database")
1526
ctx, span := tr.Start(ctx, "UpdateProcessingStatus")

files/file.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,14 @@ func (f *File) Process(workerContext context.Context, cfg *koanf.Koanf, dbConnPo
5151
} else {
5252
mediaFile := &pipeline.Video{Filename: f.Filename}
5353
log.Info(ctx, "marking file status as PROCESSING in db", "filename", f.Filename)
54-
dbConnPool.UpdateProcessingStatus(ctx, f.Filename, model.PROCESSING, log)
55-
err := mediaFile.ProcessMedia(ctx, cfg, log)
54+
err := dbConnPool.UpdateProcessingStatus(ctx, f.Filename, model.PROCESSING, log)
55+
if err != nil {
56+
log.Error(ctx, "could not mark file as PROCESSING in db", "filename", f.Filename)
57+
span.RecordError(err)
58+
span.SetStatus(codes.Error, err.Error())
59+
return err
60+
}
61+
err = mediaFile.ProcessMedia(ctx, cfg, log)
5662
if err != nil {
5763
log.Error(ctx, "marking file status as FAILED in db", "filename", f.Filename)
5864
dbConnPool.UpdateProcessingStatus(ctx, f.Filename, model.FAIL, log)

worker/worker-pool.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,17 +68,33 @@ func (worker *WorkerPool) do(workerId int, file *files.File, workerLogger logger
6868

6969
workerLogger.Info(ctx, "starting file processing", "file-name", file.Filename, "user-id", file.UserId)
7070

71-
// download the given file from cloud storage
71+
// Check if already processed
72+
isProcessed, err := worker.dbConnPool.IsAlreadyProcessed(ctx, file.Filename)
73+
if err != nil {
74+
workerLogger.Error(ctx, "error checking process status", "error", err.Error())
75+
// Optionally: do not acknowledge, so it can be retried
76+
return
77+
}
78+
if isProcessed {
79+
workerLogger.Info(ctx, "file already processed, acknowledging and skipping", "file-name", file.Filename)
80+
if file.KafkaSession != nil && file.KafkaMessage != nil {
81+
file.KafkaSession.MarkMessage(file.KafkaMessage, "")
82+
}
83+
return
84+
}
85+
86+
// Not processed: download, process, upload, then acknowledge
7287
cloudstorage.Download(ctx, s3Client, file, worker.cfg, workerLogger)
73-
// process the file
74-
err := file.Process(ctx, worker.cfg, worker.dbConnPool, workerLogger)
88+
err = file.Process(ctx, worker.cfg, worker.dbConnPool, workerLogger)
7589
if err != nil {
7690
workerLogger.Error(ctx, "could not process file", "error", err.Error())
91+
// Optionally: do not acknowledge, so it can be retried
92+
return
7793
}
78-
// upload the given file to cloud storage
94+
7995
cloudstorage.UploadDir(ctx, s3Client, file, worker.cfg, workerLogger)
8096
if file.KafkaSession != nil && file.KafkaMessage != nil {
81-
file.KafkaSession.MarkMessage(file.KafkaMessage, "") // Acknowledge the Kafka event and mark as completed
97+
file.KafkaSession.MarkMessage(file.KafkaMessage, "")
8298
}
8399
workerLogger.Info(ctx, "finished file processing", "file-name", file.Filename)
84100
}

0 commit comments

Comments
 (0)