Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import api.submit.{
JobSubmitRequestItem,
JobSubmitResponse,
JobSubmitResponseItem,
PreemptionResult,
Queue,
QueueDeleteRequest,
QueueGetRequest,
Expand Down Expand Up @@ -268,8 +269,8 @@ private class SubmitMockServer(

def preemptJobs(
request: JobPreemptRequest
): scala.concurrent.Future[com.google.protobuf.empty.Empty] = {
Future.successful(new Empty)
): scala.concurrent.Future[PreemptionResult] = {
Future.successful(new PreemptionResult)
}

def reprioritizeJobs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ describe("PreemptDialog", () => {
mockServer.setPostJobsResponse([jobs[0]])
const { getByRole, findByText } = renderComponent()

mockServer.setPreemptJobsResponse()
mockServer.setPreemptJobsResponse([jobs[0].jobId], [])

await enterPreemptReason("Reason for preemption")

await action(getByRole)
await findByText(/Successfully requested preemption of selected jobs. See table for job statuses./i)
await findByText(/Successfully requested preemption for: job-id-0/i)
})

it("allows user to refetch jobs", async () => {
Expand Down Expand Up @@ -170,7 +170,7 @@ describe("PreemptDialog", () => {
mockServer.setPostJobsResponse([jobs[0]])
const { getByRole, findByText, findByRole } = renderComponent()

mockServer.setPreemptJobsResponse(500, "Internal Server Error")
mockServer.setPreemptJobsResponse([], [{ jobId: jobs[0].jobId, errorReason: "Internal Server Error" }])

await enterPreemptReason("Reason for preemption")

Expand Down
20 changes: 19 additions & 1 deletion internal/lookoutui/src/pages/jobs/components/PreemptDialog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import { usePreemptJobs } from "../../../services/lookout/usePreemptJobs"
import dialogStyles from "./DialogStyles.module.css"
import { JobStatusTable } from "./JobStatusTable"

const MAX_JOB_IDS_TO_DISPLAY = 4

interface PreemptDialogProps {
onClose: () => void
selectedItemFilters: JobFiltersWithExcludes[]
Expand Down Expand Up @@ -72,13 +74,29 @@ export const PreemptDialog = ({ onClose, selectedItemFilters }: PreemptDialogPro
})

if (response.failedJobIds.length === 0) {
openSnackbar("Successfully requested preemption of selected jobs. See table for job statuses.", "success")
const ids = response.successfulJobIds
if (ids.length <= MAX_JOB_IDS_TO_DISPLAY) {
openSnackbar(`Successfully requested preemption for: ${ids.join(", ")}`, "success")
} else {
const displayed = ids.slice(0, MAX_JOB_IDS_TO_DISPLAY).join(", ")
openSnackbar(
`Successfully requested preemption for ${ids.length} jobs: ${displayed}, and ${ids.length - MAX_JOB_IDS_TO_DISPLAY} more`,
"success",
)
}
} else {
openSnackbar("Some preemption requests failed. See table for job statuses.", "warning")
}

const newResponseStatus = { ...jobIdsToPreemptResponses }

response.successfulJobIds.forEach((jobId) => {
newResponseStatus[jobId] = "Success"
})
response.failedJobIds.forEach(({ jobId, errorReason }) => {
newResponseStatus[jobId] = errorReason
})

setJobIdsToPreemptResponses(newResponseStatus)
setHasAttemptedPreempt(true)
} finally {
Expand Down
22 changes: 10 additions & 12 deletions internal/lookoutui/src/services/lookout/mocks/mockServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,21 +150,19 @@ export class MockServer {
)
}

setPreemptJobsResponse(status_code: number = 200, status_text: string = "") {
setPreemptJobsResponse(successfulJobIds: JobId[], failedJobIds: { jobId: JobId; errorReason: string }[] = []) {
this.server.use(
http.post(PREEMPT_JOBS_ENDPOINT, async () => {
if (status_code !== 200) {
return HttpResponse.json(
{
code: 500,
message: "Internal server error",
details: [],
},
{ status: status_code, statusText: status_text },
)
const preemptionResults: Record<JobId, string> = {}
for (const jobId of successfulJobIds) {
preemptionResults[jobId] = ""
}
for (const { jobId, errorReason } of failedJobIds) {
preemptionResults[jobId] = errorReason
}
// The actual API returns google.protobuf.Empty which is just an empty object
return HttpResponse.json({})
return HttpResponse.json({
preemptionResults,
})
}),
)
}
Expand Down
55 changes: 19 additions & 36 deletions internal/lookoutui/src/services/lookout/usePreemptJobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ const config = getConfig()
const maxJobsPerRequest = 10000

export type UpdateJobsResponse = {
status_code: number
status_text?: string
successfulJobIds: JobId[]
failedJobIds: {
jobId: JobId
Expand Down Expand Up @@ -55,18 +53,13 @@ export const usePreemptJobs = () => {
if (config.fakeDataEnabled) {
await new Promise((r) => setTimeout(r, 1_000))
return {
status_code: 200,
successfulJobIds: jobs.map((job) => job.jobId),
failedJobIds: [],
}
}

try {
const response: UpdateJobsResponse = {
status_code: 200,
successfulJobIds: jobs.map((job) => job.jobId),
failedJobIds: [],
}
const response: UpdateJobsResponse = { successfulJobIds: [], failedJobIds: [] }

const chunks = createJobBatches(jobs, maxJobsPerRequest)

Expand All @@ -75,8 +68,7 @@ export const usePreemptJobs = () => {
for (const [jobSet, batches] of jobSets) {
for (const batch of batches) {
apiResponsePromises.push({
promise: submitApi.preemptJobsRaw({
// Use raw to access status code
promise: submitApi.preemptJobs({
body: {
jobIds: batch,
queue: queue,
Expand All @@ -92,39 +84,30 @@ export const usePreemptJobs = () => {

for (const apiResponsePromise of apiResponsePromises) {
try {
const apiResponse = await apiResponsePromise.promise
const statusCode = apiResponse.raw.status
const statusText = apiResponse.raw.statusText
const apiResponse = (await apiResponsePromise.promise)?.preemptionResults

// If any request fails, capture the error for each job in the batch
if (statusCode < 200 || statusCode >= 300) {
response.status_code = statusCode
response.status_text = statusText

// Mark all jobs in this batch as failed
if (_.isNil(apiResponse)) {
const errorMessage = "No preemption results found in response body"
for (const jobId of apiResponsePromise.jobIds) {
response.failedJobIds.push({
jobId: jobId,
errorReason: statusText || `Request failed with status ${statusCode}`,
})
response.failedJobIds.push({ jobId, errorReason: errorMessage })
}
return response
} else {
// Otherwise, all jobs in this batch are successful
response.successfulJobIds.push(...apiResponsePromise.jobIds)
for (const jobId of apiResponsePromise.jobIds) {
if (jobId in apiResponse) {
const emptyOrError = apiResponse[jobId]
if (emptyOrError === "") {
response.successfulJobIds.push(jobId)
} else {
response.failedJobIds.push({ jobId, errorReason: emptyOrError })
}
} else {
response.successfulJobIds.push(jobId)
}
}
}
} catch (e) {
const text = await getErrorMessage(e)
response.status_code = 500
response.status_text = text

for (const jobId of apiResponsePromise.jobIds) {
response.failedJobIds.push({
jobId: jobId,
errorReason: text,
})
}
return response
apiResponsePromise.jobIds.forEach((jobId) => response.failedJobIds.push({ jobId, errorReason: text }))
}
}

Expand Down
4 changes: 2 additions & 2 deletions internal/scheduler/mocks/api.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions internal/server/submit/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (s *Server) CancelJobs(grpcCtx context.Context, req *api.JobCancelRequest)
}, nil
}

func (s *Server) PreemptJobs(grpcCtx context.Context, req *api.JobPreemptRequest) (*types.Empty, error) {
func (s *Server) PreemptJobs(grpcCtx context.Context, req *api.JobPreemptRequest) (*api.PreemptionResult, error) {
ctx := armadacontext.FromGrpcCtx(grpcCtx)
err := validation.ValidateQueueAndJobSet(req)
if err != nil {
Expand All @@ -211,6 +211,9 @@ func (s *Server) PreemptJobs(grpcCtx context.Context, req *api.JobPreemptRequest
return nil, err
}

// results maps job ids to strings containing error messages.
results := make(map[string]string)

sequence, err := preemptJobEventSequenceForJobIds(s.clock, req.JobIds, req.Queue, req.JobSetId, userId, req.Reason, groups)
if err != nil {
return nil, err
Expand All @@ -222,7 +225,13 @@ func (s *Server) PreemptJobs(grpcCtx context.Context, req *api.JobPreemptRequest
return nil, status.Error(codes.Internal, "Failed to send message")
}

return &types.Empty{}, nil
for _, jobId := range req.JobIds {
results[jobId] = "" // empty string indicates no error
}

return &api.PreemptionResult{
PreemptionResults: results,
}, nil
}

func preemptJobEventSequenceForJobIds(clock clock.Clock, jobIds []string, q, jobSet, userId, reason string, groups []string) (*armadaevents.EventSequence, error) {
Expand Down
17 changes: 16 additions & 1 deletion internal/server/submit/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ func TestPreemptJobs(t *testing.T) {
req: &api.JobPreemptRequest{JobIds: []string{jobId1, jobId2}, Queue: testfixtures.DefaultQueue.Name, JobSetId: testfixtures.DefaultJobset, Reason: "preemption-reason"},
expectedEvents: testfixtures.CreatePreemptJobSequenceEvents([]string{jobId1, jobId2}, "preemption-reason"),
},
"Preempt single job": {
req: &api.JobPreemptRequest{JobIds: []string{jobId1}, Queue: testfixtures.DefaultQueue.Name, JobSetId: testfixtures.DefaultJobset, Reason: "single preemption"},
expectedEvents: testfixtures.CreatePreemptJobSequenceEvents([]string{jobId1}, "single preemption"),
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -333,8 +337,19 @@ func TestPreemptJobs(t *testing.T) {
capturedEventSequence = es
})

_, err := server.PreemptJobs(ctx, tc.req)
result, err := server.PreemptJobs(ctx, tc.req)
assert.NoError(t, err)
assert.NotNil(t, result)
assert.NotNil(t, result.PreemptionResults)

// Verify all requested job IDs are in the results map
assert.Equal(t, len(tc.req.JobIds), len(result.PreemptionResults))
for _, jobId := range tc.req.JobIds {
errMsg, exists := result.PreemptionResults[jobId]
assert.True(t, exists, "Job ID %s should be in preemption results", jobId)
assert.Empty(t, errMsg, "Error message should be empty for successful preemption of job %s", jobId)
}

assert.Equal(t, expectedEventSequence, capturedEventSequence)
cancel()
})
Expand Down
16 changes: 15 additions & 1 deletion pkg/api/api.swagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,9 @@ func SwaggerJsonTemplate() string {
" \"responses\": {\n" +
" \"200\": {\n" +
" \"description\": \"A successful response.\",\n" +
" \"schema\": {}\n" +
" \"schema\": {\n" +
" \"$ref\": \"#/definitions/apiPreemptionResult\"\n" +
" }\n" +
" },\n" +
" \"default\": {\n" +
" \"description\": \"An unexpected error response.\",\n" +
Expand Down Expand Up @@ -2103,6 +2105,18 @@ func SwaggerJsonTemplate() string {
" }\n" +
" }\n" +
" },\n" +
" \"apiPreemptionResult\": {\n" +
" \"type\": \"object\",\n" +
" \"title\": \"swagger:model\",\n" +
" \"properties\": {\n" +
" \"preemptionResults\": {\n" +
" \"type\": \"object\",\n" +
" \"additionalProperties\": {\n" +
" \"type\": \"string\"\n" +
" }\n" +
" }\n" +
" }\n" +
" },\n" +
" \"apiPriorityClassPoolResourceLimits\": {\n" +
" \"type\": \"object\",\n" +
" \"properties\": {\n" +
Expand Down
16 changes: 15 additions & 1 deletion pkg/api/api.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@
"responses": {
"200": {
"description": "A successful response.",
"schema": {}
"schema": {
"$ref": "#/definitions/apiPreemptionResult"
}
},
"default": {
"description": "An unexpected error response.",
Expand Down Expand Up @@ -2092,6 +2094,18 @@
}
}
},
"apiPreemptionResult": {
"type": "object",
"title": "swagger:model",
"properties": {
"preemptionResults": {
"type": "object",
"additionalProperties": {
"type": "string"
}
}
}
},
"apiPriorityClassPoolResourceLimits": {
"type": "object",
"properties": {
Expand Down
Loading