Skip to content

Commit 1adf9ae

Browse files
Reducing scheduler startup memory allocations (#453) (#4491)
* ARMADA-3668 Numerous Scheduler Optimisations Changes include: making transaction upserts faster and more memory efficient for the startup case, moving last use of job repository jobs earlier in syncState to allow for garbage collecting, interning Job ID on run. * Correcting annotation and nodeSelector interning, not modifying the map in place * Exposing cumulative interned strings as a metric * Fixing tests, correctly calculating latest job&run serial in scheduler <!-- Thanks for sending a pull request! Here are some tips for you: #### What type of PR is this? #### What this PR does / why we need it: #### Which issue(s) this PR fixes: <!-- *Automatically closes linked issue when PR is merged. Usage: `Fixes #<issue number>`, or `Fixes (paste link of issue)`. _If PR is about `failing-tests or flakes`, please post the related issues/tests in a comment and do not use `Fixes`_* --> Summary of changes: - Populating `Txn` jobs in a map at startup before creating immutable data structures, reducing garbage - Exposing `scheduler_interned_strings` metric to help investigate production memory usage - Setting last use of `updatedJobs` and `updatedRuns` to earlier so they can be GCed Co-authored-by: Mustafa Ilyas <Mustafa.Ilyas@gresearch.co.uk>
1 parent bb49417 commit 1adf9ae

File tree

6 files changed

+159
-40
lines changed

6 files changed

+159
-40
lines changed

internal/common/metrics/scheduler_metrics.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,13 @@ var QueuePriceBandPhaseBidDesc = prometheus.NewDesc(
250250
nil,
251251
)
252252

253+
var JobDBCumulativeInternedStrings = prometheus.NewDesc(
254+
MetricPrefix+"scheduler_interned_strings",
255+
"A cumulative count of interned strings",
256+
[]string{},
257+
nil,
258+
)
259+
253260
var (
254261
queueLabelMetricName = MetricPrefix + "queue_labels"
255262
queueLabelMetricDescription = "Queue labels"
@@ -295,6 +302,7 @@ var AllDescs = []*prometheus.Desc{
295302
QueuePriorityDesc,
296303
QueueLabelDesc,
297304
QueuePriceBandPhaseBidDesc,
305+
JobDBCumulativeInternedStrings,
298306
}
299307

300308
func Describe(out chan<- *prometheus.Desc) {
@@ -538,6 +546,10 @@ func NewQueuePriceBandBidMetric(value float64, pool string, queue string, phase,
538546
return prometheus.MustNewConstMetric(QueuePriceBandPhaseBidDesc, prometheus.GaugeValue, value, pool, queue, queue, phase, priceBand)
539547
}
540548

549+
func NewJobDBCumulativeInternedStrings(value float64) prometheus.Metric {
550+
return prometheus.MustNewConstMetric(JobDBCumulativeInternedStrings, prometheus.CounterValue, value)
551+
}
552+
541553
func NewQueueLabelsMetric(queue string, labels map[string]string) prometheus.Metric {
542554
metricLabels := make([]string, 0, len(labels)+len(queueLabelDefaultLabels))
543555
values := make([]string, 0, len(labels)+len(queueLabelDefaultLabels))

internal/common/stringinterner/stringintern.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package stringinterner
22

33
import (
4+
"sync/atomic"
5+
46
lru "github.com/hashicorp/golang-lru"
57
"github.com/pkg/errors"
68
)
@@ -11,6 +13,8 @@ import (
1113
// StringInterner is backed by an LRU so that only the most recently interned strings are kept.
1214
type StringInterner struct {
1315
lru *lru.Cache
16+
// Exposed as a metric to aid in diagnosing cause of high memory usage
17+
cumulativeInserts atomic.Uint64
1418
}
1519

1620
// New return a new *StringInterner backed by a LRU of the given size.
@@ -19,14 +23,23 @@ func New(cacheSize uint32) *StringInterner {
1923
if err != nil {
2024
panic(errors.WithStack(err).Error())
2125
}
22-
return &StringInterner{lru: lru}
26+
return &StringInterner{
27+
lru: lru,
28+
cumulativeInserts: atomic.Uint64{},
29+
}
2330
}
2431

2532
// Intern ensures the string is cached and returns the cached string
2633
func (interner *StringInterner) Intern(s string) string {
2734
if existing, ok, _ := interner.lru.PeekOrAdd(s, s); ok {
2835
return existing.(string)
2936
} else {
37+
interner.cumulativeInserts.Add(1)
3038
return s
3139
}
3240
}
41+
42+
// CumulativeInsertCount - tracking this value can help us to diagnose the cause of high memory usage in the scheduler.
43+
func (interner *StringInterner) CumulativeInsertCount() uint64 {
44+
return interner.cumulativeInserts.Load()
45+
}

internal/scheduler/jobdb/jobdb.go

Lines changed: 106 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -258,13 +258,17 @@ func safeGetRequirements(schedulingInfo *internaltypes.JobSchedulingInfo) map[st
258258
func (jobDb *JobDb) internJobSchedulingInfoStrings(info *internaltypes.JobSchedulingInfo) *internaltypes.JobSchedulingInfo {
259259
info.PriorityClass = jobDb.stringInterner.Intern(info.PriorityClass)
260260
pr := info.PodRequirements
261+
newAnnotations := make(map[string]string, len(pr.Annotations))
261262
for k, v := range pr.Annotations {
262-
pr.Annotations[jobDb.stringInterner.Intern(k)] = jobDb.stringInterner.Intern(v)
263+
newAnnotations[jobDb.stringInterner.Intern(k)] = jobDb.stringInterner.Intern(v)
263264
}
265+
pr.Annotations = newAnnotations
264266

267+
newNodeSelector := make(map[string]string, len(pr.NodeSelector))
265268
for k, v := range pr.NodeSelector {
266-
pr.NodeSelector[jobDb.stringInterner.Intern(k)] = jobDb.stringInterner.Intern(v)
269+
newNodeSelector[jobDb.stringInterner.Intern(k)] = jobDb.stringInterner.Intern(v)
267270
}
271+
pr.NodeSelector = newNodeSelector
268272

269273
for idx, toleration := range pr.Tolerations {
270274
pr.Tolerations[idx] = v1.Toleration{
@@ -339,6 +343,10 @@ func (jobDb *JobDb) WriteTxn() *Txn {
339343
}
340344
}
341345

346+
func (jobDb *JobDb) CumulativeInternedStringsCount() uint64 {
347+
return jobDb.stringInterner.CumulativeInsertCount()
348+
}
349+
342350
// Txn is a JobDb Transaction. Transactions provide a consistent view of the database, allowing readers to
343351
// perform multiple actions without the database changing from underneath them.
344352
// Write transactions also allow callers to perform write operations that will not be visible to other users
@@ -559,14 +567,32 @@ func (txn *Txn) Upsert(jobs []*Job) error {
559567
// gangs
560568
go func() {
561569
defer wg.Done()
562-
for _, job := range jobs {
563-
if job.IsInGang() {
564-
key := gangKey{queue: job.Queue(), gangId: job.GetGangInfo().Id()}
570+
if hasJobs {
571+
for _, job := range jobs {
572+
if job.IsInGang() {
573+
key := gangKey{queue: job.Queue(), gangId: job.GetGangInfo().Id()}
574+
575+
if _, present := txn.jobsByGangKey[key]; !present {
576+
txn.jobsByGangKey[key] = immutable.NewSet[string](JobIdHasher{})
577+
}
578+
txn.jobsByGangKey[key] = txn.jobsByGangKey[key].Add(job.Id())
579+
}
580+
}
581+
} else {
582+
jobsByGangKey := map[gangKey]map[string]bool{}
583+
for _, job := range jobs {
584+
if job.IsInGang() {
585+
key := gangKey{queue: job.Queue(), gangId: job.GetGangInfo().Id()}
565586

566-
if _, present := txn.jobsByGangKey[key]; !present {
567-
txn.jobsByGangKey[key] = immutable.NewSet[string](JobIdHasher{})
587+
if _, present := jobsByGangKey[key]; !present {
588+
jobsByGangKey[key] = map[string]bool{}
589+
}
590+
jobsByGangKey[key][job.Id()] = true
568591
}
569-
txn.jobsByGangKey[key] = txn.jobsByGangKey[key].Add(job.Id())
592+
}
593+
594+
for key, jobsInGang := range jobsByGangKey {
595+
txn.jobsByGangKey[key] = immutable.NewSet[string](JobIdHasher{}, maps.Keys(jobsInGang)...)
570596
}
571597
}
572598
}()
@@ -575,28 +601,65 @@ func (txn *Txn) Upsert(jobs []*Job) error {
575601
// To enable iterating over them in the order they should be scheduled.
576602
go func() {
577603
defer wg.Done()
578-
for _, job := range jobs {
579-
if job.Queued() {
580-
newQueue, ok := txn.jobsByQueue[job.queue]
581-
if !ok {
582-
q := emptyList
583-
newQueue = q
604+
if hasJobs {
605+
for _, job := range jobs {
606+
if job.Queued() {
607+
newQueue, ok := txn.jobsByQueue[job.queue]
608+
if !ok {
609+
newQueue = emptyList
610+
}
611+
txn.jobsByQueue[job.queue] = newQueue.Add(job)
612+
613+
for _, pool := range job.Pools() {
614+
_, present := txn.jobsByPoolAndQueue[pool]
615+
if !present {
616+
queues := map[string]immutable.SortedSet[*Job]{}
617+
txn.jobsByPoolAndQueue[pool] = queues
618+
}
619+
_, present = txn.jobsByPoolAndQueue[pool][job.queue]
620+
if !present {
621+
jobs := immutable.NewSortedSet[*Job](MarketJobPriorityComparer{Pool: pool})
622+
txn.jobsByPoolAndQueue[pool][job.queue] = jobs
623+
}
624+
txn.jobsByPoolAndQueue[pool][job.queue] = txn.jobsByPoolAndQueue[pool][job.queue].Add(job)
625+
}
584626
}
585-
newQueue = newQueue.Add(job)
586-
txn.jobsByQueue[job.queue] = newQueue
627+
}
628+
} else {
629+
jobsByQueue := map[string]map[*Job]bool{}
630+
jobsByPoolAndQueue := map[string]map[string]map[*Job]bool{}
587631

588-
for _, pool := range job.Pools() {
589-
_, present := txn.jobsByPoolAndQueue[pool]
590-
if !present {
591-
queues := map[string]immutable.SortedSet[*Job]{}
592-
txn.jobsByPoolAndQueue[pool] = queues
632+
for _, job := range jobs {
633+
if job.Queued() {
634+
if _, ok := jobsByQueue[job.queue]; !ok {
635+
jobsByQueue[job.queue] = map[*Job]bool{}
593636
}
594-
_, present = txn.jobsByPoolAndQueue[pool][job.queue]
595-
if !present {
596-
jobs := immutable.NewSortedSet[*Job](MarketJobPriorityComparer{Pool: pool})
597-
txn.jobsByPoolAndQueue[pool][job.queue] = jobs
637+
jobsByQueue[job.queue][job] = true
638+
639+
for _, pool := range job.Pools() {
640+
if _, present := jobsByPoolAndQueue[pool]; !present {
641+
jobsByPoolAndQueue[pool] = map[string]map[*Job]bool{}
642+
}
643+
if _, present := jobsByPoolAndQueue[pool][job.queue]; !present {
644+
jobsByPoolAndQueue[pool][job.queue] = map[*Job]bool{}
645+
}
646+
jobsByPoolAndQueue[pool][job.queue][job] = true
647+
}
648+
}
649+
}
650+
651+
for queue, jobsForQueue := range jobsByQueue {
652+
txn.jobsByQueue[queue] = immutable.NewSortedSet[*Job](JobPriorityComparer{}, maps.Keys(jobsForQueue)...)
653+
}
654+
655+
for pool, jobsForPool := range jobsByPoolAndQueue {
656+
if _, ok := txn.jobsByPoolAndQueue[pool]; !ok {
657+
txn.jobsByPoolAndQueue[pool] = map[string]immutable.SortedSet[*Job]{}
658+
}
659+
for queue, jobsForQueueInPool := range jobsForPool {
660+
if _, ok := txn.jobsByPoolAndQueue[pool][queue]; !ok {
661+
txn.jobsByPoolAndQueue[pool][queue] = immutable.NewSortedSet[*Job](MarketJobPriorityComparer{Pool: pool}, maps.Keys(jobsForQueueInPool)...)
598662
}
599-
txn.jobsByPoolAndQueue[pool][job.queue] = txn.jobsByPoolAndQueue[pool][job.queue].Add(job)
600663
}
601664
}
602665
}
@@ -605,11 +668,24 @@ func (txn *Txn) Upsert(jobs []*Job) error {
605668
// Unvalidated jobs
606669
go func() {
607670
defer wg.Done()
608-
for _, job := range jobs {
609-
if !job.Validated() {
610-
unvalidatedJobs := txn.unvalidatedJobs.Add(job)
611-
txn.unvalidatedJobs = &unvalidatedJobs
671+
if hasJobs {
672+
for _, job := range jobs {
673+
if !job.Validated() {
674+
unvalidatedJobs := txn.unvalidatedJobs.Add(job)
675+
txn.unvalidatedJobs = &unvalidatedJobs
676+
}
677+
}
678+
} else {
679+
unvalidatedJobs := map[*Job]bool{}
680+
681+
for _, job := range jobs {
682+
if !job.Validated() {
683+
unvalidatedJobs[job] = true
684+
}
612685
}
686+
687+
unvalidatedJobsImmutable := immutable.NewSet[*Job](JobHasher{}, maps.Keys(unvalidatedJobs)...)
688+
txn.unvalidatedJobs = &unvalidatedJobsImmutable
613689
}
614690
}()
615691

internal/scheduler/metrics.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ func (c *MetricsCollector) refresh(ctx *armadacontext.Context) error {
141141
if err != nil {
142142
return err
143143
}
144-
allMetrics := append(queueMetrics, clusterMetrics...)
144+
jobDbMetrics := c.updateJobDBMetrics()
145+
allMetrics := append(append(queueMetrics, clusterMetrics...), jobDbMetrics...)
145146
c.state.Store(allMetrics)
146147
ctx.Debugf("Refreshed prometheus metrics in %s", time.Since(start))
147148
return nil
@@ -487,6 +488,10 @@ func (c *MetricsCollector) updateClusterMetrics(ctx *armadacontext.Context) ([]p
487488
return clusterMetrics, nil
488489
}
489490

491+
func (c *MetricsCollector) updateJobDBMetrics() []prometheus.Metric {
492+
return []prometheus.Metric{commonmetrics.NewJobDBCumulativeInternedStrings(float64(c.jobDb.CumulativeInternedStringsCount()))}
493+
}
494+
490495
func addToResourceListMap[K comparable](m map[K]resource.ComputeResources, key K, value resource.ComputeResources) {
491496
if _, exists := m[key]; !exists {
492497
m[key] = resource.ComputeResources{}

internal/scheduler/metrics_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func TestMetricsCollector_TestCollect_QueueMetrics(t *testing.T) {
106106
},
107107
},
108108
"queued metrics for requeued job": {
109-
// This job was been requeued and has a terminated run
109+
// This job was requeued and has a terminated run
110110
// The queue duration stats should count from the time the last run finished instead of job creation time
111111
initialJobs: []*jobdb.Job{jobWithTerminatedRun},
112112
queues: []*api.Queue{queue},
@@ -293,6 +293,7 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
293293
commonmetrics.NewClusterFarmCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
294294
commonmetrics.NewClusterFarmCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
295295
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", "", ""),
296+
commonmetrics.NewJobDBCumulativeInternedStrings(0.0),
296297
},
297298
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
298299
},
@@ -317,6 +318,7 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
317318
commonmetrics.NewClusterFarmCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-2"),
318319
commonmetrics.NewClusterFarmCapacity(256*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-2"),
319320
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", "", ""),
321+
commonmetrics.NewJobDBCumulativeInternedStrings(0.0),
320322
},
321323
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
322324
},
@@ -333,6 +335,7 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
333335
commonmetrics.NewClusterFarmCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
334336
commonmetrics.NewClusterFarmCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
335337
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", "", ""),
338+
commonmetrics.NewJobDBCumulativeInternedStrings(0.0),
336339
},
337340
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
338341
},
@@ -355,6 +358,7 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
355358
commonmetrics.NewClusterFarmCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
356359
commonmetrics.NewClusterFarmCapacity(256*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
357360
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", "", ""),
361+
commonmetrics.NewJobDBCumulativeInternedStrings(0.0),
358362
},
359363
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
360364
},
@@ -373,6 +377,7 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
373377
commonmetrics.NewClusterFarmCapacity(32, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
374378
commonmetrics.NewClusterFarmCapacity(256*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
375379
commonmetrics.NewClusterCordonedStatus(0.0, "cluster-1", "", ""),
380+
commonmetrics.NewJobDBCumulativeInternedStrings(0.0),
376381
},
377382
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
378383
},
@@ -383,6 +388,7 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
383388
expected: []prometheus.Metric{
384389
commonmetrics.NewClusterAvailableCapacity(10, "floating", "pool", "test-floating-resource", ""),
385390
commonmetrics.NewClusterTotalCapacity(10, "floating", "pool", "test-floating-resource", ""),
391+
commonmetrics.NewJobDBCumulativeInternedStrings(0.0),
386392
},
387393
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{},
388394
},
@@ -399,6 +405,7 @@ func TestMetricsCollector_TestCollect_ClusterMetrics(t *testing.T) {
399405
commonmetrics.NewClusterFarmCapacity(64, "cluster-1", testfixtures.TestPool, "cpu", "type-1"),
400406
commonmetrics.NewClusterFarmCapacity(512*1024*1024*1024, "cluster-1", testfixtures.TestPool, "memory", "type-1"),
401407
commonmetrics.NewClusterCordonedStatus(1.0, "cluster-1", "bad executor", ""),
408+
commonmetrics.NewJobDBCumulativeInternedStrings(0.0),
402409
},
403410
expectedExecutorSettings: []*schedulerobjects.ExecutorSettings{
404411
{

internal/scheduler/scheduler.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,17 @@ func (s *Scheduler) syncState(ctx *armadacontext.Context, initial, fullJobGc boo
420420
}
421421
}
422422

423+
latestJobSerial := s.jobsSerial
424+
latestRunsSerial := s.runsSerial
425+
426+
// Update serial to include these updates.
427+
if len(updatedJobs) > 0 {
428+
latestJobSerial = updatedJobs[len(updatedJobs)-1].Serial
429+
}
430+
if len(updatedRuns) > 0 {
431+
latestRunsSerial = updatedRuns[len(updatedRuns)-1].Serial
432+
}
433+
423434
// Reconcile any differences between the updated jobs and runs.
424435
jsts, err := s.jobDb.ReconcileDifferences(txn, updatedJobs, updatedRuns)
425436
if err != nil {
@@ -462,13 +473,8 @@ func (s *Scheduler) syncState(ctx *armadacontext.Context, initial, fullJobGc boo
462473

463474
txn.Commit()
464475

465-
// Update serial to include these updates.
466-
if len(updatedJobs) > 0 {
467-
s.jobsSerial = updatedJobs[len(updatedJobs)-1].Serial
468-
}
469-
if len(updatedRuns) > 0 {
470-
s.runsSerial = updatedRuns[len(updatedRuns)-1].Serial
471-
}
476+
s.jobsSerial = latestJobSerial
477+
s.runsSerial = latestRunsSerial
472478

473479
return jobDbJobs, jsts, nil
474480
}

0 commit comments

Comments
 (0)