Skip to content

Commit 8acea26

Browse files
authored
Merge pull request #127 from depot/feat/sync-shutdown-version-2
feat: add synchronous shutdown
2 parents d568875 + 6fa6e4f commit 8acea26

File tree

5 files changed

+143
-30
lines changed

5 files changed

+143
-30
lines changed

proto/depot/cloud/v3/machine.proto

+4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ service MachineService {
77
rpc PingMachineHealth(PingMachineHealthRequest) returns (PingMachineHealthResponse);
88
rpc ReportMachineHealth(stream ReportMachineHealthRequest) returns (ReportMachineHealthResponse);
99
rpc Usage(UsageRequest) returns (UsageResponse);
10+
rpc Shutdown(ShutdownRequest) returns (ShutdownResponse);
1011
}
1112

1213
message RegisterMachineRequest {
@@ -159,3 +160,6 @@ message Cache {
159160
}
160161

161162
message UsageResponse {}
163+
164+
message ShutdownRequest {}
165+
message ShutdownResponse {}

src/gen/ts/depot/cloud/v3/machine_connect.ts

+11
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import {
1111
RegisterMachineResponse,
1212
ReportMachineHealthRequest,
1313
ReportMachineHealthResponse,
14+
ShutdownRequest,
15+
ShutdownResponse,
1416
UsageRequest,
1517
UsageResponse,
1618
} from './machine_pb'
@@ -57,5 +59,14 @@ export const MachineService = {
5759
O: UsageResponse,
5860
kind: MethodKind.Unary,
5961
},
62+
/**
63+
* @generated from rpc depot.cloud.v3.MachineService.Shutdown
64+
*/
65+
shutdown: {
66+
name: 'Shutdown',
67+
I: ShutdownRequest,
68+
O: ShutdownResponse,
69+
kind: MethodKind.Unary,
70+
},
6071
},
6172
} as const

src/gen/ts/depot/cloud/v3/machine_pb.ts

+66
Original file line numberDiff line numberDiff line change
@@ -1140,3 +1140,69 @@ export class UsageResponse extends Message<UsageResponse> {
11401140
return proto3.util.equals(UsageResponse, a, b)
11411141
}
11421142
}
1143+
1144+
/**
1145+
* @generated from message depot.cloud.v3.ShutdownRequest
1146+
*/
1147+
export class ShutdownRequest extends Message<ShutdownRequest> {
1148+
constructor(data?: PartialMessage<ShutdownRequest>) {
1149+
super()
1150+
proto3.util.initPartial(data, this)
1151+
}
1152+
1153+
static readonly runtime: typeof proto3 = proto3
1154+
static readonly typeName = 'depot.cloud.v3.ShutdownRequest'
1155+
static readonly fields: FieldList = proto3.util.newFieldList(() => [])
1156+
1157+
static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ShutdownRequest {
1158+
return new ShutdownRequest().fromBinary(bytes, options)
1159+
}
1160+
1161+
static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): ShutdownRequest {
1162+
return new ShutdownRequest().fromJson(jsonValue, options)
1163+
}
1164+
1165+
static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): ShutdownRequest {
1166+
return new ShutdownRequest().fromJsonString(jsonString, options)
1167+
}
1168+
1169+
static equals(
1170+
a: ShutdownRequest | PlainMessage<ShutdownRequest> | undefined,
1171+
b: ShutdownRequest | PlainMessage<ShutdownRequest> | undefined,
1172+
): boolean {
1173+
return proto3.util.equals(ShutdownRequest, a, b)
1174+
}
1175+
}
1176+
1177+
/**
1178+
* @generated from message depot.cloud.v3.ShutdownResponse
1179+
*/
1180+
export class ShutdownResponse extends Message<ShutdownResponse> {
1181+
constructor(data?: PartialMessage<ShutdownResponse>) {
1182+
super()
1183+
proto3.util.initPartial(data, this)
1184+
}
1185+
1186+
static readonly runtime: typeof proto3 = proto3
1187+
static readonly typeName = 'depot.cloud.v3.ShutdownResponse'
1188+
static readonly fields: FieldList = proto3.util.newFieldList(() => [])
1189+
1190+
static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ShutdownResponse {
1191+
return new ShutdownResponse().fromBinary(bytes, options)
1192+
}
1193+
1194+
static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): ShutdownResponse {
1195+
return new ShutdownResponse().fromJson(jsonValue, options)
1196+
}
1197+
1198+
static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): ShutdownResponse {
1199+
return new ShutdownResponse().fromJsonString(jsonString, options)
1200+
}
1201+
1202+
static equals(
1203+
a: ShutdownResponse | PlainMessage<ShutdownResponse> | undefined,
1204+
b: ShutdownResponse | PlainMessage<ShutdownResponse> | undefined,
1205+
): boolean {
1206+
return proto3.util.equals(ShutdownResponse, a, b)
1207+
}
1208+
}

src/tasks/buildkit.ts

