Skip to content

Commit b1f4ce0

Browse files
authored
feat(jobsdb): introduce jobMinRowsLeftMigrateThres (#5704)
# Description Introducing `JobsDB.jobMinRowsLeftMigrateThres` configuration property, for handling compaction of jobsdb datasets containing multiple destinations, where one destination is slower than the others. In such a case, jobsdb datasets end up containing some pending jobs (e.g. 40%), but the dataset doesn't get migrated because it is neither small (we count total number of jobs for calculating small tables) nor does it have an adequate percentage of completed jobs (70%). With this new option, tables with less than 40K pending jobs will become migration candidates and will be migrated (i.e. compacted) alongside some other dataset candidate for migration, as long as one exists. ## Additional items - Making jobsdb configuration properties tableprefix-aware, i.e. making it possible to have different settings for `gw`, `rt`, `batch_rt`, etc. ## Linear Ticket resolves PIPE-2017 ## Security - [x] The code changed/added as part of this pull request won't create any security issues with how the software is being used.
1 parent 69f1baf commit b1f4ce0

File tree

6 files changed

+79
-99
lines changed

6 files changed

+79
-99
lines changed

app/apphandlers/embeddedAppHandler.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -237,11 +237,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
237237
jobsdb.WithDSLimit(a.config.arcDSLimit),
238238
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
239239
jobsdb.WithStats(statsFactory),
240-
jobsdb.WithJobMaxAge(
241-
func() time.Duration {
242-
return config.GetDuration("archival.jobRetention", 24, time.Hour)
243-
},
244-
),
240+
jobsdb.WithJobMaxAge(config.GetReloadableDurationVar(24, time.Hour, "archival.jobRetention")),
245241
jobsdb.WithDBHandle(dbPool),
246242
)
247243
defer archivalDB.Close()

app/apphandlers/processorAppHandler.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -223,11 +223,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
223223
jobsdb.WithDSLimit(a.config.arcDSLimit),
224224
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
225225
jobsdb.WithStats(statsFactory),
226-
jobsdb.WithJobMaxAge(
227-
func() time.Duration {
228-
return config.GetDuration("archival.jobRetention", 24, time.Hour)
229-
},
230-
),
226+
jobsdb.WithJobMaxAge(config.GetReloadableDurationVar(24, time.Hour, "archival.jobRetention")),
231227
jobsdb.WithDBHandle(dbPool),
232228
)
233229
defer archivalDB.Close()

