Skip to content

Commit 310734c

Browse files
Merge pull request #4562 from jmguzik/rebalancer2
Cluster profiles rebalancer as part of autoconfigbrancher + fix for dispatcher
2 parents cf55448 + befd747 commit 310734c

File tree

6 files changed

+224
-29
lines changed

6 files changed

+224
-29
lines changed

cmd/autoconfigbrancher/main.go

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/sirupsen/logrus"
14+
"gopkg.in/robfig/cron.v2"
1415

1516
"sigs.k8s.io/prow/cmd/generic-autobumper/bumper"
1617
"sigs.k8s.io/prow/pkg/config/secret"
@@ -35,12 +36,13 @@ const (
3536
type options struct {
3637
selfApprove bool
3738

38-
githubLogin string
39-
gitName string
40-
gitEmail string
41-
targetDir string
42-
assign string
43-
whitelist string
39+
githubLogin string
40+
gitName string
41+
gitEmail string
42+
targetDir string
43+
assign string
44+
whitelist string
45+
rebalancerCron string
4446

4547
promotion.FutureOptions
4648
flagutil.GitHubOptions
@@ -56,7 +58,7 @@ func parseOptions() options {
5658
fs.StringVar(&o.targetDir, "target-dir", "", "The directory containing the target repo.")
5759
fs.StringVar(&o.assign, "assign", githubTeam, "The github username or group name to assign the created pull request to.")
5860
fs.StringVar(&o.whitelist, "whitelist-file", "", "The path of the whitelisted repositories file.")
59-
61+
fs.StringVar(&o.rebalancerCron, "rebalancer-cron", "", "Cron expression defining how often rebalancer should run (plus/minus 1h time window). If not specified, rebalancer will not run.")
6062
fs.BoolVar(&o.selfApprove, "self-approve", false, "Self-approve the PR by adding the `approved` and `lgtm` labels. Requires write permissions on the repo.")
6163
o.AddFlags(fs)
6264
o.AllowAnonymous = true
@@ -142,6 +144,11 @@ func main() {
142144
logrus.WithError(err).Fatal("error getting GitHub client")
143145
}
144146

147+
rebalance, err := withinWindow(o.rebalancerCron)
148+
if err != nil {
149+
logrus.WithError(err).Fatal("failed to parse cron")
150+
}
151+
145152
logrus.Infof("Changing working directory to '%s' ...", o.targetDir)
146153
if err := os.Chdir(o.targetDir); err != nil {
147154
logrus.WithError(err).Fatal("Failed to change to root dir")
@@ -238,6 +245,17 @@ func main() {
238245
},
239246
}
240247

248+
if rebalance {
249+
steps = append([]step{{
250+
command: "/usr/bin/rebalancer",
251+
arguments: []string{
252+
"--profiles=aws,aws-2,aws-3",
253+
"--profiles=gcp-openshift-gce-devel-ci-2,gcp,gcp-3",
254+
"--prometheus-bearer-token-path=/etc/prometheus/token",
255+
},
256+
}}, steps...)
257+
}
258+
241259
stdout := bumper.HideSecretsWriter{Delegate: os.Stdout, Censor: secret.Censor}
242260
stderr := bumper.HideSecretsWriter{Delegate: os.Stderr, Censor: secret.Censor}
243261
author := fmt.Sprintf("%s <%s>", o.gitName, o.gitEmail)
@@ -300,3 +318,26 @@ func runSteps(steps []step, author string, stdout, stderr io.Writer) (needsPushi
300318

301319
return true, nil
302320
}
321+
322+
// withinWindow returns true if the schedule fires at any time
323+
// between now-1h and now+1h.
324+
func withinWindow(cronExpr string) (bool, error) {
325+
if cronExpr == "" {
326+
return false, nil
327+
}
328+
schedule, err := cron.Parse(cronExpr)
329+
if err != nil {
330+
return false, err
331+
}
332+
333+
now := time.Now()
334+
windowStart := now.Add(-1 * time.Hour)
335+
windowEnd := now.Add(+1 * time.Hour)
336+
337+
firstFire := schedule.Next(windowStart)
338+
339+
if firstFire.Before(windowEnd) {
340+
return true, nil
341+
}
342+
return false, nil
343+
}

cmd/prow-job-dispatcher/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func gatherOptions() options {
8484
fs.StringVar(&o.configPath, "config-path", "", "Path to the config file (core-services/sanitize-prow-jobs/_config.yaml in openshift/release)")
8585
fs.StringVar(&o.clusterConfigPath, "cluster-config-path", "core-services/sanitize-prow-jobs/_clusters.yaml", "Path to the config file (core-services/sanitize-prow-jobs/_clusters.yaml in openshift/release)")
8686
fs.StringVar(&o.jobsStoragePath, "jobs-storage-path", "", "Path to the file holding only job assignments in Gob format")
87-
fs.IntVar(&o.prometheusDaysBefore, "prometheus-days-before", 1, "Number [1,15] of days before. Time 00-00-00 of that day will be used as time to query Prometheus. E.g., 1 means 00-00-00 of yesterday.")
87+
fs.IntVar(&o.prometheusDaysBefore, "prometheus-days-before", 14, "Number [1,15] of days before. Time 00-00-00 of that day will be used as time to query Prometheus. E.g., 1 means 00-00-00 of yesterday.")
8888

8989
fs.BoolVar(&o.createPR, "create-pr", false, "Create a pull request to the change made with this tool.")
9090
fs.StringVar(&o.githubLogin, "github-login", githubLogin, "The GitHub username to use.")
@@ -676,7 +676,7 @@ func main() {
676676
}
677677
}
678678

679-
promVolumes, err := newPrometheusVolumes(o.PrometheusOptions, o.prometheusDaysBefore)
679+
promVolumes, err := dispatcher.NewPrometheusVolumes(o.PrometheusOptions, o.prometheusDaysBefore)
680680
if err != nil {
681681
logrus.WithError(err).Fatal("failed to create prometheus volumes")
682682
}
@@ -766,7 +766,7 @@ func main() {
766766
}
767767
return api.Cloud(info.Provider), nil
768768
})
769-
pjs, err := dispatchJobs(o.prowJobConfigDir, config, jobVolumes, blocked, promVolumes.calculateVolumeDistribution(configClusterMap), configClusterMap)
769+
pjs, err := dispatchJobs(o.prowJobConfigDir, config, jobVolumes, blocked, promVolumes.CalculateVolumeDistribution(configClusterMap), configClusterMap)
770770
if err != nil {
771771
logrus.WithError(err).Error("failed to dispatch")
772772
return

cmd/rebalancer/main.go

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"math"
6+
"os"
7+
"strings"
8+
9+
"github.com/sirupsen/logrus"
10+
11+
"sigs.k8s.io/prow/pkg/config/secret"
12+
13+
"github.com/openshift/ci-tools/pkg/api"
14+
"github.com/openshift/ci-tools/pkg/config"
15+
"github.com/openshift/ci-tools/pkg/dispatcher"
16+
)
17+
18+
type ProfilesFlag [][]string
19+
20+
type options struct {
21+
profiles ProfilesFlag
22+
prometheusDaysBefore int
23+
dispatcher.PrometheusOptions
24+
}
25+
26+
func (p *ProfilesFlag) Set(val string) error {
27+
parts := strings.Split(val, ",")
28+
*p = append(*p, parts)
29+
return nil
30+
}
31+
32+
func (p *ProfilesFlag) String() string {
33+
var groups []string
34+
for _, grp := range *p {
35+
groups = append(groups, strings.Join(grp, ","))
36+
}
37+
return strings.Join(groups, ";")
38+
}
39+
40+
func main() {
41+
o := options{}
42+
fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
43+
fs.Var(&o.profiles, "profiles", "Comma-separated list of profiles; may be repeated")
44+
fs.IntVar(&o.prometheusDaysBefore, "prometheus-days-before", 14,
45+
"Number [1,15] of days before. Time 00-00-00 of that day will be used as time to query Prometheus. E.g., 1 means 00-00-00 of yesterday.")
46+
o.PrometheusOptions.AddFlags(fs)
47+
if err := fs.Parse(os.Args[1:]); err != nil {
48+
logrus.WithError(err).Fatalf("cannot parse args: '%s'", os.Args[1:])
49+
}
50+
51+
if o.PrometheusOptions.PrometheusPasswordPath != "" {
52+
if err := secret.Add(o.PrometheusOptions.PrometheusPasswordPath); err != nil {
53+
logrus.WithError(err).Fatal("Failed to start secrets agent")
54+
}
55+
}
56+
57+
if o.PrometheusOptions.PrometheusBearerTokenPath != "" {
58+
if err := secret.Add(o.PrometheusOptions.PrometheusBearerTokenPath); err != nil {
59+
logrus.WithError(err).Fatal("Failed to start secrets agent")
60+
}
61+
}
62+
promVolumes, err := dispatcher.NewPrometheusVolumes(o.PrometheusOptions, o.prometheusDaysBefore)
63+
if err != nil {
64+
logrus.WithError(err).Fatal("failed to create prometheus volumes")
65+
}
66+
67+
vol, err := promVolumes.GetJobVolumes()
68+
if err != nil {
69+
logrus.WithError(err).Fatal("failed to fetch prometheus volumes")
70+
}
71+
72+
buckets := make(map[api.ClusterProfile]float64)
73+
membership := make(map[api.ClusterProfile][]api.ClusterProfile)
74+
for _, group := range o.profiles {
75+
var profList []api.ClusterProfile
76+
for _, p := range group {
77+
prof := api.ClusterProfile(p)
78+
if _, exists := buckets[prof]; !exists {
79+
buckets[prof] = 0
80+
}
81+
profList = append(profList, prof)
82+
}
83+
for _, prof := range profList {
84+
membership[prof] = profList
85+
}
86+
}
87+
88+
configsPath := config.CiopConfigInRepoPath
89+
configs := make([]config.DataWithInfo, 0)
90+
91+
err = config.OperateOnCIOperatorConfigDir(configsPath, func(
92+
c *api.ReleaseBuildConfiguration,
93+
info *config.Info,
94+
) error {
95+
for i := range c.Tests {
96+
t := &c.Tests[i]
97+
ms := t.MultiStageTestConfiguration
98+
if ms == nil {
99+
continue
100+
}
101+
102+
current := ms.ClusterProfile
103+
group, ok := membership[current]
104+
if !ok || len(group) == 0 {
105+
continue
106+
}
107+
108+
var bestProf api.ClusterProfile
109+
minVal := math.MaxFloat64
110+
for _, prof := range group {
111+
if val := buckets[prof]; val < minVal {
112+
minVal = val
113+
bestProf = prof
114+
}
115+
}
116+
if ms.ClusterProfile != bestProf {
117+
logrus.Infof("reassigning test %q: %s -> %s", t.As, ms.ClusterProfile, bestProf)
118+
ms.ClusterProfile = bestProf
119+
}
120+
121+
weight := vol[getTestName(t, info)]
122+
if weight == 0 {
123+
continue
124+
}
125+
buckets[bestProf] += weight
126+
configs = append(configs, config.DataWithInfo{Configuration: *c, Info: *info})
127+
}
128+
return nil
129+
})
130+
if err != nil {
131+
logrus.WithError(err).Fatal("error distributing tests across profiles")
132+
}
133+
134+
for i := range configs {
135+
c := &configs[i]
136+
if err := c.CommitTo(configsPath); err != nil {
137+
logrus.WithError(err).Fatal("commit config")
138+
}
139+
}
140+
141+
for prof, val := range buckets {
142+
logrus.WithField("weight", val).WithField("profile", prof).Info("Calculated weight")
143+
}
144+
}
145+
146+
func getTestName(t *api.TestStepConfiguration, info *config.Info) string {
147+
test := ""
148+
if t.IsPeriodic() {
149+
test += "periodic-"
150+
} else if t.Postsubmit {
151+
test += "branch-ci-"
152+
} else {
153+
test += "pull-ci-"
154+
}
155+
test += info.Org + "-" + info.Repo + "-" + info.Branch + "-" + t.As
156+
if info.Variant != "" {
157+
test += "-" + info.Variant
158+
}
159+
return test
160+
}

images/autoconfigbrancher/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ ADD registry-replacer /usr/bin/registry-replacer
1212
ADD ci-operator-yaml-creator /usr/bin/ci-operator-yaml-creator
1313
ADD clusterimageset-updater /usr/bin/clusterimageset-updater
1414
ADD promoted-image-governor /usr/bin/promoted-image-governor
15+
#ADD rebalancer /usr/bin/rebalancer TODO: enable when tool merged
1516

1617
RUN microdnf install -y git && \
1718
microdnf clean all && \

cmd/prow-job-dispatcher/prometheus_volumes.go renamed to pkg/dispatcher/prometheus_volumes.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package main
1+
package dispatcher
22

33
import (
44
"context"
@@ -7,11 +7,8 @@ import (
77

88
promapi "github.com/prometheus/client_golang/api"
99
prometheusapi "github.com/prometheus/client_golang/api/prometheus/v1"
10-
"github.com/sirupsen/logrus"
1110

1211
"sigs.k8s.io/prow/pkg/config/secret"
13-
14-
"github.com/openshift/ci-tools/pkg/dispatcher"
1512
)
1613

1714
type prometheusVolumes struct {
@@ -22,7 +19,7 @@ type prometheusVolumes struct {
2219
m sync.Mutex
2320
}
2421

25-
func newPrometheusVolumes(promOptions dispatcher.PrometheusOptions, prometheusDaysBefore int) (prometheusVolumes, error) {
22+
func NewPrometheusVolumes(promOptions PrometheusOptions, prometheusDaysBefore int) (prometheusVolumes, error) {
2623
promClient, err := promOptions.NewPrometheusClient(secret.GetSecret)
2724
if err != nil {
2825
return prometheusVolumes{}, err
@@ -39,21 +36,19 @@ func (pv *prometheusVolumes) GetJobVolumes() (map[string]float64, error) {
3936
pv.m.Lock()
4037
defer pv.m.Unlock()
4138
if len(pv.jobVolumes) != 0 && time.Since(pv.timestamp) < 24*time.Hour {
42-
logrus.Info("Using cached job volumes")
4339
return pv.jobVolumes, nil
4440
}
4541
v1api := prometheusapi.NewAPI(pv.promClient)
4642
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
4743
defer cancel()
4844
y, m, d := time.Now().Add(-time.Duration(24*pv.prometheusDaysBefore) * time.Hour).Date()
4945
ts := time.Date(y, m, d, 0, 0, 0, 0, time.UTC)
50-
jv, err := dispatcher.GetJobVolumesFromPrometheus(ctx, v1api, ts)
46+
jv, err := GetJobVolumesFromPrometheus(ctx, v1api, ts)
5147
if err != nil {
5248
return nil, err
5349
}
5450
pv.jobVolumes = jv
5551
pv.timestamp = time.Now()
56-
logrus.Info("Fetched new job volumes")
5752
return pv.jobVolumes, nil
5853
}
5954

@@ -66,7 +61,7 @@ func (pv *prometheusVolumes) getTotalVolume() float64 {
6661
return totalVolume
6762
}
6863

69-
func (pv *prometheusVolumes) calculateVolumeDistribution(clusterMap dispatcher.ClusterMap) map[string]float64 {
64+
func (pv *prometheusVolumes) CalculateVolumeDistribution(clusterMap ClusterMap) map[string]float64 {
7065
totalCapacity := 0
7166
for _, cluster := range clusterMap {
7267
totalCapacity += cluster.Capacity

0 commit comments

Comments
 (0)