Skip to content

Commit 9418066

Browse files
committed
DS-1044: Fix EMR event handling to also handle "CANCELLING" state
Signed-off-by: Benjamin Rögner <benjamin.roegner@here.com>
1 parent 7d9ef3b commit 9418066

2 files changed

Lines changed: 8 additions & 5 deletions

File tree

xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/Job.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ private Future<Void> startExecution(boolean resume) {
258258
* @return A future providing a boolean telling whether the action was performed already.
259259
*/
260260
public Future<Boolean> cancel() {
261+
logger.info("[{}] Cancelling job ...", getId());
261262
getStatus().setState(CANCELLING);
262263

263264
return storeStatus(null)
@@ -306,15 +307,15 @@ public Future<Void> updateStepStatus(String stepId, RuntimeInfo status, boolean
306307
State existingStepState = step.getStatus().getState();
307308

308309
step.getStatus()
309-
.withState(status.getState())
310-
.withErrorCode(status.getErrorCode())
311-
.withErrorCause(status.getErrorCause())
312-
.withErrorMessage(status.getErrorMessage());
310+
.withState(status.getState())
311+
.withErrorCode(status.getErrorCode())
312+
.withErrorCause(status.getErrorCause())
313+
.withErrorMessage(status.getErrorMessage());
313314

314315
return updateStep(step, existingStepState, cancelOnFailure);
315316
}
316317

317-
private Future<Void> updateStep(Step<?> step, State previousStepState, boolean cancelOnFailure) {
318+
private Future<Void> updateStep(Step step, State previousStepState, boolean cancelOnFailure) {
318319
//TODO: Once the state was SUCCEEDED it should not be mutable at all anymore
319320
if (previousStepState != null && !step.getStatus().getState().isFinal() && previousStepState.isFinal())
320321
//In case the step was already marked to have a final state, ignore any subsequent non-final updates to it
@@ -364,6 +365,7 @@ private Future<Void> updatePreviousAttempts(Step step) {
364365
* @return A future providing a boolean telling whether the action was performed already.
365366
*/
366367
public Future<Boolean> resume() {
368+
logger.info("[{}] Resuming job ...", getId());
367369
if (isResumable()) {
368370
getStatus().setState(RESUMING);
369371
getSteps().stepStream().forEach(step -> {

xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/service/JobAdminApi.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ private void processEmrJobStateChangeEvent(JsonObject event) {
319319
State newStepState = switch (emrJobStatus) {
320320
case "RUNNING" -> RUNNING;
321321
case "SUCCESS" -> SUCCEEDED;
322+
case "CANCELLING" -> CANCELLING;
322323
case "CANCELLED" -> CANCELLED;
323324
case "FAILED" -> FAILED;
324325
default -> null;

0 commit comments

Comments
 (0)