diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java index 18fc3690da..9c5e509149 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java @@ -370,9 +370,12 @@ public CancelResult cancelSessionJob( switch (suspendMode) { case STATELESS: case CANCEL: - cancelJobOrError(clusterClient, status, suspendMode == SuspendMode.STATELESS); - // This is async we need to return and re-observe - return CancelResult.pending(); + if (cancelJobOrError( + clusterClient, status, suspendMode == SuspendMode.STATELESS)) { + // This is async we need to return and re-observe + return CancelResult.pending(); + } + break; case SAVEPOINT: savepointPath = savepointJobOrError(clusterClient, status, conf); break; @@ -383,14 +386,14 @@ public CancelResult cancelSessionJob( return CancelResult.completed(savepointPath); } - public void cancelJobOrError( + public boolean cancelJobOrError( RestClusterClient clusterClient, CommonStatus status, boolean ignoreMissing) { var jobID = JobID.fromHexString(status.getJobStatus().getJobId()); if (ReconciliationUtils.isJobCancelling(status)) { LOG.info("Job already cancelling"); - return; + return true; } LOG.info("Cancelling job"); try { @@ -402,6 +405,7 @@ public void cancelJobOrError( if (isJobMissing(e)) { if (ignoreMissing) { LOG.info("Job already missing"); + return false; } else { throw new UpgradeFailureException( "Cannot find job when trying to cancel", @@ -410,6 +414,7 @@ public void cancelJobOrError( } } else if (isJobTerminated(e)) { LOG.info("Job already terminated"); + return false; } else { LOG.warn("Error while cancelling job", e); throw new UpgradeFailureException( @@ -417,6 +422,7 @@ public void cancelJobOrError( } } status.getJobStatus().setState(JobStatus.CANCELLING); + return true; } public String savepointJobOrError( @@ -487,9 +493,16 @@ public static boolean isJobTerminated(Exception e) { return true; } - return findThrowable(e, RestClientException.class) + if (findThrowable(e, RestClientException.class) .map(RestClientException::getHttpResponseStatus) .map(respCode -> HttpResponseStatus.CONFLICT == respCode) + .orElse(false)) { + return true; + } + + return Optional.ofNullable(ExceptionUtils.getExceptionMessage(e)) + .map(String::toLowerCase) + .map(msg -> msg.contains("already reached another terminal state")) .orElse(false); } @@ -1048,6 +1061,7 @@ private static String createEmptyJar() { } } + @Override public Map getMetrics( Configuration conf, String jobId, List metricNames) throws Exception { try (var clusterClient = getClusterClient(conf)) { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java index f77ccba5c4..90173c5f72 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java @@ -360,10 +360,36 @@ public void cancelErrorHandling(int statusCode) throws Exception { assertEquals(RUNNING, jobStatus.getState()); } else { flinkService.cancelSessionJob(job, SuspendMode.STATELESS, new Configuration()); - assertEquals(CANCELLING, jobStatus.getState()); + assertEquals(FINISHED, jobStatus.getState()); + assertNull(jobStatus.getJobId()); } } + @Test + public void cancelErrorHandlingWithTerminalStateMessage() throws Exception { + var testingClusterClient = + new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME); + testingClusterClient.setCancelFunction( + jobID -> + CompletableFuture.failedFuture( + new RuntimeException( + new RestClientException( + "Job cancellation failed because the job has already reached another terminal state (FAILED).", + HttpResponseStatus.BAD_REQUEST)))); + var flinkService = new TestingService(testingClusterClient); + + JobID jobID = JobID.generate(); + var job = TestUtils.buildSessionJob(); + var jobStatus = job.getStatus().getJobStatus(); + jobStatus.setJobId(jobID.toHexString()); + jobStatus.setState(RUNNING); + ReconciliationUtils.updateStatusForDeployedSpec(job, new Configuration()); + + flinkService.cancelSessionJob(job, SuspendMode.STATELESS, new Configuration()); + assertEquals(FINISHED, jobStatus.getState()); + assertNull(jobStatus.getJobId()); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void cancelJobWithSavepointUpgradeModeTest(boolean deleteAfterSavepoint)