Skip to content

Bump jsonpath-plus from 10.1.0 to 10.3.0 in /packages/k8s #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 174 additions & 0 deletions packages/k8s-workflow/gha-runner-rpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
#!/usr/bin/env python3

# Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

# This implements a very simple RPC server that should be running on the job container of the workflow pod,
# and used by the k8s hook to execute steps in the workflow on the workflow pod.

# It supports a running a single RPC call at a time, and will return an error if a new call is made while
# another one is still running (which is a valid assumption, as the runner is expected to execute one step at a time).


from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
import time
from flask import Flask, jsonify, request
from threading import Thread
from waitress import serve

import argparse
import json
import logging
import os
import signal
import subprocess

import logging
import json_logging

app = Flask(__name__)
app.logger.setLevel(logging.DEBUG)
json_logging.init_flask(enable_json=True)
json_logging.init_request_instrument(app)

@dataclass
class Response:
id: str
status: str
pid: int = None
returncode: int = None
error: str = None

def readLines(path, fromLine, maxLines):
try:
with open(path, 'r') as f:
return [x for i, x in enumerate(f) if i >= fromLine and x.endswith('\n') and i < fromLine + maxLines]
except Exception as e:
app.logger.warning(f"Error reading file {path}: {e}")
return []

class State:
def __init__(self):
self.latest_id = None
self.status = Response(id = "", status = "idle")
self.worker = ThreadPoolExecutor(max_workers=1)
self.future = None
self.process = None
self.out = None

def __run(self, id, path):
self.latest_id = id
try:
app.logger.debug(f"Running id {id}")
logsfilename = f"/logs/{id}.out"
self.out = open(logsfilename, "w")
self.process = subprocess.Popen(['sh', '-e', path], start_new_session=True, stdout=self.out, stderr=self.out)
app.logger.debug(f"Process for id {id} started with pid {self.process.pid}")
self.status = Response(
id = id,
status = 'running',
pid = self.process.pid
)
self.process.wait()
self.out.close()
app.logger.debug(f"Process for id {id} finished (return code {self.process.returncode})")
self.status = Response(
id = id,
status = 'completed',
returncode = self.process.returncode,
)
except Exception as e:
app.logger.error(f"Error starting process: {e}")
self.status = Response(
id = id,
status = 'failed',
error = str(e),
returncode = -1,
)


def exec(self, id, path):
if self.future and not self.future.done():
app.logger.error(f"A job is already running (ID {self.latest_id})")
return Response(
id = id,
status = 'failed',
error = f"A job is already running (ID {self.latest_id})",
returncode = -1,
)

app.logger.debug(f"Queueing job {id} with path {path}")
self.status = Response(id = id, status = "pending")
self.future = self.worker.submit(self.__run, id, path)
return self.status

def cancel(self):
if not self.future:
return Response(
id = '',
status = 'failed',
error = 'No job has been started yet',
)
elif self.future.done():
# The job is already done, no need to cancel
return self.status
else:
app.logger.debug(f"Cancelling {self.latest_id} (pid {self.process.pid})")
os.killpg(os.getpgid(self.process.pid), signal.SIGINT)

return Response(
id = self.latest_id,
status = 'cancelling',
pid = self.process.pid
)

state = State()

# Post a new job
@app.route('/', methods=['POST'])
def call():
data = json.loads(request.data)
if 'id' not in data or 'path' not in data:
return jsonify(Response(
id = '',
status = 'failed',
error = 'Missing id or path in request',
))
id = data['id']
path = data['path']
return jsonify(state.exec(id, path))

# Cancel the current job
@app.route('/', methods=['DELETE'])
def cancel():
return jsonify(state.cancel())

# Get the current status
@app.route('/')
def status():
app.logger.debug(f"Status: {state.status}")
return jsonify(state.status)

# Get the logs of a given job
@app.route('/logs')
def logs():
if 'id' not in request.args:
return 'Missing id in request', 400
id = request.args.get('id')
fromLine = int(request.args.get('fromLine', 0))
maxLines = int(request.args.get('maxLines', 1000))
path = f"/logs/{id}.out"
return jsonify(readLines(path, fromLine, maxLines))


if __name__ == '__main__':

parser = argparse.ArgumentParser()
parser.add_argument('--dev', action='store_true', help='Run in Flask development mode')
args = parser.parse_args()
if args.dev:
app.run(host='0.0.0.0', port=8080, debug=True)
else:
serve(app, host='0.0.0.0', port=8080, threads=1)

