Skip to content

Commit 376e332

Browse files
committed
fix: warehouse skip processing destinations
1 parent b5af4c2 commit 376e332

File tree

4 files changed

+75
-7
lines changed

4 files changed

+75
-7
lines changed

warehouse/internal/repo/upload.go

+16-5
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ type scanFn func(dest ...any) error
7878
type ProcessOptions struct {
7979
SkipIdentifiers []string
8080
SkipWorkspaces []string
81+
SkipDestinations []string
8182
AllowMultipleSourcesForJobsPickup bool
8283
}
8384

@@ -283,12 +284,12 @@ func (u *Uploads) GetToProcess(ctx context.Context, destType string, limit int,
283284
partitionIdentifierSQL := `destination_id, namespace`
284285

285286
if len(opts.SkipIdentifiers) > 0 {
286-
skipIdentifiersSQL = `AND ((destination_id || '_' || namespace)) != ALL($5)`
287+
skipIdentifiersSQL = `AND ((destination_id || '_' || namespace)) != ALL($6)`
287288
}
288289

289290
if opts.AllowMultipleSourcesForJobsPickup {
290291
if len(opts.SkipIdentifiers) > 0 {
291-
skipIdentifiersSQL = `AND ((source_id || '_' || destination_id || '_' || namespace)) != ALL($5)`
292+
skipIdentifiersSQL = `AND ((source_id || '_' || destination_id || '_' || namespace)) != ALL($6)`
292293
}
293294
partitionIdentifierSQL = fmt.Sprintf(`%s, %s`, "source_id", partitionIdentifierSQL)
294295
}
@@ -308,7 +309,8 @@ func (u *Uploads) GetToProcess(ctx context.Context, destType string, limit int,
308309
t.status != $2 AND
309310
t.status != $3 %s AND
310311
COALESCE(metadata->>'nextRetryTime', NOW()::text)::timestamptz <= NOW() AND
311-
workspace_id <> ALL ($4)
312+
workspace_id <> ALL ($4) AND
313+
destination_id <> ALL ($5)
312314
) grouped_uploads
313315
WHERE
314316
grouped_uploads.row_number = 1
@@ -330,12 +332,16 @@ func (u *Uploads) GetToProcess(ctx context.Context, destType string, limit int,
330332
if opts.SkipWorkspaces == nil {
331333
opts.SkipWorkspaces = []string{}
332334
}
335+
if opts.SkipDestinations == nil {
336+
opts.SkipDestinations = []string{}
337+
}
333338

334339
args := []any{
335340
destType,
336341
model.ExportedData,
337342
model.Aborted,
338343
pq.Array(opts.SkipWorkspaces),
344+
pq.Array(opts.SkipDestinations),
339345
}
340346

341347
if len(opts.SkipIdentifiers) > 0 {
@@ -388,22 +394,27 @@ func (u *Uploads) UploadJobsStats(ctx context.Context, destType string, opts Pro
388394
status != $3 AND
389395
status != $4 AND
390396
COALESCE((metadata->>'nextRetryTime')::TIMESTAMPTZ, $1::TIMESTAMPTZ) <= $1::TIMESTAMPTZ AND
391-
workspace_id <> ALL ($5)`
397+
workspace_id <> ALL ($5) AND
398+
destination_id <> ALL ($6)`
392399

393400
if opts.SkipWorkspaces == nil {
394401
opts.SkipWorkspaces = []string{}
395402
}
403+
if opts.SkipDestinations == nil {
404+
opts.SkipDestinations = []string{}
405+
}
396406

397407
args := []any{
398408
u.now(),
399409
destType,
400410
model.ExportedData,
401411
model.Aborted,
402412
pq.Array(opts.SkipWorkspaces),
413+
pq.Array(opts.SkipDestinations),
403414
}
404415

405416
if len(opts.SkipIdentifiers) > 0 {
406-
query += `AND ((destination_id || '_' || namespace)) != ALL($6)`
417+
query += `AND ((destination_id || '_' || namespace)) != ALL($7)`
407418
args = append(args, pq.Array(opts.SkipIdentifiers))
408419
}
409420

warehouse/internal/repo/upload_test.go

+50
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,40 @@ func TestUploads_GetToProcess(t *testing.T) {
449449
require.NoError(t, err)
450450
require.Len(t, toProcess, 0)
451451
})
452+
t.Run("skip destinations", func(t *testing.T) {
453+
t.Parallel()
454+
455+
var (
456+
db = setupDB(t)
457+
repoUpload = repo.NewUploads(db)
458+
priority = 100
459+
460+
uploads []model.Upload
461+
)
462+
463+
uploads = append(uploads,
464+
prepareUpload(db, sourceID, model.Waiting, priority,
465+
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
466+
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
467+
),
468+
prepareUpload(db, sourceID, model.ExportingDataFailed, priority,
469+
time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
470+
time.Date(2021, 1, 1, 1, 0, 0, 0, time.UTC),
471+
),
472+
)
473+
require.Len(t, uploads, 2)
474+
475+
toProcess, err := repoUpload.GetToProcess(ctx, destType, 10, repo.ProcessOptions{})
476+
require.NoError(t, err)
477+
require.Len(t, toProcess, 1)
478+
require.Equal(t, uploads[0].ID, toProcess[0].ID)
479+
480+
toProcess, err = repoUpload.GetToProcess(ctx, destType, 10, repo.ProcessOptions{
481+
SkipDestinations: []string{destID},
482+
})
483+
require.NoError(t, err)
484+
require.Len(t, toProcess, 0)
485+
})
452486

453487
t.Run("ordering by priority", func(t *testing.T) {
454488
t.Parallel()
@@ -726,6 +760,22 @@ func TestUploads_Processing(t *testing.T) {
726760
require.Equal(t, []model.Upload{uploads[4]}, s)
727761
})
728762

763+
t.Run("skip destinations", func(t *testing.T) {
764+
t.Parallel()
765+
766+
s, err := repoUpload.GetToProcess(ctx, destType, 10, repo.ProcessOptions{
767+
SkipDestinations: []string{"destination_id", "destination_id_4"},
768+
})
769+
require.NoError(t, err)
770+
require.Empty(t, s)
771+
772+
s, err = repoUpload.GetToProcess(ctx, destType, 10, repo.ProcessOptions{
773+
SkipDestinations: []string{"destination_id"},
774+
})
775+
require.NoError(t, err)
776+
require.Equal(t, []model.Upload{uploads[4]}, s)
777+
})
778+
729779
t.Run("multiple sources", func(t *testing.T) {
730780
t.Parallel()
731781

warehouse/router/router.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ type Router struct {
106106
warehouseSyncFreqIgnore config.ValueLoader[bool]
107107
cronTrackerRetries config.ValueLoader[int64]
108108
uploadBufferTimeInMin config.ValueLoader[time.Duration]
109+
skipDestinationIDs config.ValueLoader[[]string]
109110
}
110111

111112
stats struct {
@@ -410,6 +411,7 @@ func (r *Router) uploadsToProcess(ctx context.Context, availableWorkers int, ski
410411
uploads, err := r.uploadRepo.GetToProcess(ctx, r.destType, availableWorkers, repo.ProcessOptions{
411412
SkipIdentifiers: skipIdentifiers,
412413
SkipWorkspaces: r.tenantManager.DegradedWorkspaces(),
414+
SkipDestinations: r.config.skipDestinationIDs.Load(),
413415
AllowMultipleSourcesForJobsPickup: r.config.allowMultipleSourcesForJobsPickup,
414416
})
415417
if err != nil {
@@ -468,8 +470,9 @@ func (r *Router) uploadsToProcess(ctx context.Context, availableWorkers int, ski
468470
}
469471

470472
jobsStats, err := r.uploadRepo.UploadJobsStats(ctx, r.destType, repo.ProcessOptions{
471-
SkipIdentifiers: skipIdentifiers,
472-
SkipWorkspaces: r.tenantManager.DegradedWorkspaces(),
473+
SkipIdentifiers: skipIdentifiers,
474+
SkipWorkspaces: r.tenantManager.DegradedWorkspaces(),
475+
SkipDestinations: r.config.skipDestinationIDs.Load(),
473476
})
474477
if err != nil {
475478
return nil, fmt.Errorf("processing stats: %w", err)
@@ -713,6 +716,7 @@ func (r *Router) loadReloadableConfig(whName string) {
713716
r.config.warehouseSyncFreqIgnore = r.conf.GetReloadableBoolVar(false, "Warehouse.warehouseSyncFreqIgnore")
714717
r.config.cronTrackerRetries = r.conf.GetReloadableInt64Var(5, 1, "Warehouse.cronTrackerRetries")
715718
r.config.uploadBufferTimeInMin = r.conf.GetReloadableDurationVar(180, time.Minute, "Warehouse.uploadBufferTimeInMin")
719+
r.config.skipDestinationIDs = r.conf.GetReloadableStringSliceVar(nil, fmt.Sprintf(`Warehouse.%v.skipDestinationIDs`, whName))
716720
}
717721

718722
func (r *Router) loadStats() {

warehouse/router/router_test.go

+3
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,7 @@ func TestRouter(t *testing.T) {
583583
r.statsFactory = stats.NOP
584584
r.conf = config.New()
585585
r.config.allowMultipleSourcesForJobsPickup = true
586+
r.config.skipDestinationIDs = config.SingleValueLoader([]string{})
586587
r.config.stagingFilesBatchSize = config.SingleValueLoader(100)
587588
r.config.warehouseSyncFreqIgnore = config.SingleValueLoader(true)
588589
r.destType = destinationType
@@ -718,6 +719,7 @@ func TestRouter(t *testing.T) {
718719
r.statsFactory = stats.NOP
719720
r.conf = config.New()
720721
r.config.allowMultipleSourcesForJobsPickup = true
722+
r.config.skipDestinationIDs = config.SingleValueLoader([]string{})
721723
r.config.stagingFilesBatchSize = config.SingleValueLoader(100)
722724
r.config.warehouseSyncFreqIgnore = config.SingleValueLoader(true)
723725
r.config.noOfWorkers = config.SingleValueLoader(10)
@@ -869,6 +871,7 @@ func TestRouter(t *testing.T) {
869871
r.statsFactory = stats.NOP
870872
r.conf = config.New()
871873
r.config.allowMultipleSourcesForJobsPickup = true
874+
r.config.skipDestinationIDs = config.SingleValueLoader([]string{})
872875
r.config.stagingFilesBatchSize = config.SingleValueLoader(100)
873876
r.config.warehouseSyncFreqIgnore = config.SingleValueLoader(true)
874877
r.config.noOfWorkers = config.SingleValueLoader(0)

0 commit comments

Comments
 (0)