-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy pathleviathan.service.ts
More file actions
115 lines (94 loc) · 2.79 KB
/
Copy pathleviathan.service.ts
File metadata and controls
115 lines (94 loc) · 2.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import {
CancelJobRequest,
createClient,
createConnectTransport,
DockerFile,
JobLogRequest,
JobService,
LabData,
LabFile,
LabService,
NewJobRequest,
NewLabRequest,
SubmissionFile,
UploadLabFiles,
UploadSubmissionFiles,
} from 'leviathan-node-sdk'
const leviUrl = process.env.LEVIATHAN_URL || 'http://localhost:9221'
console.log(`Leviathan url set to ${leviUrl}`)
const transport = createConnectTransport({
baseUrl: leviUrl,
httpVersion: '2',
})
const jobService = createClient(JobService, transport)
const labService = createClient(LabService, transport)
export function bufferToBlob(multerFile: Express.Multer.File): Blob {
return new Blob([multerFile.buffer], { type: multerFile.mimetype })
}
export async function sendSubmission(labId: bigint, submission: Array<SubmissionFile>) {
const fileId = await UploadSubmissionFiles(leviUrl, submission)
const resp = await jobService.newJob(<NewJobRequest>{
tmpSubmissionFolderId: fileId,
labID: labId,
})
return resp.jobId
}
export async function createNewLab(lab: LabData, dockerfile: DockerFile, labFiles: Array<LabFile>) {
const fileId = await UploadLabFiles(leviUrl, dockerfile, labFiles)
const resp = await labService.newLab(<NewLabRequest>{
labData: lab,
tmpFolderId: fileId,
})
return resp.labId
}
/**
* streams job status,
* the stream will exit on its own once the job is done, can be cancelled by calling controller.abort()
* @returns a stream and a controller can be used to cancel the stream
* @see waitForJob - for usage example
*/
export function streamJob(jobId: string) {
const controller = new AbortController()
const dataStream = jobService.streamStatus(
<JobLogRequest>{ jobId },
{ signal: controller.signal },
)
return { dataStream, controller }
}
/**
* gets current job status with logs
*/
export async function getStatus(jobId: string) {
const resp = await jobService.getStatus(<JobLogRequest>{ jobId })
// strip out grpc metadata
const { $unknown, $typeName, ...info } = resp.jobInfo!
const logs = resp.logs
return { info, logs }
}
/**
* Blocks until job is complete
* @see streamJob - is used under the hood
*/
export async function waitForJob(jobId: string) {
const { dataStream } = streamJob(jobId)
let jobInfo: { jobId: string; status: string; statusMessage: string } = {
jobId: '',
status: '',
statusMessage: '',
}
let logs: string = ''
for await (const chunk of dataStream) {
if (!chunk.jobInfo) {
console.warn('Empty job state')
continue
}
const { $unknown, $typeName, ...rest } = chunk.jobInfo!
console.debug('Job', rest)
jobInfo = rest
logs = chunk.logs
}
return { jobInfo, logs }
}
export async function cancelJob(jobId: string) {
await jobService.cancelJob(<CancelJobRequest>{ jobId })
}