diff --git a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/Job.java b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/Job.java index 06d64d08d1..c058e0abb6 100644 --- a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/Job.java +++ b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/Job.java @@ -29,6 +29,7 @@ import static com.here.xyz.jobs.RuntimeInfo.State.RUNNING; import static com.here.xyz.jobs.RuntimeInfo.State.SUBMITTED; import static com.here.xyz.jobs.RuntimeInfo.State.SUCCEEDED; +import static com.here.xyz.jobs.steps.Step.InputSet.DEFAULT_INPUT_SET_NAME; import static com.here.xyz.jobs.steps.inputs.Input.inputS3Prefix; import static com.here.xyz.jobs.steps.resources.Load.addLoads; import static com.here.xyz.util.Random.randomAlpha; @@ -36,6 +37,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonView; import com.here.xyz.XyzSerializable; import com.here.xyz.jobs.RuntimeInfo.State; @@ -71,6 +73,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -88,6 +91,8 @@ public class Job implements XyzSerializable { private long updatedAt; @JsonView({Public.class, Static.class}) private long keepUntil; + @JsonProperty(access = JsonProperty.Access.WRITE_ONLY) + private Map inputs; //Caller defined properties: @JsonView(Static.class) private String owner; @@ -212,7 +217,7 @@ private Future prepareStep(Step step) { protected Future validate() { logger.info("[{}] Validating job ...", getId()); //TODO: Collect exceptions and forward them accordingly as one exception object with (potentially) multiple error objects inside - return Future.all(Job.forEach(getSteps().stepStream().toList(), step -> validateStep(step))) + return Future.all(Job.forEach(nonFinalSteps().toList(), step -> validateStep(step))) .compose(cf -> Future.succeededFuture(cf.list().stream().allMatch(isReady -> (boolean) isReady))); } @@ -234,7 +239,7 @@ public Future start() { return Future.failedFuture(new IllegalStateException("Job can not be started as it's not in SUBMITTED state.")); getStatus().setState(PENDING); - getSteps().stepStream().forEach(step -> step.getStatus().setState(PENDING)); + nonFinalSteps().forEach(step -> step.getStatus().setState(PENDING)); long t1 = Core.currentTimeMillis(); return store() @@ -286,12 +291,16 @@ public Step getStepById(String stepId) { return getSteps().getStep(stepId); } + private Stream nonFinalSteps() { + return getSteps().stepStream().filter(step -> !step.getStatus().getState().isFinal()); + } + /** * Updates the status of a step at this job by replacing it with the specified one. * @param step * @return */ - public Future updateStep(Step step) { + public Future updateStep(Step step) { final Step existingStep = getStepById(step.getId()); if (existingStep == null) throw new IllegalArgumentException("The provided step with ID " + step.getGlobalStepId() + " was not found."); @@ -490,9 +499,13 @@ private Future> calculateResourceLoads(Step step) } public UploadUrl createUploadUrl(boolean compressed) { + return createUploadUrl(compressed, DEFAULT_INPUT_SET_NAME); + } + + public UploadUrl createUploadUrl(boolean compressed, String setName) { return new UploadUrl() .withCompressed(compressed) - .withS3Key(inputS3Prefix(getId()) + "/" + UUID.randomUUID() + (compressed ? ".gz" : "")); + .withS3Key(inputS3Prefix(getId(), setName) + "/" + UUID.randomUUID() + (compressed ? ".gz" : "")); } public Future consumeInput(ModelBasedInput input) { @@ -510,8 +523,8 @@ private Future deleteInputs() { return Future.succeededFuture(); } - public Future> loadInputs() { - return ASYNC.run(() -> Input.loadInputs(getId())); + public Future> loadInputs(String setName) { + return ASYNC.run(() -> Input.loadInputs(getId(), setName)); } public Future> loadOutputs() { @@ -637,6 +650,19 @@ public Job withKeepUntil(long keepUntil) { return this; } + public Map getInputs() { + return inputs; + } + + public void setInputs(Map inputs) { + this.inputs = inputs; + } + + public Job withInputs(Map inputs) { + setInputs(inputs); + return this; + } + public String getOwner() { return owner; } diff --git a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/service/JobAdminApi.java b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/service/JobAdminApi.java index 631690e6c5..b875706b8d 100644 --- a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/service/JobAdminApi.java +++ b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/service/JobAdminApi.java @@ -32,6 +32,9 @@ import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; import static io.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT; import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.vertx.core.http.HttpMethod.DELETE; +import static io.vertx.core.http.HttpMethod.GET; +import static io.vertx.core.http.HttpMethod.POST; import com.fasterxml.jackson.core.JsonProcessingException; import com.here.xyz.XyzSerializable; @@ -42,7 +45,6 @@ import com.here.xyz.jobs.steps.execution.JobExecutor; import com.here.xyz.util.service.HttpException; import io.vertx.core.Future; -import io.vertx.core.http.HttpMethod; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; @@ -60,12 +62,12 @@ public class JobAdminApi extends JobApiBase { private static final String ADMIN_STATE_MACHINE_EVENTS = "/admin/state/events"; public JobAdminApi(Router router) { - router.route(HttpMethod.GET, ADMIN_JOBS).handler(handleErrors(this::getJobs)); - router.route(HttpMethod.GET, ADMIN_JOB).handler(handleErrors(this::getJob)); - router.route(HttpMethod.DELETE, ADMIN_JOBS).handler(handleErrors(this::deleteJob)); - router.route(HttpMethod.POST, ADMIN_JOB_STEPS).handler(handleErrors(this::postStep)); - router.route(HttpMethod.GET, ADMIN_JOB_STEP).handler(handleErrors(this::getStep)); - router.route(HttpMethod.POST, ADMIN_STATE_MACHINE_EVENTS).handler(handleErrors(this::postStateEvent)); + router.route(GET, ADMIN_JOBS).handler(handleErrors(this::getJobs)); + router.route(GET, ADMIN_JOB).handler(handleErrors(this::getJob)); + router.route(DELETE, ADMIN_JOBS).handler(handleErrors(this::deleteJob)); + router.route(POST, ADMIN_JOB_STEPS).handler(handleErrors(this::postStep)); + router.route(GET, ADMIN_JOB_STEP).handler(handleErrors(this::getStep)); + router.route(POST, ADMIN_STATE_MACHINE_EVENTS).handler(handleErrors(this::postStateEvent)); } private void getJobs(RoutingContext context) { diff --git a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/service/JobApi.java b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/service/JobApi.java index 0ce741af7a..6faaccc33c 100644 --- a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/service/JobApi.java +++ b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/service/JobApi.java @@ -19,11 +19,13 @@ package com.here.xyz.jobs.service; +import static com.here.xyz.jobs.RuntimeInfo.State.FAILED; import static com.here.xyz.jobs.RuntimeInfo.State.NOT_READY; import static com.here.xyz.jobs.RuntimeInfo.State.RUNNING; import static com.here.xyz.jobs.RuntimeStatus.Action.CANCEL; import static com.here.xyz.jobs.service.JobApiBase.ApiParam.Path.SPACE_ID; import static com.here.xyz.jobs.service.JobApiBase.ApiParam.getPathParam; +import static com.here.xyz.jobs.steps.Step.InputSet.DEFAULT_INPUT_SET_NAME; import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; import static io.netty.handler.codec.http.HttpResponseStatus.CREATED; import static io.netty.handler.codec.http.HttpResponseStatus.OK; @@ -48,7 +50,6 @@ import io.vertx.core.Future; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.openapi.router.RouterBuilder; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Objects; @@ -67,6 +68,8 @@ public JobApi(RouterBuilder rb) { rb.getRoute("deleteJob").setDoValidation(false).addHandler(handleErrors(this::deleteJob)); rb.getRoute("postJobInputs").setDoValidation(false).addHandler(handleErrors(this::postJobInput)); rb.getRoute("getJobInputs").setDoValidation(false).addHandler(handleErrors(this::getJobInputs)); + rb.getRoute("postNamedJobInputs").setDoValidation(false).addHandler(handleErrors(this::postJobInput)); + rb.getRoute("getNamedJobInputs").setDoValidation(false).addHandler(handleErrors(this::getJobInputs)); rb.getRoute("getJobOutputs").setDoValidation(false).addHandler(handleErrors(this::getJobOutputs)); rb.getRoute("patchJobStatus").setDoValidation(false).addHandler(handleErrors(this::patchJobStatus)); rb.getRoute("getJobStatus").setDoValidation(false).addHandler(handleErrors(this::getJobStatus)); @@ -79,6 +82,7 @@ protected void postJob(final RoutingContext context) throws HttpException { protected Future createNewJob(RoutingContext context, Job job) { logger.info(getMarker(context), "Received job creation request: {}", job.serialize(true)); return job.create().submit() + .compose(v -> applyInputReferences(job)) .map(res -> job) .recover(t -> { if (t instanceof CompilationError) @@ -94,6 +98,34 @@ protected Future createNewJob(RoutingContext context, Job job) { .onFailure(err -> sendErrorResponse(context, err)); } + protected Future applyInputReferences(Job job) { + if (job.getInputs() == null) + return Future.succeededFuture(); + + if (!job.getInputs().values().stream().allMatch(input -> input instanceof InputsFromS3)) + return Future.failedFuture("Only inputs of type " + InputsFromS3.class.getSimpleName() + " are supported as inline inputs."); + + //Continue with the input scanning *asynchronously* but fail the job if something goes wrong (User can check the status) + Future.all(job.getInputs().entrySet().stream() + .map(inputSet -> registerInput(job, (InputsFromS3) inputSet.getValue(), inputSet.getKey())) + .toList()) + .onFailure(t -> { + logger.error("[{}] Error while scanning inputs for job.", job.getId(), t); + job.getStatus() + .withState(FAILED) + .withErrorMessage("Error while scanning inputs.") + .withErrorCause(t.getMessage()); + job.store(); + }) + .compose(v -> job.submit()); + + /* + Return without waiting for the input scanning to complete. + The job will stay in state NOT_READY for some time but will proceed automatically afterwards. + */ + return Future.succeededFuture(); + } + protected void getJobs(final RoutingContext context) { getJobs(context, false); } @@ -116,67 +148,74 @@ protected void deleteJob(final RoutingContext context) { protected void postJobInput(final RoutingContext context) throws HttpException { String jobId = jobId(context); Input input = getJobInputFromBody(context); - if (input instanceof UploadUrl uploadUrl) { - loadJob(context, jobId) - .compose(job -> job.getStatus().getState() == NOT_READY - ? Future.succeededFuture(job) - : Future.failedFuture(new DetailedHttpException("E319004"))) - .map(job -> job.createUploadUrl(uploadUrl.isCompressed())) - .onSuccess(res -> sendResponse(context, CREATED.code(), res)) - .onFailure(err -> sendErrorResponse(context, err)); - } - else if (input instanceof InputsFromS3 s3Inputs) { - loadJob(context, jobId) - .compose(job -> job.getStatus().getState() == NOT_READY - ? Future.succeededFuture(job) - : Future.failedFuture(new DetailedHttpException("E319004"))) - .compose(job -> { - s3Inputs.dereference(job.getId()); - return Future.succeededFuture(); - }) - .onSuccess(v -> sendResponse(context, OK.code(), (XyzSerializable) null)) - .onFailure(err -> sendErrorResponse(context, err)); - } - else if (input instanceof ModelBasedInput modelBasedInput) { - loadJob(context, jobId) - .compose(job -> { - if (!job.isPipeline()) - return Future.failedFuture(new DetailedHttpException("E319005", Map.of("allowedType", UploadUrl.class.getSimpleName()))); - else if (job.getStatus().getState() != RUNNING) - return Future.failedFuture(new DetailedHttpException("E319006")); - else if (context.request().bytesRead() > 256 * 1024) - return Future.failedFuture(new DetailedHttpException("E319007")); - else - return job.consumeInput(modelBasedInput); - }) - .onSuccess(v -> sendResponse(context, OK.code(), (XyzSerializable) null)) - .onFailure(err -> sendErrorResponse(context, err)); - } - else if (input instanceof InputsFromJob inputsReference) { - //NOTE: Both jobs have to be loaded to authorize the user for both - loadJob(context, jobId) - .compose(job -> loadJob(context, inputsReference.getJobId()).compose(referencedJob -> { - try { - if (!Objects.equals(referencedJob.getOwner(), job.getOwner())) - return Future.failedFuture(new DetailedHttpException("E319008", Map.of("referencedJob", inputsReference.getJobId(), "referencingJob", job.getId()))); - - inputsReference.dereference(job.getId()); - return Future.succeededFuture(); - } - catch (IOException e) { - return Future.failedFuture(e); - } - })) - .onSuccess(v -> sendResponse(context, OK.code(), (XyzSerializable) null)) - .onFailure(err -> sendErrorResponse(context, err)); - } + String inputSetName = inputSetName(context); + + Future inputCreatedFuture = loadJob(context, jobId).compose(job -> registerInput(context, job, input, inputSetName)); + + inputCreatedFuture + .onSuccess(res -> sendResponse(context, CREATED.code(), res)) + .onFailure(err -> sendErrorResponse(context, err)); + } + + private Future registerInput(RoutingContext context, Job job, Input input, String inputSetName) { + if (input instanceof UploadUrl uploadUrl) + return registerInput(job, inputSetName, uploadUrl); + + if (input instanceof InputsFromS3 s3Inputs) + return registerInput(job, s3Inputs, inputSetName); + + if (input instanceof ModelBasedInput modelBasedInput) + return registerPipelineInput(context, job, modelBasedInput); + + if (input instanceof InputsFromJob inputsReference) + return registerInput(context, job, inputsReference); + + throw new NotImplementedException("Input type " + input.getClass().getSimpleName() + " is not supported."); + } + + private static Future registerInput(Job job, String inputSetName, UploadUrl uploadUrl) { + return job.getStatus().getState() == NOT_READY + ? Future.succeededFuture(job.createUploadUrl(uploadUrl.isCompressed(), inputSetName)) + : Future.failedFuture(new DetailedHttpException("E319004")); + } + + private Future registerInput(RoutingContext context, Job job, InputsFromJob inputsReference) { + //NOTE: Both jobs have to be loaded to authorize the user for both ones + return loadJob(context, inputsReference.getJobId()).compose(referencedJob -> { + try { + if (!Objects.equals(referencedJob.getOwner(), job.getOwner())) + return Future.failedFuture(new DetailedHttpException("E319008", Map.of("referencedJob", inputsReference.getJobId(), "referencingJob", job.getId()))); + + inputsReference.dereference(job.getId()); + return Future.succeededFuture(); + } + catch (Exception e) { + return Future.failedFuture(e); + } + }); + } + + private static Future registerPipelineInput(RoutingContext context, Job job, ModelBasedInput modelBasedInput) { + if (!job.isPipeline()) + return Future.failedFuture(new DetailedHttpException("E319005", Map.of("allowedType", UploadUrl.class.getSimpleName()))); + else if (job.getStatus().getState() != RUNNING) + return Future.failedFuture(new DetailedHttpException("E319006")); + else if (context.request().bytesRead() > 256 * 1024) + return Future.failedFuture(new DetailedHttpException("E319007")); else - throw new NotImplementedException("Input type " + input.getClass().getSimpleName() + " is not supported."); + return job.consumeInput(modelBasedInput).map(null); + } + + private static Future registerInput(Job job, InputsFromS3 s3Inputs, String inputSetName) { + if (job.getStatus().getState() != NOT_READY) + return Future.failedFuture(new DetailedHttpException("E319004")); + s3Inputs.dereference(job.getId(), inputSetName); + return Future.succeededFuture(null); } protected void getJobInputs(final RoutingContext context) { loadJob(context, jobId(context)) - .compose(job -> job.loadInputs()) + .compose(job -> job.loadInputs(inputSetName(context))) .onSuccess(res -> sendResponse(context, OK.code(), res, new TypeReference>() {})) .onFailure(err -> sendErrorResponse(context, err)); } @@ -234,6 +273,11 @@ protected Future authorizeAccess(RoutingContext context, Job job) { return Future.succeededFuture(); } + protected String inputSetName(RoutingContext context) { + String setName = context.pathParam("setName"); + return setName == null ? DEFAULT_INPUT_SET_NAME : setName; + } + protected Input getJobInputFromBody(RoutingContext context) throws HttpException { try { try { diff --git a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/StepGraph.java b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/StepGraph.java index 7726bbc28c..1e0b72b7a5 100644 --- a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/StepGraph.java +++ b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/StepGraph.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2024 HERE Europe B.V. + * Copyright (C) 2017-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/execution/GraphFusionTool.java b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/execution/GraphFusionTool.java index 213548c884..36d0991bd5 100644 --- a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/execution/GraphFusionTool.java +++ b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/execution/GraphFusionTool.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2024 HERE Europe B.V. + * Copyright (C) 2017-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,11 +25,15 @@ import com.here.xyz.jobs.steps.CompilationStepGraph; import com.here.xyz.jobs.steps.Step; import com.here.xyz.jobs.steps.Step.InputSet; +import com.here.xyz.jobs.steps.Step.OutputSet; import com.here.xyz.jobs.steps.StepExecution; import com.here.xyz.jobs.steps.StepGraph; import com.here.xyz.jobs.steps.execution.RunEmrJob.ReferenceIdentifier; import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; public class GraphFusionTool { @@ -48,6 +52,8 @@ protected static StepGraph fuseGraphs(Job newJob, StepGraph oldGraph) { } protected static StepGraph fuseGraphs(String newJobId, StepGraph newGraph, StepGraph oldGraph) { + newGraph = canonicalize(newGraph); + oldGraph = canonicalize(oldGraph); CompilationStepGraph fusedGraph = replaceByDelegations(newGraph, oldGraph); //Replace previous step relations (previousStepIds) @@ -58,6 +64,61 @@ protected static StepGraph fuseGraphs(String newJobId, StepGraph newGraph, StepG return fusedGraph; } + protected static StepGraph canonicalize(StepGraph graph) { + /* + 1.) Remove all steps that are flagged as being "notReusable" (these should be basically hidden from the reusability process) + 2.) Then, remove empty sub-graphs (NOTE: The traversal is done in "bottom-up" manner so sub-graphs + that became empty due to the removal of "notReusable" steps will be removed as well + */ + traverse(graph, execution -> { + if (execution instanceof Step step && step.isNotReusable()) + return null; + if (execution instanceof StepGraph subGraph) { + if (subGraph.isEmpty()) + return null; + if (subGraph.getExecutions().size() == 1) + return unwrap(subGraph); + } + return execution; + }); + //Unwrap the top-level graph if it only contains one subgraph + if (graph.getExecutions().size() == 1 && graph.getExecutions().get(0) instanceof StepGraph subGraph) + graph = subGraph; + + return graph; + } + + /** + * Traverses all executions of the specified graph in a bottom-up manner (leaves first). + * @param graph The graph to be traversed recursively + * @param processor The action to be performed on the execution-node and its containing graph + */ + private static void traverse(StepGraph graph, UnaryOperator processor) { + List nodes = new LinkedList<>(graph.getExecutions()); + Iterator nodeIterator = nodes.iterator(); + int index = 0; + while (nodeIterator.hasNext()) { + StepExecution execution = nodeIterator.next(); + if (execution instanceof Step step) + execution = processor.apply(step); + else if (execution instanceof StepGraph subGraph) { + //First traverse, then call the processor (==> "bottom-up") + traverse(subGraph, processor); + execution = processor.apply(subGraph); + } + + //Update executions accordingly to the return value of the processor (null means removal) + if (execution == null) { + nodeIterator.remove(); + index--; + } + else + nodes.set(index, execution); + index++; + } + graph.setExecutions(nodes); + } + /** * Replaces all steps of the new step graph that can be re-used from the old step graph by a {@link DelegateStep} that * points to the according equivalent step of the old step graph being re-used. @@ -222,7 +283,7 @@ private static int matchCount(StepExecution branch) { * * @param graph */ - static void resolveReusedInputs(StepGraph graph) { + private static void resolveReusedInputs(StepGraph graph) { graph.stepStream().forEach(step -> resolveReusedInputs(step, graph)); } @@ -235,13 +296,15 @@ static void resolveReusedInputs(StepGraph graph) { private static void resolveReusedInputs(Step step, StepGraph containingStepGraph) { List newInputSets = new ArrayList<>(); for (InputSet compiledInputSet : (List) step.getInputSets()) { - if (compiledInputSet.stepId() == null || !(containingStepGraph.getStep(compiledInputSet.stepId()) instanceof DelegateStep replacementStep)) + if (compiledInputSet.providerId() == null || !(containingStepGraph.getStep(compiledInputSet.providerId()) instanceof DelegateStep replacementStep)) //NOTE: stepId == null on an InputSet refers to the USER-inputs newInputSets.add(compiledInputSet); else //Now we know that inputSet is one that should be replaced by one that is pointing to the outputs of the old graph - newInputSets.add(new InputSet(replacementStep.getDelegate().getJobId(), replacementStep.getDelegate().getId(), compiledInputSet.name(), - compiledInputSet.modelBased(), replacementStep.getDelegate().getOutputMetadata())); + //Note that stepId in the OutputSet of DelegateStep could be different from the stepId of the DelegateStep + newInputSets.add(new InputSet(replacementStep.getDelegate().getJobId(), + replacementStep.getOutputSet(compiledInputSet.name()).getStepId(), compiledInputSet.name(), compiledInputSet.modelBased(), + replacementStep.getDelegate().getOutputMetadata())); } step.setInputSets(newInputSets); @@ -279,7 +342,8 @@ private static String updateEmrScriptParamReferences(RunEmrJob runEmrJob, StepGr Here the reference will be kept as it is, and the later execution will handle this issue by throwing the correct exception. */ return null; - return toInputSetReference(runEmrJob.getInputSet(referencedDelegateStep.getDelegate().getId(), ref.name())); + OutputSet referencedOutputsSet = referencedDelegateStep.getOutputSet(ref.name()); + return toInputSetReference(runEmrJob.getInputSet(referencedOutputsSet.getStepId(), ref.name())); } } } diff --git a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/util/test/JobTestBase.java b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/util/test/JobTestBase.java index 81da662b5e..7663c9a048 100644 --- a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/util/test/JobTestBase.java +++ b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/util/test/JobTestBase.java @@ -26,6 +26,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import io.vertx.core.json.JsonObject; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,6 +37,9 @@ public class JobTestBase extends StepTestBase { protected Set createdSpaces = new HashSet<>(); //Job-Api related + + /** Use {@link JobTestBase#createJob(JsonObject)} instead*/ + @Deprecated public static String createJob(Job job) throws IOException, InterruptedException { logger.info("Creating job ..."); HttpResponse jobResponse = post("/jobs", job); @@ -49,6 +53,59 @@ public static String createJob(Job job) throws IOException, InterruptedException return createdJob.getId(); } + protected static String createJob(JsonObject job) throws IOException, InterruptedException { + logger.info("Creating job ..."); + HttpResponse jobResponse = post("/jobs", job.mapTo(Map.class)); + + logger.info("Got response:\n{}", toPrettyJson(jobResponse.body())); + + String jobId = new JsonObject(new String(jobResponse.body())).getString("id"); + + logger.info("Internal Job config:\n{}", toPrettyJson(get("/admin/jobs/" + jobId).body())); + + return jobId; + } + + protected static JsonObject createTypedJsonObject(String type) { return new JsonObject().put("type", type);} + + protected String createJobAndPollStatus(JsonObject job) throws Exception { + return createJobAndPollStatus(job, null); + } + + protected String createJobAndPollStatus(JsonObject job, byte[] fileContent) throws Exception { + //Create Job + String jobId = createJob(job); + createdJobs.add(jobId); + + //Upload content if provided + if(fileContent != null) { + uploadFileToJob(jobId, fileContent); + //Start Job execution + startJob(jobId); + } + + //Wait till Job reached final state + pollJobStatus(jobId); + + return jobId; + } + + protected JsonObject buildJob(JsonObject source, JsonObject target) { + return buildJob(source, target, null); + } + + protected JsonObject buildJob(JsonObject source, JsonObject target, JsonObject process) { + return buildJob(source, target, process, null); + } + + protected JsonObject buildJob(JsonObject source, JsonObject target, JsonObject process, JsonObject inputs) { + return new JsonObject() + .put("source", source) + .put("target", target) + .put("process", process) + .put("inputs", inputs); + } + public static String toPrettyJson(byte[] json) throws JsonProcessingException { return XyzSerializable.serialize(XyzSerializable.deserialize(json, Map.class), true); } @@ -183,6 +240,8 @@ protected Space createSpace(Space space, boolean force) { return super.createSpace(space, force); } + /** Use {@link JobTestBase#createJobAndPollStatus(JsonObject)} instead */ + @Deprecated protected void createSelfRunningJob(Job job) throws Exception { //Create Job - expect autostart createJob(job); @@ -192,6 +251,8 @@ protected void createSelfRunningJob(Job job) throws Exception { pollJobStatus(job.getId()); } + /** Use {@link JobTestBase#createJobAndPollStatus(JsonObject, byte[])} instead */ + @Deprecated protected void createAndStartJob(Job job, byte[] fileContent) throws Exception { //Create Job createJob(job); diff --git a/xyz-jobs/xyz-job-service/src/main/resources/openapi.yaml b/xyz-jobs/xyz-job-service/src/main/resources/openapi.yaml index dfe48c108c..03074a20e2 100644 --- a/xyz-jobs/xyz-job-service/src/main/resources/openapi.yaml +++ b/xyz-jobs/xyz-job-service/src/main/resources/openapi.yaml @@ -77,7 +77,7 @@ paths: post: tags: - "Manage Jobs" - summary: "Create job input" + summary: "Create a job input" description: "Creates a new input for a job" operationId: "postJobInputs" parameters: @@ -106,6 +106,41 @@ paths: $ref: "#/components/responses/ErrorResponse400" "404": $ref: "#/components/responses/ErrorResponse404" + /jobs/{jobId}/inputs/{setName}: + post: + tags: + - "Manage Jobs" + summary: "Create a named job input" + description: "Creates a new input for a job" + operationId: "postNamedJobInputs" + parameters: + - $ref: "#/components/parameters/JobId" + - $ref: "#/components/parameters/PayloadSetName" + requestBody: + $ref: "#/components/requestBodies/JobInputRequest" + responses: + "201": + $ref: "#/components/responses/JobInputResponse" + "400": + $ref: "#/components/responses/ErrorResponse400" + "404": + $ref: "#/components/responses/ErrorResponse404" + get: + tags: + - "Manage Jobs" + summary: "List job inputs of a named set" + description: "Lists the available inputs of a named set of a job" + operationId: "getNamedJobInputs" + parameters: + - $ref: "#/components/parameters/JobId" + - $ref: "#/components/parameters/PayloadSetName" + responses: + "200": + $ref: "#/components/responses/JobInputsResponse" + "400": + $ref: "#/components/responses/ErrorResponse400" + "404": + $ref: "#/components/responses/ErrorResponse404" /jobs/{jobId}/outputs: get: tags: @@ -180,6 +215,13 @@ components: required: true schema: type: string + PayloadSetName: + name: setName + in: path + description: The name of a set of payloads. + required: true + schema: + type: string JobStatus: name: status in: query diff --git a/xyz-jobs/xyz-job-service/src/test/java/com/here/xyz/jobs/steps/execution/GraphFusionTests.java b/xyz-jobs/xyz-job-service/src/test/java/com/here/xyz/jobs/steps/execution/GraphFusionTests.java index cd873f31f0..95be60414f 100644 --- a/xyz-jobs/xyz-job-service/src/test/java/com/here/xyz/jobs/steps/execution/GraphFusionTests.java +++ b/xyz-jobs/xyz-job-service/src/test/java/com/here/xyz/jobs/steps/execution/GraphFusionTests.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2024 HERE Europe B.V. + * Copyright (C) 2017-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ package com.here.xyz.jobs.steps.execution; import static com.here.xyz.jobs.steps.Step.Visibility.SYSTEM; +import static com.here.xyz.jobs.steps.execution.GraphFusionTool.canonicalize; import static com.here.xyz.jobs.steps.execution.GraphFusionTool.fuseGraphs; import static com.here.xyz.jobs.steps.execution.fusion.SimpleTestStepWithOutput.SOME_OUTPUT; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -97,6 +98,29 @@ public void singletonGraphNotReusable(@Values(booleans = {true, false}) boolean checkOutputs(fusedGraph, OLD_JOB_ID); } + @Test + public void reuseSingletonGraphTransitively() { + Step oldProducer = new SimpleTestStepWithOutput(SOME_EXPORT); + StepGraph oldGraph = sequential(OLD_JOB_ID, oldProducer); + + SimpleTestStepWithOutput newProducer = new SimpleTestStepWithOutput(SOME_EXPORT); + StepGraph newGraph = sequential(NEW_JOB_ID, newProducer); + + StepGraph firstFusedGraph = fuse(newGraph, oldGraph); + + SimpleTestStepWithOutput anotherNewProducer = new SimpleTestStepWithOutput(SOME_EXPORT); + StepGraph anotherNewGraph = sequential(NEW_JOB_ID, newProducer); + + StepGraph fusedGraph = fuse(anotherNewGraph, firstFusedGraph); + + assertEquals(1, fusedGraph.size()); + assertTrue(fusedGraph.stepStream().allMatch(step -> step instanceof DelegateStep delegateStep && delegateStep.getDelegate() instanceof SimpleTestStepWithOutput)); + assertTrue(oldGraph.isEquivalentTo(anotherNewGraph)); + assertTrue(fusedGraph.isEquivalentTo(oldGraph)); + checkInputs(fusedGraph, newGraph); + checkOutputs(fusedGraph, OLD_JOB_ID); + } + @Test public void simpleSequentialGraphFullyReusable() { Step oldProducer = new SimpleTestStepWithOutput(SOME_EXPORT); @@ -123,13 +147,11 @@ public void simpleSequentialGraphFullyReusable() { public void simpleSequentialGraphPartiallyReusable() { SimpleTestStepWithOutput oldProducer = new SimpleTestStepWithOutput(SOME_EXPORT); SimpleTestStep oldConsumer = new SimpleTestStep(SOME_CONSUMER); - StepGraph oldGraph = sequential(OLD_JOB_ID, - oldProducer, step(oldConsumer, inputsOf(oldProducer))); + StepGraph oldGraph = sequential(OLD_JOB_ID, oldProducer, step(oldConsumer, inputsOf(oldProducer))); SimpleTestStepWithOutput newProducer = new SimpleTestStepWithOutput(SOME_EXPORT); SimpleTestStep newConsumer = new SimpleTestStep("SomeOtherStep"); - StepGraph newGraph = sequential(NEW_JOB_ID, - newProducer, step(newConsumer, inputsOf(newProducer))); + StepGraph newGraph = sequential(NEW_JOB_ID, newProducer, step(newConsumer, inputsOf(newProducer))); StepGraph fusedGraph = fuse(newGraph, oldGraph); @@ -146,13 +168,11 @@ public void simpleSequentialGraphPartiallyReusable() { public void simpleSequentialGraphNotReusable() { SimpleTestStepWithOutput oldProducer = new SimpleTestStepWithOutput(SOME_EXPORT); SimpleTestStep oldConsumer = new SimpleTestStep(SOME_CONSUMER); - StepGraph oldGraph = sequential(OLD_JOB_ID, - oldProducer, step(oldConsumer, inputsOf(oldProducer))); + StepGraph oldGraph = sequential(OLD_JOB_ID, oldProducer, step(oldConsumer, inputsOf(oldProducer))); SimpleTestStepWithOutput newProducer = new SimpleTestStepWithOutput("SomeOtherExport"); SimpleTestStep newConsumer = new SimpleTestStep("SomeOtherStep"); - StepGraph newGraph = sequential(NEW_JOB_ID, - newProducer, step(newConsumer, inputsOf(newProducer))); + StepGraph newGraph = sequential(NEW_JOB_ID, newProducer, step(newConsumer, inputsOf(newProducer))); StepGraph fusedGraph = fuse(newGraph, oldGraph); @@ -361,6 +381,70 @@ public void newComplexGraphPartiallyReusable(@Values(booleans = {true, false}) b checkOutputs(fusedGraph, OLD_JOB_ID); } + @Test + public void canonicalizeParallelyWrappedSequentialGraphs() { + StepGraph graph = parallel( + sequential( + new SimpleTestStep<>(SOME_EXPORT).withNotReusable(true), + new SimpleTestStepWithOutput(SOME_EXPORT) + ), + sequential( + new SimpleTestStep<>(SOME_OTHER_EXPORT).withNotReusable(true), + new SimpleTestStepWithOutput(SOME_OTHER_EXPORT) + ) + ); + + StepGraph canonicalGraph = canonicalize(graph); + + assertTrue(canonicalGraph.isParallel()); + assertEquals(2, canonicalGraph.getExecutions().size()); + } + + @Test + public void canonicalizeWrappedParallelGraph() { + StepGraph graph = sequential( + parallel( + new SimpleTestStepWithOutput(SOME_EXPORT), + new SimpleTestStepWithOutput(SOME_OTHER_EXPORT) + ) + ); + + StepGraph canonicalGraph = canonicalize(graph); + + assertTrue(canonicalGraph.isParallel()); + assertEquals(2, canonicalGraph.getExecutions().size()); + } + + @Test + public void canonicalizeGraph() { + StepGraph graph = parallel( + sequential( + sequential( + sequential( + new SimpleTestStep(SOME_EXPORT).withNotReusable(true), + new SimpleTestStep(SOME_CONSUMER), + new SimpleTestStep(SOME_CONSUMER) + ) + ) + ), + sequential( + new SimpleTestStep(SOME_CONSUMER), + new SimpleTestStep(SOME_EXPORT).withNotReusable(true), + new SimpleTestStep(SOME_CONSUMER) + ), + sequential( + new SimpleTestStep(SOME_EXPORT).withNotReusable(true) + ) + ); + + StepGraph canonicalGraph = canonicalize(graph); + assertEquals(2, canonicalGraph.getExecutions().size()); + assertEquals(2, ((StepGraph) canonicalGraph.getExecutions().get(0)).getExecutions().size()); + assertEquals(2, ((StepGraph) canonicalGraph.getExecutions().get(1)).getExecutions().size()); + assertTrue(((StepGraph) canonicalGraph.getExecutions().get(0)).stepStream().noneMatch(step -> ((SimpleTestStep) step).paramA == SOME_EXPORT)); + assertTrue(((StepGraph) canonicalGraph.getExecutions().get(1)).stepStream().noneMatch(step -> ((SimpleTestStep) step).paramA == SOME_EXPORT)); + } + /* TODO: Add edge case tests: @@ -392,8 +476,8 @@ private static void checkInputs(StepGraph fusedGraph, StepGraph newGraph) { //CHECK THAT ALL INPUTS OF THE FUSED GRAPH THAT *SHOULD* BE DELEGATED ACTUALLY *ARE* DELEGATED //For every input-set of all steps in the fused graph it must not be the case that it references a DelegateStep, because that would mean that the input-set was not delegated correctly to the old output ((List) fusedStep.getInputSets()).forEach(inputSet -> { - if (inputSet.stepId() != null) { - Step referencedStep = fusedGraph.getStep(inputSet.stepId()); + if (inputSet.providerId() != null) { + Step referencedStep = fusedGraph.getStep(inputSet.providerId()); if (referencedStep != null) //NOTE: In case referencedStep == null that would mean that the step is not part of the fusedGraph, thus an old step would be referenced assertFalse(referencedStep instanceof DelegateStep, !(referencedStep instanceof DelegateStep) ? null : "The input-set \"" + inputSet.name() + "\" of step \"" + fusedStep.getId() + "\" must be delegated to the old output-set of step \"" + ((DelegateStep) referencedStep).getDelegate().getGlobalStepId() diff --git a/xyz-jobs/xyz-job-service/src/test/java/com/here/xyz/jobs/steps/execution/JobExecutorTests.java b/xyz-jobs/xyz-job-service/src/test/java/com/here/xyz/jobs/steps/execution/JobExecutorTests.java index 45e923fb07..459cc6b56a 100644 --- a/xyz-jobs/xyz-job-service/src/test/java/com/here/xyz/jobs/steps/execution/JobExecutorTests.java +++ b/xyz-jobs/xyz-job-service/src/test/java/com/here/xyz/jobs/steps/execution/JobExecutorTests.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2024 HERE Europe B.V. + * Copyright (C) 2017-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -158,7 +158,7 @@ public void testReuseSequentialGraphWithInputIds() { .findAny() .get(); assertEquals(reusedInputSet.jobId(), JOB_ID1); - assertEquals(reusedInputSet.stepId(), exportStep2.getId()); + assertEquals(reusedInputSet.providerId(), exportStep2.getId()); } @Test diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/Step.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/Step.java index 137b5fe8a3..38b991715d 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/Step.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/Step.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2024 HERE Europe B.V. + * Copyright (C) 2017-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,7 @@ package com.here.xyz.jobs.steps; -import static com.here.xyz.jobs.steps.Step.InputSet.USER_INPUTS; +import static com.here.xyz.jobs.steps.Step.InputSet.USER_PROVIDER; import static com.here.xyz.jobs.steps.Step.Visibility.USER; import static com.here.xyz.jobs.steps.inputs.Input.defaultBucket; import static com.here.xyz.jobs.steps.resources.Load.addLoad; @@ -98,6 +98,8 @@ public abstract class Step implements Typed, StepExecution { private List inputSets = List.of(); @JsonView({Internal.class, Static.class}) private Map outputMetadata; + @JsonView({Internal.class, Static.class}) + private boolean notReusable = false; /** * Provides a list of the resource loads which will be consumed by this step during its execution. @@ -276,15 +278,15 @@ protected List loadInputs(InputSet inputSet, Class... in * @return All inputs from the specified InputSet */ private List loadInputs(InputSet inputSet) { - if (inputSet.stepId == null && inputSet.name == null) - return Input.loadInputs(getJobId()); - else if (inputSet.stepId == null) - throw new IllegalArgumentException("Incorrect input was provided: Missing source step ID"); - else if (inputSet.name == null) - throw new IllegalArgumentException("Incorrect input was provided: Missing referenced output name"); - else { + if (inputSet.providerId == null) + throw new IllegalArgumentException("Incorrect input was provided: Missing source input provider"); + if (inputSet.name == null) + throw new IllegalArgumentException("Incorrect input was provided: Missing referenced set name"); + + if (USER_PROVIDER.equals(inputSet.providerId)) + return Input.loadInputs(getJobId(), inputSet.name); + else return loadOutputsFor(inputSet).stream().map(output -> (Input) transformToInput(output).withMetadata(inputSet.metadata())).toList(); - } } /** @@ -322,11 +324,19 @@ private static Input transformToInput(Output output) { } protected int currentInputsCount(Class inputType) { - return Input.currentInputsCount(jobId, inputType); + return getInputSets().stream() + .filter(inputSet -> USER_PROVIDER.equals(inputSet.providerId)) + .mapToInt(userInputSet -> Input.currentInputsCount(jobId, inputType, userInputSet.name)) + .sum(); } protected List loadInputsSample(int maxSampleSize, Class inputType) { - return Input.loadInputsSample(jobId, maxSampleSize, inputType); + return getInputSets().stream() + .filter(inputSet -> USER_PROVIDER.equals(inputSet.providerId)) + .flatMap(userInputSet -> Input.loadInputsSample(jobId, userInputSet.name, maxSampleSize, inputType).stream()) + .unordered() + .limit(maxSampleSize) + .toList(); } @JsonIgnore @@ -545,7 +555,7 @@ public T withPipeline(boolean pipeline) { * @return Whether this step depends on user outputs or not. */ public boolean usesUserInput() { - return inputSets.stream().anyMatch(inputSet -> inputSet.stepId == null); + return inputSets.stream().anyMatch(inputSet -> inputSet.providerId == null); } public List getOutputSets() { @@ -590,9 +600,22 @@ public T withOutputMetadata(Map metadata) { return (T) this; } + public boolean isNotReusable() { + return notReusable; + } + + public void setNotReusable(boolean notReusable) { + this.notReusable = notReusable; + } + + public T withNotReusable(boolean notReusable) { + setNotReusable(notReusable); + return (T) this; + } + @JsonIgnore protected boolean isUserInputsExpected() { - return getInputSets().stream().anyMatch(inputSet -> USER_INPUTS.get().equals(inputSet)); + return getInputSets().stream().anyMatch(inputSet -> USER_PROVIDER.equals(inputSet.providerId)); } @JsonIgnore @@ -603,23 +626,25 @@ protected boolean isUserInputsPresent(Class inputType) { /** * Use this constructor to reference the outputs of a step belonging to a different job than the one the consuming step belongs to. * @param jobId The other job's id - * @param stepId The step ID of the step producing the outputs + * @param providerId The ID of the entity that provided the inputs (e.g., a step ID or "USER") * @param name The name for the set of outputs to be produced */ - public record InputSet(String jobId, String stepId, String name, boolean modelBased, Map metadata) { + public record InputSet(String jobId, String providerId, String name, boolean modelBased, Map metadata) { + public static final String DEFAULT_INPUT_SET_NAME = "inputs"; //Depicts the input set used if no set name is defined + public static final String USER_PROVIDER = "USER"; public static final Supplier USER_INPUTS = () -> new InputSet(); - public InputSet(String jobId, String stepId, String name, boolean modelBased) { - this(jobId, stepId, name, modelBased, null); + public InputSet(String jobId, String providerId, String name, boolean modelBased) { + this(jobId, providerId, name, modelBased, null); } /** * Use this constructor to reference the outputs of a step belonging to the same job as the consuming step. - * @param stepId + * @param providerId * @param name */ - public InputSet(String stepId, String name, boolean modelBased) { - this(null, stepId, name, modelBased); + public InputSet(String providerId, String name, boolean modelBased) { + this(null, providerId, name, modelBased); } public InputSet(OutputSet outputSetOfOtherStep) { @@ -636,7 +661,7 @@ public InputSet(OutputSet outputSetOfOtherStep, Map metadata) { */ public InputSet() { //TODO: Currently only non-modelbased user inputs are supported - this(null, null, null, false); + this(null, USER_PROVIDER, DEFAULT_INPUT_SET_NAME, false); } public String toS3Path(String consumerJobId) { @@ -645,9 +670,9 @@ public String toS3Path(String consumerJobId) { public S3Uri toS3Uri(String consumerJobId) { String jobId = this.jobId != null ? this.jobId : consumerJobId; - if (stepId == null) - return Input.loadResolvedUserInputPrefixUri(jobId); - return new S3Uri(defaultBucket(), Output.stepOutputS3Prefix(jobId, stepId, name)); + if (USER_PROVIDER.equals(providerId)) + return Input.loadResolvedUserInputPrefixUri(jobId, name); + return new S3Uri(defaultBucket(), Output.stepOutputS3Prefix(jobId, providerId, name)); } } @@ -678,11 +703,11 @@ public OutputSet(String name, Visibility visibility, boolean modelBased) { this.modelBased = modelBased; } - public OutputSet(OutputSet other, String jobId, String stepId, Visibility visibility) { + public OutputSet(OutputSet other, String jobId, Visibility visibility) { this(other.name, visibility, other.fileSuffix); this.modelBased = other.modelBased; this.jobId = jobId; - this.stepId = stepId; + this.stepId = other.stepId; } public String getJobId() { diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/execution/DelegateStep.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/execution/DelegateStep.java index ad3633cada..dfa494c7a5 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/execution/DelegateStep.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/execution/DelegateStep.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2024 HERE Europe B.V. + * Copyright (C) 2017-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import static com.here.xyz.jobs.RuntimeInfo.State.SUCCEEDED; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonView; import com.here.xyz.jobs.RuntimeInfo; import com.here.xyz.jobs.steps.Step; @@ -30,6 +31,9 @@ import com.here.xyz.util.service.BaseHttpServerVerticle.ValidationException; import java.util.List; +@JsonSubTypes({ + @JsonSubTypes.Type(value = JobInternalDelegateStep.class) +}) public class DelegateStep extends Step { @JsonView({Internal.class, Static.class}) private final Step delegate; @@ -38,29 +42,38 @@ public class DelegateStep extends Step { private RuntimeInfo status = new RuntimeInfo(); //Only needed for deserialization purposes - private DelegateStep() { + protected DelegateStep() { this.delegator = null; this.delegate = null; } - public DelegateStep(Step delegate, Step delegator) { - if (delegate instanceof DelegateStep transitiveDelegate) + public DelegateStep(Step delegate, Step delegator) { + this(delegate, delegator, null); + } + + protected DelegateStep(Step delegate, Step delegator, List outputSets) { + if (delegate instanceof DelegateStep transitiveDelegate && !(delegate instanceof JobInternalDelegateStep)) delegate = unwrapDelegate(transitiveDelegate); this.delegator = delegator; this.delegate = delegate; setInputSets(delegator.getInputSets()); - setOutputMetadata(delegate.getOutputMetadata()); //TODO: Change this to delegator.getOutputMetadata()? - - //Create the delegating output-sets by copying them from the delegate step but keep the visibility of each counterpart of the compiled (new) step - outputSets = delegate.getOutputSets().stream().map(delegateOutputSet -> { - OutputSet compiledOutputSet = delegator.getOutputSets().stream().filter(outputSet -> outputSet.name.equals(delegateOutputSet.name)).findFirst().get(); - return new OutputSet(delegateOutputSet, this.delegate.getJobId(), this.delegate.getId(), compiledOutputSet.visibility); + setOutputMetadata(delegator.getOutputMetadata()); + + /* + Create the delegating output-sets by copying them from the delegate step but keep the visibility + of each counterpart of the compiled (new) step. + NOTE: Only output-sets that are present on the compiled (new) step will be copied from the old one. + The old step might contain further output-sets that won't be referenced. + */ + this.outputSets = outputSets != null ? outputSets : delegator.getOutputSets().stream().map(compiledOutputSet -> { + OutputSet delegateOutputSet = this.delegate.getOutputSets().stream().filter(outputSet -> outputSet.name.equals(compiledOutputSet.name)).findFirst().get(); + return new OutputSet(delegateOutputSet, this.delegate.getJobId(), compiledOutputSet.visibility); }).toList(); } private Step unwrapDelegate(DelegateStep delegate) { - return delegate.getDelegate() instanceof DelegateStep transitiveDelegate + return delegate.getDelegate() instanceof DelegateStep transitiveDelegate && !(delegate.getDelegate() instanceof JobInternalDelegateStep) ? unwrapDelegate(transitiveDelegate) : delegate.getDelegate(); } diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/execution/JobInternalDelegateStep.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/execution/JobInternalDelegateStep.java new file mode 100644 index 0000000000..852a6b7cde --- /dev/null +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/execution/JobInternalDelegateStep.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2017-2025 HERE Europe B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * License-Filename: LICENSE + */ + +package com.here.xyz.jobs.steps.execution; + +import com.here.xyz.jobs.steps.Step; +import java.util.List; + +public class JobInternalDelegateStep extends DelegateStep { + + //Only needed for deserialization purposes + private JobInternalDelegateStep() { + super(); + } + + public JobInternalDelegateStep(Step delegate, List outputSets) { + super(delegate, delegate, outputSets); + } +} diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/execution/RunEmrJob.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/execution/RunEmrJob.java index 984ac41812..0a596be22f 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/execution/RunEmrJob.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/execution/RunEmrJob.java @@ -53,8 +53,6 @@ import org.apache.logging.log4j.Logger; public class RunEmrJob extends LambdaBasedStep { - - public static final String USER_REF = "USER"; public static final String EMR_JOB_NAME_PREFIX = "step:"; private static final Logger logger = LogManager.getLogger(); private static final String INPUT_SET_REF_PREFIX = "${inputSet:"; @@ -477,28 +475,23 @@ public static String toInputSetReference(InputSet inputSet) { } private static String toReferenceIdentifier(InputSet inputSet) { - return inputSet.name() == null ? USER_REF : (inputSet.stepId() + "." + inputSet.name()); + return inputSet.providerId() + "." + inputSet.name(); } InputSet fromReferenceIdentifier(String referenceIdentifier) { - if (USER_REF.equals(referenceIdentifier)) - return getInputSet(null, null); - else { - ReferenceIdentifier ref = ReferenceIdentifier.fromString(referenceIdentifier); - return getInputSet(ref.stepId(), ref.name()); - } + ReferenceIdentifier ref = ReferenceIdentifier.fromString(referenceIdentifier); + return getInputSet(ref.stepId(), ref.name()); } - protected InputSet getInputSet(String stepId, String name) { + protected InputSet getInputSet(String providerId, String name) { try { return getInputSets().stream() - .filter(inputSet -> Objects.equals(inputSet.name(), name) && Objects.equals(inputSet.stepId(), stepId)) + .filter(inputSet -> Objects.equals(inputSet.name(), name) && Objects.equals(inputSet.providerId(), providerId)) .findFirst() .get(); } catch (NoSuchElementException e) { - throw new IllegalArgumentException("No input set \"" + (name == null ? "" : stepId + "." + name) + "\" exists in step \"" - + getId() + "\""); + throw new IllegalArgumentException("No input set \"" + providerId + "." + name + "\" exists in step \"" + getId() + "\""); } } diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CountSpace.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CountSpace.java index de0fd3fa32..2b5c1d4294 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CountSpace.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/CountSpace.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2024 HERE Europe B.V. + * Copyright (C) 2017-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,15 +19,12 @@ package com.here.xyz.jobs.steps.impl.transport; -import static com.here.xyz.events.ContextAwareEvent.SpaceContext.EXTENSION; +import static com.here.xyz.jobs.steps.Step.Visibility.USER; import static com.here.xyz.jobs.steps.execution.db.Database.DatabaseRole.WRITER; import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_ON_ASYNC_SUCCESS; -import static com.here.xyz.jobs.steps.impl.transport.TransportTools.infoLog; -import static com.here.xyz.jobs.steps.impl.transport.TransportTools.getTemporaryJobTableName; import static com.here.xyz.jobs.steps.impl.transport.TransportTools.buildTemporaryJobTableDropStatement; - -import static com.here.xyz.jobs.steps.Step.Visibility.SYSTEM; -import static com.here.xyz.jobs.steps.Step.Visibility.USER; +import static com.here.xyz.jobs.steps.impl.transport.TransportTools.getTemporaryJobTableName; +import static com.here.xyz.jobs.steps.impl.transport.TransportTools.infoLog; import com.fasterxml.jackson.annotation.JsonView; import com.here.xyz.events.PropertiesQuery; @@ -36,14 +33,12 @@ import com.here.xyz.jobs.steps.outputs.FeatureStatistics; import com.here.xyz.jobs.steps.resources.Load; import com.here.xyz.jobs.steps.resources.TooManyResourcesClaimed; -import com.here.xyz.models.hub.Ref; import com.here.xyz.psql.query.GetFeaturesByGeometryBuilder; import com.here.xyz.psql.query.GetFeaturesByGeometryBuilder.GetFeaturesByGeometryInput; import com.here.xyz.psql.query.QueryBuilder.QueryBuildingException; import com.here.xyz.util.db.SQLQuery; import com.here.xyz.util.service.BaseHttpServerVerticle.ValidationException; import com.here.xyz.util.web.XyzWebClient.WebClientException; - import java.sql.SQLException; import java.util.List; import org.apache.logging.log4j.LogManager; @@ -56,7 +51,7 @@ public class CountSpace extends TaskedSpaceBasedStep { private static final Logger logger = LogManager.getLogger(); - + public static final String FEATURECOUNT = "featurecount"; { setOutputSets(List.of(new OutputSet(FEATURECOUNT, USER, true))); @@ -133,14 +128,13 @@ protected void onAsyncSuccess() throws Exception { FeatureStatistics featureStatistics = new FeatureStatistics() .withFeatureCount(count) - .withByteSize(0) - .withFileCount(0); - + .withByteSize(0); + registerOutputs(List.of( featureStatistics ), FEATURECOUNT); infoLog(STEP_ON_ASYNC_SUCCESS, this, "Cleanup temporary table"); runWriteQuerySync(buildTemporaryJobTableDropStatement(schema, getTemporaryJobTableName(getId())), db(WRITER), 0); - + //super.onAsyncSuccess(); } @@ -215,7 +209,7 @@ private SQLQuery buildCountContentQuery() throws WebClientException, QueryBuildi @Override protected int setInitialThreadCount(String schema) throws WebClientException, SQLException, TooManyResourcesClaimed { - return 1; + return 1; } @Override diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ExportChangedTiles.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ExportChangedTiles.java index f55e97ea5a..5e028def26 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ExportChangedTiles.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ExportChangedTiles.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2024 HERE Europe B.V. + * Copyright (C) 2017-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -144,7 +144,6 @@ public enum QuadType { { setOutputSets(List.of( new OutputSet(STATISTICS, USER, true), - new OutputSet(INTERNAL_STATISTICS, SYSTEM, true), new OutputSet(EXPORTED_DATA, USER, false), new OutputSet(TILE_INVALIDATIONS, SYSTEM, true) )); diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ExportSpaceToFiles.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ExportSpaceToFiles.java index ed4a326bfe..427c79ed82 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ExportSpaceToFiles.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ExportSpaceToFiles.java @@ -21,10 +21,7 @@ import static com.here.xyz.events.ContextAwareEvent.SpaceContext.DEFAULT; import static com.here.xyz.events.ContextAwareEvent.SpaceContext.SUPER; -import static com.here.xyz.jobs.steps.Step.Visibility.SYSTEM; import static com.here.xyz.jobs.steps.Step.Visibility.USER; -import static com.here.xyz.jobs.steps.execution.LambdaBasedStep.ExecutionMode.ASYNC; -import static com.here.xyz.jobs.steps.execution.LambdaBasedStep.ExecutionMode.SYNC; import static com.here.xyz.jobs.steps.execution.db.Database.DatabaseRole.WRITER; import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.JOB_EXECUTOR; import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.JOB_VALIDATE; @@ -87,7 +84,6 @@ */ public class ExportSpaceToFiles extends TaskedSpaceBasedStep { public static final String STATISTICS = "statistics"; - public static final String INTERNAL_STATISTICS = "internalStatistics"; public static final String EXPORTED_DATA = "exportedData"; //Defines how large the area of a defined spatialFilter can be //If a point is defined - the maximum radius can be 17898 meters @@ -110,12 +106,6 @@ public class ExportSpaceToFiles extends TaskedSpaceBasedStep private long minI = -1; @JsonView({Internal.class, Static.class}) private long maxI = -1; - /** - * Setting this to 'true' will skip the step execution - */ - @JsonView({Internal.class, Static.class}) - private boolean passthrough; - @JsonView({Internal.class, Static.class}) protected boolean restrictExtendOfSpatialFilter = true; @@ -146,7 +136,6 @@ public ExportSpaceToFiles withContext(SpaceContext context) { { setOutputSets(List.of( new OutputSet(STATISTICS, USER, true), - new OutputSet(INTERNAL_STATISTICS, SYSTEM, true), new OutputSet(EXPORTED_DATA, USER, false) )); } @@ -177,19 +166,6 @@ public ExportSpaceToFiles withPropertyFilter(PropertiesQuery propertyFilter){ return this; } - public boolean isPassthrough() { - return passthrough; - } - - public void setPassthrough(boolean passthrough) { - this.passthrough = passthrough; - } - - public ExportSpaceToFiles withPassthrough(boolean passthrough) { - setPassthrough(passthrough); - return this; - } - /** * Determines whether this {@code ExportSpaceToFiles} step execution is equivalent to another step execution. * @@ -231,17 +207,6 @@ public boolean isEquivalentTo(StepExecution other) { } } - @Override - public ExecutionMode getExecutionMode() { - return passthrough ? SYNC : ASYNC; - } - - @Override - public void execute(boolean resume) throws Exception { - if(passthrough) return; - super.execute(resume); - } - @Override public List getNeededResources() { try { @@ -352,17 +317,15 @@ public boolean validate() throws ValidationException { protected void onAsyncSuccess() throws Exception { String schema = getSchema(db()); - Statistics statistics = runReadQuerySync(retrieveStatisticFromTaskAndStatisticTable(schema), db(WRITER), + TransportStatistics stepStatistics = runReadQuerySync(retrieveStatisticFromTaskAndStatisticTable(schema), db(WRITER), 0, rs -> rs.next() - ? createStatistics(rs.getLong("rows_uploaded"), rs.getLong("bytes_uploaded"), - rs.getInt("files_uploaded")) - : createStatistics(0, 0, 0)); + ? new TransportStatistics(rs.getLong("rows_uploaded"), rs.getLong("bytes_uploaded"), rs.getInt("files_uploaded")) + : new TransportStatistics(0, 0, 0)); - infoLog(STEP_ON_ASYNC_SUCCESS, this, "Job Statistics: bytes=" + statistics.published().getByteSize() - + " files=" + statistics.internal().getFileCount()); + infoLog(STEP_ON_ASYNC_SUCCESS, this, "Job Statistics: bytes=" + stepStatistics.byteSize + " files=" + stepStatistics.fileCount); - registerOutputs(List.of(statistics.published()), STATISTICS); - registerOutputs(List.of(statistics.internal()), INTERNAL_STATISTICS); + registerOutputs(List.of(new FeatureStatistics().withFeatureCount(stepStatistics.rowCount).withByteSize(stepStatistics.byteSize)), + STATISTICS); infoLog(STEP_ON_ASYNC_SUCCESS, this, "Cleanup temporary table"); runWriteQuerySync(buildTemporaryJobTableDropStatement(schema, getTemporaryJobTableName(getId())), db(WRITER), 0); @@ -431,14 +394,6 @@ protected SQLQuery buildTaskQuery(String schema, Integer taskId, TaskData taskDa return buildExportToS3PluginQuery(schema, taskId, generateContentQueryForExportPlugin(taskData)); } - private static Statistics createStatistics(long featureCount, long byteSize, int fileCount) { - return new Statistics( - //NOTE: Do not publish the file count for the user facing statistics, as it could be confusing when it comes to invisible intermediate outputs - new FeatureStatistics().withByteSize(byteSize).withFeatureCount(featureCount), - new FeatureStatistics().withFileCount(fileCount) - ); - } - protected GetFeaturesByGeometryInput createGetFeaturesByGeometryInput(SpaceContext context, SpatialFilter spatialFilter, Ref versionRef) throws WebClientException { Space space = context == SUPER ? superSpace() : space(); @@ -509,6 +464,7 @@ private long loadMaxI() { return maxI; } + /** * Generates a content query for the export plugin based on the task data and context. This method * can get overridden easily from other ExportProcesses. @@ -547,5 +503,5 @@ protected String generateContentQueryForExportPlugin(TaskData taskData) throws W .toExecutableQueryString(); } - private record Statistics(FeatureStatistics published, FeatureStatistics internal) {} + private record TransportStatistics(long rowCount, long byteSize, int fileCount) {} } diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ImportFilesToSpace.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ImportFilesToSpace.java index 69f80e2a45..fdb3ccb755 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ImportFilesToSpace.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ImportFilesToSpace.java @@ -48,6 +48,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.here.xyz.events.UpdateStrategy; import com.here.xyz.jobs.steps.S3DataFile; +import com.here.xyz.jobs.steps.execution.StepException; import com.here.xyz.jobs.steps.impl.SpaceBasedStep; import com.here.xyz.jobs.steps.impl.tools.ResourceAndTimeCalculator; import com.here.xyz.jobs.steps.impl.transport.tools.ImportFilesQuickValidator; @@ -351,12 +352,12 @@ private void syncExecution() throws WebClientException, SQLException, TooManyRes List> resultFutures = new ArrayList<>(); //Execute the sync for each import file in parallel - for (Input input : loadInputs(UploadUrl.class)) { - resultFutures.add(exec.submit(() -> { - long writtenFeatureCount = syncWriteFileToSpace((UploadUrl) input, newVersion);return new FeatureStatistics().withFeatureCount(writtenFeatureCount).withByteSize(input.getByteSize()); - })); - } + for (Input input : loadInputs(UploadUrl.class)) + resultFutures.add(exec.submit(() -> new FeatureStatistics() + .withFeatureCount(syncWriteFileToSpace((UploadUrl) input, newVersion)) + .withByteSize(input.getByteSize()))); + //TODO: Use CompletableFuture.allOf() instead of the following //Wait for the futures and aggregate the result statistics into one FeatureStatistics object FeatureStatistics resultOutput = new FeatureStatistics(); for (Future future : resultFutures) { @@ -366,7 +367,7 @@ private void syncExecution() throws WebClientException, SQLException, TooManyRes .withByteSize(resultOutput.getByteSize() + future.get().getByteSize()); } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); + throw new StepException("Error during sync write to target space", e); } } @@ -554,10 +555,11 @@ private SQLQuery buildCreateImportTriggerForEmptyLayer(String targetAuthor, long triggerFunction += entityPerLine == FeatureCollection ? "_geojsonfc" : ""; return new SQLQuery("CREATE OR REPLACE TRIGGER insertTrigger BEFORE INSERT ON ${schema}.${table} " - + "FOR EACH ROW EXECUTE PROCEDURE ${triggerFunction}('${{author}}', ${{spaceVersion}}, '${{targetTable}}');") + + "FOR EACH ROW EXECUTE PROCEDURE ${triggerFunction}('${{author}}', ${{spaceVersion}}, '${{targetTable}}', ${{retainMetadata}});") .withQueryFragment("spaceVersion", "" + targetSpaceVersion) .withQueryFragment("author", targetAuthor) .withQueryFragment("targetTable", getRootTableName(space())) + .withQueryFragment("retainMetadata", "" + isRetainMetadata()) .withVariable("triggerFunction", triggerFunction) .withVariable("schema", getSchema(db())) .withVariable("table", TransportTools.getTemporaryTriggerTableName(getId())); diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/inputs/Input.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/inputs/Input.java index 6a7e530d42..9605c5bb02 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/inputs/Input.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/inputs/Input.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2024 HERE Europe B.V. + * Copyright (C) 2017-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,6 +42,7 @@ import java.util.Optional; import java.util.Set; import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.stream.Collectors; @@ -62,16 +63,24 @@ public abstract class Input extends StepPayload { private String s3Bucket; @JsonIgnore private String s3Key; - private static Map metadataCache = new WeakHashMap<>(); - private static Map> inputsCache = new WeakHashMap<>(); //TODO: Expire keys after <24h + private static Map> metadataCache = new WeakHashMap<>(); + private static Map>> inputsCache = new WeakHashMap<>(); //TODO: Expire keys after <24h private static Set inputsCacheActive = new HashSet<>(); public static String inputS3Prefix(String jobId) { return jobId + "/inputs"; } - private static String inputMetaS3Key(String jobId) { - return jobId + "/meta/inputs.json"; + public static String inputS3Prefix(String jobId, String setName) { + return jobId + "/inputs/" + setName; + } + + private static String inputMetaS3Prefix(String jobId) { + return jobId + "/meta"; + } + + private static String inputMetaS3Key(String jobId, String setName) { + return inputMetaS3Prefix(jobId) + "/" + setName + ".json"; } public static String defaultBucket() { @@ -106,22 +115,48 @@ public T withS3Key(String s3Key) { return (T) this; } - public static List loadInputs(String jobId) { + public static List loadInputs(String jobId, String setName) { //Only cache inputs of jobs which are submitted already if (inputsCacheActive.contains(jobId)) { - List inputs = inputsCache.get(jobId); + List inputs = getFromInputCache(jobId, setName); if (inputs == null) { - inputs = loadInputsAndWriteMetadata(jobId, -1, Input.class); - inputsCache.put(jobId, inputs); + inputs = loadInputsAndWriteMetadata(jobId, setName, -1, Input.class); + putToInputCache(jobId, setName, inputs); } return inputs; } - return loadInputsAndWriteMetadata(jobId, -1, Input.class); + return loadInputsAndWriteMetadata(jobId, setName, -1, Input.class); + } + + private synchronized static void putToInputCache(String jobId, String setName, List inputs) { + Map> cachedInputs = inputsCache.get(jobId); + if (cachedInputs == null) + cachedInputs = new ConcurrentHashMap<>(); + cachedInputs.put(setName, inputs); + inputsCache.put(jobId, cachedInputs); + } + + private static List getFromInputCache(String jobId, String setName) { + Map> inputs = inputsCache.get(jobId); + return inputs == null ? null: inputs.get(setName); + } + + private synchronized static void putToMetadataCache(String jobId, String setName, InputsMetadata metadata) { + Map cachedMetadata = metadataCache.get(jobId); + if (cachedMetadata == null) + cachedMetadata = new ConcurrentHashMap<>(); + cachedMetadata.put(setName, metadata); + metadataCache.put(jobId, cachedMetadata); + } + + private static InputsMetadata getFromMetadataCache(String jobId, String setName) { + Map metadata = metadataCache.get(jobId); + return metadata == null ? null : metadata.get(setName); } - private static List loadInputsAndWriteMetadata(String jobId, int maxReturnSize, Class inputType) { + private static List loadInputsAndWriteMetadata(String jobId, String setName, int maxReturnSize, Class inputType) { try { - InputsMetadata metadata = loadMetadata(jobId); + InputsMetadata metadata = loadMetadata(jobId, setName); Stream inputs = metadata.inputs.entrySet().stream() .filter(input -> input.getValue().byteSize > 0) .map(metaEntry -> { @@ -139,51 +174,65 @@ private static List loadInputsAndWriteMetadata(String jobId } catch (IOException | AmazonS3Exception ignore) {} - final List inputs = loadInputsInParallel(defaultBucket(), inputS3Prefix(jobId), maxReturnSize, inputType); + final List inputs = loadInputsInParallel(defaultBucket(), inputS3Prefix(jobId, setName), maxReturnSize, inputType); //Only write metadata of jobs which are submitted already if (inputs != null && inputs.size() > 0 && inputsCacheActive.contains(jobId)) - storeMetadata(jobId, (List) inputs); + storeMetadata(jobId, (List) inputs, setName); return inputs; } - public static final S3Uri loadResolvedUserInputPrefixUri(String jobId) { - Optional userInputsMetadata = loadMetadataIfExists(jobId); + public static final S3Uri loadResolvedUserInputPrefixUri(String jobId, String setName) { + Optional userInputsMetadata = loadMetadataIfExists(jobId, setName); if (userInputsMetadata.isPresent()) return userInputsMetadata.get().scannedFrom; - return new S3Uri(defaultBucket(), inputS3Prefix(jobId)); + return new S3Uri(defaultBucket(), inputS3Prefix(jobId, setName)); + } + + static List loadAllInputSetNames(String jobId) { + return S3Client.getInstance().scanFolder(inputMetaS3Prefix(jobId)).stream() + .map(s3ObjectSummary -> s3ObjectSummary.getKey().substring(0, s3ObjectSummary.getKey().lastIndexOf(".json"))) + .toList(); } - static final Optional loadMetadataIfExists(String jobId) { + private static Optional loadMetadataIfExists(String jobId, String setName) { try { - return Optional.of(loadMetadata(jobId)); + return Optional.of(loadMetadata(jobId, setName)); } catch (IOException | AmazonS3Exception e) { return Optional.empty(); } } - static final InputsMetadata loadMetadata(String jobId) throws IOException, AmazonS3Exception { - InputsMetadata metadata = metadataCache.get(jobId); + static final InputsMetadata loadMetadata(String jobId, String setName) throws IOException, AmazonS3Exception { + InputsMetadata metadata = getFromMetadataCache(jobId, setName); if (metadata != null) return metadata; logger.info("Loading metadata from S3 for job {} ...", jobId); long t1 = Core.currentTimeMillis(); - metadata = XyzSerializable.deserialize(S3Client.getInstance().loadObjectContent(inputMetaS3Key(jobId)), + metadata = XyzSerializable.deserialize(S3Client.getInstance().loadObjectContent(inputMetaS3Key(jobId, setName)), InputsMetadata.class); logger.info("Loaded metadata for job {}. Took {}ms ...", jobId, Core.currentTimeMillis() - t1); if (inputsCacheActive.contains(jobId)) - metadataCache.put(jobId, metadata); + putToMetadataCache(jobId, setName, metadata); return metadata; } - static final void storeMetadata(String jobId, InputsMetadata metadata) { + static final void addInputReferences(String referencedJobId, String referencingJobId, String setName) throws IOException, + AmazonS3Exception { + InputsMetadata referencedMetadata = loadMetadata(referencedJobId, setName); + //Add the referencing job to the list of jobs referencing the metadata + referencedMetadata.referencingJobs().add(referencingJobId); + storeMetadata(referencedJobId, referencedMetadata, setName); + } + + static final void storeMetadata(String jobId, InputsMetadata metadata, String setName) { try { if (inputsCacheActive.contains(jobId)) - metadataCache.put(jobId, metadata); - S3Client.getInstance().putObject(inputMetaS3Key(jobId), "application/json", metadata.serialize()); + putToMetadataCache(jobId, setName, metadata); + S3Client.getInstance().putObject(inputMetaS3Key(jobId, setName), "application/json", metadata.serialize()); } catch (IOException e) { logger.error("Error writing inputs metadata file for job {}.", jobId, e); @@ -191,20 +240,20 @@ static final void storeMetadata(String jobId, InputsMetadata metadata) { } } - private static void storeMetadata(String jobId, List inputs) { - storeMetadata(jobId, inputs, null); + private static void storeMetadata(String jobId, List inputs, String setName) { + storeMetadata(jobId, inputs, null, setName); } - static final void storeMetadata(String jobId, List inputs, String referencedJobId) { - storeMetadata(jobId, inputs, referencedJobId, new S3Uri(defaultBucket(), inputS3Prefix(jobId))); + static final void storeMetadata(String jobId, List inputs, String referencedJobId, String setName) { + storeMetadata(jobId, inputs, referencedJobId, new S3Uri(defaultBucket(), inputS3Prefix(jobId, setName)), setName); } - static final void storeMetadata(String jobId, List inputs, String referencedJobId, S3Uri scannedFrom) { + static final void storeMetadata(String jobId, List inputs, String referencedJobId, S3Uri scannedFrom, String setName) { logger.info("Storing inputs metadata for job {} ...", jobId); Map metadata = inputs.stream() .collect(Collectors.toMap(input -> (input.s3Bucket == null ? "" : "s3://" + input.s3Bucket + "/") + input.s3Key, input -> new InputMetadata(input.byteSize, input.compressed))); - storeMetadata(jobId, new InputsMetadata(metadata, new HashSet<>(Set.of(jobId)), referencedJobId, scannedFrom)); + storeMetadata(jobId, new InputsMetadata(metadata, new HashSet<>(Set.of(jobId)), referencedJobId, scannedFrom), setName); } static final List loadInputsInParallel(String bucketName, String inputS3Prefix) { @@ -233,12 +282,12 @@ static final List loadInputsInParallel(String bucketName, S return inputs; } - public static int currentInputsCount(String jobId, Class inputType) { - return (int) loadInputs(jobId).stream().filter(input -> inputType.isAssignableFrom(input.getClass())).count(); + public static int currentInputsCount(String jobId, Class inputType, String setName) { + return (int) loadInputs(jobId, setName).stream().filter(input -> inputType.isAssignableFrom(input.getClass())).count(); } - public static List loadInputsSample(String jobId, int maxSampleSize, Class inputType) { - return loadInputsAndWriteMetadata(jobId, maxSampleSize, inputType); + public static List loadInputsSample(String jobId, String setName, int maxSampleSize, Class inputType) { + return loadInputsAndWriteMetadata(jobId, setName, maxSampleSize, inputType); } private static List loadAndTransformInputs(String bucketName, String inputS3Prefix, int maxReturnSize, Class inputType) { @@ -266,9 +315,14 @@ public static void deleteInputs(String jobId) { } private static void deleteInputs(String owningJobId, String referencingJob) { + //TODO: Parallelize + loadAllInputSetNames(owningJobId).forEach(setName -> deleteInputs(owningJobId, referencingJob, setName)); + } + + private static void deleteInputs(String owningJobId, String referencingJob, String setName) { InputsMetadata metadata = null; try { - metadata = loadMetadata(owningJobId); + metadata = loadMetadata(owningJobId, setName); metadata.referencingJobs().remove(referencingJob); } catch (AmazonS3Exception | IOException ignore) {} @@ -282,10 +336,10 @@ private static void deleteInputs(String owningJobId, String referencingJob) { */ deleteInputs(metadata.referencedJob(), owningJobId); - S3Client.getInstance().deleteFolder(inputS3Prefix(owningJobId)); + S3Client.getInstance().deleteFolder(inputS3Prefix(owningJobId, setName)); } else if (metadata != null) - storeMetadata(owningJobId, metadata); + storeMetadata(owningJobId, metadata, setName); } private static Input createInput(String s3Bucket, String s3Key, long byteSize, boolean compressed) { diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/inputs/InputsFromJob.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/inputs/InputsFromJob.java index 87c6e71194..79d3cefce6 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/inputs/InputsFromJob.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/inputs/InputsFromJob.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2024 HERE Europe B.V. + * Copyright (C) 2017-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ package com.here.xyz.jobs.steps.inputs; +import com.amazonaws.services.s3.model.AmazonS3Exception; import java.io.IOException; import java.util.List; @@ -48,18 +49,22 @@ public InputsFromJob withJobId(String jobId) { * @param referencingJobId The job that owns this delegator object * @throws IOException when the metadata for the referenced job could not be updated */ - public void dereference(String referencingJobId) throws IOException { - //First load the inputs of the other job to ensure the other job's metadata actually have been written - List inputs = Input.loadInputs(getJobId()); - updateInputMetaReferences(referencingJobId); - //Store the metadata of the job that references the other job's metadata - storeMetadata(referencingJobId, inputs, getJobId()); - } + public void dereference(String referencingJobId) { + String referencedJobId = getJobId(); - private void updateInputMetaReferences(String referencingJobId) throws IOException { - InputsMetadata referencedMetadata = loadMetadata(getJobId()); - //Add the referencing job to the list of jobs referencing the metadata - referencedMetadata.referencingJobs().add(referencingJobId); - storeMetadata(getJobId(), referencedMetadata); + //TODO: Parallelize + loadAllInputSetNames(referencedJobId).forEach(setName -> { + try { + //First load the inputs of the other job to ensure the other job's metadata actually have been written + List inputs = Input.loadInputs(referencedJobId, setName); + //Update the other job's metadata to be referenced by + Input.addInputReferences(referencedJobId, referencingJobId, setName); + //Store the metadata of the job that references the other job's metadata + storeMetadata(referencingJobId, inputs, getJobId(), setName); + } + catch (IOException | AmazonS3Exception e) { + throw new RuntimeException("Error dereferencing inputs of job " + referencedJobId, e); + } + }); } } diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/inputs/InputsFromS3.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/inputs/InputsFromS3.java index ffc9f56eee..67f8748617 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/inputs/InputsFromS3.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/inputs/InputsFromS3.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2024 HERE Europe B.V. + * Copyright (C) 2017-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,11 +54,11 @@ public InputsFromS3 withBucket(String bucket) { return this; } - public void dereference(String forJob) { + public void dereference(String forJob, String setName) { //First load the inputs from the (foreign) bucket List inputs = loadInputsInParallel(getBucket(), getPrefix()); inputs.forEach(input -> input.setS3Bucket(getBucket())); //Store the metadata for the job that accesses the bucket - storeMetadata(forJob, inputs, null, new S3Uri(getBucket(), getPrefix())); + storeMetadata(forJob, inputs, null, new S3Uri(getBucket(), getPrefix()), setName); } } diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/outputs/FeatureStatistics.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/outputs/FeatureStatistics.java index 94a56f997b..bbeac87908 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/outputs/FeatureStatistics.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/outputs/FeatureStatistics.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2024 HERE Europe B.V. + * Copyright (C) 2017-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,7 +33,6 @@ public class FeatureStatistics extends ModelBasedOutput { private long featureCount; @JsonInclude(ALWAYS) private long byteSize; - private int fileCount; public long getFeatureCount() { return featureCount; @@ -60,17 +59,4 @@ public FeatureStatistics withByteSize(long byteSize) { setByteSize(byteSize); return this; } - - public int getFileCount() { - return fileCount; - } - - public void setFileCount(int fileCount) { - this.fileCount = fileCount; - } - - public FeatureStatistics withFileCount(int fileCount) { - setFileCount(fileCount); - return this; - } } diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/util/test/StepTestBase.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/util/test/StepTestBase.java index 06acfc2fe5..7072bf160e 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/util/test/StepTestBase.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/util/test/StepTestBase.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2024 HERE Europe B.V. + * Copyright (C) 2017-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ package com.here.xyz.jobs.util.test; import static com.google.common.net.HttpHeaders.CONTENT_TYPE; +import static com.here.xyz.jobs.steps.Step.InputSet.DEFAULT_INPUT_SET_NAME; import static com.here.xyz.jobs.steps.execution.LambdaBasedStep.LambdaStepRequest.RequestType.START_EXECUTION; import static com.here.xyz.jobs.steps.execution.LambdaBasedStep.LambdaStepRequest.RequestType.SUCCESS_CALLBACK; import static com.here.xyz.jobs.steps.impl.transport.TransportTools.getTemporaryJobTableName; @@ -84,8 +85,6 @@ import java.util.zip.ZipInputStream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.checkerframework.checker.units.qual.C; - import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.SdkBytes; @@ -345,7 +344,7 @@ protected void sendLambdaStepRequestBlock(LambdaBasedStep step, boolean simulate DataSourceProvider dsp = getDataSourceProvider(); - if( step instanceof ExportSpaceToFiles + if( step instanceof ExportSpaceToFiles || step instanceof CountSpace ){ waitTillTaskItemsAreFinalized(step); }else{ @@ -428,7 +427,7 @@ public void uploadFiles(String jobId, int uploadFileCount, int featureCountPerFi } public void uploadInputFile(String jobId, byte[] bytes, S3ContentType contentType) throws IOException { - uploadFileToS3(inputS3Prefix(jobId) + "/" + UUID.randomUUID(), contentType, bytes, false); + uploadFileToS3(inputS3Prefix(jobId, DEFAULT_INPUT_SET_NAME) + "/" + UUID.randomUUID(), contentType, bytes, false); } protected void uploadFileToS3(String s3Key, S3ContentType contentType, byte[] data, boolean gzip) throws IOException { diff --git a/xyz-jobs/xyz-job-steps/src/main/resources/jobs/transport.sql b/xyz-jobs/xyz-job-steps/src/main/resources/jobs/transport.sql index bc18251a39..ef22df6117 100644 --- a/xyz-jobs/xyz-job-steps/src/main/resources/jobs/transport.sql +++ b/xyz-jobs/xyz-job-steps/src/main/resources/jobs/transport.sql @@ -365,11 +365,12 @@ DECLARE author TEXT := TG_ARGV[0]; curVersion BIGINT := TG_ARGV[1]; target_table TEXT := TG_ARGV[2]; + retain_meta BOOLEAN := TG_ARGV[3]::BOOLEAN; feature RECORD; updated_rows INT; BEGIN SELECT new_jsondata, new_geo, new_operation, new_id - from import_from_s3_enrich_feature(NEW.jsondata::JSONB, NEW.geo) + from import_from_s3_enrich_feature(NEW.jsondata::JSONB, NEW.geo, retain_meta) INTO feature; EXECUTE format('INSERT INTO "%1$s"."%2$s" (id, version, operation, author, jsondata, geo) @@ -395,6 +396,7 @@ DECLARE author TEXT := TG_ARGV[0]; curVersion BIGINT := TG_ARGV[1]; target_table TEXT := TG_ARGV[2]; + retain_meta BOOLEAN := TG_ARGV[3]::BOOLEAN; elem JSONB; feature RECORD; updated_rows INT; @@ -409,7 +411,7 @@ BEGIN END IF; SELECT new_jsondata, new_geo, new_operation, new_id - from import_from_s3_enrich_feature(elem, null) + from import_from_s3_enrich_feature(elem, null, retain_meta) INTO feature; EXECUTE format('INSERT INTO "%1$s"."%2$s" (id, version, operation, author, jsondata, geo) @@ -565,7 +567,7 @@ $BODY$; /** * Enriches Feature - uses in plain trigger function */ -CREATE OR REPLACE FUNCTION import_from_s3_enrich_feature(IN jsondata JSONB, geo geometry(GeometryZ,4326)) +CREATE OR REPLACE FUNCTION import_from_s3_enrich_feature(IN jsondata JSONB, geo geometry(GeometryZ,4326), retain_meta BOOLEAN DEFAULT FALSE) RETURNS TABLE(new_jsondata JSONB, new_geo geometry(GeometryZ,4326), new_operation character, new_id TEXT) AS $BODY$ DECLARE @@ -591,6 +593,10 @@ BEGIN jsondata := jsonb_set(jsondata, '{type}', '"Feature"'); -- Inject meta + IF retain_meta THEN + meta := coalesce(jsonb_set(meta, '{createdAt}', (jsondata->'properties'->'@ns:com:here:xyz'->>'createdAt')::JSONB), meta); + meta := coalesce(jsonb_set(meta, '{updatedAt}', (jsondata->'properties'->'@ns:com:here:xyz'->>'updatedAt')::JSONB), meta); + END IF; jsondata := jsonb_set(jsondata, '{properties,@ns:com:here:xyz}', meta); IF jsondata->'geometry' IS NOT NULL THEN diff --git a/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/export/ExportStepValidationTest.java b/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/export/ExportStepValidationTest.java index f9dfc9d098..a753e6ee0d 100644 --- a/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/export/ExportStepValidationTest.java +++ b/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/export/ExportStepValidationTest.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2024 HERE Europe B.V. + * Copyright (C) 2017-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -147,12 +147,6 @@ else if (output instanceof FeatureStatistics statistics) Assertions.assertEquals(expectedFeatures.getFeatures().size(), statistics.getFeatureCount()); } - for (Output output : systemOutputs) { - //if we have one Feature - we expect at least one file - if (output instanceof FeatureStatistics statistics && expectedFeatures.getFeatures().size() > 1) - Assertions.assertTrue(statistics.getFileCount() > 0); - } - List existingFeaturesIdList = expectedFeatures.getFeatures().stream().map(Feature::getId).collect(Collectors.toList()); List exportedFeaturesFeaturesIdList = exportedFeatures.stream().map(Feature::getId).collect(Collectors.toList()); diff --git a/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/export/ExportTestBase.java b/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/export/ExportTestBase.java index 169db55d59..61546bbc24 100644 --- a/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/export/ExportTestBase.java +++ b/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/export/ExportTestBase.java @@ -1,3 +1,22 @@ +/* + * Copyright (C) 2017-2025 HERE Europe B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * License-Filename: LICENSE + */ + package com.here.xyz.jobs.steps.impl.export; import static com.here.xyz.jobs.steps.Step.Visibility.SYSTEM; @@ -19,7 +38,6 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.Assertions; @@ -57,7 +75,6 @@ protected void executeExportStepAndCheckResults(String spaceId, ContextAwareEven protected void checkOutputs(FeatureCollection expectedFeatures, List userOutputs, List systemOutputs) throws IOException { Assertions.assertNotEquals(0, userOutputs.size()); - Assertions.assertNotEquals(0, systemOutputs.size()); List exportedFeatures = new ArrayList<>(); @@ -69,13 +86,6 @@ else if (output instanceof FeatureStatistics statistics) Assertions.assertEquals(expectedFeatures.getFeatures().size(), statistics.getFeatureCount()); } - //TODO: FeatureStatistics could get only checked if we also support during simulation "UPDATE_CALLBACK" - for (Output output : systemOutputs) { - //if we have one Feature - we expect at least one file - if (output instanceof FeatureStatistics statistics && expectedFeatures.getFeatures().size() > 1) - Assertions.assertTrue(statistics.getFileCount() > 0); - } - List existingFeaturesIdList = expectedFeatures.getFeatures().stream().map(Feature::getId).collect(Collectors.toList()); List exportedFeaturesFeaturesIdList = exportedFeatures.stream().map(Feature::getId).collect(Collectors.toList()); diff --git a/xyz-util/src/main/java/com/here/xyz/util/db/pg/Script.java b/xyz-util/src/main/java/com/here/xyz/util/db/pg/Script.java index c40a148cfb..fd96ba1577 100644 --- a/xyz-util/src/main/java/com/here/xyz/util/db/pg/Script.java +++ b/xyz-util/src/main/java/com/here/xyz/util/db/pg/Script.java @@ -222,7 +222,7 @@ private SQLQuery buildCreateSchemaQuery(String schemaName) { } private SQLQuery buildDeleteSchemaQuery(String schemaName) throws SQLException { - return new SQLQuery("DROP SCHEMA IF EXISTS ${schema}") //TODO: Re-add "CASCADE" into query, once PG bug was fixed + return new SQLQuery("DROP SCHEMA IF EXISTS ${schema} CASCADE") .withVariable("schema", schemaName); } @@ -383,7 +383,7 @@ private static List buildDeleteFunctionQueries(List functionSi private List loadSchemaFunctions(String schema) throws SQLException { return new SQLQuery(""" SELECT proc.oid::REGPROCEDURE as signature FROM pg_proc proc LEFT JOIN pg_namespace ns ON proc.pronamespace = ns.oid - WHERE ns.nspname = #{schema} + WHERE ns.nspname = #{schema} AND proc.prokind = 'f' """) .withNamedParameter("schema", schema) .run(dataSourceProvider, rs -> {