+48-26
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import * as fsp from 'fs/promises'
44
import {onShutdown, onShutdownError} from 'node-graceful-shutdown'
55
import {RegisterMachineResponse, RegisterMachineResponse_BuildKitTask} from '../gen/ts/depot/cloud/v3/machine_pb'
66
import {pathExists} from '../utils/common'
7+
import {client} from '../utils/grpc'
78
import {ensureMounted, fstrim, mountExecutor, unmapBlockDevice, unmountDevice} from '../utils/mounts'
89
import {reportHealth} from './health'
910
import {reportUsage} from './usage'
@@ -209,6 +210,9 @@ keepBytes = ${cacheSizeBytes}
209210
// Ignore this error, it's expected when the process is killed.
210211
} else if (isAbortError(error)) {
211212
// Ignore this error, it's expected when the process is killed.
213+
} else if (error instanceof Error && error.message.includes('Command failed with exit code 2')) {
214+
console.error(`BuildKit exited with panic: ${error}`)
215+
throw error
212216
} else {
213217
throw error
214218
}
@@ -237,41 +241,59 @@ keepBytes = ${cacheSizeBytes}
237241
console.log(`BuildKit exited with error: ${error}`)
238242
}
239243

240-
// Remove estargz cache because we will rely on the buildkit layer cache instead.
241-
await execa('rm', ['-rf', `${rootDir}/runc-stargz/snapshots/stargz`], {stdio: 'inherit'}).catch((err) => {
242-
console.error(err)
243-
})
244-
245-
// Print the time it takes to sync the filesystem.
246-
const start = Date.now()
247-
// sync the filesystem to ensure all data is written to disk.
248-
await execa('sync', {stdio: 'inherit'}).catch((err) => {
249-
console.error(err)
250-
})
251-
console.log(`sync took ${Date.now() - start}ms`)
252-
253-
for (const mount of task.mounts) {
254-
if (mount.cephVolume) {
255-
if (!task.disableFstrim) {
256-
await fstrim(mount.path)
257-
}
258-
await unmountDevice(mount.path)
259-
await unmapBlockDevice(mount.cephVolume.volumeName, mount.cephVolume.imageSpec)
260-
} else {
261-
await unmountDevice(mount.path)
262-
}
263-
}
244+
await shutdown(rootDir, task)
264245
})
265246

266247
try {
267-
await Promise.all([
248+
const [result] = await Promise.allSettled([
268249
buildkit,
269-
reportHealth({signal, headers, path: rootDir}),
250+
reportHealth({controller, headers, path: rootDir}),
270251
reportUsage({machineId, signal, headers}),
271252
])
253+
if (result.status === 'rejected') {
254+
throw result.reason
255+
}
256+
257+
// If we have successfully stopped buildkit, we can shutdown.
258+
await shutdown(rootDir, task)
272259
} catch (error) {
273260
throw error
274261
} finally {
275262
controller.abort()
276263
}
277264
}
265+
266+
async function shutdown(rootDir: string, task: RegisterMachineResponse_BuildKitTask) {
267+
// Remove estargz cache because we will rely on the buildkit layer cache instead.
268+
await execa('rm', ['-rf', `${rootDir}/runc-stargz/snapshots/stargz`], {stdio: 'inherit'}).catch((err) => {
269+
console.error(err)
270+
})
271+
272+
// Print the time it takes to sync the filesystem.
273+
const start = Date.now()
274+
// sync the filesystem to ensure all data is written to disk.
275+
await execa('sync', {stdio: 'inherit'}).catch((err) => {
276+
console.error(err)
277+
})
278+
console.log(`sync took ${Date.now() - start}ms`)
279+
280+
for (const mount of task.mounts) {
281+
if (mount.cephVolume) {
282+
if (!task.disableFstrim) {
283+
await fstrim(mount.path)
284+
}
285+
await unmountDevice(mount.path)
286+
await unmapBlockDevice(mount.cephVolume.volumeName, mount.cephVolume.imageSpec)
287+
} else {
288+
await unmountDevice(mount.path)
289+
}
290+
}
291+
292+
// Report shutdown to the API to indicate that the machine is no longer available.
293+
await reportShutdown()
294+
}
295+
296+
async function reportShutdown() {
297+
const signal = AbortSignal.timeout(5000)
298+
return await client.shutdown({}, {signal})
299+
}

src/tasks/health.ts

+14-4
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
11
import {PlainMessage} from '@bufbuild/protobuf'
2+
import {isAbortError} from 'abort-controller-x'
23
import {execa} from 'execa'
34
import {DiskSpace} from '../gen/ts/depot/cloud/v3/machine_pb'
45
import {sleep} from '../utils/common'
56
import {stats} from '../utils/disk'
67
import {client} from '../utils/grpc'
78

89
export interface ReportHealthParams {
9-
signal: AbortSignal
10+
controller: AbortController
1011
headers: HeadersInit
1112
path: string
1213
}
1314

14-
export async function reportHealth({signal, headers, path}: ReportHealthParams) {
15+
export async function reportHealth({controller, headers, path}: ReportHealthParams) {
16+
const signal = controller.signal
17+
1518
while (!signal.aborted) {
1619
await waitForBuildKitWorkers(signal)
1720

@@ -34,9 +37,16 @@ export async function reportHealth({signal, headers, path}: ReportHealthParams)
3437
}
3538

3639
const res = await client.reportMachineHealth(stream(), {headers, signal})
37-
if (res.shouldTerminate) return
40+
if (res.shouldTerminate) {
41+
console.log('shutdown requested')
42+
controller.abort()
43+
44+
return
45+
}
3846
} catch (error) {
39-
console.log('Error reporting health:', error)
47+
if (!isAbortError(error)) {
48+
console.log('Error reporting health:', error)
49+
}
4050
}
4151
await sleep(1000)
4252
}

0 commit comments

Comments
 (0)