Skip to content

Commit 75fbf33

Browse files
authored
Merge pull request #23 from buildkite/fix-issue-17-with-collector
Correctly filter stats by queue
2 parents 38d1b98 + 1a0f312 commit 75fbf33

File tree

2 files changed

+126
-22
lines changed

2 files changed

+126
-22
lines changed

collector/collector.go

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ func New(c *bk.Client, opts Opts) *Collector {
4343
return &Collector{
4444
Opts: opts,
4545
buildService: c.Builds,
46+
agentService: c.Agents,
4647
}
4748
}
4849

@@ -70,17 +71,6 @@ func (c *Collector) Collect() (*Result, error) {
7071
return nil, err
7172
}
7273

73-
if c.Opts.Queue != "" {
74-
if cnt, ok := res.Queues[c.Opts.Queue]; ok {
75-
return &Result{
76-
Queues: map[string]map[string]int{
77-
c.Opts.Queue: cnt,
78-
},
79-
}, nil
80-
}
81-
return &Result{}, nil
82-
}
83-
8474
return res, nil
8575
}
8676

@@ -128,7 +118,7 @@ func queue(j *bk.Job) string {
128118
return "default"
129119
}
130120

131-
func uniqueQueues(builds []bk.Build) []string {
121+
func getBuildQueues(builds ...bk.Build) []string {
132122
queueMap := map[string]struct{}{}
133123
for _, b := range builds {
134124
for _, j := range b.Jobs {
@@ -153,18 +143,39 @@ func (c *Collector) addHistoricalMetrics(r *Result) error {
153143
})
154144

155145
return finishedBuilds.Pages(func(v interface{}) bool {
156-
for _, queue := range uniqueQueues(v.([]bk.Build)) {
157-
if _, ok := r.Queues[queue]; !ok {
158-
r.Queues[queue] = newCounts()
159-
}
160-
}
161146
for _, build := range v.([]bk.Build) {
147+
queues := c.filterQueues(getBuildQueues(v.([]bk.Build)...)...)
148+
149+
if len(queues) == 0 {
150+
log.Printf("Skipping build, no jobs match queue filter %v", c.Queue)
151+
continue
152+
}
153+
154+
for _, queue := range queues {
155+
if _, ok := r.Queues[queue]; !ok {
156+
r.Queues[queue] = newCounts()
157+
}
158+
}
159+
162160
r.Pipelines[*build.Pipeline.Name] = newCounts()
163161
}
164162
return true
165163
})
166164
}
167165

166+
func (c *Collector) filterQueues(queues ...string) []string {
167+
if c.Queue == "" {
168+
return queues
169+
}
170+
var filtered = []string{}
171+
for _, queue := range queues {
172+
if queue == c.Queue {
173+
filtered = append(filtered, queue)
174+
}
175+
}
176+
return filtered
177+
}
178+
168179
func (c *Collector) addBuildAndJobMetrics(r *Result) error {
169180
currentBuilds := c.listBuildsByOrg(c.Opts.OrgSlug, bk.BuildsListOptions{
170181
State: []string{"scheduled", "running"},
@@ -176,10 +187,15 @@ func (c *Collector) addBuildAndJobMetrics(r *Result) error {
176187
return currentBuilds.Pages(func(v interface{}) bool {
177188
for _, build := range v.([]bk.Build) {
178189
if c.Opts.Debug {
179-
log.Printf("Adding build to stats (id=%q, pipeline=%q, branch=%q, state=%q)",
190+
log.Printf("Processing build (id=%q, pipeline=%q, branch=%q, state=%q)",
180191
*build.ID, *build.Pipeline.Name, *build.Branch, *build.State)
181192
}
182193

194+
if filtered := c.filterQueues(getBuildQueues(build)...); len(filtered) == 0 {
195+
log.Printf("Skipping build, no jobs match queue filter %v", c.Queue)
196+
continue
197+
}
198+
183199
pipeline := *build.Pipeline.Name
184200

185201
if _, ok := r.Pipelines[pipeline]; !ok {
@@ -213,6 +229,11 @@ func (c *Collector) addBuildAndJobMetrics(r *Result) error {
213229
*job.ID, *build.Pipeline.Name, queue(job), *job.Type, state)
214230
}
215231

232+
if filtered := c.filterQueues(queue(job)); len(filtered) == 0 {
233+
log.Printf("Skipping job, doesn't match queue filter %v", c.Queue)
234+
continue
235+
}
236+
216237
if _, ok := r.Queues[queue(job)]; !ok {
217238
r.Queues[queue(job)] = newCounts()
218239
}
@@ -275,9 +296,11 @@ func (c *Collector) addAgentMetrics(r *Result) error {
275296
r.Totals[TotalAgentCount] = 0
276297

277298
for queue := range r.Queues {
278-
r.Queues[queue][BusyAgentCount] = 0
279-
r.Queues[queue][IdleAgentCount] = 0
280-
r.Queues[queue][TotalAgentCount] = 0
299+
if filtered := c.filterQueues(queue); len(filtered) > 0 {
300+
r.Queues[queue][BusyAgentCount] = 0
301+
r.Queues[queue][IdleAgentCount] = 0
302+
r.Queues[queue][TotalAgentCount] = 0
303+
}
281304
}
282305

283306
err := p.Pages(func(v interface{}) bool {
@@ -292,6 +315,11 @@ func (c *Collector) addAgentMetrics(r *Result) error {
292315
}
293316
}
294317

318+
if filtered := c.filterQueues(queue); len(filtered) == 0 {
319+
log.Printf("Skipping agent, doesn't match queue filter %v", c.Queue)
320+
continue
321+
}
322+
295323
if _, ok := r.Queues[queue]; !ok {
296324
r.Queues[queue] = newCounts()
297325
r.Queues[queue][BusyAgentCount] = 0

collector/collector_test.go

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func newTestCollector() *Collector {
4343
}
4444
}
4545

46-
func TestCollectorWithRunningBuilds(t *testing.T) {
46+
func TestCollectorWithRunningBuildsForAllQueues(t *testing.T) {
4747
c := newTestCollector()
4848

4949
res, err := c.Collect()
@@ -103,6 +103,82 @@ func TestCollectorWithRunningBuilds(t *testing.T) {
103103
{"Pipeline.alpacas", res.Pipelines["alpacas"], IdleAgentCount, 0},
104104
}
105105

106+
for queue, _ := range res.Queues {
107+
switch queue {
108+
case "default", "deploy":
109+
continue
110+
default:
111+
t.Fatalf("Unexpected queue %s", queue)
112+
}
113+
}
114+
115+
for _, tc := range testCases {
116+
t.Run(fmt.Sprintf("%s/%s", tc.Group, tc.Key), func(t *testing.T) {
117+
if tc.Counts[tc.Key] != tc.Expected {
118+
t.Fatalf("%s was %d; want %d", tc.Key, tc.Counts[tc.Key], tc.Expected)
119+
}
120+
})
121+
}
122+
}
123+
124+
func TestCollectorWithRunningBuildsForASingleQueue(t *testing.T) {
125+
c := newTestCollector()
126+
c.Queue = "default"
127+
128+
res, err := c.Collect()
129+
if err != nil {
130+
t.Fatal(err)
131+
}
132+
133+
testCases := []struct {
134+
Group string
135+
Counts map[string]int
136+
Key string
137+
Expected int
138+
}{
139+
{"Totals", res.Totals, RunningBuildsCount, 1},
140+
{"Totals", res.Totals, ScheduledBuildsCount, 1},
141+
{"Totals", res.Totals, RunningJobsCount, 1},
142+
{"Totals", res.Totals, ScheduledJobsCount, 1},
143+
{"Totals", res.Totals, UnfinishedJobsCount, 2},
144+
{"Totals", res.Totals, TotalAgentCount, 1},
145+
{"Totals", res.Totals, BusyAgentCount, 1},
146+
{"Totals", res.Totals, IdleAgentCount, 0},
147+
148+
{"Queue.default", res.Queues["default"], RunningBuildsCount, 1},
149+
{"Queue.default", res.Queues["default"], ScheduledBuildsCount, 1},
150+
{"Queue.default", res.Queues["default"], RunningJobsCount, 1},
151+
{"Queue.default", res.Queues["default"], ScheduledJobsCount, 1},
152+
{"Queue.default", res.Queues["default"], UnfinishedJobsCount, 2},
153+
{"Queue.default", res.Queues["default"], TotalAgentCount, 1},
154+
{"Queue.default", res.Queues["default"], BusyAgentCount, 1},
155+
{"Queue.default", res.Queues["default"], IdleAgentCount, 0},
156+
157+
{"Pipeline.llamas", res.Pipelines["llamas"], RunningBuildsCount, 1},
158+
{"Pipeline.llamas", res.Pipelines["llamas"], ScheduledBuildsCount, 0},
159+
{"Pipeline.llamas", res.Pipelines["llamas"], RunningJobsCount, 1},
160+
{"Pipeline.llamas", res.Pipelines["llamas"], ScheduledJobsCount, 0},
161+
{"Pipeline.llamas", res.Pipelines["llamas"], UnfinishedJobsCount, 1},
162+
{"Pipeline.llamas", res.Pipelines["llamas"], TotalAgentCount, 0},
163+
{"Pipeline.llamas", res.Pipelines["llamas"], BusyAgentCount, 0},
164+
{"Pipeline.llamas", res.Pipelines["llamas"], IdleAgentCount, 0},
165+
166+
{"Pipeline.alpacas", res.Pipelines["alpacas"], RunningBuildsCount, 0},
167+
{"Pipeline.alpacas", res.Pipelines["alpacas"], ScheduledBuildsCount, 1},
168+
{"Pipeline.alpacas", res.Pipelines["alpacas"], RunningJobsCount, 0},
169+
{"Pipeline.alpacas", res.Pipelines["alpacas"], ScheduledJobsCount, 1},
170+
{"Pipeline.alpacas", res.Pipelines["alpacas"], UnfinishedJobsCount, 1},
171+
{"Pipeline.alpacas", res.Pipelines["alpacas"], TotalAgentCount, 0},
172+
{"Pipeline.alpacas", res.Pipelines["alpacas"], BusyAgentCount, 0},
173+
{"Pipeline.alpacas", res.Pipelines["alpacas"], IdleAgentCount, 0},
174+
}
175+
176+
for queue, _ := range res.Queues {
177+
if queue != "default" {
178+
t.Fatalf("Unexpected queue %s", queue)
179+
}
180+
}
181+
106182
for _, tc := range testCases {
107183
t.Run(fmt.Sprintf("%s/%s", tc.Group, tc.Key), func(t *testing.T) {
108184
if tc.Counts[tc.Key] != tc.Expected {

0 commit comments

Comments
 (0)