Skip to content

Commit 6b32b84

Browse files
committed
rpc-based step execution
1 parent 49fe070 commit 6b32b84

File tree

7 files changed

+430
-24
lines changed

7 files changed

+430
-24
lines changed

Diff for: packages/k8s-workflow/gha-runner-rpc.py

+174
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
#!/usr/bin/env python3
2+
3+
# Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
4+
# SPDX-License-Identifier: Apache-2.0
5+
6+
# This implements a very simple RPC server that should be running on the job container of the workflow pod,
7+
# and used by the k8s hook to execute steps in the workflow on the workflow pod.
8+
9+
# It supports a running a single RPC call at a time, and will return an error if a new call is made while
10+
# another one is still running (which is a valid assumption, as the runner is expected to execute one step at a time).
11+
12+
13+
from concurrent.futures import ThreadPoolExecutor
14+
from dataclasses import dataclass
15+
import time
16+
from flask import Flask, jsonify, request
17+
from threading import Thread
18+
from waitress import serve
19+
20+
import argparse
21+
import json
22+
import logging
23+
import os
24+
import signal
25+
import subprocess
26+
27+
import logging
28+
import json_logging
29+
30+
app = Flask(__name__)
31+
app.logger.setLevel(logging.DEBUG)
32+
json_logging.init_flask(enable_json=True)
33+
json_logging.init_request_instrument(app)
34+
35+
@dataclass
36+
class Response:
37+
id: str
38+
status: str
39+
pid: int = None
40+
returncode: int = None
41+
error: str = None
42+
43+
def readLines(path, fromLine, maxLines):
44+
try:
45+
with open(path, 'r') as f:
46+
return [x for i, x in enumerate(f) if i >= fromLine and x.endswith('\n') and i < fromLine + maxLines]
47+
except Exception as e:
48+
app.logger.warning(f"Error reading file {path}: {e}")
49+
return []
50+
51+
class State:
52+
def __init__(self):
53+
self.latest_id = None
54+
self.status = Response(id = "", status = "idle")
55+
self.worker = ThreadPoolExecutor(max_workers=1)
56+
self.future = None
57+
self.process = None
58+
self.out = None
59+
60+
def __run(self, id, path):
61+
self.latest_id = id
62+
try:
63+
app.logger.debug(f"Running id {id}")
64+
logsfilename = f"/logs/{id}.out"
65+
self.out = open(logsfilename, "w")
66+
self.process = subprocess.Popen(['sh', '-e', path], start_new_session=True, stdout=self.out, stderr=self.out)
67+
app.logger.debug(f"Process for id {id} started with pid {self.process.pid}")
68+
self.status = Response(
69+
id = id,
70+
status = 'running',
71+
pid = self.process.pid
72+
)
73+
self.process.wait()
74+
self.out.close()
75+
app.logger.debug(f"Process for id {id} finished (return code {self.process.returncode})")
76+
self.status = Response(
77+
id = id,
78+
status = 'completed',
79+
returncode = self.process.returncode,
80+
)
81+
except Exception as e:
82+
app.logger.error(f"Error starting process: {e}")
83+
self.status = Response(
84+
id = id,
85+
status = 'failed',
86+
error = str(e),
87+
returncode = -1,
88+
)
89+
90+
91+
def exec(self, id, path):
92+
if self.future and not self.future.done():
93+
app.logger.error(f"A job is already running (ID {self.latest_id})")
94+
return Response(
95+
id = id,
96+
status = 'failed',
97+
error = f"A job is already running (ID {self.latest_id})",
98+
returncode = -1,
99+
)
100+
101+
app.logger.debug(f"Queueing job {id} with path {path}")
102+
self.status = Response(id = id, status = "pending")
103+
self.future = self.worker.submit(self.__run, id, path)
104+
return self.status
105+
106+
def cancel(self):
107+
if not self.future:
108+
return Response(
109+
id = '',
110+
status = 'failed',
111+
error = 'No job has been started yet',
112+
)
113+
elif self.future.done():
114+
# The job is already done, no need to cancel
115+
return self.status
116+
else:
117+
app.logger.debug(f"Cancelling {self.latest_id} (pid {self.process.pid})")
118+
os.killpg(os.getpgid(self.process.pid), signal.SIGINT)
119+
120+
return Response(
121+
id = self.latest_id,
122+
status = 'cancelling',
123+
pid = self.process.pid
124+
)
125+
126+
state = State()
127+
128+
# Post a new job
129+
@app.route('/', methods=['POST'])
130+
def call():
131+
data = json.loads(request.data)
132+
if 'id' not in data or 'path' not in data:
133+
return jsonify(Response(
134+
id = '',
135+
status = 'failed',
136+
error = 'Missing id or path in request',
137+
))
138+
id = data['id']
139+
path = data['path']
140+
return jsonify(state.exec(id, path))
141+
142+
# Cancel the current job
143+
@app.route('/', methods=['DELETE'])
144+
def cancel():
145+
return jsonify(state.cancel())
146+
147+
# Get the current status
148+
@app.route('/')
149+
def status():
150+
app.logger.debug(f"Status: {state.status}")
151+
return jsonify(state.status)
152+
153+
# Get the logs of a given job
154+
@app.route('/logs')
155+
def logs():
156+
if 'id' not in request.args:
157+
return 'Missing id in request', 400
158+
id = request.args.get('id')
159+
fromLine = int(request.args.get('fromLine', 0))
160+
maxLines = int(request.args.get('maxLines', 1000))
161+
path = f"/logs/{id}.out"
162+
return jsonify(readLines(path, fromLine, maxLines))
163+
164+
165+
if __name__ == '__main__':
166+
167+
parser = argparse.ArgumentParser()
168+
parser.add_argument('--dev', action='store_true', help='Run in Flask development mode')
169+
args = parser.parse_args()
170+
if args.dev:
171+
app.run(host='0.0.0.0', port=8080, debug=True)
172+
else:
173+
serve(app, host='0.0.0.0', port=8080, threads=1)
174+

