Skip to content

Commit 41108bf

Browse files
authored
Merge branch 'master' into chore.cleanup_router_oauthV2
2 parents 5343c36 + 5d2fae9 commit 41108bf

File tree

6 files changed

+53
-56
lines changed

6 files changed

+53
-56
lines changed

jobsdb/integration_test.go

+34-18
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,7 @@ func TestJobsDB(t *testing.T) {
584584
}
585585
prefix := strings.ToLower(rand.String(5))
586586
c.Set("JobsDB.jobDoneMigrateThreshold", 0.7)
587-
c.Set("JobsDB.jobMinRowsMigrateThreshold", 0.6)
587+
c.Set("JobsDB.jobMinRowsLeftMigrateThreshold", 0.41)
588588
err := jobDB.Setup(ReadWrite, true, prefix)
589589
require.NoError(t, err)
590590
defer jobDB.TearDown()
@@ -622,7 +622,7 @@ func TestJobsDB(t *testing.T) {
622622
})
623623
require.NoError(t, err)
624624

625-
// process some jobs
625+
// process first 15 jobs
626626
for _, job := range jobsResult.Jobs[:15] {
627627
status := JobStatusT{
628628
JobID: job.JobID,
@@ -642,20 +642,36 @@ func TestJobsDB(t *testing.T) {
642642

643643
dsList = getDSList()
644644
require.Lenf(t, dsList, 5, "dsList length is not 5, got %+v", dsList)
645-
require.Equal(t, prefix+"_jobs_1_1", dsList[0].JobTable)
646-
require.Equal(t, prefix+"_jobs_2", dsList[1].JobTable)
647-
require.Equal(t, prefix+"_jobs_3", dsList[2].JobTable)
648-
require.Equal(t, prefix+"_jobs_4", dsList[3].JobTable)
649-
require.Equal(t, prefix+"_jobs_5", dsList[4].JobTable)
645+
require.Equal(t, prefix+"_jobs_1_1", dsList[0].JobTable) // 5 jobs
646+
require.Equal(t, prefix+"_jobs_2", dsList[1].JobTable) // 20 jobs
647+
require.Equal(t, prefix+"_jobs_3", dsList[2].JobTable) // 20 jobs
648+
require.Equal(t, prefix+"_jobs_4", dsList[3].JobTable) // 20 jobs
649+
require.Equal(t, prefix+"_jobs_5", dsList[4].JobTable) // 20 jobs
650+
651+
// process 1 more job
652+
for _, job := range jobsResult.Jobs[15:16] {
653+
status := JobStatusT{
654+
JobID: job.JobID,
655+
JobState: "succeeded",
656+
AttemptNum: 1,
657+
ExecTime: time.Now(),
658+
RetryTime: time.Now(),
659+
ErrorCode: "202",
660+
ErrorResponse: []byte(`{"success":"OK"}`),
661+
Parameters: []byte(`{}`),
662+
}
663+
err := jobDB.UpdateJobStatus(context.Background(), []*JobStatusT{&status}, []string{customVal}, []ParameterFilterT{})
664+
require.NoError(t, err)
665+
}
650666

651-
trigger() // jobs_1_1 will remain as is even though it is now a small table (5 < 10*0.6)
667+
trigger() // jobs_1_1 will remain as is even though it is now a small table 4/10 = 0.4 < 0.41
652668
dsList = getDSList()
653669
require.Lenf(t, dsList, 5, "dsList length is not 5, got %+v", dsList)
654-
require.Equal(t, prefix+"_jobs_1_1", dsList[0].JobTable)
655-
require.Equal(t, prefix+"_jobs_2", dsList[1].JobTable)
656-
require.Equal(t, prefix+"_jobs_3", dsList[2].JobTable)
657-
require.Equal(t, prefix+"_jobs_4", dsList[3].JobTable)
658-
require.Equal(t, prefix+"_jobs_5", dsList[4].JobTable)
670+
require.Equal(t, prefix+"_jobs_1_1", dsList[0].JobTable) // 4 jobs
671+
require.Equal(t, prefix+"_jobs_2", dsList[1].JobTable) // 20 jobs
672+
require.Equal(t, prefix+"_jobs_3", dsList[2].JobTable) // 20 jobs
673+
require.Equal(t, prefix+"_jobs_4", dsList[3].JobTable) // 20 jobs
674+
require.Equal(t, prefix+"_jobs_5", dsList[4].JobTable) // 20 jobs
659675

660676
// process some jobs
661677
jobsResult, err = jobDB.GetUnprocessed(context.Background(), GetQueryParams{
@@ -664,7 +680,7 @@ func TestJobsDB(t *testing.T) {
664680
ParameterFilters: []ParameterFilterT{},
665681
})
666682
require.NoError(t, err)
667-
for _, job := range jobsResult.Jobs[5:20] {
683+
for _, job := range jobsResult.Jobs[4:20] { // process 16 jobs from jobs_2
668684
status := JobStatusT{
669685
JobID: job.JobID,
670686
JobState: Succeeded.State,
@@ -682,10 +698,10 @@ func TestJobsDB(t *testing.T) {
682698
trigger() // both jobs_1_1 and jobs_2 would be migrated to jobs_2_1
683699
dsList = getDSList()
684700
require.Lenf(t, dsList, 4, "dsList length is not 4, got %+v", dsList)
685-
require.Equal(t, prefix+"_jobs_2_1", dsList[0].JobTable)
686-
require.Equal(t, prefix+"_jobs_3", dsList[1].JobTable)
687-
require.Equal(t, prefix+"_jobs_4", dsList[2].JobTable)
688-
require.Equal(t, prefix+"_jobs_5", dsList[3].JobTable)
701+
require.Equal(t, prefix+"_jobs_2_1", dsList[0].JobTable) // 8 jobs
702+
require.Equal(t, prefix+"_jobs_3", dsList[1].JobTable) // 20 jobs
703+
require.Equal(t, prefix+"_jobs_4", dsList[2].JobTable) // 20 jobs
704+
require.Equal(t, prefix+"_jobs_5", dsList[3].JobTable) // 20 jobs
689705
})
690706

691707
t.Run(`migrates only moves non-terminal jobs to a new DS`, func(t *testing.T) {

jobsdb/jobsdb.go

+4-9
Original file line numberDiff line numberDiff line change
@@ -527,8 +527,7 @@ type Handle struct {
527527
vacuumFullStatusTableThreshold config.ValueLoader[int64]
528528
vacuumAnalyzeStatusTableThreshold config.ValueLoader[int64]
529529
jobDoneMigrateThres, jobStatusMigrateThres config.ValueLoader[float64]
530-
jobMinRowsMigrateThres config.ValueLoader[float64]
531-
jobMinRowsLeftMigrateThres config.ValueLoader[float64]
530+
jobMinRowsLeftMigrateThreshold config.ValueLoader[float64]
532531
migrateDSLoopSleepDuration config.ValueLoader[time.Duration]
533532
migrateDSTimeout config.ValueLoader[time.Duration]
534533
}
@@ -933,14 +932,10 @@ func (jd *Handle) loadConfig() {
933932
jd.conf.migration.jobDoneMigrateThres = jd.config.GetReloadableFloat64Var(0.8, jd.configKeys("jobDoneMigrateThreshold")...)
934933
// jobStatusMigrateThres: A DS is migrated if the job_status exceeds this (* no_of_jobs)
935934
jd.conf.migration.jobStatusMigrateThres = jd.config.GetReloadableFloat64Var(3, jd.configKeys("jobStatusMigrateThreshold")...)
936-
// jobMinRowsMigrateThres: A DS with a low number of total rows should be eligible for migration if the number of total rows are
937-
// less than jobMinRowsMigrateThres percent of maxDSSize (e.g. if jobMinRowsMigrateThres is 0.2
938-
// then DSs that have less than 20% of maxDSSize total rows are eligible for migration)
939-
jd.conf.migration.jobMinRowsMigrateThres = jd.config.GetReloadableFloat64Var(0.2, jd.configKeys("jobMinRowsMigrateThreshold")...)
940-
// jobMinRowsMigrateThres: A DS with a low number of pending rows should be eligible for migration if the number of pending rows are
941-
// less than jobMinRowsLeftMigrateThres percent of maxDSSize (e.g. if jobMinRowsLeftMigrateThres is 0.5
935+
// jobMinRowsLeftMigrateThreshold: A DS with a low number of pending rows should be eligible for migration if the number of pending rows are
936+
// less than jobMinRowsLeftMigrateThreshold percent of maxDSSize (e.g. if jobMinRowsLeftMigrateThreshold is 0.5
942937
// then DSs that have less than 50% of maxDSSize pending rows are eligible for migration)
943-
jd.conf.migration.jobMinRowsLeftMigrateThres = jd.config.GetReloadableFloat64Var(0.4, jd.configKeys("jobMinRowsLeftMigrateThres")...)
938+
jd.conf.migration.jobMinRowsLeftMigrateThreshold = jd.config.GetReloadableFloat64Var(0.4, jd.configKeys("jobMinRowsLeftMigrateThreshold")...)
944939
// maxMigrateOnce: Maximum number of DSs that are migrated together into one destination
945940
jd.conf.migration.maxMigrateOnce = jd.config.GetReloadableIntVar(10, 1, jd.configKeys("maxMigrateOnce")...)
946941
// maxMigrateDSProbe: Maximum number of DSs that are checked from left to right if they are eligible for migration

jobsdb/migration.go

+3-11
Original file line numberDiff line numberDiff line change
@@ -657,18 +657,10 @@ func (jd *Handle) checkIfMigrateDS(ds dataSetT) (
657657
return true, false, recordsLeft, nil
658658
}
659659

660-
isSmallDS := float64(totalCount) < jd.conf.migration.jobMinRowsMigrateThres.Load()*float64(jd.conf.MaxDSSize.Load())
660+
needsPair = float64(recordsLeft) < jd.conf.migration.jobMinRowsLeftMigrateThreshold.Load()*float64(jd.conf.MaxDSSize.Load())
661661

662-
if float64(delCount)/float64(totalCount) > jd.conf.migration.jobDoneMigrateThres.Load() {
663-
return true, isSmallDS, recordsLeft, nil
664-
}
665-
666-
if isSmallDS {
667-
return true, true, recordsLeft, nil
668-
}
669-
670-
if float64(recordsLeft) < jd.conf.migration.jobMinRowsLeftMigrateThres.Load()*float64(jd.conf.MaxDSSize.Load()) {
671-
return true, true, recordsLeft, nil
662+
if needsPair || float64(delCount)/float64(totalCount) >= jd.conf.migration.jobDoneMigrateThres.Load() {
663+
return true, needsPair, recordsLeft, nil
672664
}
673665

674666
return false, false, recordsLeft, nil

jobsdb/migration_test.go

+10-16
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func TestMigration(t *testing.T) {
106106
// should have enough statuses for a cleanup to be triggered
107107
// all non-terminal
108108
require.NoError(t, jobDB.Store(context.Background(), jobs[20:30]))
109-
for i := 0; i < 10; i++ {
109+
for range 10 {
110110
require.NoError(
111111
t,
112112
jobDB.UpdateJobStatus(
@@ -230,7 +230,7 @@ func TestMigration(t *testing.T) {
230230
c.Set("JobsDB."+tablePrefix+"."+"maxDSRetention", "1ms")
231231

232232
// 3 datasets with 10 jobs each, 1 dataset with 0 jobs
233-
for i := 0; i < 3; i++ {
233+
for i := range 3 {
234234
require.NoError(t, jobDB.Store(context.Background(), genJobs(defaultWorkspaceID, "test", 10, 1)))
235235
triggerAddNewDS <- time.Now() // trigger addNewDSLoop to run
236236
triggerAddNewDS <- time.Now() // waits for last loop to finish
@@ -239,10 +239,10 @@ func TestMigration(t *testing.T) {
239239

240240
// 1st ds 5 statuses each
241241
var jobs []*JobT
242-
for i := 0; i < 10; i++ {
242+
for i := range 10 {
243243
jobs = append(jobs, &JobT{JobID: int64(i + 1)})
244244
}
245-
for i := 0; i < 5; i++ {
245+
for range 5 {
246246
require.NoError(t, jobDB.UpdateJobStatus(context.Background(), genJobStatuses(jobs, "failed"), []string{"test"}, []ParameterFilterT{}))
247247
}
248248
var count int
@@ -251,10 +251,10 @@ func TestMigration(t *testing.T) {
251251

252252
// 2nd ds 10 statuses each
253253
jobs = nil
254-
for i := 0; i < 10; i++ {
254+
for i := range 10 {
255255
jobs = append(jobs, &JobT{JobID: int64(i + 11)})
256256
}
257-
for i := 0; i < 10; i++ {
257+
for range 10 {
258258
require.NoError(t, jobDB.UpdateJobStatus(context.Background(), genJobStatuses(jobs, "failed"), []string{"test"}, []ParameterFilterT{}))
259259
}
260260
require.NoError(t, jobDB.dbHandle.QueryRow(fmt.Sprintf(`SELECT COUNT(*) FROM %[1]s_job_status_2 `, tablePrefix)).Scan(&count))
@@ -317,10 +317,10 @@ func TestMigration(t *testing.T) {
317317
require.Less(t, newTableSizes[fmt.Sprintf("%s_job_status_2", tablePrefix)], originalTableSizes[fmt.Sprintf("%s_job_status_2", tablePrefix)])
318318

319319
// after adding some statuses to the 1st DS its table size shouldn't increase
320-
for i := 0; i < 10; i++ {
320+
for i := range 10 {
321321
jobs = append(jobs, &JobT{JobID: int64(i + 1)})
322322
}
323-
for i := 0; i < 4; i++ {
323+
for range 4 {
324324
require.NoError(t, jobDB.UpdateJobStatus(context.Background(), genJobStatuses(jobs, "failed"), []string{"test"}, []ParameterFilterT{}))
325325
}
326326

@@ -332,6 +332,7 @@ func TestMigration(t *testing.T) {
332332
config.Reset()
333333
c := config.New()
334334
c.Set("JobsDB.maxDSSize", 1)
335+
c.Set("JobsDB.jobMinRowsLeftMigrateThres", 0.2)
335336

336337
_ = startPostgres(t)
337338

@@ -418,13 +419,6 @@ func TestMigration(t *testing.T) {
418419
// add some more jobs to the new DS
419420
require.NoError(t, jobDB.Store(context.Background(), jobs[10:20]))
420421

421-
// triggerMigrateDS <- time.Now()
422-
// triggerMigrateDS <- time.Now()
423-
// var payloadType_1_1 string
424-
// err = jobDB.dbHandle.QueryRowContext(context.Background(), fmt.Sprintf(`select data_type from information_schema.columns where table_name='%s' and column_name='event_payload';`, tablePrefix+"_jobs_1_1")).Scan(&payloadType_1_1)
425-
// require.NoError(t, err)
426-
// require.EqualValues(t, "bytea", payloadType_1_1)
427-
428422
triggerAddNewDS <- time.Now() // trigger addNewDSLoop to run
429423
triggerAddNewDS <- time.Now() // Second time, waits for the first loop to finish
430424
require.EqualValues(t, 3, jobDB.GetMaxDSIndex())
@@ -437,7 +431,7 @@ func TestMigration(t *testing.T) {
437431
// should have enough statuses for a cleanup to be triggered
438432
// all non-terminal
439433
require.NoError(t, jobDB.Store(context.Background(), jobs[20:30]))
440-
for i := 0; i < 10; i++ {
434+
for range 10 {
441435
require.NoError(
442436
t,
443437
jobDB.UpdateJobStatus(

processor/internal/transformer/destination_transformer/destination_transformer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func New(conf *config.Config, log logger.Logger, stat stats.Stats, opts ...Opt)
8282
handle.loggedEventsMu = sync.Mutex{}
8383
handle.loggedFileName = generateLogFileName()
8484

85-
handle.config.compactionEnabled = conf.GetReloadableBoolVar(true, "Processor.DestinationTransformer.compactionEnabled", "Transformer.compactionEnabled")
85+
handle.config.compactionEnabled = conf.GetReloadableBoolVar(false, "Processor.DestinationTransformer.compactionEnabled", "Transformer.compactionEnabled")
8686

8787
for _, opt := range opts {
8888
opt(handle)

router/transformer/transformer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ func (trans *handle) setup(destinationTimeout, transformTimeout time.Duration, c
576576
trans.proxyClientOAuth = oauthv2httpclient.NewOAuthHttpClient(&http.Client{Transport: trans.tr, Timeout: trans.destinationTimeout + trans.transformTimeout}, common.RudderFlowDelivery, cache, backendConfig, GetAuthErrorCategoryFromTransformProxyResponse, proxyClientOptionalArgs)
577577
trans.stats = stats.Default
578578
trans.transformRequestTimerStat = stats.Default.NewStat("router.transformer_request_time", stats.TimerType)
579-
trans.compactionEnabled = config.GetReloadableBoolVar(true, "Router.DestinationTransformer.compactionEnabled", "Transformer.compactionEnabled")
579+
trans.compactionEnabled = config.GetReloadableBoolVar(false, "Router.DestinationTransformer.compactionEnabled", "Transformer.compactionEnabled")
580580
if featuresService != nil {
581581
go func() {
582582
<-featuresService.Wait()

0 commit comments

Comments
 (0)