Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,12 @@ public CancelResult cancelSessionJob(
switch (suspendMode) {
case STATELESS:
case CANCEL:
cancelJobOrError(clusterClient, status, suspendMode == SuspendMode.STATELESS);
// This is async we need to return and re-observe
return CancelResult.pending();
if (cancelJobOrError(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One concern on the scope: this PR only hardens the FlinkSessionJob cancel path (AbstractFlinkService#cancelSessionJob / the session-job finalizer), but AbstractFlinkService#cancelJob, used for the FlinkDeployment (application cluster) path, is left untouched and can hit the exact same failure mode described in FLINK-37766:

Could we:

  1. Extend the new "already terminal / not found" handling to AbstractFlinkService#cancelJob?
  2. Add a unit test mirroring the new session-job test, but against cancelJob, with a mocked JM response returning the terminal-state error (and ideally a 404), asserting the finalizer completes cleanly.
  3. Update the PR description (and ideally the JIRA) to make clear the fix covers both CR types?

clusterClient, status, suspendMode == SuspendMode.STATELESS)) {
// This is async we need to return and re-observe
return CancelResult.pending();
}
break;
case SAVEPOINT:
savepointPath = savepointJobOrError(clusterClient, status, conf);
break;
Expand All @@ -383,14 +386,14 @@ public CancelResult cancelSessionJob(
return CancelResult.completed(savepointPath);
}

public void cancelJobOrError(
public boolean cancelJobOrError(
RestClusterClient<String> clusterClient,
CommonStatus<?> status,
boolean ignoreMissing) {
var jobID = JobID.fromHexString(status.getJobStatus().getJobId());
if (ReconciliationUtils.isJobCancelling(status)) {
LOG.info("Job already cancelling");
return;
return true;
}
LOG.info("Cancelling job");
try {
Expand All @@ -402,6 +405,7 @@ public void cancelJobOrError(
if (isJobMissing(e)) {
if (ignoreMissing) {
LOG.info("Job already missing");
return false;
} else {
throw new UpgradeFailureException(
"Cannot find job when trying to cancel",
Expand All @@ -410,13 +414,15 @@ public void cancelJobOrError(
}
} else if (isJobTerminated(e)) {
LOG.info("Job already terminated");
return false;
} else {
LOG.warn("Error while cancelling job", e);
throw new UpgradeFailureException(
"Cancellation Error", EventRecorder.Reason.CleanupFailed.name(), e);
}
}
status.getJobStatus().setState(JobStatus.CANCELLING);
return true;
}

public String savepointJobOrError(
Expand Down Expand Up @@ -487,9 +493,16 @@ public static boolean isJobTerminated(Exception e) {
return true;
}

return findThrowable(e, RestClientException.class)
if (findThrowable(e, RestClientException.class)
.map(RestClientException::getHttpResponseStatus)
.map(respCode -> HttpResponseStatus.CONFLICT == respCode)
.orElse(false)) {
return true;
}

return Optional.ofNullable(ExceptionUtils.getExceptionMessage(e))
.map(String::toLowerCase)
.map(msg -> msg.contains("already reached another terminal state"))
.orElse(false);
}

Expand Down Expand Up @@ -1048,6 +1061,7 @@ private static String createEmptyJar() {
}
}

@Override
public Map<String, String> getMetrics(
Configuration conf, String jobId, List<String> metricNames) throws Exception {
try (var clusterClient = getClusterClient(conf)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,36 @@ public void cancelErrorHandling(int statusCode) throws Exception {
assertEquals(RUNNING, jobStatus.getState());
} else {
flinkService.cancelSessionJob(job, SuspendMode.STATELESS, new Configuration());
assertEquals(CANCELLING, jobStatus.getState());
assertEquals(FINISHED, jobStatus.getState());
assertNull(jobStatus.getJobId());
}
}

@Test
public void cancelErrorHandlingWithTerminalStateMessage() throws Exception {
var testingClusterClient =
new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME);
testingClusterClient.setCancelFunction(
jobID ->
CompletableFuture.failedFuture(
new RuntimeException(
new RestClientException(
"Job cancellation failed because the job has already reached another terminal state (FAILED).",
HttpResponseStatus.BAD_REQUEST))));
var flinkService = new TestingService(testingClusterClient);

JobID jobID = JobID.generate();
var job = TestUtils.buildSessionJob();
var jobStatus = job.getStatus().getJobStatus();
jobStatus.setJobId(jobID.toHexString());
jobStatus.setState(RUNNING);
ReconciliationUtils.updateStatusForDeployedSpec(job, new Configuration());

flinkService.cancelSessionJob(job, SuspendMode.STATELESS, new Configuration());
assertEquals(FINISHED, jobStatus.getState());
assertNull(jobStatus.getJobId());
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void cancelJobWithSavepointUpgradeModeTest(boolean deleteAfterSavepoint)
Expand Down