Skip to content

Commit 9f66a9e

Browse files
jeanschmidtZainRizviCamyll
authored
Adds scaleUpHealing chron (#6412)
# TLDR This change introduces a new lambda `${var.environment}-scale-up-chron`. With all the typescript code and required terraform changes. # What is chaning? This PR introduces the typescript code for the new lambda, and the related terraform changes to run the lambda every 30 minutes. The lambda should timeout in 15 minutes. Its permissions and access should be the same as the one in scaleUp. It goes to hud in a URL specified in a user configuration `retry_scale_up_chron_hud_query_url` and gets a list of instance types and number of jobs enqueued. It then synchronously tries to deploy those runners. It introduces 2 new parameters in the main module: * `retry_scale_up_chron_hud_query_url` that for now should point to https://hud.pytorch.org/api/clickhouse/queued_jobs_aggregate?parameters=%5B%5D only in the installations that will benefit from it (both meta and linux foundation PROD clusters, NOT canary) as when this variable is set to empty string (default) the installation of this cron is not performed. * `scale_config_org` that should point to the org where scale-config files are defined. In our case it is `pytorch`. [example of the change](https://github.com/pytorch-labs/pytorch-gha-infra/pull/622/files) # Why are we changing this? We're introducing this change in order to provide a solution to help recover lost requests for infra scaling. Its been proven for a while that when there are github API outages we fail to get new jobs webhook or fail to provision new runners. Most of the time our retry mechanism is capable of dealing with the situation. But, in cases where we are not receiving webhooks or other more esoteric problems, there is no way to recover. With this change, every 30 minutes, jobs enqueued for longer than 30 minutes for one of the autoscaled instance types, will trigger the creation of those instances. A few design decisions: 1 - Why rely on hud? Hud currently already have these informations, so it should be simple to just get it from there; 2 - Why not send a scale message and allow scaleUp to handle it? We want to have isolation, in a way that we can easily circuit-break the creation of enqueued instances. This also includes the isolation that guarantees that if scaler is failing to deploy given instance type, this mechanism won;t risk flood/overflow the main scaler that have to deal with all other ones. 3 - why randomise the instance creation order? So if some instance type is problematic, we are not absolutely preventing the recovery of other instances types (just interfering). Also we gain some time between instances creations of the same type, allowing for a smoother operation. 4 - why a new lambda? check number 2 # If something goes wrong? Given we introduced as much as possible work to make sure there are maximal isolation between the regular scaler and the cron recovery scaler that we're introducing, we;re not foreseeing any potential gaps that could break the main scaler and as a consequence introduce system breakages. Having said that, if you need to revert those changes from production, just follow the steps: https://docs.google.com/document/d/1nq3dx-_8wasii1koCkXJDSo3uz_0Ee8DzIS2-j2TOpA/edit?tab=t.0#heading=h.jwflgevrww4j --------- Co-authored-by: Zain Rizvi <[email protected]> Co-authored-by: Camyll Harajli <[email protected]>
1 parent 8737643 commit 9f66a9e

17 files changed

+789
-16
lines changed

terraform-aws-github-runner/main.tf

+3
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ module "runners" {
104104
environment = var.environment
105105
tags = local.tags
106106

107+
scale_config_org = var.scale_config_org
107108
scale_config_repo = var.scale_config_repo
108109
scale_config_repo_path = var.scale_config_repo_path
109110

@@ -112,6 +113,8 @@ module "runners" {
112113
encrypt = var.encrypt_secrets
113114
}
114115

116+
retry_scale_up_chron_hud_query_url = var.retry_scale_up_chron_hud_query_url
117+
115118
must_have_issues_labels = var.must_have_issues_labels
116119
cant_have_issues_labels = var.cant_have_issues_labels
117120

terraform-aws-github-runner/modules/runners/lambdas/runners/jest.config.js

+4-1
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,8 @@ module.exports = {
1111
lines: 93,
1212
statements: 94
1313
}
14-
}
14+
},
15+
moduleNameMapper: {
16+
axios: 'axios/dist/node/axios.cjs', // Allow axios to work in tests
17+
},
1518
};

