Skip to content

Commit ab5c56a

Browse files
feat: add support for processing of upload_v2 job type by slave
1 parent 7dc3a45 commit ab5c56a

File tree

12 files changed

+1155
-423
lines changed

12 files changed

+1155
-423
lines changed

services/notifier/repo.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ func scanJob(scan scanFn, job *Job) error {
275275
}
276276
if jobTypeRaw.Valid {
277277
switch jobTypeRaw.String {
278-
case string(JobTypeUpload), string(JobTypeAsync):
278+
case string(JobTypeUpload), string(JobTypeAsync), string(JobTypeUploadV2):
279279
job.Type = JobType(jobTypeRaw.String)
280280
default:
281281
return fmt.Errorf("scanning: unknown job type: %s", jobTypeRaw.String)

warehouse/internal/loadfiles/loadfiles.go

+14-5
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ func (lf *LoadFileGenerator) createFromStaging(ctx context.Context, job *model.U
211211
}()
212212

213213
if !lf.AllowUploadV2JobCreation(job) {
214+
lf.Logger.Infof("V2 job creation disabled. Processing %d staging files", len(toProcessStagingFiles))
214215
err = lf.createUploadJobs(ctx, job, toProcessStagingFiles, publishBatchSize, uniqueLoadGenID)
215216
if err != nil {
216217
return 0, 0, fmt.Errorf("creating upload jobs: %w", err)
@@ -234,11 +235,13 @@ func (lf *LoadFileGenerator) createFromStaging(ctx context.Context, job *model.U
234235
g, gCtx := errgroup.WithContext(ctx)
235236
if len(v1Files) > 0 {
236237
g.Go(func() error {
238+
lf.Logger.Infof("V2 job creation enabled. Processing %d v1 staging files", len(v1Files))
237239
return lf.createUploadJobs(gCtx, job, v1Files, publishBatchSize, uniqueLoadGenID)
238240
})
239241
}
240242
if len(v2Files) > 0 {
241243
g.Go(func() error {
244+
lf.Logger.Infof("V2 job creation enabled. Processing %d v2 staging files", len(v1Files))
242245
return lf.createUploadV2Jobs(gCtx, job, v2Files, publishBatchSize, uniqueLoadGenID)
243246
})
244247
}
@@ -329,7 +332,7 @@ func (lf *LoadFileGenerator) publishToNotifier(
329332

330333
destID := job.Upload.DestinationID
331334
destType := job.Upload.DestinationType
332-
lf.Logger.Infof("[WH]: Publishing %d staging files for %s:%s to notifier", len(messages), destType, destID)
335+
lf.Logger.Infof("[WH]: Publishing %d jobs for %s:%s to notifier", len(messages), obskit.DestinationType(destType), obskit.DestinationID(destID))
333336

334337
ch, err := lf.Notifier.Publish(ctx, &notifier.PublishRequest{
335338
Payloads: messages,
@@ -401,7 +404,6 @@ func (lf *LoadFileGenerator) processNotifierResponse(ctx context.Context, ch <-c
401404
return nil
402405
}
403406

404-
// Unlike upload type job, for v2 we are not setting the status of staging files
405407
func (lf *LoadFileGenerator) processNotifierResponseV2(ctx context.Context, ch <-chan *notifier.PublishResponse, job *model.UploadJob, chunk []*model.StagingFile) error {
406408
responses, ok := <-ch
407409
if !ok {
@@ -437,6 +439,12 @@ func (lf *LoadFileGenerator) processNotifierResponseV2(ctx context.Context, ch <
437439
if err := lf.LoadRepo.Insert(ctx, loadFiles); err != nil {
438440
return fmt.Errorf("inserting load files: %w", err)
439441
}
442+
stagingFileIds := lo.Map(chunk, func(file *model.StagingFile, _ int) int64 {
443+
return file.ID
444+
})
445+
if err := lf.StageRepo.SetStatuses(ctx, stagingFileIds, warehouseutils.StagingFileSucceededState); err != nil {
446+
return fmt.Errorf("setting staging file status to succeeded: %w", err)
447+
}
440448
return nil
441449
}
442450

@@ -479,8 +487,9 @@ func (lf *LoadFileGenerator) createUploadV2Jobs(ctx context.Context, job *model.
479487
}
480488
g, gCtx := errgroup.WithContext(ctx)
481489
stagingFileGroups := lf.GroupStagingFiles(stagingFiles, lf.Conf.GetInt("Warehouse.loadFiles.maxSizeInMB", 128))
482-
for _, fileGroups := range lo.Chunk(stagingFileGroups, publishBatchSize) {
483-
for _, group := range fileGroups {
490+
for i, fileGroups := range lo.Chunk(stagingFileGroups, publishBatchSize) {
491+
for j, group := range fileGroups {
492+
lf.Logger.Infof("chunk %d, group %d, size %d", i, j, len(group))
484493
baseReq := lf.prepareBaseJobRequest(job, uniqueLoadGenID, group[0], destinationRevisionIDMap)
485494

486495
stagingFileInfos := make([]StagingFileInfo, len(group))
@@ -642,7 +651,7 @@ func (lf *LoadFileGenerator) groupBySize(files []*model.StagingFile, maxSizeMB i
642651
sizes: make(map[string]int64),
643652
})
644653

645-
lf.Logger.Infof("maxTable: %s, maxTableSize: %d", maxTable.name, maxTable.size)
654+
lf.Logger.Infon("[groupBySize]", logger.NewStringField("maxTableName", maxTable.name), logger.NewIntField("maxTableSizeInBytes", maxTable.size))
646655

647656
// Sorting ensures that minimum batches are created
648657
slices.SortFunc(files, func(a, b *model.StagingFile) int {

warehouse/internal/loadfiles/loadfiles_test.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -935,7 +935,6 @@ func TestV2CreateLoadFiles_Failure(t *testing.T) {
935935
ctx := context.Background()
936936

937937
t.Run("worker partial failure", func(t *testing.T) {
938-
t.Skip("enable the test once partial failure is implemented/handled as part of processing upload_v2 job")
939938
notifier := &mockNotifier{
940939
t: t,
941940
tables: tables,
@@ -965,6 +964,12 @@ func TestV2CreateLoadFiles_Failure(t *testing.T) {
965964
}
966965
}
967966

967+
timeWindow := time.Now().Add(time.Hour)
968+
// Setting time window so that these 2 files are grouped together in a single upload_v2 job
969+
stagingFiles[0].TimeWindow = timeWindow
970+
stagingFiles[1].TimeWindow = timeWindow
971+
972+
// Batch 1 should fail, batch 2 should succeed
968973
stagingFiles[0].Location = "abort"
969974

970975
startID, endID, err := lf.ForceCreateLoadFiles(ctx, &model.UploadJob{
@@ -973,15 +978,11 @@ func TestV2CreateLoadFiles_Failure(t *testing.T) {
973978
StagingFiles: stagingFiles,
974979
})
975980
require.NoError(t, err)
976-
require.Equal(t, int64(1), startID)
977-
978-
require.Len(t,
979-
loadRepo.store,
980-
len(tables)*(len(stagingFiles)-1),
981-
)
982981

982+
require.Len(t, loadRepo.store, len(tables))
983983
require.Equal(t, loadRepo.store[0].ID, startID)
984984
require.Equal(t, loadRepo.store[len(loadRepo.store)-1].ID, endID)
985+
require.Equal(t, loadRepo.store[0].TotalRows, 8)
985986
})
986987

987988
t.Run("worker failures for all", func(t *testing.T) {

warehouse/internal/loadfiles/mock_notifier_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,7 @@ func (n *mockNotifier) publishV2(payload *notifier.PublishRequest) (<-chan *noti
9696
loadFileUploads = append(loadFileUploads, loadfiles.LoadFileUpload{
9797
TableName: tableName,
9898
Location: req.UniqueLoadGenID + "/" + tableName,
99-
TotalRows: 10,
100-
ContentLength: 1000,
99+
TotalRows: len(req.StagingFiles),
101100
DestinationRevisionID: destinationRevisionID,
102101
UseRudderStorage: req.UseRudderStorage,
103102
})

warehouse/router/upload.go

+62-26
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ type UploadJob struct {
103103
columnsBatchSize int
104104
longRunningUploadStatThresholdInMin time.Duration
105105
skipPreviouslyFailedTables bool
106+
queryLoadFilesWithUploadID config.ValueLoader[bool]
106107
}
107108

108109
errorHandler ErrorHandler
@@ -196,6 +197,7 @@ func (f *UploadJobFactory) NewUploadJob(ctx context.Context, dto *model.UploadJo
196197
uj.config.maxUploadBackoff = f.conf.GetDurationVar(1800, time.Second, "Warehouse.maxUploadBackoff", "Warehouse.maxUploadBackoffInS")
197198
uj.config.retryTimeWindow = f.conf.GetDurationVar(180, time.Minute, "Warehouse.retryTimeWindow", "Warehouse.retryTimeWindowInMins")
198199
uj.config.skipPreviouslyFailedTables = f.conf.GetBool("Warehouse.skipPreviouslyFailedTables", false)
200+
uj.config.queryLoadFilesWithUploadID = f.conf.GetReloadableBoolVar(false, "Warehouse.loadFiles.queryWithUploadID.enable")
199201

200202
uj.stats.uploadTime = uj.timerStat("upload_time")
201203
uj.stats.userTablesLoadTime = uj.timerStat("user_tables_load_time")
@@ -827,8 +829,66 @@ func (job *UploadJob) GetLoadFilesMetadata(ctx context.Context, options whutils.
827829
if options.Limit != 0 {
828830
limitSQL = fmt.Sprintf(`LIMIT %d`, options.Limit)
829831
}
832+
sqlStatement := job.getLoadFilesMetadataQuery(tableFilterSQL, limitSQL)
830833

831-
sqlStatement := fmt.Sprintf(`
834+
job.logger.Debugf(`Fetching loadFileLocations: %v`, sqlStatement)
835+
rows, err := job.db.QueryContext(ctx, sqlStatement)
836+
if err != nil {
837+
return nil, fmt.Errorf("query: %s\nfailed with Error : %w", sqlStatement, err)
838+
}
839+
defer func() { _ = rows.Close() }()
840+
841+
for rows.Next() {
842+
var location string
843+
var metadata json.RawMessage
844+
err := rows.Scan(&location, &metadata)
845+
if err != nil {
846+
return nil, fmt.Errorf("failed to scan result from query: %s\nwith Error : %w", sqlStatement, err)
847+
}
848+
loadFiles = append(loadFiles, whutils.LoadFile{
849+
Location: location,
850+
Metadata: metadata,
851+
})
852+
}
853+
if err = rows.Err(); err != nil {
854+
return nil, fmt.Errorf("iterate query results: %s\nwith Error : %w", sqlStatement, err)
855+
}
856+
return
857+
}
858+
859+
func (job *UploadJob) getLoadFilesMetadataQuery(tableFilterSQL, limitSQL string) string {
860+
if job.config.queryLoadFilesWithUploadID.Load() {
861+
return fmt.Sprintf(`
862+
WITH row_numbered_load_files as (
863+
SELECT
864+
location,
865+
metadata,
866+
row_number() OVER (
867+
PARTITION BY upload_id,
868+
table_name
869+
ORDER BY
870+
id DESC
871+
) AS row_number
872+
FROM
873+
%[1]s
874+
WHERE upload_id = %[2]d %[3]s
875+
)
876+
SELECT
877+
location,
878+
metadata
879+
FROM
880+
row_numbered_load_files
881+
WHERE
882+
row_number = 1
883+
%[4]s;
884+
`,
885+
whutils.WarehouseLoadFilesTable,
886+
job.upload.ID,
887+
tableFilterSQL,
888+
limitSQL,
889+
)
890+
}
891+
return fmt.Sprintf(`
832892
WITH row_numbered_load_files as (
833893
SELECT
834894
location,
@@ -852,36 +912,12 @@ func (job *UploadJob) GetLoadFilesMetadata(ctx context.Context, options whutils.
852912
WHERE
853913
row_number = 1
854914
%[4]s;
855-
`,
915+
`,
856916
whutils.WarehouseLoadFilesTable,
857917
misc.IntArrayToString(job.stagingFileIDs, ","),
858918
tableFilterSQL,
859919
limitSQL,
860920
)
861-
862-
job.logger.Debugf(`Fetching loadFileLocations: %v`, sqlStatement)
863-
rows, err := job.db.QueryContext(ctx, sqlStatement)
864-
if err != nil {
865-
return nil, fmt.Errorf("query: %s\nfailed with Error : %w", sqlStatement, err)
866-
}
867-
defer func() { _ = rows.Close() }()
868-
869-
for rows.Next() {
870-
var location string
871-
var metadata json.RawMessage
872-
err := rows.Scan(&location, &metadata)
873-
if err != nil {
874-
return nil, fmt.Errorf("failed to scan result from query: %s\nwith Error : %w", sqlStatement, err)
875-
}
876-
loadFiles = append(loadFiles, whutils.LoadFile{
877-
Location: location,
878-
Metadata: metadata,
879-
})
880-
}
881-
if err = rows.Err(); err != nil {
882-
return nil, fmt.Errorf("iterate query results: %s\nwith Error : %w", sqlStatement, err)
883-
}
884-
return
885921
}
886922

887923
func (job *UploadJob) GetSampleLoadFileLocation(ctx context.Context, tableName string) (location string, err error) {

warehouse/slave/slave_test.go

+20-18
Original file line numberDiff line numberDiff line change
@@ -112,24 +112,26 @@ func TestSlave(t *testing.T) {
112112
}()
113113

114114
p := payload{
115-
UploadID: 1,
116-
StagingFileID: 1,
117-
StagingFileLocation: jobLocation,
118-
UploadSchema: schemaMap,
119-
WorkspaceID: "test_workspace_id",
120-
SourceID: "test_source_id",
121-
SourceName: "test_source_name",
122-
DestinationID: "test_destination_id",
123-
DestinationName: "test_destination_name",
124-
DestinationType: "test_destination_type",
125-
DestinationNamespace: "test_destination_namespace",
126-
DestinationRevisionID: uuid.New().String(),
127-
StagingDestinationRevisionID: uuid.New().String(),
128-
DestinationConfig: destConf,
129-
StagingDestinationConfig: map[string]interface{}{},
130-
UniqueLoadGenID: uuid.New().String(),
131-
RudderStoragePrefix: misc.GetRudderObjectStoragePrefix(),
132-
LoadFileType: "csv",
115+
basePayload: basePayload{
116+
UploadID: 1,
117+
UploadSchema: schemaMap,
118+
WorkspaceID: "test_workspace_id",
119+
SourceID: "test_source_id",
120+
SourceName: "test_source_name",
121+
DestinationID: "test_destination_id",
122+
DestinationName: "test_destination_name",
123+
DestinationType: "test_destination_type",
124+
DestinationNamespace: "test_destination_namespace",
125+
DestinationRevisionID: uuid.New().String(),
126+
StagingDestinationRevisionID: uuid.New().String(),
127+
DestinationConfig: destConf,
128+
StagingDestinationConfig: map[string]interface{}{},
129+
UniqueLoadGenID: uuid.New().String(),
130+
RudderStoragePrefix: misc.GetRudderObjectStoragePrefix(),
131+
LoadFileType: "csv",
132+
},
133+
StagingFileID: 1,
134+
StagingFileLocation: jobLocation,
133135
}
134136

135137
payloadJson, err := jsonrs.Marshal(p)
2.34 KB
Binary file not shown.
2.34 KB
Binary file not shown.

0 commit comments

Comments
 (0)