forked from llvm/llvm-project
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmetrics.py
294 lines (241 loc) · 10.1 KB
/
metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
import requests
import time
import os
from dataclasses import dataclass
import sys
import logging
import github
from github import Github
from github import Auth
GRAFANA_URL = (
"https://influx-prod-13-prod-us-east-0.grafana.net/api/v1/push/influx/write"
)
GITHUB_PROJECT = "llvm/llvm-project"
WORKFLOWS_TO_TRACK = ["LLVM Premerge Checks"]
SCRAPE_INTERVAL_SECONDS = 5 * 60
@dataclass
class JobMetrics:
job_name: str
queue_time: int
run_time: int
status: int
created_at_ns: int
workflow_id: int
workflow_name: str
@dataclass
class GaugeMetric:
name: str
value: int
time_ns: int
def get_sampled_workflow_metrics(github_repo: github.Repository):
"""Gets global statistics about the Github workflow queue
Args:
github_repo: A github repo object to use to query the relevant information.
Returns:
Returns a list of GaugeMetric objects, containing the relevant metrics about
the workflow
"""
queued_job_counts = {}
running_job_counts = {}
# Other states are available (pending, waiting, etc), but the meaning
# is not documented (See #70540).
# "queued" seems to be the info we want.
for queued_workflow in github_repo.get_workflow_runs(status="queued"):
if queued_workflow.name not in WORKFLOWS_TO_TRACK:
continue
for queued_workflow_job in queued_workflow.jobs():
job_name = queued_workflow_job.name
# Workflows marked as queued can potentially only have some jobs
# queued, so make sure to also count jobs currently in progress.
if queued_workflow_job.status == "queued":
if job_name not in queued_job_counts:
queued_job_counts[job_name] = 1
else:
queued_job_counts[job_name] += 1
elif queued_workflow_job.status == "in_progress":
if job_name not in running_job_counts:
running_job_counts[job_name] = 1
else:
running_job_counts[job_name] += 1
for running_workflow in github_repo.get_workflow_runs(status="in_progress"):
if running_workflow.name not in WORKFLOWS_TO_TRACK:
continue
for running_workflow_job in running_workflow.jobs():
job_name = running_workflow_job.name
if running_workflow_job.status != "in_progress":
continue
if job_name not in running_job_counts:
running_job_counts[job_name] = 1
else:
running_job_counts[job_name] += 1
workflow_metrics = []
for queued_job in queued_job_counts:
workflow_metrics.append(
GaugeMetric(
f"workflow_queue_size_{queued_job}",
queued_job_counts[queued_job],
time.time_ns(),
)
)
for running_job in running_job_counts:
workflow_metrics.append(
GaugeMetric(
f"running_workflow_count_{running_job}",
running_job_counts[running_job],
time.time_ns(),
)
)
# Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
workflow_metrics.append(
GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
)
return workflow_metrics
def get_per_workflow_metrics(
github_repo: github.Repository, workflows_to_track: dict[str, int]
):
"""Gets the metrics for specified Github workflows.
This function takes in a list of workflows to track, and optionally the
workflow ID of the last tracked invocation. It grabs the relevant data
from Github, returning it to the caller.
Args:
github_repo: A github repo object to use to query the relevant information.
workflows_to_track: A dictionary mapping workflow names to the last
invocation ID where metrics have been collected, or None to collect the
last five results.
Returns:
Returns a list of JobMetrics objects, containing the relevant metrics about
the workflow.
"""
workflow_metrics = []
workflows_to_include = set(workflows_to_track.keys())
for workflow_run in iter(github_repo.get_workflow_runs()):
if len(workflows_to_include) == 0:
break
if workflow_run.status != "completed":
continue
# This workflow was already sampled for this run, or is not tracked at
# all. Ignoring.
if workflow_run.name not in workflows_to_include:
continue
# There were no new workflow invocations since the previous scrape.
# The API returns a sorted list with the most recent invocations first,
# so we can stop looking for this particular workflow. Continue to grab
# information on the other workflows of interest, if present.
if workflows_to_track[workflow_run.name] == workflow_run.id:
workflows_to_include.remove(workflow_run.name)
continue
workflow_jobs = workflow_run.jobs()
if workflow_jobs.totalCount == 0:
continue
if (
workflows_to_track[workflow_run.name] is None
or workflows_to_track[workflow_run.name] == workflow_run.id
):
workflows_to_include.remove(workflow_run.name)
if (
workflows_to_track[workflow_run.name] is not None
and len(workflows_to_include) == 0
):
break
for workflow_job in workflow_jobs:
created_at = workflow_job.created_at
started_at = workflow_job.started_at
completed_at = workflow_job.completed_at
job_result = int(workflow_job.conclusion == "success")
if job_result:
# We still might want to mark the job as a failure if one of the steps
# failed. This is required due to use setting continue-on-error in
# the premerge pipeline to prevent sending emails while we are
# testing the infrastructure.
# TODO(boomanaiden154): Remove this once the premerge pipeline is no
# longer in a testing state and we can directly assert the workflow
# result.
for step in workflow_job.steps:
if step.conclusion != "success" and step.conclusion != "skipped":
job_result = 0
break
queue_time = started_at - created_at
run_time = completed_at - started_at
if run_time.seconds == 0:
continue
# The timestamp associated with the event is expected by Grafana to be
# in nanoseconds.
created_at_ns = int(created_at.timestamp()) * 10**9
logging.info(
f"Adding a job metric for job {workflow_job.id} in workflow {workflow_run.id}"
)
workflow_metrics.append(
JobMetrics(
workflow_run.name + "-" + workflow_job.name,
queue_time.seconds,
run_time.seconds,
job_result,
created_at_ns,
workflow_run.id,
workflow_run.name,
)
)
return workflow_metrics
def upload_metrics(workflow_metrics, metrics_userid, api_key):
"""Upload metrics to Grafana.
Takes in a list of workflow metrics and then uploads them to Grafana
through a REST request.
Args:
workflow_metrics: A list of metrics to upload to Grafana.
metrics_userid: The userid to use for the upload.
api_key: The API key to use for the upload.
"""
if len(workflow_metrics) == 0:
logging.info("No metrics found to upload.")
return
metrics_batch = []
for workflow_metric in workflow_metrics:
if isinstance(workflow_metric, GaugeMetric):
name = workflow_metric.name.lower().replace(" ", "_")
metrics_batch.append(
f"{name} value={workflow_metric.value} {workflow_metric.time_ns}"
)
elif isinstance(workflow_metric, JobMetrics):
name = workflow_metric.job_name.lower().replace(" ", "_")
metrics_batch.append(
f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
)
else:
raise ValueError(
f"Unsupported object type {type(workflow_metric)}: {str(workflow_metric)}"
)
request_data = "\n".join(metrics_batch)
response = requests.post(
GRAFANA_URL,
headers={"Content-Type": "text/plain"},
data=request_data,
auth=(metrics_userid, api_key),
)
if response.status_code < 200 or response.status_code >= 300:
logging.info(f"Failed to submit data to Grafana: {response.status_code}")
def main():
# Authenticate with Github
auth = Auth.Token(os.environ["GITHUB_TOKEN"])
grafana_api_key = os.environ["GRAFANA_API_KEY"]
grafana_metrics_userid = os.environ["GRAFANA_METRICS_USERID"]
workflows_to_track = {}
for workflow_to_track in WORKFLOWS_TO_TRACK:
workflows_to_track[workflow_to_track] = None
# Enter the main loop. Every five minutes we wake up and dump metrics for
# the relevant jobs.
while True:
github_object = Github(auth=auth)
github_repo = github_object.get_repo("llvm/llvm-project")
current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track)
current_metrics += get_sampled_workflow_metrics(github_repo)
upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key)
logging.info(f"Uploaded {len(current_metrics)} metrics")
for workflow_metric in reversed(current_metrics):
if isinstance(workflow_metric, JobMetrics):
workflows_to_track[
workflow_metric.workflow_name
] = workflow_metric.workflow_id
time.sleep(SCRAPE_INTERVAL_SECONDS)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()