43 changes: 22 additions & 21 deletions packages/k8s/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/k8s/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"@actions/core": "^1.9.1",
"@actions/exec": "^1.1.1",
"@actions/io": "^1.1.2",
"@kubernetes/client-node": "^0.22.2",
"@kubernetes/client-node": "^0.22.3",
"hooklib": "file:../hooklib",
"js-yaml": "^4.1.0",
"shlex": "^2.1.2"
Expand Down
4 changes: 2 additions & 2 deletions packages/k8s/src/hooks/cleanup-job.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { prunePods, pruneSecrets } from '../k8s'
import { prunePods, pruneServices, pruneSecrets } from '../k8s'

export async function cleanupJob(): Promise<void> {
await Promise.all([prunePods(), pruneSecrets()])
await Promise.all([prunePods(), pruneServices(), pruneSecrets()])
}
36 changes: 27 additions & 9 deletions packages/k8s/src/hooks/prepare-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import {
containerPorts,
createPod,
isPodContainerAlpine,
prunePods,
prunePodsAndServices,
waitForPodPhases,
getPrepareJobTimeoutSeconds
getPrepareJobTimeoutSeconds,
createService,
} from '../k8s'
import {
containerVolumes,
Expand All @@ -27,6 +28,7 @@ import {
fixArgs
} from '../k8s/utils'
import { CONTAINER_EXTENSION_PREFIX, JOB_CONTAINER_NAME } from './constants'
import { waitForRpcStatus } from '../k8s/rpc'

export async function prepareJob(
args: PrepareJobArgs,
Expand All @@ -36,7 +38,7 @@ export async function prepareJob(
throw new Error('Job Container is required.')
}

await prunePods()
await prunePodsAndServices()

const extension = readExtensionFromFile()
await copyExternalsToRoot()
Expand All @@ -58,7 +60,7 @@ export async function prepareJob(
core.debug(`Adding service '${service.image}' to pod definition`)
return createContainerSpec(
service,
generateContainerName(service.image),
generateContainerName(service),
false,
extension
)
Expand All @@ -78,7 +80,7 @@ export async function prepareJob(
extension
)
} catch (err) {
await prunePods()
await prunePodsAndServices()
core.debug(`createPod failed: ${JSON.stringify(err)}`)
const message = (err as any)?.response?.body?.message || err
throw new Error(`failed to create job pod: ${message}`)
Expand All @@ -87,6 +89,17 @@ export async function prepareJob(
if (!createdPod?.metadata?.name) {
throw new Error('created pod should have metadata.name')
}

let createdService: k8s.V1Service | undefined = undefined
try {
createdService = await createService(createdPod)
} catch (err) {
await prunePodsAndServices()
core.debug(`createService failed: ${JSON.stringify(err)}`)
const message = (err as any)?.response?.body?.message || err
throw new Error(`failed to create job pod: ${message}`)
}

core.debug(
`Job pod created, waiting for it to come online ${createdPod?.metadata?.name}`
)
Expand All @@ -98,8 +111,11 @@ export async function prepareJob(
new Set([PodPhase.PENDING]),
getPrepareJobTimeoutSeconds()
)

await waitForRpcStatus(`http://${createdService?.metadata?.name}:8080`)

} catch (err) {
await prunePods()
await prunePodsAndServices()
throw new Error(`pod failed to come online with error: ${err}`)
}

Expand All @@ -119,21 +135,23 @@ export async function prepareJob(
throw new Error(`failed to determine if the pod is alpine: ${message}`)
}
core.debug(`Setting isAlpine to ${isAlpine}`)
generateResponseFile(responseFile, args, createdPod, isAlpine)
generateResponseFile(responseFile, args, createdPod, createdService, isAlpine)
}

function generateResponseFile(
responseFile: string,
args: PrepareJobArgs,
appPod: k8s.V1Pod,
appService: k8s.V1Service,
isAlpine
): void {
if (!appPod.metadata?.name) {
throw new Error('app pod must have metadata.name specified')
}
const response = {
state: {
jobPod: appPod.metadata.name
jobPod: appPod.metadata.name,
jobService: appService.metadata?.name,
},
context: {},
isAlpine
Expand All @@ -159,7 +177,7 @@ function generateResponseFile(

if (args.services?.length) {
const serviceContainerNames =
args.services?.map(s => generateContainerName(s.image)) || []
args.services?.map(s => generateContainerName(s)) || []

response.context['services'] = appPod?.spec?.containers
?.filter(c => serviceContainerNames.includes(c.name))
Expand Down
Loading