Diff for: packages/k8s/src/hooks/cleanup-job.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { prunePods, pruneSecrets } from '../k8s'
1+
import { prunePods, pruneServices, pruneSecrets } from '../k8s'
22

33
export async function cleanupJob(): Promise<void> {
4-
await Promise.all([prunePods(), pruneSecrets()])
4+
await Promise.all([prunePods(), pruneServices(), pruneSecrets()])
55
}

Diff for: packages/k8s/src/hooks/prepare-job.ts

+25-7
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@ import {
1212
containerPorts,
1313
createPod,
1414
isPodContainerAlpine,
15-
prunePods,
15+
prunePodsAndServices,
1616
waitForPodPhases,
17-
getPrepareJobTimeoutSeconds
17+
getPrepareJobTimeoutSeconds,
18+
createService,
1819
} from '../k8s'
1920
import {
2021
containerVolumes,
@@ -27,6 +28,7 @@ import {
2728
fixArgs
2829
} from '../k8s/utils'
2930
import { CONTAINER_EXTENSION_PREFIX, JOB_CONTAINER_NAME } from './constants'
31+
import { waitForRpcStatus } from '../k8s/rpc'
3032

3133
export async function prepareJob(
3234
args: PrepareJobArgs,
@@ -36,7 +38,7 @@ export async function prepareJob(
3638
throw new Error('Job Container is required.')
3739
}
3840

39-
await prunePods()
41+
await prunePodsAndServices()
4042

4143
const extension = readExtensionFromFile()
4244
await copyExternalsToRoot()
@@ -78,7 +80,7 @@ export async function prepareJob(
7880
extension
7981
)
8082
} catch (err) {
81-
await prunePods()
83+
await prunePodsAndServices()
8284
core.debug(`createPod failed: ${JSON.stringify(err)}`)
8385
const message = (err as any)?.response?.body?.message || err
8486
throw new Error(`failed to create job pod: ${message}`)
@@ -87,6 +89,17 @@ export async function prepareJob(
8789
if (!createdPod?.metadata?.name) {
8890
throw new Error('created pod should have metadata.name')
8991
}
92+
93+
let createdService: k8s.V1Service | undefined = undefined
94+
try {
95+
createdService = await createService(createdPod)
96+
} catch (err) {
97+
await prunePodsAndServices()
98+
core.debug(`createService failed: ${JSON.stringify(err)}`)
99+
const message = (err as any)?.response?.body?.message || err
100+
throw new Error(`failed to create job pod: ${message}`)
101+
}
102+
90103
core.debug(
91104
`Job pod created, waiting for it to come online ${createdPod?.metadata?.name}`
92105
)
@@ -98,8 +111,11 @@ export async function prepareJob(
98111
new Set([PodPhase.PENDING]),
99112
getPrepareJobTimeoutSeconds()
100113
)
114+
115+
await waitForRpcStatus(`http://${createdService?.metadata?.name}:8080`)
116+
101117
} catch (err) {
102-
await prunePods()
118+
await prunePodsAndServices()
103119
throw new Error(`pod failed to come online with error: ${err}`)
104120
}
105121

@@ -119,21 +135,23 @@ export async function prepareJob(
119135
throw new Error(`failed to determine if the pod is alpine: ${message}`)
120136
}
121137
core.debug(`Setting isAlpine to ${isAlpine}`)
122-
generateResponseFile(responseFile, args, createdPod, isAlpine)
138+
generateResponseFile(responseFile, args, createdPod, createdService, isAlpine)
123139
}
124140

125141
function generateResponseFile(
126142
responseFile: string,
127143
args: PrepareJobArgs,
128144
appPod: k8s.V1Pod,
145+
appService: k8s.V1Service,
129146
isAlpine
130147
): void {
131148
if (!appPod.metadata?.name) {
132149
throw new Error('app pod must have metadata.name specified')
133150
}
134151
const response = {
135152
state: {
136-
jobPod: appPod.metadata.name
153+
jobPod: appPod.metadata.name,
154+
jobService: appService.metadata?.name,
137155
},
138156
context: {},
139157
isAlpine

Diff for: packages/k8s/src/hooks/run-script-step.ts

+7-10
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,16 @@
22
import * as fs from 'fs'
33
import * as core from '@actions/core'
44
import { RunScriptStepArgs } from 'hooklib'
5-
import { execPodStep } from '../k8s'
5+
import { rpcPodStep } from '../k8s/rpc'
66
import { writeEntryPointScript } from '../k8s/utils'
7-
import { JOB_CONTAINER_NAME } from './constants'
87

98
export async function runScriptStep(
109
args: RunScriptStepArgs,
1110
state,
1211
responseFile
1312
): Promise<void> {
1413
const { entryPoint, entryPointArgs, environmentVariables } = args
15-
const { containerPath, runnerPath } = writeEntryPointScript(
14+
const { containerPath, runnerPath, id } = writeEntryPointScript(
1615
args.workingDirectory,
1716
entryPoint,
1817
entryPointArgs,
@@ -23,16 +22,14 @@ export async function runScriptStep(
2322
args.entryPoint = 'sh'
2423
args.entryPointArgs = ['-e', containerPath]
2524
try {
26-
await execPodStep(
27-
[args.entryPoint, ...args.entryPointArgs],
28-
state.jobPod,
29-
JOB_CONTAINER_NAME
25+
await rpcPodStep(
26+
id,
27+
containerPath,
28+
state.jobService,
3029
)
3130
} catch (err) {
3231
core.debug(`execPodStep failed: ${JSON.stringify(err)}`)
3332
const message = (err as any)?.response?.body?.message || err
34-
throw new Error(`failed to run script step: ${message}`)
35-
} finally {
36-
fs.rmSync(runnerPath)
33+
throw new Error(`failed to run script step (id ${id}): ${message}`)
3734
}
3835
}

0 commit comments

Comments
 (0)