terraform-aws-github-runner/modules/runners/lambdas/runners/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
"@types/uuid": "^9.0.1",
4343
"async-mutex": "^0.4.0",
4444
"aws-sdk": "^2.863.0",
45+
"axios": "^1.7.7",
4546
"cron-parser": "^3.3.0",
4647
"generic-pool": "^3.9.0",
4748
"lru-cache": "^6.0.0",

terraform-aws-github-runner/modules/runners/lambdas/runners/src/lambda.test.ts

+38-10
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
import { scaleDown as scaleDownL, scaleUp as scaleUpL } from './lambda';
1+
import { scaleDown as scaleDownL, scaleUp as scaleUpL, scaleUpChron as scaleUpChronL } from './lambda';
22

33
import nock from 'nock';
44
import { Config } from './scale-runners/config';
55
import { Context, SQSEvent, ScheduledEvent } from 'aws-lambda';
66
import { mocked } from 'ts-jest/utils';
77
import { scaleDown } from './scale-runners/scale-down';
88
import { scaleUp, RetryableScalingError } from './scale-runners/scale-up';
9+
import { scaleUpChron } from './scale-runners/scale-up-chron';
910
import { sqsSendMessages, sqsDeleteMessageBatch } from './scale-runners/sqs';
1011
import * as MetricsModule from './scale-runners/metrics';
1112

