Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
904f154
Quickfix resolving of --tileInvalidations
mchrza Feb 27, 2025
5a1ec0a
Set TILE_INVALIDATION file as SYSTEM output
mchrza Feb 27, 2025
fb0f692
Fix log output.
mchrza Feb 27, 2025
922648c
Add prevention for double initialization of Configs.
mchrza Feb 27, 2025
0e6bd82
- Improve docs for Step#prepare()
roegi Feb 27, 2025
b78a895
Merge branch 'master' into job-dev
roegi Feb 27, 2025
15a1c8d
RunEmrJob: set contentType properly and fix trailing / issue.
mchrza Feb 28, 2025
26d6d08
Do not treat ambiguity during compilation interceptor selection as us…
roegi Mar 3, 2025
c46644f
Rename to UnexpectedCompilerError
roegi Mar 3, 2025
7f617b0
Merge branch 'master' into job-dev
roegi Mar 5, 2025
a3c53a4
Merge branch 'master' into job-dev
mujammil10 Mar 6, 2025
6e2a933
Merge branch 'master' into job-dev
roegi Mar 7, 2025
255629c
Merge branch 'master' into job-dev
mujammil10 Mar 11, 2025
e3ad05a
Fix database instance selection when READER is not available
mujammil10 Mar 11, 2025
be485db
Merge branch 'master' into job-dev
roegi Mar 13, 2025
b567ca2
Revert double initialization check.
mchrza Mar 17, 2025
202bac0
Merge remote-tracking branch 'origin/job-dev' into job-dev
mujammil10 Mar 25, 2025
3b6d9f3
Support re-using metadata for imports and add passthrough flag to Exp…
mujammil10 Mar 26, 2025
bd76a1d
Merge branch 'master' into job-dev
roegi Mar 27, 2025
2b196a5
Merge branch 'master' into job-dev
roegi Mar 27, 2025
2286ab9
Merge branch 'master' into job-dev
roegi Mar 28, 2025
f4a45c8
Merge branch 'master' into job-dev
roegi Mar 28, 2025
a868538
DS-1101: Implement named user input-sets for jobs
roegi Mar 24, 2025
c770739
DS-1101: Support passing inputs of type InputsFromS3 directly within …
roegi Mar 25, 2025
c2e9c3f
DS-1101: Adjust parameter references in RunEmrJob-step to work proper…
roegi Mar 27, 2025
57e3a6c
Merge branch 'master' into job-dev
roegi Mar 28, 2025
369f920
Merge branch 'master' into job-dev
roegi Mar 31, 2025
ee48650
Merge branch 'master' into job-dev
roegi Mar 31, 2025
c5d744f
Merge branch 'master' into userInputSets
roegi Apr 1, 2025
cb442be
Add new helper methods to create and start jobs.
mujammil10 Apr 2, 2025
3799114
Return jobId from JobTestBase#createJobAndPollStatus
mujammil10 Apr 3, 2025
f93ccac
DS-1103: Introduce general flag for a step that makes it to be ignore…
roegi Apr 2, 2025
b59ab73
Merge branch 'master' into userInputSets
roegi Apr 3, 2025
624c626
DS-1103: Improve GraphFusionTool#canonicalize() to also unwrap single…
roegi Apr 4, 2025
7ba817c
DS-1103: Fix GraphFusionTool#traverse() to not try updating immutable…
roegi Apr 4, 2025
8ae8ef8
Merge branch 'master' into userInputSets
roegi Apr 4, 2025
698a4a3
Fix stepId in OutputSet in DelegateStep and add graph fusion test
mujammil10 Apr 4, 2025
a6e386a
DS-1103: Fix GraphFusionTool#canonicalize() to also unwrap root-graph…
roegi Apr 7, 2025
9e8b69b
Merge branch 'master' into userInputSets
roegi Apr 7, 2025
f549c02
DS-1101: Fix inputs cache to include "setName" in the cache-key
roegi Apr 7, 2025
5ed3cd4
Merge branch 'refs/heads/master' into userInputSets
roegi Apr 7, 2025
44ffeb2
DS-1101: Minor code changes
roegi Apr 7, 2025
fad942e
DS-1101: Fix inputs metadata-cache to include "setName" in the cache-key
roegi Apr 7, 2025
dd0e94f
DS-1122: Fix issues with undeletable PG processes
roegi Apr 8, 2025
c4dfdba
Merge branch 'master' into userInputSets
roegi Apr 8, 2025
b9eeaed
DS-1122: Re-add CASCADE to schema deletion query
roegi Apr 8, 2025
f04d9bd
DS-1189: Remove obsolete fileCount from FeatureStatistics
roegi Apr 9, 2025
41bce8e
DS-1189: Make DelegateStep#setOutputSets public to be used by compile…
roegi Apr 9, 2025
f6f54d3
DS-1189: Fix step tests after removing internalStatistics from export…
roegi Apr 9, 2025
edfb778
DS-1189: Add test for transitive reusability to GraphFusionTests
roegi Apr 9, 2025
68135d6
Add JobInternalDelegateStep for delegating the steps within the same job
mujammil10 Apr 10, 2025
b23ea72
DS-1189: Some minor code improvements
roegi Apr 10, 2025
c5000e1
DS-1189: Some minor code improvements
roegi Apr 10, 2025
8961084
Merge branch 'master' into userInputSets
roegi Apr 10, 2025
a787c1e
DS-1189: Fix bug in DelegateStep that it could happen that an outputS…
roegi Apr 10, 2025
f6802f2
DS-1208: Perform input scanning asynchronously if inputs were provide…
roegi Apr 11, 2025
1ff85bc
DS-1208: Wait for asynchronous input scanning before submitting & sta…
roegi Apr 14, 2025
2439f20
DS-1208: Always try submitting the job without inputs even when speci…
roegi Apr 14, 2025
0341465
Merge branch 'master' into userInputSets
roegi Apr 14, 2025
f8c2e96
DS-1064: Fix input caching issues in Job API
roegi Apr 14, 2025
47a1d22
DS-1064: Ensure synchronized writes to metadata / inputs cache in Job…
roegi Apr 14, 2025
8b82938
Remove obsolete "passthrough" implementation from ExportSpaceToFiles …
mujammil10 Apr 15, 2025
3327e24
DS-1208: Rename InputSet#stepId => #providerId
roegi Apr 15, 2025
23fb8b8
DS-1208: Remove obsolete OutputSet constructor
roegi Apr 15, 2025
fb67b01
DS-1208: Small improvement of param name
roegi Apr 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
import static com.here.xyz.jobs.RuntimeInfo.State.RUNNING;
import static com.here.xyz.jobs.RuntimeInfo.State.SUBMITTED;
import static com.here.xyz.jobs.RuntimeInfo.State.SUCCEEDED;
import static com.here.xyz.jobs.steps.Step.InputSet.DEFAULT_INPUT_SET_NAME;
import static com.here.xyz.jobs.steps.inputs.Input.inputS3Prefix;
import static com.here.xyz.jobs.steps.resources.Load.addLoads;
import static com.here.xyz.util.Random.randomAlpha;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonView;
import com.here.xyz.XyzSerializable;
import com.here.xyz.jobs.RuntimeInfo.State;
Expand Down Expand Up @@ -71,6 +73,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -88,6 +91,8 @@ public class Job implements XyzSerializable {
private long updatedAt;
@JsonView({Public.class, Static.class})
private long keepUntil;
@JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
private Map<String, Input> inputs;
//Caller defined properties:
@JsonView(Static.class)
private String owner;
Expand Down Expand Up @@ -212,7 +217,7 @@ private Future<Void> prepareStep(Step step) {
protected Future<Boolean> validate() {
logger.info("[{}] Validating job ...", getId());
//TODO: Collect exceptions and forward them accordingly as one exception object with (potentially) multiple error objects inside
return Future.all(Job.forEach(getSteps().stepStream().toList(), step -> validateStep(step)))
return Future.all(Job.forEach(nonFinalSteps().toList(), step -> validateStep(step)))
.compose(cf -> Future.succeededFuture(cf.list().stream().allMatch(isReady -> (boolean) isReady)));
}

Expand All @@ -234,7 +239,7 @@ public Future<Void> start() {
return Future.failedFuture(new IllegalStateException("Job can not be started as it's not in SUBMITTED state."));

getStatus().setState(PENDING);
getSteps().stepStream().forEach(step -> step.getStatus().setState(PENDING));
nonFinalSteps().forEach(step -> step.getStatus().setState(PENDING));

long t1 = Core.currentTimeMillis();
return store()
Expand Down Expand Up @@ -286,12 +291,16 @@ public Step getStepById(String stepId) {
return getSteps().getStep(stepId);
}

private Stream<Step> nonFinalSteps() {
return getSteps().stepStream().filter(step -> !step.getStatus().getState().isFinal());
}

/**
* Updates the status of a step at this job by replacing it with the specified one.
* @param step
* @return
*/
public Future<Void> updateStep(Step<?> step) {
public Future<Void> updateStep(Step step) {
final Step existingStep = getStepById(step.getId());
if (existingStep == null)
throw new IllegalArgumentException("The provided step with ID " + step.getGlobalStepId() + " was not found.");
Expand Down Expand Up @@ -490,9 +499,13 @@ private Future<Map<ExecutionResource, Double>> calculateResourceLoads(Step step)
}

public UploadUrl createUploadUrl(boolean compressed) {
return createUploadUrl(compressed, DEFAULT_INPUT_SET_NAME);
}

public UploadUrl createUploadUrl(boolean compressed, String setName) {
return new UploadUrl()
.withCompressed(compressed)
.withS3Key(inputS3Prefix(getId()) + "/" + UUID.randomUUID() + (compressed ? ".gz" : ""));
.withS3Key(inputS3Prefix(getId(), setName) + "/" + UUID.randomUUID() + (compressed ? ".gz" : ""));
}

public Future<Void> consumeInput(ModelBasedInput input) {
Expand All @@ -510,8 +523,8 @@ private Future<Void> deleteInputs() {
return Future.succeededFuture();
}

public Future<List<Input>> loadInputs() {
return ASYNC.run(() -> Input.loadInputs(getId()));
public Future<List<Input>> loadInputs(String setName) {
return ASYNC.run(() -> Input.loadInputs(getId(), setName));
}

public Future<List<Output>> loadOutputs() {
Expand Down Expand Up @@ -637,6 +650,19 @@ public Job withKeepUntil(long keepUntil) {
return this;
}

public Map<String, Input> getInputs() {
return inputs;
}

public void setInputs(Map<String, Input> inputs) {
this.inputs = inputs;
}

public Job withInputs(Map<String, Input> inputs) {
setInputs(inputs);
return this;
}

public String getOwner() {
return owner;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.vertx.core.http.HttpMethod.DELETE;
import static io.vertx.core.http.HttpMethod.GET;
import static io.vertx.core.http.HttpMethod.POST;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.here.xyz.XyzSerializable;
Expand All @@ -42,7 +45,6 @@
import com.here.xyz.jobs.steps.execution.JobExecutor;
import com.here.xyz.util.service.HttpException;
import io.vertx.core.Future;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
Expand All @@ -60,12 +62,12 @@ public class JobAdminApi extends JobApiBase {
private static final String ADMIN_STATE_MACHINE_EVENTS = "/admin/state/events";

public JobAdminApi(Router router) {
router.route(HttpMethod.GET, ADMIN_JOBS).handler(handleErrors(this::getJobs));
router.route(HttpMethod.GET, ADMIN_JOB).handler(handleErrors(this::getJob));
router.route(HttpMethod.DELETE, ADMIN_JOBS).handler(handleErrors(this::deleteJob));
router.route(HttpMethod.POST, ADMIN_JOB_STEPS).handler(handleErrors(this::postStep));
router.route(HttpMethod.GET, ADMIN_JOB_STEP).handler(handleErrors(this::getStep));
router.route(HttpMethod.POST, ADMIN_STATE_MACHINE_EVENTS).handler(handleErrors(this::postStateEvent));
router.route(GET, ADMIN_JOBS).handler(handleErrors(this::getJobs));
router.route(GET, ADMIN_JOB).handler(handleErrors(this::getJob));
router.route(DELETE, ADMIN_JOBS).handler(handleErrors(this::deleteJob));
router.route(POST, ADMIN_JOB_STEPS).handler(handleErrors(this::postStep));
router.route(GET, ADMIN_JOB_STEP).handler(handleErrors(this::getStep));
router.route(POST, ADMIN_STATE_MACHINE_EVENTS).handler(handleErrors(this::postStateEvent));
}

private void getJobs(RoutingContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

package com.here.xyz.jobs.service;

import static com.here.xyz.jobs.RuntimeInfo.State.FAILED;
import static com.here.xyz.jobs.RuntimeInfo.State.NOT_READY;
import static com.here.xyz.jobs.RuntimeInfo.State.RUNNING;
import static com.here.xyz.jobs.RuntimeStatus.Action.CANCEL;
import static com.here.xyz.jobs.service.JobApiBase.ApiParam.Path.SPACE_ID;
import static com.here.xyz.jobs.service.JobApiBase.ApiParam.getPathParam;
import static com.here.xyz.jobs.steps.Step.InputSet.DEFAULT_INPUT_SET_NAME;
import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED;
import static io.netty.handler.codec.http.HttpResponseStatus.CREATED;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
Expand All @@ -48,7 +50,6 @@
import io.vertx.core.Future;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.openapi.router.RouterBuilder;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -67,6 +68,8 @@ public JobApi(RouterBuilder rb) {
rb.getRoute("deleteJob").setDoValidation(false).addHandler(handleErrors(this::deleteJob));
rb.getRoute("postJobInputs").setDoValidation(false).addHandler(handleErrors(this::postJobInput));
rb.getRoute("getJobInputs").setDoValidation(false).addHandler(handleErrors(this::getJobInputs));
rb.getRoute("postNamedJobInputs").setDoValidation(false).addHandler(handleErrors(this::postJobInput));
rb.getRoute("getNamedJobInputs").setDoValidation(false).addHandler(handleErrors(this::getJobInputs));
rb.getRoute("getJobOutputs").setDoValidation(false).addHandler(handleErrors(this::getJobOutputs));
rb.getRoute("patchJobStatus").setDoValidation(false).addHandler(handleErrors(this::patchJobStatus));
rb.getRoute("getJobStatus").setDoValidation(false).addHandler(handleErrors(this::getJobStatus));
Expand All @@ -79,6 +82,7 @@ protected void postJob(final RoutingContext context) throws HttpException {
protected Future<Job> createNewJob(RoutingContext context, Job job) {
logger.info(getMarker(context), "Received job creation request: {}", job.serialize(true));
return job.create().submit()
.compose(v -> applyInputReferences(job))
.map(res -> job)
.recover(t -> {
if (t instanceof CompilationError)
Expand All @@ -94,6 +98,34 @@ protected Future<Job> createNewJob(RoutingContext context, Job job) {
.onFailure(err -> sendErrorResponse(context, err));
}

protected Future<Void> applyInputReferences(Job job) {
if (job.getInputs() == null)
return Future.succeededFuture();

if (!job.getInputs().values().stream().allMatch(input -> input instanceof InputsFromS3))
return Future.failedFuture("Only inputs of type " + InputsFromS3.class.getSimpleName() + " are supported as inline inputs.");

//Continue with the input scanning *asynchronously* but fail the job if something goes wrong (User can check the status)
Future.all(job.getInputs().entrySet().stream()
.map(inputSet -> registerInput(job, (InputsFromS3) inputSet.getValue(), inputSet.getKey()))
.toList())
.onFailure(t -> {
logger.error("[{}] Error while scanning inputs for job.", job.getId(), t);
job.getStatus()
.withState(FAILED)
.withErrorMessage("Error while scanning inputs.")
.withErrorCause(t.getMessage());
job.store();
})
.compose(v -> job.submit());

/*
Return without waiting for the input scanning to complete.
The job will stay in state NOT_READY for some time but will proceed automatically afterwards.
*/
return Future.succeededFuture();
}

protected void getJobs(final RoutingContext context) {
getJobs(context, false);
}
Expand All @@ -116,67 +148,74 @@ protected void deleteJob(final RoutingContext context) {
protected void postJobInput(final RoutingContext context) throws HttpException {
String jobId = jobId(context);
Input input = getJobInputFromBody(context);
if (input instanceof UploadUrl uploadUrl) {
loadJob(context, jobId)
.compose(job -> job.getStatus().getState() == NOT_READY
? Future.succeededFuture(job)
: Future.failedFuture(new DetailedHttpException("E319004")))
.map(job -> job.createUploadUrl(uploadUrl.isCompressed()))
.onSuccess(res -> sendResponse(context, CREATED.code(), res))
.onFailure(err -> sendErrorResponse(context, err));
}
else if (input instanceof InputsFromS3 s3Inputs) {
loadJob(context, jobId)
.compose(job -> job.getStatus().getState() == NOT_READY
? Future.succeededFuture(job)
: Future.failedFuture(new DetailedHttpException("E319004")))
.compose(job -> {
s3Inputs.dereference(job.getId());
return Future.succeededFuture();
})
.onSuccess(v -> sendResponse(context, OK.code(), (XyzSerializable) null))
.onFailure(err -> sendErrorResponse(context, err));
}
else if (input instanceof ModelBasedInput modelBasedInput) {
loadJob(context, jobId)
.compose(job -> {
if (!job.isPipeline())
return Future.failedFuture(new DetailedHttpException("E319005", Map.of("allowedType", UploadUrl.class.getSimpleName())));
else if (job.getStatus().getState() != RUNNING)
return Future.failedFuture(new DetailedHttpException("E319006"));
else if (context.request().bytesRead() > 256 * 1024)
return Future.failedFuture(new DetailedHttpException("E319007"));
else
return job.consumeInput(modelBasedInput);
})
.onSuccess(v -> sendResponse(context, OK.code(), (XyzSerializable) null))
.onFailure(err -> sendErrorResponse(context, err));
}
else if (input instanceof InputsFromJob inputsReference) {
//NOTE: Both jobs have to be loaded to authorize the user for both
loadJob(context, jobId)
.compose(job -> loadJob(context, inputsReference.getJobId()).compose(referencedJob -> {
try {
if (!Objects.equals(referencedJob.getOwner(), job.getOwner()))
return Future.failedFuture(new DetailedHttpException("E319008", Map.of("referencedJob", inputsReference.getJobId(), "referencingJob", job.getId())));

inputsReference.dereference(job.getId());
return Future.succeededFuture();
}
catch (IOException e) {
return Future.failedFuture(e);
}
}))
.onSuccess(v -> sendResponse(context, OK.code(), (XyzSerializable) null))
.onFailure(err -> sendErrorResponse(context, err));
}
String inputSetName = inputSetName(context);

Future<Input> inputCreatedFuture = loadJob(context, jobId).compose(job -> registerInput(context, job, input, inputSetName));

inputCreatedFuture
.onSuccess(res -> sendResponse(context, CREATED.code(), res))
.onFailure(err -> sendErrorResponse(context, err));
}

private Future<Input> registerInput(RoutingContext context, Job job, Input input, String inputSetName) {
if (input instanceof UploadUrl uploadUrl)
return registerInput(job, inputSetName, uploadUrl);

if (input instanceof InputsFromS3 s3Inputs)
return registerInput(job, s3Inputs, inputSetName);

if (input instanceof ModelBasedInput modelBasedInput)
return registerPipelineInput(context, job, modelBasedInput);

if (input instanceof InputsFromJob inputsReference)
return registerInput(context, job, inputsReference);

throw new NotImplementedException("Input type " + input.getClass().getSimpleName() + " is not supported.");
}

private static Future<Input> registerInput(Job job, String inputSetName, UploadUrl uploadUrl) {
return job.getStatus().getState() == NOT_READY
? Future.succeededFuture(job.createUploadUrl(uploadUrl.isCompressed(), inputSetName))
: Future.failedFuture(new DetailedHttpException("E319004"));
}

private Future<Input> registerInput(RoutingContext context, Job job, InputsFromJob inputsReference) {
//NOTE: Both jobs have to be loaded to authorize the user for both ones
return loadJob(context, inputsReference.getJobId()).compose(referencedJob -> {
try {
if (!Objects.equals(referencedJob.getOwner(), job.getOwner()))
return Future.failedFuture(new DetailedHttpException("E319008", Map.of("referencedJob", inputsReference.getJobId(), "referencingJob", job.getId())));

inputsReference.dereference(job.getId());
return Future.succeededFuture();
}
catch (Exception e) {
return Future.failedFuture(e);
}
});
}

private static Future<Input> registerPipelineInput(RoutingContext context, Job job, ModelBasedInput modelBasedInput) {
if (!job.isPipeline())
return Future.failedFuture(new DetailedHttpException("E319005", Map.of("allowedType", UploadUrl.class.getSimpleName())));
else if (job.getStatus().getState() != RUNNING)
return Future.failedFuture(new DetailedHttpException("E319006"));
else if (context.request().bytesRead() > 256 * 1024)
return Future.failedFuture(new DetailedHttpException("E319007"));
else
throw new NotImplementedException("Input type " + input.getClass().getSimpleName() + " is not supported.");
return job.consumeInput(modelBasedInput).map(null);
}

private static Future<Input> registerInput(Job job, InputsFromS3 s3Inputs, String inputSetName) {
if (job.getStatus().getState() != NOT_READY)
return Future.failedFuture(new DetailedHttpException("E319004"));
s3Inputs.dereference(job.getId(), inputSetName);
return Future.succeededFuture(null);
}

protected void getJobInputs(final RoutingContext context) {
loadJob(context, jobId(context))
.compose(job -> job.loadInputs())
.compose(job -> job.loadInputs(inputSetName(context)))
.onSuccess(res -> sendResponse(context, OK.code(), res, new TypeReference<List<Input>>() {}))
.onFailure(err -> sendErrorResponse(context, err));
}
Expand Down Expand Up @@ -234,6 +273,11 @@ protected Future<Void> authorizeAccess(RoutingContext context, Job job) {
return Future.succeededFuture();
}

protected String inputSetName(RoutingContext context) {
String setName = context.pathParam("setName");
return setName == null ? DEFAULT_INPUT_SET_NAME : setName;
}

protected Input getJobInputFromBody(RoutingContext context) throws HttpException {
try {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2017-2024 HERE Europe B.V.
* Copyright (C) 2017-2025 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Loading
Loading