Skip to content

Commit 2579bab

Browse files
committed
Publish metrics like JobPickUpTimeSec and JobExecutionTimeSec
1 parent 1d036c3 commit 2579bab

File tree

3 files changed

+213
-42
lines changed

3 files changed

+213
-42
lines changed

docker/ci-scaler/README.md

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ Example for docker compose:
2424

2525
```yml
2626
services:
27-
...
27+
...
2828
ci-scaler:
2929
image: ghcr.io/dimikot/ci-scaler:latest
3030
ports:
31-
- 18088:8088/tcp
31+
- 18088:8088/tcp
3232
environment:
3333
- GH_TOKEN
3434
- ASGS
@@ -38,10 +38,22 @@ services:
3838
3939
One ci-scaler container may serve multiple GitHub repositories.
4040
41+
## Debugging
42+
4143
To enter the container, run e.g.:
4244
4345
```
46+
cd ..
4447
docker compose exec ci-scaler bash -l
4548
```
4649

50+
From there, to manually send webhook simulation requests to ci-scaler:
51+
52+
```
53+
curl -XPOST http://localhost:8088/workflow_run/dimikot/ci-storage/ci-storage-dev
54+
curl -XPOST http://localhost:8088/workflow_job/dimikot/ci-storage/ci-storage-dev/queued/42
55+
curl -XPOST http://localhost:8088/workflow_job/dimikot/ci-storage/ci-storage-dev/in_progress/42
56+
curl -XPOST http://localhost:8088/workflow_job/dimikot/ci-storage/ci-storage-dev/completed/42
57+
```
58+
4759
See also https://github.com/dimikot/ci-storage

docker/ci-scaler/guest/scaler/api_aws.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ def aws_metadata_curl(path: str) -> str | None:
4444
timeout=METADATA_TIMEOUT_SEC,
4545
) as response:
4646
return response.read().decode("utf-8")
47+
except TimeoutError:
48+
return None
4749
except urllib.error.URLError:
4850
return None
4951

docker/ci-scaler/guest/scaler/handler_webhooks.py

Lines changed: 197 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,25 @@
1414
gh_webhook_ensure_exists,
1515
gh_webhook_ping,
1616
)
17-
from api_aws import DRY_RUN_MSG, aws_autoscaling_increment_desired_capacity
17+
from api_aws import (
18+
DRY_RUN_MSG,
19+
aws_autoscaling_increment_desired_capacity,
20+
aws_cloudwatch_put_metric_data,
21+
)
1822
from helpers import (
1923
ExpiringDict,
2024
PostJsonHttpRequestHandler,
2125
AsgSpec,
2226
log,
2327
logged_result,
2428
)
25-
from typing import Any
29+
from typing import Any, Literal, cast
2630

2731

2832
DUPLICATED_EVENTS_TTL = 3600
33+
JOB_TIMING_TTL = 7200 # 2 hours to track job timing data
2934
WORKFLOW_RUN_EVENT = "workflow_run"
35+
WORKFLOW_JOB_EVENT = "workflow_job"
3036
IGNORE_KEYS = [
3137
"zen",
3238
"hook_id",
@@ -52,16 +58,26 @@ class ServiceAction:
5258
iteration: int = 0
5359

5460

61+
@dataclasses.dataclass
62+
class JobTiming:
63+
job_id: int
64+
queued_at: float | None = None
65+
started_at: float | None = None
66+
completed_at: float | None = None
67+
bumped: set[str] = dataclasses.field(default_factory=set)
68+
69+
5570
class HandlerWebhooks:
5671
def __init__(self, *, domain: str, asg_specs: list[AsgSpec]):
5772
self.domain = domain
5873
self.asg_specs = asg_specs
5974
self.webhooks: dict[str, Webhook] = {}
6075
self.service_action = ServiceAction(prev_at=int(time.time()))
6176
self.secret = gh_get_webhook_secret()
62-
self.duplicated_events = ExpiringDict[tuple[int, int], float](
77+
self.duplicated_events = ExpiringDict[tuple[int, str], float](
6378
ttl=DUPLICATED_EVENTS_TTL
6479
)
80+
self.job_timings = ExpiringDict[int, JobTiming](ttl=JOB_TIMING_TTL)
6581
this = self
6682

6783
class RequestHandler(PostJsonHttpRequestHandler):
@@ -80,7 +96,7 @@ def __enter__(self):
8096
repository=repository,
8197
url=url,
8298
secret=self.secret,
83-
events=[WORKFLOW_RUN_EVENT],
99+
events=[WORKFLOW_RUN_EVENT, WORKFLOW_JOB_EVENT],
84100
)
85101
self.webhooks[repository] = Webhook(url=url, last_delivery_at=None)
86102
return self
@@ -116,13 +132,59 @@ def handle(
116132
data_bytes: bytes,
117133
):
118134
action = data.get("action")
119-
event_payload = data.get(WORKFLOW_RUN_EVENT)
120-
name = event_payload.get("name") if event_payload else None
121-
repository: str | None = data.get("repository", {}).get("full_name", None)
135+
run_payload = data.get(WORKFLOW_RUN_EVENT)
136+
job_payload = data.get(WORKFLOW_JOB_EVENT)
122137

138+
# For local debugging only! Allows to simulate a webhook with just
139+
# querying an URL that includes the repo name and label:
140+
# - /workflow_run/owner/repo/label
141+
# - /workflow_job/owner/repo/label/{queued|in_progress|completed}/job_id
142+
if (
143+
handler.client_address[0] == "127.0.0.1"
144+
and not action
145+
and not run_payload
146+
and not job_payload
147+
):
148+
if match := re.match(
149+
rf"^/{WORKFLOW_RUN_EVENT}/([^/]+/[^/]+)/([^/]+)/?$",
150+
handler.path,
151+
):
152+
return self._handle_workflow_run_in_progress(
153+
handler=handler,
154+
repository=match.group(1),
155+
labels={match.group(2): 1},
156+
)
157+
elif match := re.match(
158+
rf"^/{WORKFLOW_JOB_EVENT}/([^/]+/[^/]+)/([^/]+)/([^/]+)/([^/]+)/?$",
159+
handler.path,
160+
):
161+
return self._handle_workflow_job_timing(
162+
handler=handler,
163+
repository=match.group(1),
164+
labels={match.group(2): 1},
165+
action=cast(Any, match.group(3)),
166+
job_id=int(match.group(4)),
167+
name=None,
168+
)
169+
else:
170+
return handler.send_error(
171+
404,
172+
f"When accessing from localhost for debugging, the path must look like: "
173+
+ f"/{WORKFLOW_RUN_EVENT}/owner/repo/label"
174+
+ f" or "
175+
+ f"/{WORKFLOW_JOB_EVENT}/owner/repo/label/{'{queued|in_progress|completed}'}/job_id"
176+
+ f", but got {handler.path}",
177+
)
178+
179+
repository: str | None = data.get("repository", {}).get("full_name", None)
123180
if repository in self.webhooks:
124181
self.webhooks[repository].last_delivery_at = int(time.time())
125182

183+
name = (
184+
str(run_payload.get("name"))
185+
if run_payload
186+
else str(job_payload.get("name")) if job_payload else None
187+
)
126188
keys = [k for k in data.keys() if k not in IGNORE_KEYS]
127189
if keys:
128190
handler.log_suffix = (
@@ -137,24 +199,6 @@ def handle(
137199
if not repository:
138200
return handler.send_json(202, message="ignoring event with no repository")
139201

140-
if handler.client_address[0] == "127.0.0.1" and not event_payload:
141-
match = re.match(
142-
rf"^/{WORKFLOW_RUN_EVENT}/([^/]+/[^/]+)/([^/]+)/?$",
143-
handler.path,
144-
)
145-
if match:
146-
return self._handle_workflow_run_in_progress(
147-
handler=handler,
148-
repository=match.group(1),
149-
labels={match.group(2): 1},
150-
)
151-
else:
152-
return handler.send_error(
153-
404,
154-
f"When accessing from localhost for debugging, the path must "
155-
+ f"look like: /{WORKFLOW_RUN_EVENT}/{'{owner}/{repo}/{label}'}, but got {handler.path}",
156-
)
157-
158202
assert self.secret
159203
error = verify_signature(
160204
secret=self.secret,
@@ -164,24 +208,23 @@ def handle(
164208
if error:
165209
return handler.send_error(403, error)
166210

167-
if event_payload:
211+
if run_payload:
168212
if action != "requested" and action != "in_progress":
169213
return handler.send_json(
170214
202,
171215
message='ignoring action != ["requested", "in_progress"]',
172216
)
173217

174-
event_key = (int(event_payload["id"]), int(event_payload["run_attempt"]))
218+
event_key = (int(run_payload["id"]), str(run_payload["run_attempt"]))
175219
processed_at = self.duplicated_events.get(event_key)
176220
if processed_at:
177221
return handler.send_json(
178222
202,
179-
message=f"this event has already been processed at {time.ctime(processed_at)}",
223+
message=f"ignoring event that has already been processed at {time.ctime(processed_at)}",
180224
)
181225

182-
head_sha = str(event_payload["head_sha"])
183-
path = str(event_payload["path"])
184-
226+
head_sha = str(run_payload["head_sha"])
227+
path = str(run_payload["path"])
185228
message = f"{repository}: downloading {os.path.basename(path)} and parsing jobs list..."
186229
try:
187230
workflow = gh_fetch_workflow(
@@ -194,12 +237,42 @@ def handle(
194237
except Exception as e:
195238
return handler.send_error(500, f"{message} failed: {e}")
196239

197-
self._handle_workflow_run_in_progress(
240+
self.duplicated_events[event_key] = time.time()
241+
return self._handle_workflow_run_in_progress(
198242
handler=handler,
199243
repository=repository,
200244
labels=labels,
201245
)
246+
247+
if job_payload:
248+
if action != "queued" and action != "in_progress" and action != "completed":
249+
return handler.send_json(
250+
202,
251+
message='ignoring action != ["queued", "in_progress", "completed"]',
252+
)
253+
254+
event_key = (int(job_payload["id"]), action)
255+
processed_at = self.duplicated_events.get(event_key)
256+
if processed_at:
257+
return handler.send_json(
258+
202,
259+
message=f"ignoring event that has already been processed at {time.ctime(processed_at)}",
260+
)
261+
202262
self.duplicated_events[event_key] = time.time()
263+
return self._handle_workflow_job_timing(
264+
handler=handler,
265+
repository=repository,
266+
labels={label: 1 for label in job_payload["labels"]},
267+
action=action,
268+
job_id=int(job_payload["id"]),
269+
name=name,
270+
)
271+
272+
return handler.send_json(
273+
202,
274+
message=f"ignoring event with no {WORKFLOW_RUN_EVENT} and {WORKFLOW_JOB_EVENT}",
275+
)
203276

204277
def _handle_workflow_run_in_progress(
205278
self,
@@ -218,20 +291,104 @@ def _handle_workflow_run_in_progress(
218291
inc=inc,
219292
)
220293
has_aws = has_aws or res
221-
messages.append(f"{asg_spec.label}:+{inc}")
222-
if messages:
294+
messages.append(f"{asg_spec}:+{inc}")
295+
296+
if not messages:
297+
# Most likely, it's a GitHub-hosted action runner's label.
223298
return handler.send_json(
224-
200,
225-
message=f"{repository} desired capacity: {', '.join(messages)}"
226-
+ (f" {DRY_RUN_MSG}" if not has_aws else ""),
299+
202,
300+
message=f"ignoring event, since no matching auto-scaling group(s) found for repository {repository} and labels {[*labels.keys()]}",
227301
)
228-
else:
229-
# Most likely, it's a GitHub-hosted action runner's label.
302+
303+
return handler.send_json(
304+
200,
305+
message=f"updated desired capacity: {', '.join(messages)}"
306+
+ (f" {DRY_RUN_MSG}" if not has_aws else ""),
307+
)
308+
309+
def _handle_workflow_job_timing(
310+
self,
311+
*,
312+
handler: PostJsonHttpRequestHandler,
313+
repository: str,
314+
labels: dict[str, int],
315+
action: Literal["queued", "in_progress", "completed"],
316+
job_id: int,
317+
name: str | None,
318+
):
319+
asg_spec: AsgSpec | None = None
320+
for asg in self.asg_specs:
321+
if asg.repository == repository and asg.label in labels:
322+
asg_spec = asg
323+
break
324+
if not asg_spec:
230325
return handler.send_json(
231326
202,
232-
message=f"Ignored: no matching auto-scaling group(s) found for repository {repository} and labels {[*labels.keys()]}",
327+
message=f"ignoring event, since no matching auto-scaling group(s) found for repository {repository} and labels {[*labels.keys()]}",
328+
)
329+
330+
timing = self.job_timings.get(job_id) or JobTiming(job_id=job_id)
331+
self.job_timings[job_id] = timing
332+
333+
now = time.time()
334+
if action == "queued":
335+
timing.queued_at = now
336+
elif action == "in_progress":
337+
timing.started_at = now
338+
elif action == "completed":
339+
timing.completed_at = now
340+
341+
metrics: dict[str, int] = {}
342+
if timing.started_at and timing.queued_at:
343+
metrics["JobPickUpTimeSec"] = int(timing.started_at - timing.queued_at)
344+
if timing.completed_at and timing.started_at:
345+
metrics["JobExecutionTimeSec"] = int(
346+
timing.completed_at - timing.started_at
347+
)
348+
if timing.completed_at and timing.queued_at:
349+
metrics["JobCompleteTimeSec"] = int(timing.completed_at - timing.queued_at)
350+
351+
for metric in timing.bumped:
352+
metrics.pop(metric, None)
353+
timing.bumped.update(metrics.keys())
354+
355+
if metrics:
356+
job_name = (
357+
re.sub(
358+
r"^_+|_+$", # e.g. "_some_text_" -> "some_text"
359+
"",
360+
re.sub(
361+
r"[^-_a-zA-Z0-9]+", # e.g. "run lint" -> "run_lint"
362+
"_",
363+
re.sub(
364+
r"\s+\d+$", # e.g. "test 6" -> "test x"
365+
" x",
366+
name.lower(), # e.g. "Abc" -> "abc"
367+
),
368+
),
369+
)
370+
if name
371+
else None
372+
)
373+
has_aws = aws_cloudwatch_put_metric_data(
374+
metrics=metrics,
375+
dimensions={
376+
"GH_REPOSITORY": asg_spec.repository,
377+
"GH_LABEL": asg_spec.label,
378+
**({"GH_JOB_NAME": job_name} if job_name else {}),
379+
},
380+
)
381+
log(
382+
f"{asg_spec}: job_id={job_id} job_name={job_name}: "
383+
+ " ".join(f"{k}={v}" for k, v in metrics.items())
384+
+ (f" {DRY_RUN_MSG}" if not has_aws else "")
233385
)
234386

387+
return handler.send_json(
388+
200,
389+
message=f"processed event for job_id={job_id}: {asg_spec}",
390+
)
391+
235392

236393
def verify_signature(
237394
*,

0 commit comments

Comments
 (0)