@@ -21,8 +22,10 @@ jest.mock('aws-sdk', () => ({
2122
jest.mock('./scale-runners/scale-down');
2223
jest.mock('./scale-runners/scale-up');
2324
jest.mock('./scale-runners/sqs');
25+
jest.mock('./scale-runners/scale-up-chron');
2426

25-
const metrics = new MetricsModule.ScaleUpMetrics();
27+
const mockScaleUpMetrics = new MetricsModule.ScaleUpMetrics();
28+
const mockScaleUpChronMetrics = new MetricsModule.ScaleUpChronMetrics();
2629

2730
beforeEach(() => {
2831
jest.resetModules();
@@ -34,7 +37,7 @@ beforeEach(() => {
3437
describe('scaleUp', () => {
3538
beforeEach(() => {
3639
jest.spyOn(global.Math, 'random').mockReturnValue(1.0);
37-
jest.spyOn(MetricsModule, 'ScaleUpMetrics').mockReturnValue(metrics);
40+
jest.spyOn(MetricsModule, 'ScaleUpMetrics').mockReturnValue(mockScaleUpMetrics);
3841
});
3942

4043
afterEach(() => {
@@ -55,8 +58,8 @@ describe('scaleUp', () => {
5558
callback,
5659
);
5760
expect(mockedScaleUp).toBeCalledTimes(2);
58-
expect(mockedScaleUp).toBeCalledWith('aws:sqs', { id: 1 }, metrics);
59-
expect(mockedScaleUp).toBeCalledWith('aws:sqs', { id: 2 }, metrics);
61+
expect(mockedScaleUp).toBeCalledWith('aws:sqs', { id: 1 }, mockScaleUpMetrics);
62+
expect(mockedScaleUp).toBeCalledWith('aws:sqs', { id: 2 }, mockScaleUpMetrics);
6063
expect(callback).toBeCalledTimes(1);
6164
expect(callback).toBeCalledWith(null);
6265
});
@@ -88,12 +91,12 @@ describe('scaleUp', () => {
8891
callback,
8992
);
9093
expect(mockedScaleUp).toBeCalledTimes(1);
91-
expect(mockedScaleUp).toBeCalledWith('aws:sqs', { id: 1 }, metrics);
94+
expect(mockedScaleUp).toBeCalledWith('aws:sqs', { id: 1 }, mockScaleUpMetrics);
9295
expect(callback).toBeCalledTimes(1);
9396
expect(callback).toBeCalledWith('Failed handling SQS event');
9497

9598
expect(sqsDeleteMessageBatch).toBeCalledTimes(1);
96-
expect(sqsDeleteMessageBatch).toBeCalledWith(metrics, evts);
99+
expect(sqsDeleteMessageBatch).toBeCalledWith(mockScaleUpMetrics, evts);
97100
});
98101

99102
it('stochasticOvershoot when retryCount > 5', async () => {
@@ -137,7 +140,7 @@ describe('scaleUp', () => {
137140
},
138141
];
139142
expect(sqsSendMessages).toBeCalledTimes(1);
140-
expect(sqsSendMessages).toBeCalledWith(metrics, expected, 'asdf');
143+
expect(sqsSendMessages).toBeCalledWith(mockScaleUpMetrics, expected, 'asdf');
141144

142145
expect(sqsDeleteMessageBatch).toBeCalledTimes(0);
143146
});
@@ -205,10 +208,10 @@ describe('scaleUp', () => {
205208
},
206209
];
207210
expect(sqsSendMessages).toBeCalledTimes(1);
208-
expect(sqsSendMessages).toBeCalledWith(metrics, expected, 'asdf');
211+
expect(sqsSendMessages).toBeCalledWith(mockScaleUpMetrics, expected, 'asdf');
209212

210213
expect(sqsDeleteMessageBatch).toBeCalledTimes(1);
211-
expect(sqsDeleteMessageBatch).toBeCalledWith(metrics, records);
214+
expect(sqsDeleteMessageBatch).toBeCalledWith(mockScaleUpMetrics, records);
212215
});
213216
});
214217

@@ -231,3 +234,28 @@ describe('scaleDown', () => {
231234
expect(callback).toBeCalledWith('Failed');
232235
});
233236
});
237+
238+
describe('scaleUpChron', () => {
239+
beforeEach(() => {
240+
jest.spyOn(MetricsModule, 'ScaleUpChronMetrics').mockReturnValue(mockScaleUpChronMetrics);
241+
});
242+
243+
it('succeeds', async () => {
244+
const mockedScaleUpChron = mocked(scaleUpChron).mockResolvedValue(undefined);
245+
const callback = jest.fn();
246+
await scaleUpChronL({} as unknown as ScheduledEvent, {} as unknown as Context, callback);
247+
expect(mockedScaleUpChron).toBeCalledTimes(1);
248+
expect(mockedScaleUpChron).toBeCalledWith(mockScaleUpChronMetrics);
249+
expect(callback).toBeCalledTimes(1);
250+
expect(callback).toBeCalledWith(null);
251+
});
252+
253+
it('fails', async () => {
254+
const mockedScaleUpChron = mocked(scaleUpChron).mockRejectedValue(Error('error'));
255+
const callback = jest.fn();
256+
await scaleUpChronL({} as unknown as ScheduledEvent, {} as unknown as Context, callback);
257+
expect(mockedScaleUpChron).toBeCalledTimes(1);
258+
expect(callback).toBeCalledTimes(1);
259+
expect(callback).toBeCalledWith('Failed');
260+
});
261+
});

terraform-aws-github-runner/modules/runners/lambdas/runners/src/lambda.ts

+39-1
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,15 @@ import { ActionRequestMessage, RetryableScalingError, scaleUp as scaleUpR } from
22
import { Context, SQSEvent, SQSRecord, ScheduledEvent } from 'aws-lambda';
33

44
import { Config } from './scale-runners/config';
5-
import { ScaleUpMetrics, sendMetricsAtTimeout, sendMetricsTimeoutVars } from './scale-runners/metrics';
5+
import {
6+
ScaleUpMetrics,
7+
ScaleUpChronMetrics,
8+
sendMetricsAtTimeout,
9+
sendMetricsTimeoutVars,
10+
} from './scale-runners/metrics';
611
import { getDelayWithJitterRetryCount, stochaticRunOvershoot } from './scale-runners/utils';
712
import { scaleDown as scaleDownR } from './scale-runners/scale-down';
13+
import { scaleUpChron as scaleUpChronR } from './scale-runners/scale-up-chron';
814
import { sqsSendMessages, sqsDeleteMessageBatch } from './scale-runners/sqs';
915

1016
async function sendRetryEvents(evtFailed: Array<[SQSRecord, boolean, number]>, metrics: ScaleUpMetrics) {
@@ -155,3 +161,35 @@ export async function scaleDown(event: ScheduledEvent, context: Context, callbac
155161
return callback('Failed');
156162
}
157163
}
164+
165+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
166+
export async function scaleUpChron(event: ScheduledEvent, context: Context, callback: any) {
167+
// we mantain open connections to redis, so the event pool is only cleaned when the SIGTERM is sent
168+
context.callbackWaitsForEmptyEventLoop = false;
169+
170+
const metrics = new ScaleUpChronMetrics();
171+
const sndMetricsTimout: sendMetricsTimeoutVars = {
172+
metrics: metrics,
173+
};
174+
sndMetricsTimout.setTimeout = setTimeout(
175+
sendMetricsAtTimeout(sndMetricsTimout),
176+
(Config.Instance.lambdaTimeout - 10) * 1000,
177+
);
178+
179+
try {
180+
await scaleUpChronR(metrics);
181+
return callback(null);
182+
} catch (e) {
183+
console.error(e);
184+
return callback('Failed');
185+
} finally {
186+
try {
187+
clearTimeout(sndMetricsTimout.setTimeout);
188+
sndMetricsTimout.metrics = undefined;
189+
sndMetricsTimout.setTimeout = undefined;
190+
await metrics.sendMetrics();
191+
} catch (e) {
192+
console.error(`Error sending metrics: ${e}`);
193+
}
194+
}
195+
}

terraform-aws-github-runner/modules/runners/lambdas/runners/src/scale-runners/config.ts

+9-1
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,11 @@ export class Config {
3636
readonly retryScaleUpRecordQueueUrl: string | undefined;
3737
readonly runnerGroupName: string | undefined;
3838
readonly runnersExtraLabels: undefined | string;
39+
readonly scaleConfigOrg: string;
3940
readonly scaleConfigRepo: string;
4041
readonly scaleConfigRepoPath: string;
42+
readonly scaleUpMinQueueTimeMinutes: number;
43+
readonly scaleUpChronRecordQueueUrl: string | undefined;
4144
readonly secretsManagerSecretsId: string | undefined;
4245
readonly sSMParamCleanupAgeDays: number;
4346
readonly sSMParamMaxCleanupAllowance: number;
@@ -93,9 +96,14 @@ export class Config {
9396
this.retryScaleUpRecordDelayS = Number(process.env.RETRY_SCALE_UP_RECORD_DELAY_S || '0');
9497
/* istanbul ignore next */
9598
this.retryScaleUpRecordJitterPct = Number(process.env.RETRY_SCALE_UP_RECORD_JITTER_PCT || '0');
96-
this.retryScaleUpRecordQueueUrl = process.env.RETRY_SCALE_UP_RECORD_QUEUE_URL;
99+
this.retryScaleUpRecordQueueUrl = process.env.RETRY_SCALE_UP_CHRON_RECORD_QUEUE_URL;
100+
this.scaleUpChronRecordQueueUrl = process.env.SCALE_UP_CHRON_HUD_QUERY_URL;
101+
this.scaleUpMinQueueTimeMinutes = process.env.SCALE_UP_MIN_QUEUE_TIME_MINUTES
102+
? Number(process.env.SCALE_UP_MIN_QUEUE_TIME_MINUTES)
103+
: 30;
97104
this.runnerGroupName = process.env.RUNNER_GROUP_NAME;
98105
this.runnersExtraLabels = process.env.RUNNER_EXTRA_LABELS;
106+
this.scaleConfigOrg = process.env.SCALE_CONFIG_ORG || '';
99107
/* istanbul ignore next */
100108
this.scaleConfigRepo = process.env.SCALE_CONFIG_REPO || '';
101109
if (this.enableOrganizationRunners && !this.scaleConfigRepo) {

terraform-aws-github-runner/modules/runners/lambdas/runners/src/scale-runners/metrics.ts

+60-2
Original file line numberDiff line numberDiff line change
@@ -913,8 +913,8 @@ export class Metrics {
913913
}
914914

915915
export class ScaleUpMetrics extends Metrics {
916-
constructor() {
917-
super('scaleUp');
916+
constructor(lambdaName: string | undefined = undefined) {
917+
super(lambdaName || 'scaleUp');
918918
}
919919

920920
/* istanbul ignore next */
@@ -1630,6 +1630,64 @@ export class ScaleDownMetrics extends Metrics {
16301630
}
16311631
}
16321632

1633+
export class ScaleUpChronMetrics extends ScaleUpMetrics {
1634+
constructor() {
1635+
super('scaleUpChron');
1636+
}
1637+
1638+
queuedRunnerStats(org: string, runnerType: string, numQueuedJobs: number) {
1639+
const dimensions = new Map([
1640+
['Org', org],
1641+
['RunnerType', runnerType],
1642+
['numQueuedJobs', numQueuedJobs.toString()],
1643+
]);
1644+
this.addEntry('gh.scaleupchron.queuedRunners', 3, dimensions);
1645+
}
1646+
1647+
queuedRunnerFailure(error: string) {
1648+
const dimensions = new Map([['error', error]]);
1649+
this.countEntry('gh.scaleupchron.queuedRunners.failure', 1, dimensions);
1650+
}
1651+
1652+
/* istanbul ignore next */
1653+
getQueuedJobsEndpointSuccess(ms: number) {
1654+
this.countEntry(`gh.calls.total`, 1);
1655+
this.countEntry(`gh.calls.getQueuedJobsEndpoint.count`, 1);
1656+
this.countEntry(`gh.calls.getQueuedJobsEndpoint.success`, 1);
1657+
this.addEntry(`gh.calls.getQueuedJobsEndpoint.wallclock`, ms);
1658+
}
1659+
1660+
/* istanbul ignore next */
1661+
getQueuedJobsEndpointFailure(ms: number) {
1662+
this.countEntry(`gh.calls.total`, 1);
1663+
this.countEntry(`gh.calls.getQueuedJobsEndpoint.count`, 1);
1664+
this.countEntry(`gh.calls.getQueuedJobsEndpoint.failure`, 1);
1665+
this.addEntry(`gh.calls.getQueuedJobsEndpoint.wallclock`, ms);
1666+
}
1667+
1668+
scaleUpInstanceSuccess() {
1669+
this.scaleUpSuccess();
1670+
this.countEntry('run.scaleupchron.success');
1671+
}
1672+
1673+
scaleUpInstanceFailureNonRetryable(error: string) {
1674+
const dimensions = new Map([['error', error]]);
1675+
// should we add more information about this or do we not care since it'll be requeued?
1676+
this.countEntry('run.scaleupchron.failure.nonRetryable', 1, dimensions);
1677+
}
1678+
1679+
scaleUpInstanceFailureRetryable(error: string) {
1680+
const dimensions = new Map([['error', error]]);
1681+
1682+
// should we add more information about this or do we not care since it'll be requeued?
1683+
this.countEntry('run.scaleupchron.failure.retryable', 1, dimensions);
1684+
}
1685+
1686+
scaleUpInstanceNoOp() {
1687+
this.countEntry('run.scaleupchron.noop');
1688+
}
1689+
}
1690+
16331691
export interface sendMetricsTimeoutVars {
16341692
metrics?: Metrics;
16351693
setTimeout?: ReturnType<typeof setTimeout>;

0 commit comments

Comments
 (0)