Skip to content

Commit 76073a7

Browse files
authored
HMS-5571: add upload cleanup (#1087)
1 parent e93e6da commit 76073a7

File tree

8 files changed

+162
-5
lines changed

8 files changed

+162
-5
lines changed

cmd/external-repos/main.go

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ func main() {
103103
} else {
104104
log.Warn().Msg("Snapshotting disabled")
105105
}
106+
} else if args[1] == "upload-cleanup" {
107+
err = uploadCleanup(ctx, db.DB)
108+
if err != nil {
109+
log.Error().Err(err).Msgf("error starting upload cleanup tasks")
110+
}
106111
} else if args[1] == "process-repos" {
107112
err = enqueueIntrospectAllRepos(ctx)
108113
if err != nil {
@@ -128,6 +133,10 @@ func main() {
128133
log.Error().Err(err).Msg("error queueing delete snapshot tasks for snapshot cleanup")
129134
}
130135
}
136+
err = uploadCleanup(ctx, db.DB)
137+
if err != nil {
138+
log.Error().Err(err).Msgf("error starting upload cleanup tasks")
139+
}
131140
} else if args[1] == "pulp-orphan-cleanup" {
132141
batchSize := 5
133142
if len(args) > 2 {
@@ -140,7 +149,7 @@ func main() {
140149
if !config.PulpConfigured() {
141150
log.Error().Msg("cannot run orphan cleanup if pulp is not configured")
142151
}
143-
err := pulpOrphanCleanup(ctx, db.DB, batchSize)
152+
err = pulpOrphanCleanup(ctx, db.DB, batchSize)
144153
if err != nil {
145154
log.Error().Err(err).Msg("error starting pulp orphan cleanup tasks")
146155
}
@@ -520,3 +529,44 @@ func pulpOrphanCleanup(ctx context.Context, db *gorm.DB, batchSize int) error {
520529
}
521530
return nil
522531
}
532+
533+
func uploadCleanup(ctx context.Context, db *gorm.DB) error {
534+
var err error
535+
daoReg := dao.GetDaoRegistry(db)
536+
537+
log.Info().Msg("===== Starting upload cleanup =====")
538+
539+
uploads, err := daoReg.Uploads.ListUploadsForCleanup(ctx)
540+
if err != nil {
541+
return fmt.Errorf("error listing uploads for cleanup: %w", err)
542+
}
543+
544+
var cleanupCounter int
545+
for _, upload := range uploads {
546+
orgID := upload.OrgID
547+
548+
domainName, err := daoReg.Domain.Fetch(ctx, orgID)
549+
if err != nil {
550+
log.Error().Err(err).Msgf("error fetching domain name for org %v", orgID)
551+
continue
552+
}
553+
logger := log.Logger.With().Str("org_id", orgID).Str("pulp_domain_name", domainName).Logger()
554+
pulpClient := pulp_client.GetPulpClientWithDomain(domainName)
555+
556+
uploadHref := "/api/pulp/" + domainName + "/api/v3/uploads/" + upload.UploadUUID + "/"
557+
_, err = pulpClient.DeleteUpload(ctx, uploadHref)
558+
if err != nil {
559+
logger.Error().Err(err).Msgf("error deleting pulp upload with uuid: %v", upload.UploadUUID)
560+
continue
561+
}
562+
563+
err = daoReg.Uploads.DeleteUpload(ctx, upload.UploadUUID)
564+
if err != nil {
565+
logger.Error().Err(err).Msgf("error deleting upload")
566+
continue
567+
}
568+
cleanupCounter++
569+
}
570+
log.Info().Msgf("Cleaned up %v uploads", cleanupCounter)
571+
return nil
572+
}

pkg/clients/pulp_client/interfaces.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,5 @@ type PulpClient interface {
9595
CreateUpload(ctx context.Context, size int64) (*zest.UploadResponse, int, error)
9696
UploadChunk(ctx context.Context, uploadHref string, contentRange string, file *os.File, sha256 string) (*zest.UploadResponse, int, error)
9797
FinishUpload(ctx context.Context, uploadHref string, sha256 string) (*zest.AsyncOperationResponse, int, error)
98+
DeleteUpload(ctx context.Context, uploadHref string) (int, error)
9899
}

pkg/clients/pulp_client/pulp_client_mock.go

Lines changed: 28 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/clients/pulp_client/uploads.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@ package pulp_client
22

33
import (
44
"context"
5+
"io"
56
"net/http"
67
"os"
8+
"strings"
79

810
zest "github.com/content-services/zest/release/v2025"
11+
"github.com/rs/zerolog/log"
912
)
1013

1114
// CreateUpload Creates an upload
@@ -63,3 +66,29 @@ func (r *pulpDaoImpl) FinishUpload(ctx context.Context, uploadHref string, sha25
6366
}
6467
return readResp, statusCode, nil
6568
}
69+
70+
func (r *pulpDaoImpl) DeleteUpload(ctx context.Context, uploadHref string) (int, error) {
71+
ctx, client := getZestClient(ctx)
72+
statusCode := http.StatusInternalServerError
73+
var body []byte
74+
var readErr error
75+
76+
httpResp, err := client.UploadsAPI.UploadsDelete(ctx, uploadHref).Execute()
77+
if httpResp != nil {
78+
statusCode = httpResp.StatusCode
79+
defer httpResp.Body.Close()
80+
81+
body, readErr = io.ReadAll(httpResp.Body)
82+
if readErr != nil {
83+
log.Logger.Error().Err(readErr).Msg("DeleteUpload: could not read http body")
84+
}
85+
}
86+
if err != nil {
87+
// want to differentiate between resource not found and page not found
88+
if statusCode == http.StatusNotFound && strings.Contains(string(body), "No Upload matches") {
89+
return statusCode, nil
90+
}
91+
return statusCode, errorWithResponseBody("deleting upload", httpResp, err)
92+
}
93+
return statusCode, nil
94+
}

pkg/dao/interfaces.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,4 +226,5 @@ type UploadDao interface {
226226
StoreChunkUpload(ctx context.Context, orgID string, uploadUUID string, sha256 string) error
227227
GetExistingUploadIDAndCompletedChunks(ctx context.Context, orgID string, sha256 string, chunkSize int64) (string, []string, error)
228228
DeleteUpload(ctx context.Context, uploadUUID string) error
229+
ListUploadsForCleanup(ctx context.Context) ([]models.Upload, error)
229230
}

pkg/dao/uploads.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ func (t uploadDaoImpl) StoreFileUpload(ctx context.Context, orgID string, upload
2727
upload.UploadUUID = uploadUUID
2828
upload.Sha256 = sha256
2929
upload.ChunkSize = chunkSize
30-
3130
upload.ChunkList = []string{}
3231

3332
db := t.db.Model(models.Upload{}).WithContext(ctx).Create(&upload)
@@ -77,3 +76,14 @@ func (t uploadDaoImpl) DeleteUpload(ctx context.Context, uploadUUID string) erro
7776
}
7877
return nil
7978
}
79+
80+
func (t uploadDaoImpl) ListUploadsForCleanup(ctx context.Context) ([]models.Upload, error) {
81+
var uploads []models.Upload
82+
err := t.db.WithContext(ctx).
83+
Where("created_at < current_date - INTERVAL '1' day").
84+
Find(&uploads).Error
85+
if err != nil {
86+
return nil, err
87+
}
88+
return uploads, nil
89+
}

