Skip to content

Commit b0bc42c

Browse files
feat: add support for processing of upload_v2 job type by slave
1 parent 7dc3a45 commit b0bc42c

File tree

17 files changed

+1330
-477
lines changed

17 files changed

+1330
-477
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ require (
248248
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
249249
github.com/jmespath/go-jmespath v0.4.0 // indirect
250250
github.com/klauspost/asmfmt v1.3.2 // indirect
251-
github.com/klauspost/compress v1.18.0 // indirect
251+
github.com/klauspost/compress v1.18.0
252252
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
253253
github.com/kr/fs v0.1.0 // indirect
254254
github.com/leodido/go-urn v1.4.0 // indirect

integration_test/warehouse/warehouse_test.go

+61-47
Original file line numberDiff line numberDiff line change
@@ -83,57 +83,71 @@ func TestMain(m *testing.M) {
8383

8484
func TestUploads(t *testing.T) {
8585
t.Run("tracks loading", func(t *testing.T) {
86-
db, minioResource, whClient := setupServer(t, false, nil, nil)
87-
88-
var (
89-
ctx = context.Background()
90-
events = 100
91-
jobs = 1
92-
)
86+
testCases := []struct {
87+
batchStagingFiles bool
88+
}{
89+
{batchStagingFiles: false},
90+
{batchStagingFiles: true},
91+
}
92+
for _, tc := range testCases {
93+
if tc.batchStagingFiles {
94+
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.enableV2NotifierJob"), "true")
95+
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.loadFiles.queryWithUploadID.enable"), "true")
96+
}
97+
db, minioResource, whClient := setupServer(t, false, nil, nil)
9398

94-
eventsPayload := strings.Join(lo.RepeatBy(events, func(int) string {
95-
return fmt.Sprintf(`{"data":{"id":%q,"user_id":%q,"received_at":"2023-05-12T04:36:50.199Z"},"metadata":{"columns":{"id":"string","user_id":"string","received_at":"datetime"}, "table": "tracks"}}`,
96-
uuid.New().String(),
97-
uuid.New().String(),
99+
var (
100+
ctx = context.Background()
101+
events = 100
102+
jobs = 1
98103
)
99-
}), "\n")
104+
eventsPayload := strings.Join(lo.RepeatBy(events, func(int) string {
105+
return fmt.Sprintf(`{"data":{"id":%q,"user_id":%q,"received_at":"2023-05-12T04:36:50.199Z"},"metadata":{"columns":{"id":"string","user_id":"string","received_at":"datetime"}, "table": "tracks"}}`,
106+
uuid.New().String(),
107+
uuid.New().String(),
108+
)
109+
}), "\n")
100110

101-
require.NoError(t, whClient.Process(ctx, whclient.StagingFile{
102-
WorkspaceID: workspaceID,
103-
SourceID: sourceID,
104-
DestinationID: destinationID,
105-
Location: prepareStagingFile(t, ctx, minioResource, eventsPayload).ObjectName,
106-
TotalEvents: events,
107-
FirstEventAt: time.Now().Format(misc.RFC3339Milli),
108-
LastEventAt: time.Now().Add(time.Minute * 30).Format(misc.RFC3339Milli),
109-
UseRudderStorage: false,
110-
DestinationRevisionID: destinationID,
111-
Schema: map[string]map[string]string{
112-
"tracks": {
113-
"id": "string",
114-
"user_id": "string",
115-
"received_at": "datetime",
111+
require.NoError(t, whClient.Process(ctx, whclient.StagingFile{
112+
WorkspaceID: workspaceID,
113+
SourceID: sourceID,
114+
DestinationID: destinationID,
115+
Location: prepareStagingFile(t, ctx, minioResource, eventsPayload).ObjectName,
116+
TotalEvents: events,
117+
FirstEventAt: time.Now().Format(misc.RFC3339Milli),
118+
LastEventAt: time.Now().Add(time.Minute * 30).Format(misc.RFC3339Milli),
119+
UseRudderStorage: false,
120+
BytesPerTable: map[string]int64{
121+
"tracks": int64(len(eventsPayload)),
116122
},
117-
},
118-
}))
119-
requireStagingFileEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{
120-
{A: "source_id", B: sourceID},
121-
{A: "destination_id", B: destinationID},
122-
{A: "status", B: succeeded},
123-
}...)
124-
requireTableUploadEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{
125-
{A: "status", B: exportedData},
126-
{A: "wh_uploads.source_id", B: sourceID},
127-
{A: "wh_uploads.destination_id", B: destinationID},
128-
{A: "wh_uploads.namespace", B: namespace},
129-
}...)
130-
requireUploadJobsCount(t, ctx, db, jobs, []lo.Tuple2[string, any]{
131-
{A: "source_id", B: sourceID},
132-
{A: "destination_id", B: destinationID},
133-
{A: "namespace", B: namespace},
134-
{A: "status", B: exportedData},
135-
}...)
136-
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events)
123+
DestinationRevisionID: destinationID,
124+
Schema: map[string]map[string]string{
125+
"tracks": {
126+
"id": "string",
127+
"user_id": "string",
128+
"received_at": "datetime",
129+
},
130+
},
131+
}))
132+
requireStagingFileEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{
133+
{A: "source_id", B: sourceID},
134+
{A: "destination_id", B: destinationID},
135+
{A: "status", B: succeeded},
136+
}...)
137+
requireTableUploadEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{
138+
{A: "status", B: exportedData},
139+
{A: "wh_uploads.source_id", B: sourceID},
140+
{A: "wh_uploads.destination_id", B: destinationID},
141+
{A: "wh_uploads.namespace", B: namespace},
142+
}...)
143+
requireUploadJobsCount(t, ctx, db, jobs, []lo.Tuple2[string, any]{
144+
{A: "source_id", B: sourceID},
145+
{A: "destination_id", B: destinationID},
146+
{A: "namespace", B: namespace},
147+
{A: "status", B: exportedData},
148+
}...)
149+
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events)
150+
}
137151
})
138152
t.Run("user and identifies loading", func(t *testing.T) {
139153
db, minioResource, whClient := setupServer(t, false, nil, nil)

services/notifier/repo.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ func scanJob(scan scanFn, job *Job) error {
275275
}
276276
if jobTypeRaw.Valid {
277277
switch jobTypeRaw.String {
278-
case string(JobTypeUpload), string(JobTypeAsync):
278+
case string(JobTypeUpload), string(JobTypeAsync), string(JobTypeUploadV2):
279279
job.Type = JobType(jobTypeRaw.String)
280280
default:
281281
return fmt.Errorf("scanning: unknown job type: %s", jobTypeRaw.String)

warehouse/integrations/datalake/datalake_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func TestIntegration(t *testing.T) {
8787
destType string
8888
conf map[string]interface{}
8989
schemaTTLInMinutes int
90+
batchStagingFiles bool
9091
prerequisite func(t testing.TB, ctx context.Context)
9192
configOverride map[string]any
9293
verifySchema func(*testing.T, filemanager.FileManager, string)
@@ -109,6 +110,7 @@ func TestIntegration(t *testing.T) {
109110
"syncFrequency": "30",
110111
},
111112
schemaTTLInMinutes: 0,
113+
batchStagingFiles: true,
112114
prerequisite: func(t testing.TB, ctx context.Context) {
113115
t.Helper()
114116
createMinioBucket(t, ctx, s3EndPoint, s3AccessKeyID, s3AccessKey, s3BucketName, s3Region)
@@ -360,6 +362,10 @@ func TestIntegration(t *testing.T) {
360362
t.Setenv("STORAGE_EMULATOR_HOST", fmt.Sprintf("localhost:%d", c.Port("gcs", 4443)))
361363
t.Setenv("RSERVER_WORKLOAD_IDENTITY_TYPE", "GKE")
362364
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.schemaTTLInMinutes"), strconv.Itoa(tc.schemaTTLInMinutes))
365+
if tc.batchStagingFiles {
366+
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.enableV2NotifierJob"), "true")
367+
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.loadFiles.queryWithUploadID.enable"), "true")
368+
}
363369

364370
whth.BootstrapSvc(t, workspaceConfig, httpPort, jobsDBPort)
365371

warehouse/integrations/testhelper/staging.go

+9
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,14 @@ func prepareStagingPayload(t testing.TB, testConfig *TestConfig, stagingFile str
261261
stagingFileInfo, err := os.Stat(stagingFile)
262262
require.NoError(t, err)
263263

264+
bytesPerTable := make(map[string]int64)
265+
for _, event := range stagingEvents {
266+
tableName := event.Metadata.Table
267+
eventJSON, err := jsonrs.Marshal(event.Data)
268+
require.NoError(t, err)
269+
bytesPerTable[tableName] += int64(len(eventJSON))
270+
}
271+
264272
payload := warehouseclient.StagingFile{
265273
WorkspaceID: testConfig.WorkspaceID,
266274
Schema: schemaMap,
@@ -275,6 +283,7 @@ func prepareStagingPayload(t testing.TB, testConfig *TestConfig, stagingFile str
275283
SourceTaskRunID: testConfig.TaskRunID,
276284
SourceJobRunID: testConfig.JobRunID,
277285
TimeWindow: warehouseutils.GetTimeWindow(receivedAt),
286+
BytesPerTable: bytesPerTable,
278287
}
279288
return payload
280289
}

warehouse/internal/loadfiles/loadfiles.go

+14-5
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ func (lf *LoadFileGenerator) createFromStaging(ctx context.Context, job *model.U
211211
}()
212212

213213
if !lf.AllowUploadV2JobCreation(job) {
214+
lf.Logger.Infof("V2 job creation disabled. Processing %d staging files", len(toProcessStagingFiles))
214215
err = lf.createUploadJobs(ctx, job, toProcessStagingFiles, publishBatchSize, uniqueLoadGenID)
215216
if err != nil {
216217
return 0, 0, fmt.Errorf("creating upload jobs: %w", err)
@@ -234,11 +235,13 @@ func (lf *LoadFileGenerator) createFromStaging(ctx context.Context, job *model.U
234235
g, gCtx := errgroup.WithContext(ctx)
235236
if len(v1Files) > 0 {
236237
g.Go(func() error {
238+
lf.Logger.Infof("V2 job creation enabled. Processing %d v1 staging files", len(v1Files))
237239
return lf.createUploadJobs(gCtx, job, v1Files, publishBatchSize, uniqueLoadGenID)
238240
})
239241
}
240242
if len(v2Files) > 0 {
241243
g.Go(func() error {
244+
lf.Logger.Infof("V2 job creation enabled. Processing %d v2 staging files", len(v2Files))
242245
return lf.createUploadV2Jobs(gCtx, job, v2Files, publishBatchSize, uniqueLoadGenID)
243246
})
244247
}
@@ -329,7 +332,7 @@ func (lf *LoadFileGenerator) publishToNotifier(
329332

330333
destID := job.Upload.DestinationID
331334
destType := job.Upload.DestinationType
332-
lf.Logger.Infof("[WH]: Publishing %d staging files for %s:%s to notifier", len(messages), destType, destID)
335+
lf.Logger.Infof("[WH]: Publishing %d jobs for %s:%s to notifier", len(messages), obskit.DestinationType(destType), obskit.DestinationID(destID))
333336

334337
ch, err := lf.Notifier.Publish(ctx, &notifier.PublishRequest{
335338
Payloads: messages,
@@ -401,7 +404,6 @@ func (lf *LoadFileGenerator) processNotifierResponse(ctx context.Context, ch <-c
401404
return nil
402405
}
403406

404-
// Unlike upload type job, for v2 we are not setting the status of staging files
405407
func (lf *LoadFileGenerator) processNotifierResponseV2(ctx context.Context, ch <-chan *notifier.PublishResponse, job *model.UploadJob, chunk []*model.StagingFile) error {
406408
responses, ok := <-ch
407409
if !ok {
@@ -437,6 +439,12 @@ func (lf *LoadFileGenerator) processNotifierResponseV2(ctx context.Context, ch <
437439
if err := lf.LoadRepo.Insert(ctx, loadFiles); err != nil {
438440
return fmt.Errorf("inserting load files: %w", err)
439441
}
442+
stagingFileIds := lo.Map(chunk, func(file *model.StagingFile, _ int) int64 {
443+
return file.ID
444+
})
445+
if err := lf.StageRepo.SetStatuses(ctx, stagingFileIds, warehouseutils.StagingFileSucceededState); err != nil {
446+
return fmt.Errorf("setting staging file status to succeeded: %w", err)
447+
}
440448
return nil
441449
}
442450

@@ -479,8 +487,9 @@ func (lf *LoadFileGenerator) createUploadV2Jobs(ctx context.Context, job *model.
479487
}
480488
g, gCtx := errgroup.WithContext(ctx)
481489
stagingFileGroups := lf.GroupStagingFiles(stagingFiles, lf.Conf.GetInt("Warehouse.loadFiles.maxSizeInMB", 128))
482-
for _, fileGroups := range lo.Chunk(stagingFileGroups, publishBatchSize) {
483-
for _, group := range fileGroups {
490+
for i, fileGroups := range lo.Chunk(stagingFileGroups, publishBatchSize) {
491+
for j, group := range fileGroups {
492+
lf.Logger.Infof("chunk %d, group %d, size %d", i, j, len(group))
484493
baseReq := lf.prepareBaseJobRequest(job, uniqueLoadGenID, group[0], destinationRevisionIDMap)
485494

486495
stagingFileInfos := make([]StagingFileInfo, len(group))
@@ -642,7 +651,7 @@ func (lf *LoadFileGenerator) groupBySize(files []*model.StagingFile, maxSizeMB i
642651
sizes: make(map[string]int64),
643652
})
644653

645-
lf.Logger.Infof("maxTable: %s, maxTableSize: %d", maxTable.name, maxTable.size)
654+
lf.Logger.Infon("[groupBySize]", logger.NewStringField("maxTableName", maxTable.name), logger.NewIntField("maxTableSizeInBytes", maxTable.size))
646655

647656
// Sorting ensures that minimum batches are created
648657
slices.SortFunc(files, func(a, b *model.StagingFile) int {

warehouse/internal/loadfiles/loadfiles_test.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -935,7 +935,6 @@ func TestV2CreateLoadFiles_Failure(t *testing.T) {
935935
ctx := context.Background()
936936

937937
t.Run("worker partial failure", func(t *testing.T) {
938-
t.Skip("enable the test once partial failure is implemented/handled as part of processing upload_v2 job")
939938
notifier := &mockNotifier{
940939
t: t,
941940
tables: tables,
@@ -965,6 +964,12 @@ func TestV2CreateLoadFiles_Failure(t *testing.T) {
965964
}
966965
}
967966

967+
timeWindow := time.Now().Add(time.Hour)
968+
// Setting time window so that these 2 files are grouped together in a single upload_v2 job
969+
stagingFiles[0].TimeWindow = timeWindow
970+
stagingFiles[1].TimeWindow = timeWindow
971+
972+
// Batch 1 should fail, batch 2 should succeed
968973
stagingFiles[0].Location = "abort"
969974

970975
startID, endID, err := lf.ForceCreateLoadFiles(ctx, &model.UploadJob{
@@ -973,15 +978,11 @@ func TestV2CreateLoadFiles_Failure(t *testing.T) {
973978
StagingFiles: stagingFiles,
974979
})
975980
require.NoError(t, err)
976-
require.Equal(t, int64(1), startID)
977-
978-
require.Len(t,
979-
loadRepo.store,
980-
len(tables)*(len(stagingFiles)-1),
981-
)
982981

982+
require.Len(t, loadRepo.store, len(tables))
983983
require.Equal(t, loadRepo.store[0].ID, startID)
984984
require.Equal(t, loadRepo.store[len(loadRepo.store)-1].ID, endID)
985+
require.Equal(t, loadRepo.store[0].TotalRows, 8)
985986
})
986987

987988
t.Run("worker failures for all", func(t *testing.T) {

warehouse/internal/loadfiles/mock_notifier_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,7 @@ func (n *mockNotifier) publishV2(payload *notifier.PublishRequest) (<-chan *noti
9696
loadFileUploads = append(loadFileUploads, loadfiles.LoadFileUpload{
9797
TableName: tableName,
9898
Location: req.UniqueLoadGenID + "/" + tableName,
99-
TotalRows: 10,
100-
ContentLength: 1000,
99+
TotalRows: len(req.StagingFiles),
101100
DestinationRevisionID: destinationRevisionID,
102101
UseRudderStorage: req.UseRudderStorage,
103102
})

0 commit comments

Comments
 (0)