enterprise/reporting/error_index/error_index_reporting.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -200,11 +200,7 @@ func (eir *ErrorIndexReporter) DatabaseSyncer(c types.SyncerConfig) types.Report
200200
jobsdb.WithDSLimit(eir.conf.GetReloadableIntVar(0, 1, "Reporting.errorIndexReporting.dsLimit")),
201201
jobsdb.WithConfig(eir.conf),
202202
jobsdb.WithSkipMaintenanceErr(eir.conf.GetBool("Reporting.errorIndexReporting.skipMaintenanceError", false)),
203-
jobsdb.WithJobMaxAge(
204-
func() time.Duration {
205-
return eir.conf.GetDurationVar(24, time.Hour, "Reporting.errorIndexReporting.jobRetention")
206-
},
207-
),
203+
jobsdb.WithJobMaxAge(config.GetReloadableDurationVar(24, time.Hour, "Reporting.errorIndexReporting.jobRetention")),
208204
)
209205
if err := errIndexDB.Start(); err != nil {
210206
panic(fmt.Errorf("failed to start error index db: %w", err))

jobsdb/admin.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func (jd *Handle) doCleanup(ctx context.Context) error {
152152
fmt.Sprintf(
153153
deleteStmt,
154154
jd.tablePrefix,
155-
jd.config.GetInt("JobsDB.archivalTimeInDays", 10),
155+
jd.config.GetIntVar(10, 1, jd.configKeys("archivalTimeInDays")...),
156156
),
157157
)
158158
if err != nil {
@@ -173,7 +173,7 @@ func (jd *Handle) doCleanup(ctx context.Context) error {
173173
func (jd *Handle) abortOldJobs(ctx context.Context, dsList []dataSetT) error {
174174
jobState := "aborted"
175175
maxAgeStatusResponse := `{"reason": "job max age exceeded"}`
176-
maxAge := jd.conf.jobMaxAge()
176+
maxAge := jd.conf.jobMaxAge.Load()
177177
for _, ds := range dsList {
178178
res, err := jd.dbHandle.ExecContext(
179179
ctx,

jobsdb/jobsdb.go

Lines changed: 56 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,7 @@ type Handle struct {
508508
minDSRetentionPeriod config.ValueLoader[time.Duration]
509509
maxDSRetentionPeriod config.ValueLoader[time.Duration]
510510
refreshDSTimeout config.ValueLoader[time.Duration]
511-
jobMaxAge func() time.Duration
511+
jobMaxAge config.ValueLoader[time.Duration]
512512
writeCapacity chan struct{}
513513
readCapacity chan struct{}
514514
enableWriterQueue bool
@@ -525,10 +525,11 @@ type Handle struct {
525525

526526
migration struct {
527527
maxMigrateOnce, maxMigrateDSProbe config.ValueLoader[int]
528-
vacuumFullStatusTableThreshold func() int64
529-
vacuumAnalyzeStatusTableThreshold func() int64
530-
jobDoneMigrateThres, jobStatusMigrateThres func() float64
531-
jobMinRowsMigrateThres func() float64
528+
vacuumFullStatusTableThreshold config.ValueLoader[int64]
529+
vacuumAnalyzeStatusTableThreshold config.ValueLoader[int64]
530+
jobDoneMigrateThres, jobStatusMigrateThres config.ValueLoader[float64]
531+
jobMinRowsMigrateThres config.ValueLoader[float64]
532+
jobMinRowsLeftMigrateThres config.ValueLoader[float64]
532533
migrateDSLoopSleepDuration config.ValueLoader[time.Duration]
533534
migrateDSTimeout config.ValueLoader[time.Duration]
534535
}
@@ -712,9 +713,9 @@ func WithSkipMaintenanceErr(ignore bool) OptsFunc {
712713
}
713714
}
714715

715-
func WithJobMaxAge(maxAgeFunc func() time.Duration) OptsFunc {
716+
func WithJobMaxAge(jobMaxAge config.ValueLoader[time.Duration]) OptsFunc {
716717
return func(jd *Handle) {
717-
jd.conf.jobMaxAge = maxAgeFunc
718+
jd.conf.jobMaxAge = jobMaxAge
718719
}
719720
}
720721

@@ -775,7 +776,7 @@ func (jd *Handle) init() {
775776
}
776777

777778
if string(jd.conf.payloadColumnType) == "" {
778-
jd.conf.payloadColumnType = payloadColumnType(jd.config.GetStringVar(string(JSONB), "JobsDB.payloadColumnType"))
779+
jd.conf.payloadColumnType = payloadColumnType(jd.config.GetStringVar(string(JSONB), jd.configKeys("payloadColumnType")...))
779780
}
780781

781782
if jd.stats == nil {
@@ -894,7 +895,7 @@ func (jd *Handle) workersAndAuxSetup() {
894895
cacheParameterFilters,
895896
func() time.Duration { return jd.conf.cacheExpiration.Load() },
896897
cache.WithWarnOnBranchInvalidation[ParameterFilterT](
897-
jd.config.GetReloadableBoolVar(defaultLogCacheBranchInvalidation, "JobsDB."+jd.tablePrefix+".logCacheBranchInvalidation", "JobsDB.logCacheBranchInvalidation"),
898+
jd.config.GetReloadableBoolVar(defaultLogCacheBranchInvalidation, jd.configKeys("logCacheBranchInvalidation")...),
898899
jd.logger),
899900
)
900901

@@ -907,78 +908,56 @@ func (jd *Handle) workersAndAuxSetup() {
907908

908909
func (jd *Handle) loadConfig() {
909910
// maxTableSizeInMB: Maximum Table size in MB
910-
jd.conf.maxTableSize = jd.config.GetReloadableInt64Var(300, 1000000, "JobsDB.maxTableSizeInMB")
911-
jd.conf.cacheExpiration = jd.config.GetReloadableDurationVar(120, time.Minute, []string{"JobsDB.cacheExpiration"}...)
911+
jd.conf.maxTableSize = jd.config.GetReloadableInt64Var(300, 1000000, jd.configKeys("maxTableSizeInMB")...)
912+
jd.conf.cacheExpiration = jd.config.GetReloadableDurationVar(120, time.Minute, jd.configKeys("cacheExpiration")...)
912913
// addNewDSLoopSleepDuration: How often is the loop (which checks for adding new DS) run
913-
jd.conf.addNewDSLoopSleepDuration = jd.config.GetReloadableDurationVar(5, time.Second, []string{"JobsDB.addNewDSLoopSleepDuration", "JobsDB.addNewDSLoopSleepDurationInS"}...)
914+
jd.conf.addNewDSLoopSleepDuration = jd.config.GetReloadableDurationVar(5, time.Second, jd.configKeys("addNewDSLoopSleepDuration")...)
914915
// refreshDSListLoopSleepDuration: How often is the loop (which refreshes DSList) run
915-
jd.conf.refreshDSListLoopSleepDuration = jd.config.GetReloadableDurationVar(10, time.Second, []string{"JobsDB.refreshDSListLoopSleepDuration", "JobsDB.refreshDSListLoopSleepDurationInS"}...)
916-
917-
enableWriterQueueKeys := []string{"JobsDB." + jd.tablePrefix + "." + "enableWriterQueue", "JobsDB." + "enableWriterQueue"}
918-
jd.conf.enableWriterQueue = jd.config.GetBoolVar(true, enableWriterQueueKeys...)
919-
enableReaderQueueKeys := []string{"JobsDB." + jd.tablePrefix + "." + "enableReaderQueue", "JobsDB." + "enableReaderQueue"}
920-
jd.conf.enableReaderQueue = jd.config.GetBoolVar(true, enableReaderQueueKeys...)
921-
maxWritersKeys := []string{"JobsDB." + jd.tablePrefix + "." + "maxWriters", "JobsDB." + "maxWriters"}
922-
jd.conf.maxWriters = jd.config.GetIntVar(3, 1, maxWritersKeys...)
923-
maxReadersKeys := []string{"JobsDB." + jd.tablePrefix + "." + "maxReaders", "JobsDB." + "maxReaders"}
924-
jd.conf.maxReaders = jd.config.GetIntVar(6, 1, maxReadersKeys...)
925-
maxOpenConnectionsKeys := []string{"JobsDB." + jd.tablePrefix + "." + "maxOpenConnections", "JobsDB." + "maxOpenConnections"}
926-
jd.conf.maxOpenConnections = jd.config.GetIntVar(20, 1, maxOpenConnectionsKeys...)
927-
analyzeThresholdKeys := []string{"JobsDB." + jd.tablePrefix + "." + "analyzeThreshold", "JobsDB." + "analyzeThreshold"}
928-
jd.conf.analyzeThreshold = jd.config.GetReloadableIntVar(30000, 1, analyzeThresholdKeys...)
929-
minDSRetentionPeriodKeys := []string{"JobsDB." + jd.tablePrefix + "." + "minDSRetention", "JobsDB." + "minDSRetention"}
930-
jd.conf.minDSRetentionPeriod = jd.config.GetReloadableDurationVar(0, time.Minute, minDSRetentionPeriodKeys...)
931-
maxDSRetentionPeriodKeys := []string{"JobsDB." + jd.tablePrefix + "." + "maxDSRetention", "JobsDB." + "maxDSRetention"}
932-
jd.conf.maxDSRetentionPeriod = jd.config.GetReloadableDurationVar(90, time.Minute, maxDSRetentionPeriodKeys...)
933-
jd.conf.refreshDSTimeout = jd.config.GetReloadableDurationVar(10, time.Minute, "JobsDB.refreshDS.timeout")
916+
jd.conf.refreshDSListLoopSleepDuration = jd.config.GetReloadableDurationVar(10, time.Second, jd.configKeys("refreshDSListLoopSleepDuration")...)
917+
918+
jd.conf.enableWriterQueue = jd.config.GetBoolVar(true, jd.configKeys("enableWriterQueue")...)
919+
jd.conf.enableReaderQueue = jd.config.GetBoolVar(true, jd.configKeys("enableReaderQueue")...)
920+
jd.conf.maxWriters = jd.config.GetIntVar(3, 1, jd.configKeys("maxWriters")...)
921+
jd.conf.maxReaders = jd.config.GetIntVar(6, 1, jd.configKeys("maxReaders")...)
922+
jd.conf.maxOpenConnections = jd.config.GetIntVar(20, 1, jd.configKeys("maxOpenConnections")...)
923+
jd.conf.analyzeThreshold = jd.config.GetReloadableIntVar(30000, 1, jd.configKeys("analyzeThreshold")...)
924+
jd.conf.minDSRetentionPeriod = jd.config.GetReloadableDurationVar(0, time.Minute, jd.configKeys("minDSRetention")...)
925+
jd.conf.maxDSRetentionPeriod = jd.config.GetReloadableDurationVar(90, time.Minute, jd.configKeys("maxDSRetention")...)
926+
jd.conf.refreshDSTimeout = jd.config.GetReloadableDurationVar(10, time.Minute, jd.configKeys("refreshDS.timeout")...)
934927

935928
// migrationConfig
936929

937930
// migrateDSLoopSleepDuration: How often is the loop (which checks for migrating DS) run
938-
jd.conf.migration.migrateDSLoopSleepDuration = jd.config.GetReloadableDurationVar(
939-
30, time.Second,
940-
[]string{
941-
"JobsDB.migrateDSLoopSleepDuration",
942-
"JobsDB.migrateDSLoopSleepDurationInS",
943-
}...,
944-
)
945-
jd.conf.migration.migrateDSTimeout = jd.config.GetReloadableDurationVar(
946-
10, time.Minute, "JobsDB.migrateDS.timeout",
947-
)
931+
jd.conf.migration.migrateDSLoopSleepDuration = jd.config.GetReloadableDurationVar(30, time.Second, jd.configKeys("migrateDSLoopSleepDuration", "migrateDSLoopSleepDurationInS")...)
932+
jd.conf.migration.migrateDSTimeout = jd.config.GetReloadableDurationVar(10, time.Minute, jd.configKeys("migrateDS.timeout")...)
948933
// jobDoneMigrateThres: A DS is migrated when this fraction of the jobs have been processed
949-
jd.conf.migration.jobDoneMigrateThres = func() float64 { return jd.config.GetFloat64("JobsDB.jobDoneMigrateThreshold", 0.7) }
934+
jd.conf.migration.jobDoneMigrateThres = jd.config.GetReloadableFloat64Var(0.8, jd.configKeys("jobDoneMigrateThreshold")...)
950935
// jobStatusMigrateThres: A DS is migrated if the job_status exceeds this (* no_of_jobs)
951-
jd.conf.migration.jobStatusMigrateThres = func() float64 { return jd.config.GetFloat64("JobsDB.jobStatusMigrateThreshold", 3) }
952-
// jobMinRowsMigrateThres: A DS with a low number of rows should be eligible for migration if the number of rows are
953-
// less than jobMinRowsMigrateThres percent of maxDSSize (e.g. if jobMinRowsMigrateThres is 5
954-
// then DSs that have less than 5% of maxDSSize are eligible for migration)
955-
jd.conf.migration.jobMinRowsMigrateThres = func() float64 { return jd.config.GetFloat64("JobsDB.jobMinRowsMigrateThreshold", 0.2) }
936+
jd.conf.migration.jobStatusMigrateThres = jd.config.GetReloadableFloat64Var(3, jd.configKeys("jobStatusMigrateThreshold")...)
937+
// jobMinRowsMigrateThres: A DS with a low number of total rows should be eligible for migration if the number of total rows are
938+
// less than jobMinRowsMigrateThres percent of maxDSSize (e.g. if jobMinRowsMigrateThres is 0.2
939+
// then DSs that have less than 20% of maxDSSize total rows are eligible for migration)
940+
jd.conf.migration.jobMinRowsMigrateThres = jd.config.GetReloadableFloat64Var(0.2, jd.configKeys("jobMinRowsMigrateThreshold")...)
941+
// jobMinRowsMigrateThres: A DS with a low number of pending rows should be eligible for migration if the number of pending rows are
942+
// less than jobMinRowsLeftMigrateThres percent of maxDSSize (e.g. if jobMinRowsLeftMigrateThres is 0.5
943+
// then DSs that have less than 50% of maxDSSize pending rows are eligible for migration)
944+
jd.conf.migration.jobMinRowsLeftMigrateThres = jd.config.GetReloadableFloat64Var(0.4, jd.configKeys("jobMinRowsLeftMigrateThres")...)
956945
// maxMigrateOnce: Maximum number of DSs that are migrated together into one destination
957-
jd.conf.migration.maxMigrateOnce = jd.config.GetReloadableIntVar(
958-
10, 1, "JobsDB.maxMigrateOnce",
959-
)
946+
jd.conf.migration.maxMigrateOnce = jd.config.GetReloadableIntVar(10, 1, jd.configKeys("maxMigrateOnce")...)
960947
// maxMigrateDSProbe: Maximum number of DSs that are checked from left to right if they are eligible for migration
961-
jd.conf.migration.maxMigrateDSProbe = jd.config.GetReloadableIntVar(
962-
10, 1, "JobsDB.maxMigrateDSProbe",
963-
)
964-
jd.conf.migration.vacuumFullStatusTableThreshold = func() int64 {
965-
return jd.config.GetInt64("JobsDB.vacuumFullStatusTableThreshold", 500*bytesize.MB)
966-
}
967-
jd.conf.migration.vacuumAnalyzeStatusTableThreshold = func() int64 {
968-
return jd.config.GetInt64("JobsDB.vacuumAnalyzeStatusTableThreshold", 30000)
969-
}
948+
jd.conf.migration.maxMigrateDSProbe = jd.config.GetReloadableIntVar(10, 1, jd.configKeys("maxMigrateDSProbe")...)
949+
jd.conf.migration.vacuumFullStatusTableThreshold = jd.config.GetReloadableInt64Var(500*bytesize.MB, 1, jd.configKeys("vacuumFullStatusTableThreshold")...)
950+
jd.conf.migration.vacuumAnalyzeStatusTableThreshold = jd.config.GetReloadableInt64Var(30000, 1, jd.configKeys("vacuumAnalyzeStatusTableThreshold")...)
970951

971952
// masterBackupEnabled = true => all the jobsdb are eligible for backup
972-
jd.conf.backup.masterBackupEnabled = jd.config.GetReloadableBoolVar(
973-
true, "JobsDB.backup.enabled",
974-
)
953+
jd.conf.backup.masterBackupEnabled = jd.config.GetReloadableBoolVar(true, jd.configKeys("backup.enabled")...)
975954

976955
// maxDSSize: Maximum size of a DS. The process which adds new DS runs in the background
977956
// (every few seconds) so a DS may go beyond this size
978957
// passing `maxDSSize` by reference, so it can be hot reloaded
979-
jd.conf.MaxDSSize = jd.config.GetReloadableIntVar(100000, 1, "JobsDB.maxDSSize")
958+
jd.conf.MaxDSSize = jd.config.GetReloadableIntVar(100000, 1, jd.configKeys("maxDSSize")...)
980959

981-
jd.conf.indexOptimizations = jd.config.GetReloadableBoolVar(true, "JobsDB.indexOptimizations")
960+
jd.conf.indexOptimizations = jd.config.GetReloadableBoolVar(true, jd.configKeys("indexOptimizations")...)
982961

983962
if jd.TriggerAddNewDS == nil {
984963
jd.TriggerAddNewDS = func() <-chan time.Time {
@@ -999,10 +978,19 @@ func (jd *Handle) loadConfig() {
999978
}
1000979

1001980
if jd.conf.jobMaxAge == nil {
1002-
jd.conf.jobMaxAge = func() time.Duration {
1003-
return jd.config.GetDuration("JobsDB.jobMaxAge", 720, time.Hour)
1004-
}
981+
jd.conf.jobMaxAge = jd.config.GetReloadableDurationVar(720, time.Hour, jd.configKeys("jobMaxAge")...)
982+
}
983+
}
984+
985+
func (jd *Handle) configKeys(key string, additionalKeys ...string) []string {
986+
res := []string{
987+
"JobsDB." + jd.tablePrefix + "." + key,
988+
"JobsDB." + key,
989+
}
990+
for _, additionalKey := range additionalKeys {
991+
res = append(res, "JobsDB."+additionalKey)
1005992
}
993+
return res
1006994
}
1007995

1008996
// Start starts the jobsdb worker and housekeeping (migration, archive) threads.
@@ -1883,7 +1871,7 @@ FROM pending GROUP BY workspace_id, custom_val`
18831871

18841872
g, ctx := errgroup.WithContext(ctx)
18851873
const defaultConcurrency = 4
1886-
conc := jd.config.GetIntVar(defaultConcurrency, 1, "JobsDB.pileupCountConcurrency", "jobsdb.pileupcount.parallelism")
1874+
conc := jd.config.GetIntVar(defaultConcurrency, 1, jd.configKeys("pileupCountConcurrency")...)
18871875
if conc < 1 || conc > defaultConcurrency {
18881876
jd.logger.Warnn("GetPileUpCounts concurrency out of safe bounds, using default value",
18891877
logger.NewIntField("concurrency", int64(conc)),

0 commit comments

Comments
 (0)