pkg/dao/uploads_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package dao
33
import (
44
"context"
55
"testing"
6+
"time"
67

78
"github.com/content-services/content-sources-backend/pkg/clients/pulp_client"
89
"github.com/content-services/content-sources-backend/pkg/models"
@@ -77,3 +78,39 @@ func (s *UploadsSuite) TestDeleteUpload() {
7778
require.Error(s.T(), err)
7879
assert.Equal(s.T(), "record not found", err.Error())
7980
}
81+
82+
func (s *UploadsSuite) TestListUploadsForCleanup() {
83+
uploadDao := s.uploadsDao()
84+
ctx := context.Background()
85+
86+
// Insert an upload with a timestamp older than 1 day
87+
oldUpload := models.Upload{
88+
CreatedAt: time.Now().AddDate(0, 0, -2), // 2 days ago
89+
UploadUUID: uuid.NewString(),
90+
OrgID: orgIDTest,
91+
ChunkSize: int64(1),
92+
Sha256: uuid.NewString(),
93+
ChunkList: []string{uuid.NewString()},
94+
}
95+
err := s.tx.Create(&oldUpload).Error
96+
require.NoError(s.T(), err)
97+
98+
// Insert an upload with a recent timestamp
99+
recentUpload := models.Upload{
100+
CreatedAt: time.Now(),
101+
UploadUUID: uuid.NewString(),
102+
OrgID: orgIDTest,
103+
ChunkSize: int64(1),
104+
Sha256: uuid.NewString(),
105+
ChunkList: []string{uuid.NewString()},
106+
}
107+
err = s.tx.Create(&recentUpload).Error
108+
require.NoError(s.T(), err)
109+
110+
uploads, err := uploadDao.ListUploadsForCleanup(ctx)
111+
require.NoError(s.T(), err)
112+
113+
// Assert that only the old upload was found
114+
assert.Equal(s.T(), 1, len(uploads))
115+
assert.Equal(s.T(), oldUpload.UploadUUID, uploads[0].UploadUUID)
116+
}

scripts/uploads.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ def generate_checksum(rpm_file):
3838
sha256_hash.update(byte_block)
3939
return sha256_hash.hexdigest()
4040

41-
def create_upload(size):
41+
def create_upload(size, rpm_sha256):
4242
# create the upload
43-
data = {'size': size}
43+
data = {'size': size, 'chunk_size': size, 'sha256': rpm_sha256}
4444
headers = {
4545
'x-rh-identity': IDENTITY_HEADER,
4646
'Content-Type': 'application/json'
@@ -187,7 +187,8 @@ def main():
187187
print(f'sha256 for rpm: {rpm_sha256}')
188188

189189
# create the upload
190-
upload_id = create_upload(rpm_size)
190+
print(f'rpm_size: {rpm_size}')
191+
upload_id = create_upload(rpm_size, rpm_sha256)
191192
print(f'upload href or uuid: {upload_id}')
192193
upload_ids += [(rpm_sha256, upload_id)]
193194

0 commit comments

Comments
 (0)