diff --git a/xyz-hub-test/src/test/java/com/here/xyz/hub/rest/LimitsTestIT.java b/xyz-hub-test/src/test/java/com/here/xyz/hub/rest/LimitsTestIT.java index d4c39c6272..ab0be64645 100644 --- a/xyz-hub-test/src/test/java/com/here/xyz/hub/rest/LimitsTestIT.java +++ b/xyz-hub-test/src/test/java/com/here/xyz/hub/rest/LimitsTestIT.java @@ -26,7 +26,6 @@ import static io.restassured.RestAssured.given; import static org.hamcrest.Matchers.equalTo; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace; import io.restassured.response.ValidatableResponse; import org.apache.commons.lang3.RandomStringUtils; import org.junit.After; diff --git a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/JobPlayground.java b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/JobPlayground.java index e27d1359d5..57444dfa31 100644 --- a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/JobPlayground.java +++ b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/JobPlayground.java @@ -51,7 +51,7 @@ import com.here.xyz.jobs.steps.impl.maintenance.MarkForMaintenance; import com.here.xyz.jobs.steps.impl.transport.CopySpace; import com.here.xyz.jobs.steps.impl.transport.ExportSpaceToFiles; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace; +import com.here.xyz.jobs.steps.impl.transport.TaskedImportFilesToSpace; import com.here.xyz.jobs.steps.inputs.Input; import com.here.xyz.jobs.steps.outputs.Output; import com.here.xyz.models.geojson.coordinates.PointCoordinates; @@ -117,7 +117,6 @@ public class JobPlayground { private static Space targetSpace; private static boolean simulateExecution = true; private static boolean executeWholeJob = false; - private static ImportFilesToSpace.Format importFormat = ImportFilesToSpace.Format.GEOJSON; private static int uploadFileCount = 2; private static String jobServiceBaseUrl = "http://localhost:7070"; @@ -235,7 +234,7 @@ private static void startLambdaExecutions() throws IOException, WebClientExcepti runDropIndexStep(sampleSpace.getId()); - runImportFilesToSpaceStep(sampleSpace.getId(), importFormat); + runImportFilesToSpaceStep(sampleSpace.getId()); for (SystemIndex index : SystemIndex.values()) runCreateIndexStep(sampleSpace.getId(), index); @@ -253,7 +252,7 @@ else if (playgroundUsecase == EXPORT) private static void uploadFiles() throws IOException { //Generate N Files with M features for (int i = 0; i < uploadFileCount; i++) - uploadInputFile(generateContent(importFormat, 10)); + uploadInputFile(generateContent( 10)); } private static void uploadFilesToRealJob(String jobId) throws IOException, InterruptedException { @@ -261,7 +260,7 @@ private static void uploadFilesToRealJob(String jobId) throws IOException, Inter for (int i = 0; i < uploadFileCount; i++) { HttpResponse inputResponse = post("/jobs/" + jobId + "/inputs", Map.of("type", "UploadUrl")); String uploadUrl = (String) XyzSerializable.deserialize(inputResponse.body(), Map.class).get("url"); - uploadInputFile(generateContent(importFormat, 10), new URL(uploadUrl)); + uploadInputFile(generateContent( 10), new URL(uploadUrl)); } } @@ -291,16 +290,16 @@ private static void pollRealJobStatus(String jobId) throws InterruptedException } } - private static byte[] generateContent(ImportFilesToSpace.Format format, int featureCnt) { + private static byte[] generateContent(int featureCnt) { String output = ""; for (int i = 1; i <= featureCnt; i++) { - output += generateContentLine(format, i); + output += generateContentLine(i); } return output.getBytes(); } - private static void generateContentToFile(ImportFilesToSpace.Format format, int featureCnt, boolean beZipped) throws IOException { + private static void generateContentToFile(int featureCnt, boolean beZipped) throws IOException { String outputFile = "/tmp/output.file" + (beZipped ? ".gz" : ""); BufferedWriter writer = null; @@ -315,7 +314,7 @@ private static void generateContentToFile(ImportFilesToSpace.Format format, int new OutputStreamWriter(zip, "UTF-8")); } for (int i = 1; i <= featureCnt; i++) { - writer.write(generateContentLine(format, i)); + writer.write(generateContentLine(i)); } }finally { if (writer != null) @@ -323,16 +322,10 @@ private static void generateContentToFile(ImportFilesToSpace.Format format, int } } - private static String generateContentLine(ImportFilesToSpace.Format format, int i){ + private static String generateContentLine(int i){ Random rd = new Random(); String lineSeparator = "\n"; - - if(format.equals(ImportFilesToSpace.Format.CSV_JSON_WKB)) - return "\"{'\"properties'\": {'\"test'\": "+i+"}}\",01010000A0E61000007DAD4B8DD0AF07C0BD19355F25B74A400000000000000000"+lineSeparator; - else if(format.equals(ImportFilesToSpace.Format.CSV_GEOJSON)) - return "\"{'\"type'\":'\"Feature'\",'\"geometry'\":{'\"type'\":'\"Point'\",'\"coordinates'\":["+(rd.nextInt(179))+"."+(rd.nextInt(100))+","+(rd.nextInt(79))+"."+(rd.nextInt(100))+"]},'\"properties'\":{'\"test'\":"+i+"}}\""+lineSeparator; - else - return "{\"type\":\"Feature\",\"geometry\":{\"type\":\"Point\",\"coordinates\":["+(rd.nextInt(179))+"."+(rd.nextInt(100))+","+(rd.nextInt(79))+"."+(rd.nextInt(100))+"]},\"properties\":{\"te\\\"st\":"+i+"}}"+lineSeparator; + return "{\"type\":\"Feature\",\"geometry\":{\"type\":\"Point\",\"coordinates\":["+(rd.nextInt(179))+"."+(rd.nextInt(100))+","+(rd.nextInt(79))+"."+(rd.nextInt(100))+"]},\"properties\":{\"te\\\"st\":"+i+"}}"+lineSeparator; } private static void startMockJob() { @@ -479,8 +472,8 @@ public static void runDropIndexStep(String spaceId) throws IOException { runStep(new DropIndexes().withSpaceId(spaceId)); } - public static void runImportFilesToSpaceStep(String spaceId, ImportFilesToSpace.Format format) throws IOException { - runStep(new ImportFilesToSpace().withSpaceId(spaceId).withFormat(format).withUpdateStrategy(UpdateStrategy.DEFAULT_UPDATE_STRATEGY)); + public static void runImportFilesToSpaceStep(String spaceId) throws IOException { + runStep(new TaskedImportFilesToSpace().withSpaceId(spaceId).withUpdateStrategy(UpdateStrategy.DEFAULT_UPDATE_STRATEGY)); } public static void runCreateIndexStep(String spaceId, SystemIndex index) throws IOException { diff --git a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/compiler/ImportFromFiles.java b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/compiler/ImportFromFiles.java index 3180ff77cf..28291a1328 100644 --- a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/compiler/ImportFromFiles.java +++ b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/steps/compiler/ImportFromFiles.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2025 HERE Europe B.V. + * Copyright (C) 2017-2026 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,14 +19,6 @@ package com.here.xyz.jobs.steps.compiler; -import static com.here.xyz.jobs.steps.Step.InputSet.USER_INPUTS; -import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format.CSV_GEOJSON; -import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format.CSV_JSON_WKB; -import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format.GEOJSON; -import static com.here.xyz.util.db.pg.IndexHelper.SystemIndex.NEXT_VERSION; -import static com.here.xyz.util.db.pg.IndexHelper.SystemIndex.OPERATION; -import static com.here.xyz.util.db.pg.IndexHelper.SystemIndex.VERSION_ID; - import com.google.common.collect.Lists; import com.here.xyz.jobs.Job; import com.here.xyz.jobs.datasets.DatasetDescription.Space; @@ -42,9 +34,6 @@ import com.here.xyz.jobs.steps.impl.AnalyzeSpaceTable; import com.here.xyz.jobs.steps.impl.CreateIndex; import com.here.xyz.jobs.steps.impl.DropIndexes; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.EntityPerLine; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format; import com.here.xyz.jobs.steps.impl.transport.TaskedImportFilesToSpace; import com.here.xyz.models.hub.Ref; import com.here.xyz.util.db.pg.IndexHelper.Index; @@ -52,25 +41,21 @@ import com.here.xyz.util.web.HubWebClient; import com.here.xyz.util.web.XyzWebClient.ErrorResponseException; import com.here.xyz.util.web.XyzWebClient.WebClientException; + import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -public class ImportFromFiles implements JobCompilationInterceptor { - public boolean useNewTaskedImportStep = true; +import static com.here.xyz.jobs.steps.Step.InputSet.USER_INPUTS; +import static com.here.xyz.util.db.pg.IndexHelper.SystemIndex.NEXT_VERSION; +import static com.here.xyz.util.db.pg.IndexHelper.SystemIndex.OPERATION; +import static com.here.xyz.util.db.pg.IndexHelper.SystemIndex.VERSION_ID; +public class ImportFromFiles implements JobCompilationInterceptor { public static Set> allowedTargetTypes = new HashSet<>(Set.of(Space.class)); - public void setUseNewTaskedImportStep(boolean useNewTaskedImportStep) { - this.useNewTaskedImportStep = useNewTaskedImportStep; - } - public ImportFromFiles withUseNewTaskedImportStep(boolean useNewTaskedImportStep) { - setUseNewTaskedImportStep(useNewTaskedImportStep); - return this; - } - @Override public boolean chooseMe(Job job) { return job.getProcess() == null && job.getSource() instanceof Files files && isSupportedFormat(files) @@ -87,52 +72,31 @@ public CompilationStepGraph compile(Job job) { String spaceId = target.getId(); final FileFormat sourceFormat = ((Files) job.getSource()).getInputSettings().getFormat(); - Format importStepFormat; - if (sourceFormat instanceof GeoJson) - importStepFormat = GEOJSON; - else if (sourceFormat instanceof Csv csvFormat) - importStepFormat = csvFormat.isGeometryAsExtraWkbColumn() ? CSV_JSON_WKB : CSV_GEOJSON; - else - throw new CompilationError("Unsupported import file format: " + sourceFormat.getClass().getSimpleName()); - EntityPerLine entityPerLine = getEntityPerLine(sourceFormat); + if (!(sourceFormat instanceof GeoJson)) + throw new CompilationError("Unsupported import file format: " + sourceFormat.getClass().getSimpleName()); //This validation check is necessary to deliver a constructive error to the user - otherwise keepIndices will throw a runtime error. checkIfSpaceIsAccessible(spaceId); - if(useNewTaskedImportStep) { - if(!entityPerLine.equals(EntityPerLine.Feature)) - throw new CompilationError("TaskedImportStep - Unsupported entityPerLine configuration: " + entityPerLine.name()); - if(!(sourceFormat instanceof GeoJson)) - throw new CompilationError("TaskedImportStep - Unsupported format configuration: " + sourceFormat.getClass().getSimpleName()); - - TaskedImportFilesToSpace importFilesStep = new TaskedImportFilesToSpace() //Perform import - .withSpaceId(spaceId) - .withVersionRef(new Ref(Ref.HEAD)) - .withJobId(job.getId()) - .withInputSets(List.of(USER_INPUTS.get())); - if (importFilesStep.keepIndices()) - //Perform only the import Step - return (CompilationStepGraph) new CompilationStepGraph() - .addExecution(importFilesStep); + TaskedImportFilesToSpace importFilesStep = new TaskedImportFilesToSpace() //Perform import + .withEntityPerLine(getEntityPerLine(sourceFormat)) + .withSpaceId(spaceId) + .withVersionRef(new Ref(Ref.HEAD)) + .withJobId(job.getId()) + .withInputSets(List.of(USER_INPUTS.get())); - //perform full Import with all 11 Steps (IDX deletion/creation..) - return compileTaskedImportSteps(importFilesStep); - }else{ - ImportFilesToSpace importFilesStep = new ImportFilesToSpace() //Perform import - .withSpaceId(spaceId) - .withFormat(importStepFormat) - .withEntityPerLine(entityPerLine) - .withJobId(job.getId()) - .withInputSets(List.of(USER_INPUTS.get())); - if (importFilesStep.keepIndices()) + try { + if (importFilesStep.useFeatureWriter()) //Perform only the import Step return (CompilationStepGraph) new CompilationStepGraph() .addExecution(importFilesStep); - - //perform full Import with all 11 Steps (IDX deletion/creation..) - return compileImportSteps(importFilesStep); + } catch (WebClientException e) { + throw new CompilationError("Error retrieving statistics for target resource during compilation!", e); } + + //perform full Import with all 11 Steps (IDX deletion/creation..) + return compileTaskedImportSteps(importFilesStep); } public static CompilationStepGraph compileWrapWithDropRecreateIndices(String spaceId, StepExecution stepExecution) { @@ -179,20 +143,10 @@ public static CompilationStepGraph compileTaskedImportSteps(TaskedImportFilesToS } } - public static CompilationStepGraph compileImportSteps(ImportFilesToSpace importFilesStep) { - try { - //Keep these indices if FeatureWriter is used - List whiteListIndex = importFilesStep.useFeatureWriter() ? List.of(VERSION_ID, NEXT_VERSION, OPERATION) : null; - return compileWrapWithDropRecreateIndices(importFilesStep.getSpaceId(), importFilesStep, whiteListIndex); - } catch (WebClientException e) { - throw new CompilationError("Unexpected error occurred during compilation", e); - } - } - - private EntityPerLine getEntityPerLine(FileFormat format) { - return EntityPerLine.valueOf((format instanceof GeoJson geoJson - ? geoJson.getEntityPerLine() - : ((Csv) format).getEntityPerLine()).toString()); + private TaskedImportFilesToSpace.EntityPerLine getEntityPerLine(FileFormat format) { + return TaskedImportFilesToSpace.EntityPerLine.valueOf((format instanceof GeoJson geoJson + ? geoJson.getEntityPerLine() + : ((Csv) format).getEntityPerLine()).toString()); } private static List toSequentialSteps(String spaceId, List indices) { diff --git a/xyz-jobs/xyz-job-service/src/test/java/com/here/xyz/jobs/ImportJobTestIT.java b/xyz-jobs/xyz-job-service/src/test/java/com/here/xyz/jobs/ImportJobTestIT.java index f25e23d9a0..cb58b62fbb 100644 --- a/xyz-jobs/xyz-job-service/src/test/java/com/here/xyz/jobs/ImportJobTestIT.java +++ b/xyz-jobs/xyz-job-service/src/test/java/com/here/xyz/jobs/ImportJobTestIT.java @@ -25,7 +25,6 @@ import com.here.xyz.jobs.datasets.files.GeoJson; import com.here.xyz.jobs.steps.JobCompiler; import com.here.xyz.jobs.steps.compiler.ImportFromFiles; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace; import com.here.xyz.jobs.util.test.ContentCreator; import com.here.xyz.models.hub.Space; import org.junit.jupiter.api.Assertions; @@ -46,12 +45,13 @@ public class ImportJobTestIT extends JobTest { @BeforeEach public void setUp() { - createSpace(new Space().withId(SPACE_ID).withSearchableProperties(Map.of( + createSpace(new Space().withId(SPACE_ID).withVersionsToKeep(10000).withSearchableProperties(Map.of( "foo1", true, "foo2.nested", true, "foo3.nested.array::array", true ) ), false); + putRandomFeatureCollectionToSpace(SPACE_ID, 10); } @Test @@ -60,7 +60,7 @@ public void testSimpleImport() throws Exception { new FileInputSettings() .withFormat(new GeoJson().withEntityPerLine(Feature))) ); - createAndStartJob(importJob, ContentCreator.generateImportFileContent(ImportFilesToSpace.Format.GEOJSON, 50)); + createAndStartJob(importJob, ContentCreator.generateImportFileContent(50)); } @Test @@ -71,7 +71,6 @@ public void testInvalidEntity() throws Exception { ); Assertions.assertThrows(JobCompiler.CompilationError.class, () -> new ImportFromFiles() - .withUseNewTaskedImportStep(true) .compile(importJob)); } @@ -82,7 +81,6 @@ public void testInvalidFormat() throws Exception { .withFormat(new Csv().withEntityPerLine(Feature)))); Assertions.assertThrows(JobCompiler.CompilationError.class, () -> new ImportFromFiles() - .withUseNewTaskedImportStep(true) .compile(importJob)); } diff --git a/xyz-jobs/xyz-job-service/src/test/java/com/here/xyz/jobs/steps/execution/JobExecutorTests.java b/xyz-jobs/xyz-job-service/src/test/java/com/here/xyz/jobs/steps/execution/JobExecutorTests.java index 459cc6b56a..2bb2f7a5df 100644 --- a/xyz-jobs/xyz-job-service/src/test/java/com/here/xyz/jobs/steps/execution/JobExecutorTests.java +++ b/xyz-jobs/xyz-job-service/src/test/java/com/here/xyz/jobs/steps/execution/JobExecutorTests.java @@ -21,7 +21,6 @@ import static com.here.xyz.jobs.steps.Step.Visibility.SYSTEM; import static com.here.xyz.jobs.steps.impl.transport.ExportSpaceToFiles.EXPORTED_DATA; -import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format.GEOJSON; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -34,7 +33,6 @@ import com.here.xyz.jobs.steps.StepExecution; import com.here.xyz.jobs.steps.StepGraph; import com.here.xyz.jobs.steps.impl.transport.ExportSpaceToFiles; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -43,6 +41,8 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; + +import com.here.xyz.jobs.steps.impl.transport.TaskedImportFilesToSpace; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -119,7 +119,7 @@ public void testReuseSequentialGraphWithInputIds() { StepGraph graph1 = new CompilationStepGraph() .withExecutions(List.of( exportStep1, - stepGenerator(ImportFilesToSpace.class, JOB_ID1, SOURCE_ID2, Set.of(exportStep1.getId())) + stepGenerator(TaskedImportFilesToSpace.class, JOB_ID1, SOURCE_ID2, Set.of(exportStep1.getId())) )) .withParallel(false); ((CompilationStepGraph) graph1).enrich(JOB_ID1); @@ -129,7 +129,7 @@ public void testReuseSequentialGraphWithInputIds() { StepGraph graph2 = new CompilationStepGraph() .withExecutions(List.of( exportStep2, - stepGenerator(ImportFilesToSpace.class, JOB_ID2, SOURCE_ID2, Set.of(exportStep2.getId())) //not reusable + stepGenerator(TaskedImportFilesToSpace.class, JOB_ID2, SOURCE_ID2, Set.of(exportStep2.getId())) //not reusable )) .withParallel(false); ((CompilationStepGraph) graph2).enrich(JOB_ID1); @@ -147,7 +147,7 @@ public void testReuseSequentialGraphWithInputIds() { Step execution2 = (Step) graph.getExecutions().get(1); assertInstanceOf(DelegateStep.class, execution1); - assertInstanceOf(ImportFilesToSpace.class, execution2); + assertInstanceOf(TaskedImportFilesToSpace.class, execution2); //Check if previousStepIds are set correct assertEquals(Set.of(execution1.getId()), execution2.getPreviousStepIds()); @@ -417,9 +417,8 @@ public StepExecution stepGenerator(Class stepType, String jobId, String sourceId .withJobId(jobId) .withOutputSetVisibility(EXPORTED_DATA, SYSTEM); - case "ImportFilesToSpace" -> { - ImportFilesToSpace importFilesToSpace = new ImportFilesToSpace() - .withFormat(GEOJSON) + case "TaskedImportFilesToSpace" -> { + TaskedImportFilesToSpace importFilesToSpace = new TaskedImportFilesToSpace() .withSpaceId(sourceId) .withJobId(jobId); if (inputStepIds != null) diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/SpaceBasedStep.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/SpaceBasedStep.java index ccf801f4c2..219c4e6981 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/SpaceBasedStep.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/SpaceBasedStep.java @@ -44,7 +44,6 @@ import com.here.xyz.jobs.steps.impl.transport.ExportChangedTiles; import com.here.xyz.jobs.steps.impl.transport.ExportSpaceToFiles; import com.here.xyz.jobs.steps.impl.transport.GetNextSpaceVersion; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace; import com.here.xyz.jobs.steps.impl.transport.TaskedImportFilesToSpace; import com.here.xyz.models.hub.Branch; import com.here.xyz.models.hub.Connector; @@ -67,7 +66,6 @@ @JsonSubTypes.Type(value = CreateIndex.class), @JsonSubTypes.Type(value = ExportSpaceToFiles.class), @JsonSubTypes.Type(value = ExportChangedTiles.class), - @JsonSubTypes.Type(value = ImportFilesToSpace.class), @JsonSubTypes.Type(value = TaskedImportFilesToSpace.class), @JsonSubTypes.Type(value = DropIndexes.class), @JsonSubTypes.Type(value = AnalyzeSpaceTable.class), diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ImportFilesToSpace.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ImportFilesToSpace.java deleted file mode 100644 index 699e951db3..0000000000 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/ImportFilesToSpace.java +++ /dev/null @@ -1,856 +0,0 @@ -/* - * Copyright (C) 2017-2025 HERE Europe B.V. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * License-Filename: LICENSE - */ - -package com.here.xyz.jobs.steps.impl.transport; - -import static com.here.xyz.events.ContextAwareEvent.SpaceContext.DEFAULT; -import static com.here.xyz.events.ContextAwareEvent.SpaceContext.EXTENSION; -import static com.here.xyz.events.UpdateStrategy.DEFAULT_UPDATE_STRATEGY; -import static com.here.xyz.jobs.steps.Step.Visibility.USER; -import static com.here.xyz.jobs.steps.execution.LambdaBasedStep.ExecutionMode.ASYNC; -import static com.here.xyz.jobs.steps.execution.LambdaBasedStep.ExecutionMode.SYNC; -import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.EntityPerLine.Feature; -import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.EntityPerLine.FeatureCollection; -import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format.CSV_JSON_WKB; -import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format.GEOJSON; -import static com.here.xyz.jobs.steps.impl.SpaceBasedStep.LogPhase.JOB_EXECUTOR; -import static com.here.xyz.jobs.steps.impl.SpaceBasedStep.LogPhase.JOB_VALIDATE; -import static com.here.xyz.jobs.steps.impl.SpaceBasedStep.LogPhase.STEP_EXECUTE; -import static com.here.xyz.jobs.steps.impl.SpaceBasedStep.LogPhase.STEP_ON_ASYNC_SUCCESS; -import static com.here.xyz.jobs.steps.impl.SpaceBasedStep.LogPhase.STEP_ON_STATE_CHECK; -import static com.here.xyz.jobs.steps.impl.transport.TaskedSpaceBasedStep.buildTemporaryJobTableDropStatement; -import static com.here.xyz.jobs.steps.impl.transport.TaskedSpaceBasedStep.getTemporaryJobTableName; -import static com.here.xyz.util.web.XyzWebClient.WebClientException; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonView; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.here.xyz.events.UpdateStrategy; -import com.here.xyz.jobs.steps.S3DataFile; -import com.here.xyz.jobs.steps.execution.StepException; -import com.here.xyz.jobs.steps.impl.SpaceBasedStep; -import com.here.xyz.jobs.steps.impl.tools.ResourceAndTimeCalculator; -import com.here.xyz.jobs.steps.impl.transport.tools.ImportFilesQuickValidator; -import com.here.xyz.jobs.steps.impl.transport.tools.ImportQueryBuilder; -import com.here.xyz.jobs.steps.inputs.Input; -import com.here.xyz.jobs.steps.inputs.InputFromOutput; -import com.here.xyz.jobs.steps.inputs.UploadUrl; -import com.here.xyz.jobs.steps.outputs.CreatedVersion; -import com.here.xyz.jobs.steps.outputs.DownloadUrl; -import com.here.xyz.jobs.steps.outputs.FeatureStatistics; -import com.here.xyz.jobs.steps.resources.IOResource; -import com.here.xyz.jobs.steps.resources.Load; -import com.here.xyz.jobs.steps.resources.TooManyResourcesClaimed; -import com.here.xyz.jobs.util.S3Client; -import com.here.xyz.models.hub.Space; -import com.here.xyz.responses.StatisticsResponse; -import com.here.xyz.util.db.SQLQuery; -import com.here.xyz.util.db.pg.FeatureWriterQueryBuilder; -import com.here.xyz.util.db.pg.FeatureWriterQueryBuilder.FeatureWriterQueryContextBuilder; -import com.here.xyz.util.service.BaseHttpServerVerticle.ValidationException; -import com.here.xyz.util.service.Core; -import io.vertx.core.json.JsonObject; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.zip.GZIPInputStream; -import org.locationtech.jts.io.ParseException; - - -/** - * This step imports a set of user provided inputs and imports their data into a specified space. This step produces exactly one output of - * type {@link FeatureStatistics}. - */ -@Deprecated -public class ImportFilesToSpace extends SpaceBasedStep { - - private static final long MAX_INPUT_BYTES_FOR_SYNC_IMPORT = 100l * 1024 * 1024; - private static final long MAX_INPUT_BYTES_FOR_KEEP_INDICES = 1l * 1024 * 1024 * 1024; - private static final int MIN_FEATURE_COUNT_IN_TARGET_TABLE_FOR_KEEP_INDICES = 5_000_000; - private static final int MAX_DB_THREAD_COUNT = 15; - public static final String STATISTICS = "statistics"; - private static final String TRIGGER_TABLE_SUFFIX = "_trigger_tbl"; - - private Format format = GEOJSON; - - @JsonView({Internal.class, Static.class}) - private double overallNeededAcus = -1; - - @JsonView({Internal.class, Static.class}) - private long targetTableFeatureCount = -1; - - @JsonView({Internal.class, Static.class}) - private int fileCount = -1; - - @JsonView({Internal.class, Static.class}) - private int calculatedThreadCount = -1; - - @JsonView({Internal.class, Static.class}) - private int estimatedSeconds = -1; - - @JsonView({Internal.class, Static.class}) - private UpdateStrategy updateStrategy = DEFAULT_UPDATE_STRATEGY; - - @JsonView({Internal.class, Static.class}) - private EntityPerLine entityPerLine = Feature; - - @JsonView({Internal.class, Static.class}) - private boolean retainMetadata = false; - - @JsonView({Internal.class, Static.class}) - private boolean enableQuickValidation = true; - - //Compilers can decide max allowed import size. Set default to 200G for normal use-case - @JsonIgnore - private long maxInputBytesForNonEmptyImport = 200l * 1024 * 1024 * 1024; - - { - setOutputSets(List.of(new OutputSet(STATISTICS, USER, true))); - } - - public Format getFormat() { - return format; - } - - public void setFormat(Format format) { - this.format = format; - } - - public ImportFilesToSpace withFormat(Format format) { - setFormat(format); - return this; - } - - public UpdateStrategy getUpdateStrategy() { - return updateStrategy; - } - - public void setUpdateStrategy(UpdateStrategy updateStrategy) { - this.updateStrategy = updateStrategy; - } - - public ImportFilesToSpace withUpdateStrategy(UpdateStrategy updateStrategy) { - setUpdateStrategy(updateStrategy); - return this; - } - - public ImportFilesToSpace withCalculatedThreadCount(int calculatedThreadCount) { - setCalculatedThreadCount(calculatedThreadCount); - return this; - } - - public int getCalculatedThreadCount() { - return calculatedThreadCount; - } - - public void setCalculatedThreadCount(int calculatedThreadCount) { - this.calculatedThreadCount = calculatedThreadCount; - } - - public EntityPerLine getEntityPerLine() { - return entityPerLine; - } - - public void setEntityPerLine(EntityPerLine entityPerLine) { - this.entityPerLine = entityPerLine; - } - - public ImportFilesToSpace withEntityPerLine(EntityPerLine entityPerLine) { - setEntityPerLine(entityPerLine); - return this; - } - - public boolean isRetainMetadata() { - return retainMetadata; - } - - public void setRetainMetadata(boolean retainMetadata) { - this.retainMetadata = retainMetadata; - } - - public ImportFilesToSpace withRetainMetadata(boolean retainMetadata) { - setRetainMetadata(retainMetadata); - return this; - } - - public void setEnableQuickValidation(boolean enableQuickValidation) { - this.enableQuickValidation = enableQuickValidation; - } - - public ImportFilesToSpace withEnableQuickValidation(boolean enableQuickValidation) { - setEnableQuickValidation(enableQuickValidation); - return this; - } - - public boolean isEnableQuickValidation() { - return enableQuickValidation; - } - - public long getMaxInputBytesForNonEmptyImport() { - return maxInputBytesForNonEmptyImport; - } - - public void setMaxInputBytesForNonEmptyImport(long maxInputBytesForNonEmptyImport) { - this.maxInputBytesForNonEmptyImport = maxInputBytesForNonEmptyImport; - } - - public ImportFilesToSpace withMaxInputBytesForNonEmptyImport(long maxInputBytesForNonEmptyImport) { - setMaxInputBytesForNonEmptyImport(maxInputBytesForNonEmptyImport); - return this; - } - - public boolean keepIndices() { - /* - * The targetSpace needs to have more than MIN_FEATURE_COUNT_IN_TARGET_TABLE_FOR_KEEP_INDICES features - * Reason: For tables with not that many records in its always faster to remove and recreate indices - * + - * Incoming bytes have to be smaller than MAX_INPUT_BYTES_FOR_KEEP_INDICES - * Reason: if we write not that much, it's also with indices fast enough - */ - return loadTargetSpaceFeatureCount() > MIN_FEATURE_COUNT_IN_TARGET_TABLE_FOR_KEEP_INDICES - || getUncompressedUploadBytesEstimation() < MAX_INPUT_BYTES_FOR_KEEP_INDICES; - } - - /* - * Use FeatureWriter if either is true - * - the target space is not empty - * - space is composite - */ - public boolean useFeatureWriter() throws WebClientException { - return loadTargetSpaceFeatureCount() > 0 || space().getExtension() != null; - } - - private long loadTargetSpaceFeatureCount() { - if (targetTableFeatureCount == -1 && getSpaceId() != null) { - StatisticsResponse statistics; - try { - statistics = loadSpaceStatistics(getSpaceId(), EXTENSION, true); - targetTableFeatureCount = statistics.getCount().getValue(); - } - catch (WebClientException e) { - throw new RuntimeException(e); - } - } - return targetTableFeatureCount; - } - - @Override - public List getNeededResources() { - try { - //Calculate estimation for ACUs for all parallel running threads - overallNeededAcus = overallNeededAcus != -1 ? overallNeededAcus : calculateNeededAcus(calculateThreadCount()); - return List.of(new Load().withResource(db()).withEstimatedVirtualUnits(overallNeededAcus), - new Load().withResource(IOResource.getInstance()).withEstimatedVirtualUnits(getUncompressedUploadBytesEstimation())); - } - catch (WebClientException e) { - throw new RuntimeException(e); - } - } - - private int countFiles() { - return fileCount = fileCount != -1 ? fileCount : currentInputsCount(UploadUrl.class); - } - - private int calculateThreadCount() { - return calculatedThreadCount = calculatedThreadCount != -1 ? calculatedThreadCount : - ResourceAndTimeCalculator.getInstance().calculateNeededImportDBThreadCount(getUncompressedUploadBytesEstimation(), countFiles(), - MAX_DB_THREAD_COUNT); - } - - @Override - public int getTimeoutSeconds() { - //return ResourceAndTimeCalculator.getInstance().calculateImportTimeoutSeconds(getSpaceId(), getUncompressedUploadBytesEstimation(), getExecutionMode()); - return 3 * 24 * 3600; - } - - @Override - public int getEstimatedExecutionSeconds() { - if (estimatedSeconds == -1 && getSpaceId() != null) { - estimatedSeconds = ResourceAndTimeCalculator.getInstance() - .calculateImportTimeInSeconds(getSpaceId(), getUncompressedUploadBytesEstimation(), getExecutionMode()); - infoLog(JOB_EXECUTOR, "Calculated estimatedSeconds: "+estimatedSeconds ); - } - return estimatedSeconds; - } - - //TODO: Cache the execution-mode once it was calculated - @Override - public ExecutionMode getExecutionMode() { - //CSV is not supported in SYNC mode -// if (format == CSV_JSON_WKB || format == CSV_GEOJSON) -// return ASYNC; -// return getUncompressedUploadBytesEstimation() > MAX_INPUT_BYTES_FOR_SYNC_IMPORT ? ASYNC : SYNC; - //TODO: Fix ConnectionPool issue caused from threading in syncExecution() - return ASYNC; - } - - @Override - public String getDescription() { - return "Imports the data to space " + getSpaceId(); - } - - @Override - public void deleteOutputs() { - super.deleteOutputs(); - - /** @TODO: - * Currently we only have non-retryable import jobs. If we introduce some, we need to implement the resource - * cleanup properly. One possibility could be to add the restriction that steps only could have temporary - * resources during their execution. To not lose the temporary information, which is relevant for a retry, - * it would be required to write those into a system output. - */ - } - - @Override - public boolean validate() throws ValidationException { - super.validate(); - try { - infoLog(JOB_VALIDATE); - //Check if the space is actually existing - Space space = space(); - if (!space.isActive()) - throw new ValidationException("Data can not be written to target " + space.getId() + " as it is inactive."); - - if (space.isReadOnly()) - throw new ValidationException("Data can not be written to target " + space.getId() + " as it is in read-only mode."); - - if (entityPerLine == FeatureCollection && format == CSV_JSON_WKB) - throw new ValidationException("Combination of entityPerLine 'FeatureCollection' and type 'Csv' is not supported!"); - - if (loadTargetSpaceFeatureCount() > 0 && getUncompressedUploadBytesEstimation() > getMaxInputBytesForNonEmptyImport()) - throw new ValidationException("An import into a non empty space is not possible. " - + "The uncompressed size of the provided files exceeds the limit of " + getMaxInputBytesForNonEmptyImport() + " bytes."); - } - catch (WebClientException e) { - throw new ValidationException("Error loading resource " + getSpaceId(), e); - } - - if (isUserInputsExpected()) { - if (!isUserInputsPresent(UploadUrl.class)) - return false; - //Quick-validate the first UploadUrl that is found in the inputs - if(enableQuickValidation) - ImportFilesQuickValidator.validate(loadInputsSample(1, UploadUrl.class).get(0), format, entityPerLine); - } - - return true; - } - - @Override - public void execute(boolean resume) throws WebClientException, SQLException, TooManyResourcesClaimed, IOException, ParseException, - InterruptedException { - if (getExecutionMode() == SYNC) - syncExecution(); - else { - if (!resume) { - infoLog(STEP_EXECUTE, "Retrieve new version"); - long newVersion = getOrIncreaseVersionSequence(); - - infoLog(STEP_EXECUTE, "Create TriggerTable and Trigger"); - //Create Temp-ImportTable to avoid deserialization of JSON and fix missing row count - //FIXME: Use job owner as author - runBatchWriteQuerySync(buildTemporaryTriggerTableBlock(space().getOwner(), newVersion), db(), 0); - } - - if(!createAndFillTemporaryJobTable()) { - infoLog(STEP_EXECUTE, "No files available - nothing to do!"); - //Report Success with a new invocation. - runReadQueryAsync(buildSuccessReportQuery(), db(), 0, true); - //no Files to process simply return successfully! - return; - } - - double neededAcusForOneThread = calculateNeededAcus(1); - for (int i = 1; i <= calculateThreadCount(); i++) { - infoLog(STEP_EXECUTE, "Start Import Thread number " + i); - runReadQueryAsync(buildImportQuery(), db(), neededAcusForOneThread, false); - } - } - } - - private void syncExecution() throws WebClientException, SQLException, TooManyResourcesClaimed, IOException { - //TODO: Support resume - infoLog(STEP_EXECUTE, "Retrieve new version"); - long newVersion = getOrIncreaseVersionSequence(); - - ExecutorService exec = Executors.newFixedThreadPool(5); - List> resultFutures = new ArrayList<>(); - - //Execute the sync for each import file in parallel - for (Input input : loadInputs(UploadUrl.class)) - resultFutures.add(exec.submit(() -> new FeatureStatistics() - .withFeatureCount(syncWriteFileToSpace((UploadUrl) input, newVersion)) - .withByteSize(input.getByteSize()))); - - //TODO: Use CompletableFuture.allOf() instead of the following - //Wait for the futures and aggregate the result statistics into one FeatureStatistics object - FeatureStatistics resultOutput = new FeatureStatistics(); - for (Future future : resultFutures) { - try { - resultOutput - .withFeatureCount(resultOutput.getFeatureCount() + future.get().getFeatureCount()) - .withByteSize(resultOutput.getByteSize() + future.get().getByteSize()); - } - catch (InterruptedException | ExecutionException e) { - throw new StepException("Error during sync write to target space", e); - } - } - - exec.shutdown(); - - registerOutputs(List.of(resultOutput), STATISTICS); - infoLog(STEP_EXECUTE, "Set contentUpdatedAt on target space"); - hubWebClient().patchSpace(getSpaceId(), Map.of("contentUpdatedAt", Core.currentTimeMillis())); - } - - /** - * Writes one input file into the target space. - * - * @param input The input file - * @param newVersion The new space version being created by this import - * @return The number of features that have been written - */ - private int syncWriteFileToSpace(UploadUrl input, long newVersion) throws IOException, WebClientException, SQLException, - TooManyResourcesClaimed { - infoLog(STEP_EXECUTE, "Start sync write of file " + input.getS3Key() + " ..."); - - InputStream inputStream = S3Client.getInstance(input.getS3Bucket()).streamObjectContent(input.getS3Key()); - if (input.isCompressed()) - inputStream = new GZIPInputStream(inputStream); - - try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) { - StringBuilder fileContent = new StringBuilder(); - fileContent.append("["); - String line; - while ((line = reader.readLine()) != null) { - line = line.replace("\\","\\\\"); - fileContent.append(line).append(","); - } - //cut comma if file was empty - if (fileContent.length() > 1) { - fileContent.setLength(fileContent.length() - 1); - } - fileContent.append("]"); - - int writtenFeatureCount = runReadQuerySync(buildFeatureWriterQuery(fileContent.toString(), newVersion), db(), 0, rs -> { - rs.next(); - return rs.getInt("count"); - }); - - infoLog(STEP_EXECUTE, "Completed sync write of file " + input.getS3Key() + ". Written features: " - + writtenFeatureCount + ", input bytes: " + input.getByteSize()); - - return writtenFeatureCount; - } - } - - /** - * This method returns the next space version in either of two ways: - *
  1. By Fetching the next version from {@link CreatedVersion} provided as step input
  2. - *
  3. By Incrementing the space version sequence
- */ - private long getOrIncreaseVersionSequence() throws SQLException, TooManyResourcesClaimed, WebClientException { - - Optional versionInput = loadInputs(InputFromOutput.class) - .stream() - .filter(input -> ((InputFromOutput) input).getDelegate() instanceof CreatedVersion) - .findFirst(); - if(versionInput.isPresent()) { - CreatedVersion version = (CreatedVersion) ((InputFromOutput) versionInput.get()).getDelegate(); - return version.getVersion(); - } - - return runReadQuerySync(buildVersionSequenceIncrement(), db(), 0, rs -> { - rs.next(); - return rs.getLong(1); - }); - } - - private boolean createAndFillTemporaryJobTable() throws SQLException, TooManyResourcesClaimed, WebClientException { - if (isResume()) { - infoLog(STEP_EXECUTE, "Reset SuccessMarker"); - runWriteQuerySync(buildResetJobTableItemsForResumeStatement(), db(), 0); - return true; - } - else { - infoLog(STEP_EXECUTE, "Create temporary job table"); - runWriteQuerySync(buildTemporaryJobTableCreateStatement(), db(), 0); - - infoLog(STEP_EXECUTE, "Fill temporary job table"); - - List inputs = loadInputs(UploadUrl.class); - runBatchWriteQuerySync(SQLQuery.batchOf(buildTemporaryJobTableInsertStatements((List) inputs)), db(), 0); - - //If no Inputs are present return 0 - return inputs.size() != 0; - } - } - - @Override - protected void onStateCheck() { - try { - runReadQuerySync(buildProgressQuery(getSchema(db()), this), db(), 0, - rs -> { - rs.next(); - - float progress = rs.getFloat("progress"); - long processedBytes = rs.getLong("processed_bytes"); - int finishedCnt = rs.getInt("finished_cnt"); - int failedCnt = rs.getInt("failed_cnt"); - - getStatus().setEstimatedProgress(progress); - - infoLog(STEP_ON_STATE_CHECK,"Progress[" + progress + "] => " + " processedBytes:" - + processedBytes + " ,finishedCnt:" + finishedCnt + " ,failedCnt:" + failedCnt); - return progress; - }); - } - catch (Exception e) { - //TODO: What to do? Only log? Report Status is not that important. Further Ignore "table does not exists error" - report 0 in this case. - errorLog(STEP_ON_STATE_CHECK, e); - } - } - - @Override - protected void onAsyncSuccess() throws WebClientException, - SQLException, TooManyResourcesClaimed, IOException { - try { - FeatureStatistics statistics = runReadQuerySync(buildStatisticDataOfTemporaryTableQuery(), db(), - 0, rs -> rs.next() - ? new FeatureStatistics().withFeatureCount(rs.getLong("imported_rows")).withByteSize(rs.getLong("imported_bytes")) - : new FeatureStatistics()); - - infoLog(STEP_ON_ASYNC_SUCCESS, "Job Statistics: bytes=" + statistics.getByteSize() + " rows=" + statistics.getFeatureCount()); - registerOutputs(List.of(statistics), STATISTICS); - - cleanUpDbRelatedResources(); - - infoLog(STEP_ON_ASYNC_SUCCESS, "Set contentUpdatedAt on target space"); - hubWebClient().patchSpace(getSpaceId(), Map.of("contentUpdatedAt", Core.currentTimeMillis())); - } - catch (SQLException e) { - //relation "*_job_data" does not exist - can happen when we have received twice a SUCCESS_CALLBACK - //TODO: Find out the cases in which that could happen and prevent it from happening - if (e.getSQLState() != null && e.getSQLState().equals("42P01")) { - errorLog(STEP_ON_ASYNC_SUCCESS, e, "_job_data table got already deleted!"); - return; - } - throw e; - } - } - - private void cleanUpDbRelatedResources() throws TooManyResourcesClaimed, SQLException, WebClientException { - infoLog(STEP_ON_ASYNC_SUCCESS, "Clean up database resources"); - runBatchWriteQuerySync(SQLQuery.batchOf( - buildTemporaryJobTableDropStatement(getSchema(db()), getTemporaryJobTableName(getId())), - buildTemporaryJobTableDropStatement(getSchema(db()), getTemporaryTriggerTableName(getId())) - ), db(), 0); - } - - @Override - protected boolean onAsyncFailure() { - try { - //TODO: Inspect the error provided in the status and decide whether it is retryable (return-value) - boolean isRetryable = false; - - if (!isRetryable) - cleanUpDbRelatedResources(); - - return isRetryable; - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - - private SQLQuery buildTemporaryTriggerTableForImportQuery() throws WebClientException { - String tableFields = - "jsondata TEXT, " - + "geo geometry(GeometryZ, 4326), " - + "count INT "; - return new SQLQuery("CREATE TABLE IF NOT EXISTS ${schema}.${table} (${{tableFields}} )") - .withQueryFragment("tableFields", tableFields) - .withVariable("schema", getSchema(db())) - .withVariable("table", getTemporaryTriggerTableName(getId())); - } - - private SQLQuery buildCreateImportTrigger(String targetAuthor, long newVersion) throws WebClientException { - if(useFeatureWriter()) - return buildCreateImportTriggerWithFeatureWriter(targetAuthor, newVersion); - return buildCreateImportTriggerForInsertsOnly(targetAuthor, newVersion); - } - - private SQLQuery buildTemporaryTriggerTableBlock(String targetAuthor, long newVersion) throws WebClientException { - return SQLQuery.batchOf( - buildTemporaryTriggerTableForImportQuery(), - buildCreateImportTrigger(targetAuthor, newVersion) - ); - } - - private SQLQuery buildCreateImportTriggerForInsertsOnly(String targetAuthor, long targetSpaceVersion) throws WebClientException { - String triggerFunction = "import_from_s3_trigger_for_empty_layer"; - triggerFunction += entityPerLine == FeatureCollection ? "_geojsonfc" : ""; - - return new SQLQuery("CREATE OR REPLACE TRIGGER insertTrigger BEFORE INSERT ON ${schema}.${table} " - + "FOR EACH ROW EXECUTE PROCEDURE ${triggerFunction}('${{author}}', ${{spaceVersion}}, '${{targetTable}}', ${{retainMetadata}});") - .withQueryFragment("spaceVersion", "" + targetSpaceVersion) - .withQueryFragment("author", targetAuthor) - .withQueryFragment("targetTable", getRootTableName(space())) - .withQueryFragment("retainMetadata", "" + isRetainMetadata()) - .withVariable("triggerFunction", triggerFunction) - .withVariable("schema", getSchema(db())) - .withVariable("table", getTemporaryTriggerTableName(getId())); - } - - private SQLQuery buildCreateImportTriggerWithFeatureWriter(String author, long newVersion) throws WebClientException { - String superRootTable = space().getExtension() != null ? getRootTableName(superSpace()) : null; - return new ImportQueryBuilder(getId(), getSchema(db()), getRootTableName(space()), space().getVersionsToKeep()) - .buildCreateFeatureWriterImportTrigger(author, newVersion, superRootTable, updateStrategy, - entityPerLine.name()); - } - - //TODO: Move to XyzSpaceTableHelper or so (it's the nth time we have that implemented somewhere) - private SQLQuery buildVersionSequenceIncrement() throws WebClientException { - return new SQLQuery("SELECT nextval('${schema}.${sequence}')") - .withVariable("schema", getSchema(db())) - .withVariable("sequence", getRootTableName(space()) + "_version_seq"); - } - - private SQLQuery buildStatisticDataOfTemporaryTableQuery() throws WebClientException { - return new SQLQuery(""" - SELECT sum((data->'filesize')::bigint) as imported_bytes, - count(1) as imported_files, - (SELECT sum(count) FROM ${schema}.${triggerTable} ) as imported_rows - FROM ${schema}.${tmpTable} - WHERE POSITION('SUCCESS_MARKER' in state) = 0; - """) - .withVariable("schema", getSchema(db())) - .withVariable("tmpTable", getTemporaryJobTableName(getId())) - .withVariable("triggerTable", getTemporaryTriggerTableName(getId())); - } - - private SQLQuery buildImportQuery() throws WebClientException { - SQLQuery successQuery = buildSuccessCallbackQuery(); - SQLQuery failureQuery = buildFailureCallbackQuery(); - - return new SQLQuery( - "CALL execute_transfer(#{format}, '${{successQuery}}', '${{failureQuery}}');") - .withContext(getQueryContext()) - .withAsyncProcedure(true) - .withNamedParameter("format", format.toString()) - .withQueryFragment("successQuery", successQuery.substitute().text().replaceAll("'", "''")) - .withQueryFragment("failureQuery", failureQuery.substitute().text().replaceAll("'", "''")); - } - - private SQLQuery buildSuccessReportQuery() throws WebClientException { - //Wait 5 seconds before report success to ensure event rule is successfully created before. - return new SQLQuery("PERFORM pg_sleep(5)"); - } - - @JsonIgnore - private Map getQueryContext() throws WebClientException { - Space superSpace = superSpace(); - List tables = new ArrayList<>(); - if (superSpace != null) - tables.add(getRootTableName(superSpace)); - tables.add(getRootTableName(space())); - - return new FeatureWriterQueryContextBuilder() - .withSchema(getSchema(db())) - .withTables(tables) - .withSpaceContext(DEFAULT) - .withHistoryEnabled(space().getVersionsToKeep() > 1) - .withBatchMode(true) - .with("stepId", getId()) - .build(); - } - - private SQLQuery buildFeatureWriterQuery(String featureList, long targetVersion) throws WebClientException, JsonProcessingException { - Map queryContext = getQueryContext(); - return new SQLQuery("SELECT (write_features::JSONB->>'count')::INT AS count FROM ${{writeFeaturesQuery}};") - .withQueryFragment("writeFeaturesQuery", new FeatureWriterQueryBuilder() - .withInput(featureList, "Features") - .withAuthor(space().getOwner()) - .withReturnResult(false) - .withVersion(targetVersion) - .withUpdateStrategy(updateStrategy) - .withIsPartial(false) - .withQueryContext(queryContext) - .build()) - .withContext(queryContext); //TODO: That is a temporary workaround for a bug with ignored query contexts of nested queries - } - - private SQLQuery buildProgressQuery(String schema, ImportFilesToSpace step) { - return new SQLQuery(""" - SELECT - COALESCE(processed_bytes/overall_bytes, 0) as progress, - COALESCE(processed_bytes,0) as processed_bytes, - COALESCE(finished_cnt,0) as finished_cnt, - COALESCE(failed_cnt,0) as failed_cnt - FROM( - SELECT - (SELECT sum((data->'filesize')::bigint ) FROM ${schema}.${table}) as overall_bytes, - sum((data->'filesize')::bigint ) as processed_bytes, - sum((state = 'FINISHED')::int) as finished_cnt, - sum((state = 'FAILED')::int) as failed_cnt - FROM ${schema}.${table} - WHERE POSITION('SUCCESS_MARKER' in state) = 0 - AND state IN ('FINISHED','FAILED') - )A - """) - .withVariable("schema", schema) - .withVariable("table", getTemporaryJobTableName(step.getId())); - } - - protected String bucketRegion(String bucketName) { - return S3Client.getInstance(bucketName).region(); - } - - protected List buildTemporaryJobTableInsertStatements(List fileList) throws WebClientException { - String schema = getSchema(db()); - List queryList = new ArrayList<>(); - for (S3DataFile input : fileList) { - if (input instanceof UploadUrl || input instanceof DownloadUrl) { - JsonObject data = new JsonObject() - .put("compressed", input.isCompressed()); - - if (input instanceof UploadUrl) - data.put("filesize", input.getByteSize()); - - queryList.add( - new SQLQuery(""" - INSERT INTO ${schema}.${table} (s3_bucket, s3_path, s3_region, state, data) - VALUES (#{bucketName}, #{s3Key}, #{bucketRegion}, #{state}, #{data}::jsonb) - ON CONFLICT (s3_path) DO NOTHING; - """) //TODO: Why would we ever have a conflict here? Why to fill the table again on resume()? - .withVariable("schema", schema) - .withVariable("table", getTemporaryJobTableName(getId())) - .withNamedParameter("s3Key", input.getS3Key()) - .withNamedParameter("bucketName", input.getS3Bucket()) - .withNamedParameter("bucketRegion", bucketRegion(input.getS3Bucket())) - .withNamedParameter("state", "SUBMITTED") - .withNamedParameter("data", data.toString()) - ); - } - } - - //Add final entry - queryList.add( - new SQLQuery(""" - INSERT INTO ${schema}.${table} (s3_bucket, s3_path, s3_region, state, data) - VALUES (#{bucketName}, #{s3Key}, #{bucketRegion}, #{state}, #{data}::jsonb) - ON CONFLICT (s3_path) DO NOTHING; - """) //TODO: Why would we ever have a conflict here? Why to fill the table again on resume()? - .withVariable("schema", schema) - .withVariable("table", getTemporaryJobTableName(getId())) - .withNamedParameter("s3Key", "SUCCESS_MARKER") - .withNamedParameter("bucketName", "SUCCESS_MARKER") - .withNamedParameter("state", "SUCCESS_MARKER") - .withNamedParameter("bucketRegion", "SUCCESS_MARKER") - .withNamedParameter("data", "{}")); - return queryList; - } - - protected SQLQuery buildResetJobTableItemsForResumeStatement() throws WebClientException { - String schema = getSchema(db()); - return new SQLQuery(""" - UPDATE ${schema}.${table} - SET state = - CASE - WHEN state = 'SUCCESS_MARKER_RUNNING' THEN 'SUCCESS_MARKER' - WHEN state = 'RUNNING' THEN 'SUBMITTED' - WHEN state = 'FAILED' THEN 'SUBMITTED' - END, - execution_count = - CASE - WHEN execution_count = 2 THEN 1 - ELSE execution_count - END - WHERE state IN ('SUCCESS_MARKER_RUNNING', 'RUNNING', 'FAILED'); - """) - .withVariable("schema", schema) - .withVariable("table", getTemporaryJobTableName(getId())); - } - - protected SQLQuery buildTemporaryJobTableCreateStatement() throws WebClientException { - String schema = getSchema(db()); - return new SQLQuery(""" - CREATE TABLE IF NOT EXISTS ${schema}.${table} - ( - s3_bucket text NOT NULL, - s3_path text NOT NULL, - s3_region text NOT NULL, - state text NOT NULL, --jobtype - execution_count int DEFAULT 0, --amount of retries - data jsonb COMPRESSION lz4, --statistic data - i SERIAL, - CONSTRAINT ${primaryKey} PRIMARY KEY (s3_path) - ); - """) - .withVariable("table", getTemporaryJobTableName(getId())) - .withVariable("schema", schema) - .withVariable("primaryKey", getTemporaryJobTableName(getId()) + "_primKey"); - } - - public static String getTemporaryTriggerTableName(String stepId) { - return getTemporaryJobTableName(stepId) + TRIGGER_TABLE_SUFFIX; - } - - private double calculateNeededAcus(int threadCount) { - double neededACUs; - - neededACUs = ResourceAndTimeCalculator.getInstance().calculateNeededImportAcus( - getUncompressedUploadBytesEstimation(), countFiles(), threadCount); - neededACUs /= 4d; //TODO: Remove workaround once GraphSequentializer was implemented - - infoLog(JOB_EXECUTOR, "Calculated ACUS: expectedMemoryConsumption: " - + getUncompressedUploadBytesEstimation() + " => neededACUs:" + neededACUs); - return neededACUs; - } - - public int getFileCount() { - return fileCount; - } - - public void setFileCount(int fileCount) { - this.fileCount = fileCount; - } - - //TODO: De-duplicate once CSV was removed (see GeoJson format class) - public enum EntityPerLine { - Feature, - FeatureCollection - } - - public enum Format { - CSV_GEOJSON, - CSV_JSON_WKB, - GEOJSON; - } -} diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/TaskedImportFilesToSpace.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/TaskedImportFilesToSpace.java index 464e9dd806..ab2ca38aba 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/TaskedImportFilesToSpace.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/TaskedImportFilesToSpace.java @@ -82,6 +82,9 @@ public enum Format { GEOJSON, FAST_IMPORT_INTO_EMPTY } + @JsonView({Internal.class, Static.class}) + private EntityPerLine entityPerLine = EntityPerLine.Feature; + private ImportQueryBuilder importQueryBuilder; @JsonView({Internal.class, Static.class}) @@ -172,6 +175,19 @@ public TaskedImportFilesToSpace withMaxInputBytesForNonEmptyImport(long maxInput return this; } + public EntityPerLine getEntityPerLine() { + return entityPerLine; + } + + public void setEntityPerLine(EntityPerLine entityPerLine) { + this.entityPerLine = entityPerLine; + } + + public TaskedImportFilesToSpace withEntityPerLine(EntityPerLine entityPerLine) { + setEntityPerLine(entityPerLine); + return this; + } + @Override protected boolean queryRunsOnWriter(){ return true; @@ -189,7 +205,7 @@ protected void initialSetup() throws SQLException, TooManyResourcesClaimed, WebC String superRootTable = space().getExtension() != null ? getRootTableName(superSpace()) : null; runBatchWriteQuerySync(getQueryBuilder().buildTemporaryTriggerTableBlockForImportWithFW(space().getOwner(), - newVersion, superRootTable, updateStrategy, "Feature"), db(), 0); + newVersion, superRootTable, updateStrategy, entityPerLine.name()), db(), 0); }else{ infoLog(STEP_EXECUTE, "initialSetup - Import into empty layer detected!"); @@ -402,4 +418,9 @@ private ImportQueryBuilder getQueryBuilder() throws WebClientException { } return importQueryBuilder; } + + public enum EntityPerLine { + Feature, + FeatureCollection + } } diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/tools/ImportFilesQuickValidator.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/tools/ImportFilesQuickValidator.java index cf3167281c..b07cda359b 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/tools/ImportFilesQuickValidator.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/tools/ImportFilesQuickValidator.java @@ -19,16 +19,13 @@ package com.here.xyz.jobs.steps.impl.transport.tools; -import static com.here.xyz.XyzSerializable.Mappers.DEFAULT_MAPPER; - import com.amazonaws.AmazonServiceException; import com.amazonaws.util.CountingInputStream; import com.fasterxml.jackson.core.JacksonException; import com.here.xyz.Typed; import com.here.xyz.XyzSerializable; import com.here.xyz.jobs.steps.S3DataFile; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.EntityPerLine; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format; +import com.here.xyz.jobs.steps.impl.transport.TaskedImportFilesToSpace.EntityPerLine; import com.here.xyz.jobs.util.S3Client; import com.here.xyz.models.geojson.implementation.Feature; import com.here.xyz.models.geojson.implementation.FeatureCollection; @@ -42,7 +39,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.locationtech.jts.io.ParseException; -import org.locationtech.jts.io.WKBReader; import software.amazon.awssdk.core.ResponseInputStream; public class ImportFilesQuickValidator { @@ -50,19 +46,19 @@ public class ImportFilesQuickValidator { private static final int VALIDATE_LINE_MAX_LINE_SIZE_BYTES = 4 * 1024 * 1024; private static final String RE_UPLOAD_HINT = "\nPlease re-upload the input files in the correct format using the already provided upload-urls!"; - public static void validate(S3DataFile s3File, Format format, EntityPerLine entityPerLine) throws ValidationException { + public static void validate(S3DataFile s3File, EntityPerLine entityPerLine) throws ValidationException { try { - validateFirstCSVLine(s3File, format, entityPerLine); + validateFirstCSVLine(s3File, entityPerLine); } catch (IOException e) { throw new ValidationException("Input could not be read.", e); } } - private static void validateFirstCSVLine(S3DataFile s3File, Format format, EntityPerLine entityPerLine) + private static void validateFirstCSVLine(S3DataFile s3File, EntityPerLine entityPerLine) throws IOException, ValidationException { - logger.info("Validating first line of file {} in format {}", s3File.getS3Key(), format); + logger.info("Validating first line of file {} ", s3File.getS3Key()); S3Client client = S3Client.getInstance(s3File.getS3Bucket()); StringBuilder line = new StringBuilder(); @@ -77,21 +73,21 @@ private static void validateFirstCSVLine(S3DataFile s3File, Format format, Entit line.append((char) ch); if (ch == '\n' || ch == '\r') { - ImportFilesQuickValidator.validateCSVLine(line.toString(), format, entityPerLine); - logger.info("Validation finished {} in format {}", s3File.getS3Key(), format); + ImportFilesQuickValidator.validateCSVLine(line.toString(), entityPerLine); + logger.info("Validation finished {} ", s3File.getS3Key()); S3Client.abortS3Streaming(s3InputStream); return; } if (countingStream.getByteCount() >= VALIDATE_LINE_MAX_LINE_SIZE_BYTES) { - logger.info("Validation finished {} in format {}", s3File.getS3Key(), format); + logger.info("Validation finished {} ", s3File.getS3Key()); S3Client.abortS3Streaming(s3InputStream); throw new IllegalStateException("No newline found within 4MB decompressed limit."); } } if (line.length() > 0) { - ImportFilesQuickValidator.validateCSVLine(line.toString(), format, entityPerLine); + ImportFilesQuickValidator.validateCSVLine(line.toString(), entityPerLine); return; } @@ -100,28 +96,20 @@ private static void validateFirstCSVLine(S3DataFile s3File, Format format, Entit catch (AmazonServiceException e) { if (e.getErrorCode().equalsIgnoreCase("InvalidRange")) { // The file might be smaller than the requested range - ImportFilesQuickValidator.validateCSVLine(line.toString(), format, entityPerLine); + ImportFilesQuickValidator.validateCSVLine(line.toString(), entityPerLine); return; } throw e; } } - private static void validateCSVLine(String csvLine, Format format, EntityPerLine entityPerLine) throws ValidationException { + private static void validateCSVLine(String csvLine, EntityPerLine entityPerLine) throws ValidationException { if (csvLine != null && csvLine.endsWith("\r\n")) csvLine = csvLine.substring(0, csvLine.length() - 3); else if (csvLine != null && (csvLine.endsWith("\n") || csvLine.endsWith("\r"))) csvLine = csvLine.substring(0, csvLine.length() - 1); - if (!format.equals(Format.GEOJSON) && csvLine.endsWith(",")) - throw new ValidationException("Empty Column detected!" + RE_UPLOAD_HINT); - - switch (format) { - case CSV_GEOJSON -> validateCsvGeoJSON(csvLine, entityPerLine); - case CSV_JSON_WKB -> validateCsvJSON_WKB(csvLine); - case GEOJSON -> validateGeoJSON(csvLine, entityPerLine); - default -> throw new ValidationException("Format is not supported! " + format + RE_UPLOAD_HINT); - } + validateGeoJSON(csvLine, entityPerLine); } private static void validateGeoJSON(String geoJson, EntityPerLine entityPerLine) throws ValidationException { @@ -134,28 +122,6 @@ private static void validateGeoJSON(String geoJson, EntityPerLine entityPerLine) } } - private static void validateCsvGeoJSON(String csvLine, EntityPerLine entityPerLine) throws ValidationException { - //Try to deserialize JSON - csvLine = csvLine.substring(1, csvLine.length()).replaceAll("'\"", "\""); - validateGeoJSON(csvLine, entityPerLine); - } - - private static void validateCsvJSON_WKB(String csvLine) throws ValidationException { - try { - String json = csvLine.substring(1, csvLine.lastIndexOf(",") - 1).replaceAll("'\"", "\""); - String wkb = csvLine.substring(csvLine.lastIndexOf(",") + 1); - - byte[] aux = WKBReader.hexToBytes(wkb); - //Try to read WKB - new WKBReader().read(aux); - //Try to serialize JSON - DEFAULT_MAPPER.get().readTree(json); - } - catch (Exception e) { - transformException(e); - } - } - private static void transformException(Exception e) throws ValidationException { Throwable cause = e.getCause(); if (e instanceof ParseException || e instanceof IllegalArgumentException) diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/tools/ImportQueryBuilder.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/tools/ImportQueryBuilder.java index 386381df08..a886223c0e 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/tools/ImportQueryBuilder.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/impl/transport/tools/ImportQueryBuilder.java @@ -21,11 +21,16 @@ public class ImportQueryBuilder { public static void main(String[] args) {} + public ImportQueryBuilder(String stepId, String schema){ + this.schema = schema; + this.temporaryImportTable = getTemporaryJobTableName(stepId) + TRIGGER_TABLE_SUFFIX;; + } + public ImportQueryBuilder(String stepId, String schema, String rootTable, long versionsToKeep){ this.schema = schema; this.rootTable = rootTable; this.versionsToKeep = versionsToKeep; - this.temporaryImportTable = getTemporaryTriggerTableName(stepId); + this.temporaryImportTable = getTemporaryJobTableName(stepId) + TRIGGER_TABLE_SUFFIX;; } public SQLQuery buildCleanUpStatement(){ @@ -93,8 +98,8 @@ private SQLQuery buildTemporaryTriggerTableForImportQuery(){ .withVariable("table", temporaryImportTable); } - private String getTemporaryTriggerTableName(String stepId) { - return getTemporaryJobTableName(stepId) + TRIGGER_TABLE_SUFFIX; + public String getTemporaryTriggerTableName() { + return this.temporaryImportTable; } private SQLQuery buildCreateImportTriggerForEmptyLayers(String targetAuthor, long targetSpaceVersion, boolean retainMetadata){ diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/util/test/ContentCreator.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/util/test/ContentCreator.java index 4e6cb80b2d..1397704a55 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/util/test/ContentCreator.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/util/test/ContentCreator.java @@ -1,40 +1,31 @@ package com.here.xyz.jobs.util.test; import com.fasterxml.jackson.core.JsonProcessingException; -import com.here.xyz.XyzSerializable; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace; + import com.here.xyz.models.geojson.coordinates.PointCoordinates; import com.here.xyz.models.geojson.implementation.Feature; import com.here.xyz.models.geojson.implementation.FeatureCollection; import com.here.xyz.models.geojson.implementation.Point; import com.here.xyz.models.geojson.implementation.Properties; -import com.here.xyz.models.geojson.implementation.Geometry; -import org.locationtech.jts.io.ParseException; -import org.locationtech.jts.io.WKBReader; import java.util.Random; public class ContentCreator { /** Generate content */ - public static byte[] generateImportFileContent(ImportFilesToSpace.Format format, int featureCnt) { + public static byte[] generateImportFileContent(int featureCnt) { String output = ""; for (int i = 1; i <= featureCnt; i++) { - output += generateContentLine(format, i); + output += generateContentLine(i); } return output.getBytes(); } - public static String generateContentLine(ImportFilesToSpace.Format format, int i){ + public static String generateContentLine(int i){ Random rd = new Random(); String lineSeparator = "\n"; - if(format.equals(ImportFilesToSpace.Format.CSV_JSON_WKB)) - return "\"{'\"properties'\": {'\"test'\": "+i+"}}\",01010000A0E61000007DAD4B8DD0AF07C0BD19355F25B74A400000000000000000"+lineSeparator; - else if(format.equals(ImportFilesToSpace.Format.CSV_GEOJSON)) - return "\"{'\"type'\":'\"Feature'\",'\"geometry'\":{'\"type'\":'\"Point'\",'\"coordinates'\":["+(rd.nextInt(179))+"."+(rd.nextInt(100))+","+(rd.nextInt(79))+"."+(rd.nextInt(100))+"]},'\"properties'\":{'\"test'\":"+i+"}}\""+lineSeparator; - else - return "{\"type\":\"Feature\",\"geometry\":{\"type\":\"Point\",\"coordinates\":["+(rd.nextInt(179))+"."+(rd.nextInt(100))+","+(rd.nextInt(79))+"."+(rd.nextInt(100))+"]},\"properties\":{\"test\":"+i+"}}"+lineSeparator; + return "{\"type\":\"Feature\",\"geometry\":{\"type\":\"Point\",\"coordinates\":["+(rd.nextInt(179))+"."+(rd.nextInt(100))+","+(rd.nextInt(79))+"."+(rd.nextInt(100))+"]},\"properties\":{\"test\":"+i+"}}"+lineSeparator; } public static FeatureCollection generateRandomFeatureCollection(int featureCnt) { @@ -61,17 +52,4 @@ public static FeatureCollection generateRandomFeatureCollection(int featureCnt,f return fc; } - - - public static Feature getFeatureFromCSVLine(String csvLine) throws JsonProcessingException { - return XyzSerializable.deserialize( csvLine.substring(1, csvLine.lastIndexOf(",") -1 ).replaceAll("'\"","\""), Feature.class); - } - - public static Geometry getWKBFromCsvLine(String csvLine) throws ParseException { - String geomAsWKB = csvLine.substring(csvLine.lastIndexOf(",") + 1 ); - byte[] aux = WKBReader.hexToBytes(geomAsWKB); - /** Try to read WKB */ - org.locationtech.jts.geom.Geometry read = new WKBReader().read(aux); - return Geometry.convertJTSGeometry(read); - } } diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/util/test/StepTestBase.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/util/test/StepTestBase.java index be537f0de9..9e36c739a8 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/util/test/StepTestBase.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/util/test/StepTestBase.java @@ -43,8 +43,8 @@ import com.here.xyz.jobs.steps.impl.DropIndexes; import com.here.xyz.jobs.steps.impl.transport.CountSpace; import com.here.xyz.jobs.steps.impl.transport.ExportSpaceToFiles; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace; import com.here.xyz.jobs.steps.impl.transport.TaskedImportFilesToSpace; +import com.here.xyz.jobs.steps.impl.transport.tools.ImportQueryBuilder; import com.here.xyz.jobs.steps.outputs.DownloadUrl; import com.here.xyz.jobs.steps.outputs.Output; import com.here.xyz.jobs.util.S3Client; @@ -102,6 +102,8 @@ import software.amazon.awssdk.services.lambda.LambdaClient; import software.amazon.awssdk.services.lambda.model.InvokeRequest; +import static com.here.xyz.jobs.steps.impl.transport.TaskedImportFilesToSpace.Format; + public class StepTestBase { public static final Config config = new Config(); @@ -383,7 +385,7 @@ protected void deleteAllJobTables(List stepIds) throws SQLException { dropQueries.add( new SQLQuery("DROP TABLE IF EXISTS ${schema}.${table};") .withVariable("schema", SCHEMA) - .withVariable("table", ImportFilesToSpace.getTemporaryTriggerTableName(stepId)) + .withVariable("table", new ImportQueryBuilder(stepId, SCHEMA).getTemporaryTriggerTableName()) ); } SQLQuery.join(dropQueries, ";").write(getDataSourceProvider()); @@ -489,11 +491,11 @@ private void invokeLambda(String lambdaArn, byte[] payload) { } //TODO: find a central place to avoid double implementation from JobPlayground - public void uploadFiles(String jobId, int uploadFileCount, int featureCountPerFile, ImportFilesToSpace.Format format) + public void uploadFiles(String jobId, int uploadFileCount, int featureCountPerFile, Format format) throws IOException { //Generate N Files with M features for (int i = 0; i < uploadFileCount; i++) - uploadInputFile(jobId, ContentCreator.generateImportFileContent(format, featureCountPerFile), S3ContentType.APPLICATION_JSON); + uploadInputFile(jobId, ContentCreator.generateImportFileContent(featureCountPerFile), S3ContentType.APPLICATION_JSON); } public void uploadInputFile(String jobId, byte[] bytes, S3ContentType contentType) throws IOException { diff --git a/xyz-jobs/xyz-job-steps/src/main/resources/jobs/transport.sql b/xyz-jobs/xyz-job-steps/src/main/resources/jobs/transport.sql index f78189c937..3fa63374fe 100644 --- a/xyz-jobs/xyz-job-steps/src/main/resources/jobs/transport.sql +++ b/xyz-jobs/xyz-job-steps/src/main/resources/jobs/transport.sql @@ -801,17 +801,20 @@ RETURNS VOID LANGUAGE 'plpgsql' VOLATILE AS $BODY$ +DECLARE + lamda_response RECORD; BEGIN - PERFORM report_progress( - lambda_function_arn, - lambda_region, - step_payload, - json_build_object( - 'type','SpaceBasedTaskUpdate', - 'taskId', task_id, - 'taskOutput', task_output - ) - ); + --TODO: Add error handling + SELECT aws_lambda.invoke(aws_commons.create_lambda_function_arn(lambda_function_arn, lambda_region), + json_build_object( + 'type','UPDATE_CALLBACK', + 'step', step_payload, + 'processUpdate', json_build_object( + 'type','SpaceBasedTaskUpdate', + 'taskId', task_id, + 'taskOutput', task_output + ) + ), 'Event') INTO lamda_response; END $BODY$; @@ -1078,403 +1081,4 @@ BEGIN $wrappedinner$ $wrappedouter$; EXECUTE sql_text; END; -$BODY$; - --- #################################################################################################################### --- Delete Functions below when movement to new tasked import is done -- - -/** - * Get work_item. Used for synchronizing threads. - */ -CREATE OR REPLACE FUNCTION get_work_item(temporary_tbl REGCLASS) - RETURNS JSONB - LANGUAGE 'plpgsql' - VOLATILE -AS $BODY$ -DECLARE - work_items_left INT := 1; - success_marker TEXT := 'SUCCESS_MARKER'; - target_state TEXT := 'RUNNING'; - work_item RECORD; - result RECORD; - updated_rows INT; -BEGIN - -- Try to get a work item in SUBMITTED, RETRY, or FAILED state - EXECUTE format('SELECT state, s3_path, execution_count FROM %1$s ' - ||'WHERE state IN (''SUBMITTED'', ''RETRY'', ''FAILED'') ORDER BY random() LIMIT 1;', - temporary_tbl) INTO work_item; - - -- If a work item is found, try to update it - IF work_item.state IS NOT NULL THEN - EXECUTE format('UPDATE %1$s SET state = %2$L WHERE s3_path = %3$L AND state = %4$L RETURNING *', - temporary_tbl, target_state, work_item.s3_path, work_item.state) INTO result; - - GET DIAGNOSTICS updated_rows = ROW_COUNT; - - -- If updated, return the work item as JSONB - IF updated_rows = 1 THEN - RETURN jsonb_build_object( - 's3_bucket', result.s3_bucket, - 's3_path', result.s3_path, - 's3_region', result.s3_region, - 'filesize', result.data->'filesize', - 'state', result.state, - 'execution_count', result.execution_count, - 'data', result.data - ); - ELSE - RETURN jsonb_build_object('state', 'RETRY'); - END IF; - END IF; - - -- Check if there are any items left in the RUNNING or SUBMITTED state - EXECUTE format('SELECT count(1) FROM %1$s WHERE state IN (''RUNNING'',''SUBMITTED'');', - temporary_tbl) into work_items_left; - - IF work_items_left > 0 THEN - RETURN jsonb_build_object('state', 'LAST_ONES_RUNNING', 's3_path', success_marker); - END IF; - - -- Check if a SUCCESS_MARKER exists, and attempt to update it - EXECUTE format('SELECT s3_path, state FROM %1$s WHERE state = %2$L;', temporary_tbl, success_marker) INTO work_item; - - IF work_item.state IS NOT NULL THEN - EXECUTE format('UPDATE %1$s SET state = %2$L, execution_count = execution_count + 1 ' - ||'WHERE s3_path = %3$L AND state = %4$L RETURNING *', - temporary_tbl, work_item.state || '_' || target_state, work_item.s3_path, work_item.state) INTO result; - - GET DIAGNOSTICS updated_rows = ROW_COUNT; - - IF updated_rows = 1 THEN - RETURN jsonb_build_object( - 's3_bucket', result.s3_bucket, - 's3_path', result.s3_path, - 's3_region', result.s3_region, - 'filesize', result.data->'filesize', - 'state', result.state, - 'execution_count', result.execution_count, - 'data', result.data - ); - ELSE - RETURN jsonb_build_object('state', 'RETRY'); - END IF; - END IF; - - RETURN NULL; -END; -$BODY$; - -/** - *.. - */ -CREATE OR REPLACE FUNCTION perform_work_item(work_item JSONB, format TEXT, content_query TEXT) - RETURNS VOID - LANGUAGE 'plpgsql' - VOLATILE -AS $BODY$ -DECLARE - ctx JSONB; -BEGIN - SELECT context() into ctx; - IF content_query IS NULL OR content_query = '' THEN - PERFORM import_from_s3_perform(ctx->>'schema', - get_table_reference(ctx->>'schema', ctx->>'stepId' ,'JOB_TABLE'), - get_table_reference(ctx->>'schema', ctx->>'stepId', 'TRIGGER_TABLE'), - work_item ->> 's3_bucket', - work_item ->> 's3_path', - work_item ->> 's3_region', - format, - CASE WHEN (work_item -> 'filesize') = 'null'::jsonb THEN 0 ELSE (work_item -> 'filesize')::BIGINT END); - ELSE - PERFORM export_to_s3_perform(content_query, (work_item ->> 's3_bucket'), (work_item ->> 's3_path'), (work_item ->> 's3_region')); - END IF; -END; -$BODY$; - -/** - * Report Success - */ -CREATE OR REPLACE FUNCTION report_success(success_callback TEXT) - RETURNS void - LANGUAGE 'plpgsql' - VOLATILE PARALLEL SAFE -AS $BODY$ -DECLARE - ctx JSONB; - sql_text TEXT; -BEGIN - SELECT context() into ctx; - - sql_text = $wrappedouter$ DO - $wrappedinner$ - DECLARE - ctx JSONB := '$wrappedouter$||(ctx::TEXT)||$wrappedouter$'::JSONB; - job_results RECORD; - retry_count INT := 2; - BEGIN - EXECUTE format('SELECT ' - || ' COUNT(*) FILTER (WHERE state = %1$L) AS finished_count,' - || ' COUNT(*) FILTER (WHERE state = %2$L and execution_count=%3$L) AS failed_count,' - || ' COUNT(*) AS total_count ' - || 'FROM %4$s WHERE NOT starts_with(state,''SUCCESS_MARKER'');', - 'FINISHED', - 'FAILED', - retry_count, - get_table_reference(ctx->>'schema', ctx->>'stepId' ,'JOB_TABLE') - ) INTO job_results; - - IF (job_results.finished_count + job_results.failed_count) = job_results.total_count THEN - -- Will only be executed from last worker - IF job_results.total_count = job_results.failed_count THEN - -- All Job-Threads are failed - ELSEIF job_results.failed_count > 0 AND (job_results.total_count > job_results.failed_count) THEN - -- Job-Threads partially failed - ELSE - -- All done invoke lambda - $wrappedouter$ || success_callback || $wrappedouter$ - END IF; - ELSE - -- Job-Threads still in progress! - END IF; - END; - $wrappedinner$ $wrappedouter$; - EXECUTE sql_text; -END; -$BODY$; - -/** - *.. - */ -CREATE OR REPLACE PROCEDURE execute_transfer(format TEXT, success_callback TEXT, failure_callback TEXT, content_query TEXT = NULL) - LANGUAGE 'plpgsql' -AS -$BODY$ -DECLARE - ctx JSONB; - work_item JSONB; - sql_text TEXT; -BEGIN - SELECT context() into ctx; - SELECT * from get_work_item(get_table_reference(ctx->>'schema', ctx->>'stepId' ,'JOB_TABLE')) into work_item; - --PERFORM write_log(work_item::text,'execute_transfer'); - - COMMIT; - IF work_item -> 'state' IS NULL THEN - RETURN; - ELSE - PERFORM context(ctx); - IF work_item ->> 'state' = 'RETRY' THEN - -- Received a RETRY - PERFORM asyncify(format('CALL execute_transfer(%1$L, %2$L, %3$L, %4$L );', - format, - success_callback, - failure_callback, - content_query - ), false, true ); - RETURN; - ELSEIF work_item ->> 'state' = 'SUCCESS_MARKER_RUNNING' THEN - EXECUTE format('SELECT report_success(%1$L);', success_callback); - RETURN; - END IF; - END IF; - - sql_text = $wrappedouter$ DO - $wrappedinner$ - DECLARE - ctx JSONB := '$wrappedouter$||(ctx::TEXT)||$wrappedouter$'::JSONB; - work_item JSONB := '$wrappedouter$||(work_item::TEXT)||$wrappedouter$'::JSONB; - format TEXT := '$wrappedouter$||format||$wrappedouter$'::TEXT; - content_query TEXT := $x$$wrappedouter$||coalesce(content_query,'')||$wrappedouter$$x$::TEXT; - retry_count INT := 2; - BEGIN - BEGIN - PERFORM context(ctx); - IF (work_item -> 'execution_count')::INT >= retry_count THEN - --TODO: find a solution to read a given hint in the failure_callback. Remove than the duplication. - RAISE EXCEPTION 'Error on processing file ''%''. Maximum retries are reached %. Details: ''%''', - (work_item ->>'s3_path'), retry_count, (work_item -> 'data' -> 'error' ->> 'sqlstate') - --USING HINT = 'Details: ' || 'details' , - USING ERRCODE = 'XYZ50'; - END IF; - - IF work_item ->> 's3_bucket' != 'SUCCESS_MARKER' THEN - PERFORM perform_work_item(work_item, format, content_query); - END IF; - - EXCEPTION - WHEN OTHERS THEN - -- Transfer has failed - BEGIN - $wrappedouter$ || failure_callback || $wrappedouter$ - RETURN; - END; - END; - - PERFORM asyncify(format('CALL execute_transfer(%1$L, %2$L, %3$L, %4$L);', - format, - '$wrappedouter$||REPLACE(success_callback, '''', '''''')||$wrappedouter$'::TEXT, - '$wrappedouter$||REPLACE(failure_callback, '''', '''''')||$wrappedouter$'::TEXT, - content_query - ), false, true ); - END; - $wrappedinner$ $wrappedouter$; - EXECUTE sql_text; -END; -$BODY$; - -/** - * Report progress by invoking lambda function with UPDATE_CALLBACK payload - */ -CREATE OR REPLACE FUNCTION report_progress( - lambda_function_arn TEXT, - lambda_region TEXT, - step_payload JSON, - progress_data JSON -) -RETURNS VOID - LANGUAGE 'plpgsql' - VOLATILE -AS $BODY$ -DECLARE - lamda_response RECORD; -BEGIN - --TODO: Add error handling - SELECT aws_lambda.invoke(aws_commons.create_lambda_function_arn(lambda_function_arn, lambda_region), - json_build_object( - 'type','UPDATE_CALLBACK', - 'step', step_payload, - 'processUpdate', progress_data - ), 'Event') INTO lamda_response; -END -$BODY$; - -/** - * Perform single import from S3 - */ -CREATE OR REPLACE FUNCTION import_from_s3_perform(schem TEXT, temporary_tbl REGCLASS ,target_tbl REGCLASS, - s3_bucket TEXT, s3_path TEXT, s3_region TEXT, format TEXT, filesize BIGINT) - RETURNS void - LANGUAGE 'plpgsql' - VOLATILE -AS $BODY$ -DECLARE - import_statistics RECORD; - config RECORD; -BEGIN - select * from s3_plugin_config(format) into config; - - EXECUTE format( - 'SELECT/*lables({"type": "ImortFilesToSpace","bytes":%1$L})*/ aws_s3.table_import_from_s3( ' - ||' ''%2$s.%3$s'', ' - ||' %4$L, ' - ||' %5$L, ' - ||' aws_commons.create_s3_uri(%6$L,%7$L,%8$L)) ', - filesize, - schem, - target_tbl, - config.plugin_columns, - config.plugin_options, - s3_bucket, - s3_path, - s3_region - ) INTO import_statistics; - - -- Mark item as successfully imported. Store import_statistics. - EXECUTE format('UPDATE %1$s ' - ||'set state = %2$L, ' - ||'execution_count = execution_count + 1, ' - ||'data = data || %3$L ' - ||'WHERE s3_path = %4$L ', - temporary_tbl, - 'FINISHED', - json_build_object('import_statistics', import_statistics), - s3_path); -EXCEPTION - WHEN SQLSTATE '55P03' OR SQLSTATE '23505' OR SQLSTATE '22P02' OR SQLSTATE '22P04' THEN - /** Retryable errors: - 55P03 (lock_not_available) - 23505 (duplicate key value violates unique constraint) - 22P02 (invalid input syntax for type json) - 22P04 (extra data after last expected column) - 38000 Lambda not available - */ - EXECUTE format('UPDATE %1$s ' - ||'set state = %2$L, ' - ||'execution_count = execution_count +1, ' - ||'data = data || ''{"error" : {"sqlstate":"%3$s"}}'' ' - ||'WHERE s3_path = %4$L', - temporary_tbl, - 'FAILED', - SQLSTATE, - s3_path); -END; -$BODY$; - -/** - * Enriches Feature - uses in plain trigger function - */ -CREATE OR REPLACE FUNCTION import_from_s3_trigger_for_empty_layer() - RETURNS trigger -AS $BODY$ -DECLARE - author TEXT := TG_ARGV[0]; - curVersion BIGINT := TG_ARGV[1]; - target_table TEXT := TG_ARGV[2]; - retain_meta BOOLEAN := TG_ARGV[3]::BOOLEAN; - feature RECORD; - updated_rows INT; -BEGIN - SELECT new_jsondata, new_geo, new_operation, new_id - from import_from_s3_enrich_feature(NEW.jsondata::JSONB, NEW.geo, retain_meta) - INTO feature; - - EXECUTE format('INSERT INTO "%1$s"."%2$s" (id, version, operation, author, jsondata, geo) - values(%3$L, %4$L, %5$L, %6$L, %7$L, %8$L);', - TG_TABLE_SCHEMA, target_table, feature.new_id, curVersion, feature.new_operation, - author, feature.new_jsondata, xyz_reduce_precision(feature.new_geo, false)); - - NEW.jsondata = NULL; - NEW.geo = NULL; - NEW.count = 1; - RETURN NEW; -END; -$BODY$ - LANGUAGE plpgsql VOLATILE; - -CREATE OR REPLACE FUNCTION import_from_s3_trigger_for_empty_layer_geojsonfc() - RETURNS trigger -AS $BODY$ -DECLARE - author TEXT := TG_ARGV[0]; - curVersion BIGINT := TG_ARGV[1]; - target_table TEXT := TG_ARGV[2]; - retain_meta BOOLEAN := TG_ARGV[3]::BOOLEAN; - elem JSONB; - feature RECORD; - updated_rows INT; -BEGIN - - --TODO: Should we also allow "Features" - FOR elem IN SELECT * FROM jsonb_array_elements(((NEW.jsondata)::JSONB)->'features') - LOOP - IF NEW.geo IS NOT NULL THEN - RAISE EXCEPTION 'Combination of FeatureCollection and WKB is not allowed!' - USING ERRCODE = 'XYZ40'; - END IF; - - SELECT new_jsondata, new_geo, new_operation, new_id - from import_from_s3_enrich_feature(elem, null, retain_meta) - INTO feature; - - EXECUTE format('INSERT INTO "%1$s"."%2$s" (id, version, operation, author, jsondata, geo) - values(%3$L, %4$L, %5$L, %6$L, %7$L, %8$L )', - TG_TABLE_SCHEMA, target_table, feature.new_id, curVersion, feature.new_operation, author, feature.new_jsondata, xyz_reduce_precision(feature.new_geo, false)); - END LOOP; - - NEW.jsondata = NULL; - NEW.geo = NULL; - NEW.count = jsonb_array_length((NEW.jsondata)::JSONB->'features'); - RETURN NEW; -END; -$BODY$ - LANGUAGE plpgsql VOLATILE; +$BODY$; \ No newline at end of file diff --git a/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/ImportStepTest.java b/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/ImportStepTest.java deleted file mode 100644 index 45fc9029af..0000000000 --- a/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/ImportStepTest.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Copyright (C) 2017-2025 HERE Europe B.V. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * License-Filename: LICENSE - */ - -package com.here.xyz.jobs.steps.impl; - -import static com.here.xyz.events.UpdateStrategy.DEFAULT_UPDATE_STRATEGY; -import static com.here.xyz.jobs.steps.Step.InputSet.USER_INPUTS; - -import com.google.common.io.ByteStreams; -import com.here.xyz.jobs.steps.execution.LambdaBasedStep; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.EntityPerLine; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format; -import com.here.xyz.jobs.steps.outputs.FeatureStatistics; -import com.here.xyz.jobs.steps.outputs.Output; -import com.here.xyz.responses.StatisticsResponse; -import com.here.xyz.util.service.BaseHttpServerVerticle; -import java.io.IOException; -import java.util.List; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; - -public class ImportStepTest extends StepTest { - /** - Test Format`s - * CSV_JSON_WKB - * CSV_GEOSJON - * GEOJSON - - Test EntityPerLine`s - * Feature - * FeatureCollection - * - Test UpdateStrategy`s - * UpdateStrategy.OnExists - * UpdateStrategy.OnNotExists - * UpdateStrategy.OnMergeConflict - * UpdateStrategy.OnVersionConflict - - Test ThreadCount-Settings - * Check ThreadCount Calculation - - Test START_EXECUTION - * Check Result - * Check Temp-Table and its content - * Check Temp-Trigger-Table - * Check Trigger-Creation - * Check set of ReadOnlyMode - - Test SUCCESS_CALLBACK - * Check Statistics - * Check cleanup of Temp-Table - * Check cleanup Temp-Trigger-Table - * Check release of ReadOnlyMode - - Test FAILURE_CALLBACK - * Retryable: Check persistence of temp-tables - * NonRetryable: Check clean-up of tables - - Test STATE_CHECK - * Check Statistics - * Check Result (running queries) - - Test ExecutionModes - * ASYCN - * SYNC - - Test Import in Empty Layer (all Formats / all execution modes ) - * Check Trigger-Type - * Check result (Operation, author, etc) - * Check if expected Features are present - - Test Import in NonEmpty Layer (all Formats / all execution modes ) - * Check Trigger-Type - * Check result (Operation, author, etc) - * Check if expected Features are present - * Check if existing id got updated - - Test Cancel - - Test Resume - */ - - /** Import in Empty Layer + Entity: Feature */ - @Test - public void testSyncImport_with_many_files() throws Exception { - executeImportStepWithManyFiles(Format.GEOJSON, 10, 2 , false); - } - - @Test - @Disabled("Takes extra 6 minutes of execution time, disabled by default") - public void testSyncImport_with_more_than_default_pagination_size_files() throws Exception { - executeImportStepWithManyFiles(Format.GEOJSON, 1010, 2 , false); - } - - //@Test //temporary deactivation - public void testAsyncSyncImport_with_many_files() throws Exception { - executeImportStepWithManyFiles(Format.GEOJSON, 10, 2 , true); - } - - @Test - public void testImport_inEmpty_GEOJSON_Entity_Feature() throws Exception { - //Gets executed SYNC - executeImportStep(Format.GEOJSON, 0, EntityPerLine.Feature, false); - } - - @Test - public void testImport_inEmpty_CSV_JSON_WKB_Entity_Feature() throws Exception { - //CSV_JSON_WKB gets always executed ASYNC - executeImportStep(Format.CSV_JSON_WKB, 0, EntityPerLine.Feature, true); - } - - @Test - public void testImport_inEmpty_CSV_GEOJSON_Entity_Feature() throws Exception { - //CSV_GEOJSON gets always executed ASYNC - executeImportStep(Format.CSV_GEOJSON, 0, EntityPerLine.Feature, true); - } - - /** Import in NON-Empty Layer + Entity: Feature */ - @Test - public void testImport_inNonEmpty_GEOJSON_Entity_Feature() throws Exception { - int existingFeatureCount = 10; - //CSV_JSON_WKB gets always executed ASYNC - putRandomFeatureCollectionToSpace(SPACE_ID, existingFeatureCount); - executeImportStep(Format.GEOJSON, existingFeatureCount, EntityPerLine.Feature, true); - } - - @Test - public void testImport_inNonEmpty_CSV_JSON_WKB_Entity_Feature() throws Exception { - int existingFeatureCount = 10; - //Gets executed ASYNC - putRandomFeatureCollectionToSpace(SPACE_ID, existingFeatureCount); - executeImportStep(Format.CSV_JSON_WKB, existingFeatureCount, EntityPerLine.Feature, true); - } - - /** Import in NON-Empty Layer + Entity: FeatureCollection */ - @Test - public void testImport_inNonEmpty_GEOJSON_Entity_FeatureCollection() throws Exception { - int existingFeatureCount = 10; - putRandomFeatureCollectionToSpace(SPACE_ID, existingFeatureCount); - executeImportStep(Format.GEOJSON, existingFeatureCount, EntityPerLine.FeatureCollection, true); - } - - @Test - public void testImport_inNonEmpty_CSV_GEOJSON_Entity_FeatureCollection() throws Exception { - Assertions.assertThrows(BaseHttpServerVerticle.ValidationException.class, () -> new ImportFilesToSpace() - .withFormat(Format.CSV_JSON_WKB) - .withEntityPerLine(EntityPerLine.FeatureCollection) - .withSpaceId(SPACE_ID) - .validate()); - } - - protected void executeImportStep(Format format, int featureCountSource, - ImportFilesToSpace.EntityPerLine entityPerLine, boolean runAsync) throws IOException, InterruptedException { - StatisticsResponse statsBefore = getStatistics(SPACE_ID); - if(featureCountSource == 0) - Assertions.assertEquals(0L, statsBefore.getCount().getValue()); - else - Assertions.assertEquals(Long.valueOf(featureCountSource), statsBefore.getCount().getValue()); - - String fileExtension = switch(format) { - case GEOJSON -> ".geojson"; - case CSV_JSON_WKB -> ".csvwkb"; - case CSV_GEOJSON -> ".csvgeojson"; - }; - - if (entityPerLine == EntityPerLine.FeatureCollection) - fileExtension += "fc"; - - S3ContentType contentType = format == Format.GEOJSON ? S3ContentType.APPLICATION_JSON : S3ContentType.TEXT_CSV; - uploadInputFile(JOB_ID, ByteStreams.toByteArray(this.getClass().getResourceAsStream("/testFiles/file1" + fileExtension)), contentType); - uploadInputFile(JOB_ID, ByteStreams.toByteArray(this.getClass().getResourceAsStream("/testFiles/file2" + fileExtension)), contentType); - - LambdaBasedStep step = new ImportFilesToSpace() - .withJobId(JOB_ID) - .withFormat(format) - .withEntityPerLine(entityPerLine) - .withUpdateStrategy(DEFAULT_UPDATE_STRATEGY) - .withSpaceId(SPACE_ID) - .withInputSets(List.of(USER_INPUTS.get())); - - if(runAsync) - step.setUncompressedUploadBytesEstimation(1024 * 1024 * 1024); - - sendLambdaStepRequestBlock(step, true); - - StatisticsResponse statsAfter = getStatistics(SPACE_ID); - - //We have 2 files with 20 features each. - Assertions.assertEquals(Long.valueOf(40 + featureCountSource), statsAfter.getCount().getValue()); - - //Statistics are now broken on deprecated ImportFilesToSpace implementation - if(this instanceof TaskedImportStepTest) - checkStatistics(40, step.loadUserOutputs()); - } - - private void executeImportStepWithManyFiles(Format format, int fileCount, int featureCountPerFile, boolean runAsync) throws IOException, InterruptedException { - uploadFiles(JOB_ID, fileCount, featureCountPerFile, format); - LambdaBasedStep step = new ImportFilesToSpace() - .withJobId(JOB_ID) - .withFormat(format) - .withSpaceId(SPACE_ID) - .withInputSets(List.of(USER_INPUTS.get())); - - if(runAsync) - //Triggers async execution with max threads - step.setUncompressedUploadBytesEstimation(1024 * 1024 * 1024); - - sendLambdaStepRequestBlock(step ,true); - - StatisticsResponse statsAfter = getStatistics(SPACE_ID); - Assertions.assertEquals(Long.valueOf(fileCount * featureCountPerFile), statsAfter.getCount().getValue()); - checkStatistics(fileCount * featureCountPerFile, step.loadUserOutputs()); - } - - protected void checkStatistics(int expectedFeatureCount, List outputs) throws IOException { - for (Object output : outputs) { - if(output instanceof FeatureStatistics statistics) { - Assertions.assertEquals(expectedFeatureCount, statistics.getFeatureCount()); - } - } - } -} diff --git a/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/TaskedImportStepTest.java b/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/TaskedImportStepTest.java index 8ad3cbd7ca..ffcef95ad6 100644 --- a/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/TaskedImportStepTest.java +++ b/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/TaskedImportStepTest.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2025 HERE Europe B.V. + * Copyright (C) 2017-2026 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,15 +21,19 @@ import com.google.common.io.ByteStreams; import com.here.xyz.jobs.steps.execution.LambdaBasedStep; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.EntityPerLine; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format; import com.here.xyz.jobs.steps.impl.transport.TaskedImportFilesToSpace; +import com.here.xyz.jobs.steps.impl.transport.TaskedImportFilesToSpace.EntityPerLine; +import com.here.xyz.jobs.steps.impl.transport.TaskedImportFilesToSpace.Format; +import com.here.xyz.jobs.steps.outputs.FeatureStatistics; +import com.here.xyz.jobs.steps.outputs.Output; +import com.here.xyz.models.geojson.coordinates.PointCoordinates; +import com.here.xyz.models.geojson.implementation.Feature; import com.here.xyz.models.geojson.implementation.FeatureCollection; +import com.here.xyz.models.geojson.implementation.Point; import com.here.xyz.models.hub.Ref; import com.here.xyz.models.hub.Space; +import com.here.xyz.models.hub.Tag; import com.here.xyz.responses.StatisticsResponse; -import com.here.xyz.util.service.BaseHttpServerVerticle; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -41,25 +45,40 @@ import static com.here.xyz.jobs.steps.Step.InputSet.USER_INPUTS; -@Disabled -public class TaskedImportStepTest extends ImportStepTest { +public class TaskedImportStepTest extends StepTest { @BeforeEach public void setup() throws SQLException { cleanup(); - createSpace(new Space().withId(SPACE_ID).withVersionsToKeep(100).withStorage(new Space.ConnectorRef().withId("psql")), false); + createSpace(new Space().withId(SPACE_ID).withVersionsToKeep(1000).withStorage(new Space.ConnectorRef().withId("psql")), false); + } + + //@Test + public void write(){ + createTag(SPACE_ID, new Tag().withId("tag").withVersion(0)); + for (int i = 0; i < 999; i++) { + + putFeatureCollectionToSpace(SPACE_ID, + new FeatureCollection().withFeatures(List.of( + new Feature().withGeometry(new Point().withCoordinates(new PointCoordinates(10, 10))))) + ); + } + System.out.println("done"); } @Disabled + @Test public void testNewFormat() throws Exception { executeImportStep(TaskedImportFilesToSpace.Format.FAST_IMPORT_INTO_EMPTY,0, EntityPerLine.Feature); } @Test public void testSyncImport_with_many_files() throws Exception { - executeImportStepWithManyFiles(Format.GEOJSON, 10, 2 , false); + executeImportStepWithManyFiles(Format.GEOJSON, 100, 2 , false); } + //TODO: fix versionRef -1 issue + @Disabled @Test public void testEmptyImportWithEmptyUserInput() throws Exception { TaskedImportFilesToSpace step = new TaskedImportFilesToSpace() @@ -71,6 +90,8 @@ public void testEmptyImportWithEmptyUserInput() throws Exception { Assertions.assertFalse(step.validate()); } + //TODO: fix versionRef -1 issue + @Disabled @Test public void testEmptyImportWithoutUserInput() throws Exception { TaskedImportFilesToSpace step = new TaskedImportFilesToSpace() @@ -93,16 +114,6 @@ public void testImport_inEmpty_GEOJSON_Entity_Feature() throws Exception { executeImportStep(TaskedImportFilesToSpace.Format.GEOJSON, 0, EntityPerLine.Feature); } - @Test - public void testImport_inEmpty_CSV_JSON_WKB_Entity_Feature() throws Exception { - //NOT IMPLEMENTED - } - - @Test - public void testImport_inEmpty_CSV_GEOJSON_Entity_Feature() throws Exception { - //NOT IMPLEMENTED - } - /** Import in NON-Empty Layer + Entity: Feature */ @Test public void testImport_inNonEmpty_GEOJSON_Entity_Feature() throws Exception { @@ -112,23 +123,12 @@ public void testImport_inNonEmpty_GEOJSON_Entity_Feature() throws Exception { executeImportStep(TaskedImportFilesToSpace.Format.GEOJSON, existingFeatureCount, EntityPerLine.Feature); } - @Test - public void testImport_inNonEmpty_CSV_JSON_WKB_Entity_Feature() throws Exception { - //NOT IMPLEMENTED - } - - @Test + //TODO: fix statistics + @Disabled public void testImport_inNonEmpty_GEOJSON_Entity_FeatureCollection() throws Exception { - //NOT IMPLEMENTED - } - - @Test - public void testImport_inNonEmpty_CSV_GEOJSON_Entity_FeatureCollection() throws Exception { - Assertions.assertThrows(BaseHttpServerVerticle.ValidationException.class, () -> new ImportFilesToSpace() - .withFormat(Format.CSV_JSON_WKB) - .withEntityPerLine(EntityPerLine.FeatureCollection) - .withSpaceId(SPACE_ID) - .validate()); + int existingFeatureCount = 10; + putRandomFeatureCollectionToSpace(SPACE_ID, existingFeatureCount); + executeImportStep(Format.GEOJSON, existingFeatureCount, EntityPerLine.FeatureCollection); } private void executeImportStepWithManyFiles(Format format, int fileCount, int featureCountPerFile, boolean runAsync) throws IOException, InterruptedException { @@ -227,8 +227,8 @@ public void testImport_inNonEmpty_GEOJSON_with_deleted_features() throws Excepti /**** */ - protected void executeImportStep(TaskedImportFilesToSpace.Format format, int featureCountSource, - ImportFilesToSpace.EntityPerLine entityPerLine) throws IOException, InterruptedException { + protected void executeImportStep(Format format, int featureCountSource, + EntityPerLine entityPerLine) throws IOException, InterruptedException { StatisticsResponse statsBefore = getStatistics(SPACE_ID); if(featureCountSource == 0) @@ -244,7 +244,7 @@ protected void executeImportStep(TaskedImportFilesToSpace.Format format, int fea if (entityPerLine == EntityPerLine.FeatureCollection) fileExtension += "fc"; - S3ContentType contentType = format == TaskedImportFilesToSpace.Format.GEOJSON ? S3ContentType.APPLICATION_JSON : S3ContentType.TEXT_CSV; + S3ContentType contentType = format == Format.GEOJSON ? S3ContentType.APPLICATION_JSON : S3ContentType.TEXT_CSV; //* Only files with enriched features are allowed */ uploadInputFile(JOB_ID, ByteStreams.toByteArray(this.getClass().getResourceAsStream("/testFiles/file1" + fileExtension)), contentType); @@ -254,6 +254,7 @@ protected void executeImportStep(TaskedImportFilesToSpace.Format format, int fea .withJobId(JOB_ID) .withVersionRef(new Ref(Ref.HEAD)) .withFormat(format) + .withEntityPerLine(entityPerLine) .withSpaceId(SPACE_ID) .withInputSets(List.of(USER_INPUTS.get())); @@ -265,4 +266,12 @@ protected void executeImportStep(TaskedImportFilesToSpace.Format format, int fea Assertions.assertEquals(Long.valueOf(40 + featureCountSource), statsAfter.getCount().getValue()); checkStatistics(40, step.loadUserOutputs()); } + + protected void checkStatistics(int expectedFeatureCount, List outputs) throws IOException { + for (Object output : outputs) { + if(output instanceof FeatureStatistics statistics) { + Assertions.assertEquals(expectedFeatureCount, statistics.getFeatureCount()); + } + } + } } diff --git a/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/transport/QuickValidatorTest.java b/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/transport/QuickValidatorTest.java index 2ce4f8d794..90bf465450 100644 --- a/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/transport/QuickValidatorTest.java +++ b/xyz-jobs/xyz-job-steps/src/test/java/com/here/xyz/jobs/steps/impl/transport/QuickValidatorTest.java @@ -21,11 +21,9 @@ import static com.here.xyz.jobs.util.test.StepTestBase.S3ContentType.APPLICATION_JSON; import static com.here.xyz.jobs.util.test.StepTestBase.S3ContentType.TEXT_CSV; -import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.EntityPerLine.Feature; -import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.EntityPerLine.FeatureCollection; -import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format.CSV_GEOJSON; -import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format.CSV_JSON_WKB; -import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format.GEOJSON; +import static com.here.xyz.jobs.steps.impl.transport.TaskedImportFilesToSpace.EntityPerLine.Feature; +import static com.here.xyz.jobs.steps.impl.transport.TaskedImportFilesToSpace.EntityPerLine.FeatureCollection; +import static com.here.xyz.jobs.steps.impl.transport.TaskedImportFilesToSpace.Format.GEOJSON; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -33,8 +31,8 @@ import com.here.xyz.jobs.steps.Config; import com.here.xyz.jobs.steps.impl.transport.tools.ImportFilesQuickValidator; import com.here.xyz.jobs.util.test.StepTestBase; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.EntityPerLine; -import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format; +import com.here.xyz.jobs.steps.impl.transport.TaskedImportFilesToSpace.EntityPerLine; +import com.here.xyz.jobs.steps.impl.transport.TaskedImportFilesToSpace.Format; import com.here.xyz.jobs.steps.inputs.UploadUrl; import com.here.xyz.util.service.BaseHttpServerVerticle.ValidationException; import java.io.IOException; @@ -110,25 +108,21 @@ private void uploadAndValidateValidFiles(boolean gzip) throws IOException, Valid ); /** Should not fail - above are all valid */ - validate(generateTestS3Key("test_valid_1_jsonwkb.csv"), CSV_JSON_WKB, gzip, Feature); - validate(generateTestS3Key("test_valid_1_geojson.csv"), CSV_GEOJSON, gzip, Feature); - validate(generateTestS3Key("test_valid_1_geojson.geojson"), GEOJSON, gzip, Feature); - validate(generateTestS3Key("test_valid_1_geojsonfc.geojson"), GEOJSON, gzip, EntityPerLine.FeatureCollection); - validate(generateTestS3Key("test_valid_2_jsonwkb.csv"), CSV_JSON_WKB, gzip, Feature); - validate(generateTestS3Key("test_valid_2_geojson.csv"), CSV_GEOJSON, gzip, Feature); - validate(generateTestS3Key("test_valid_2_geojson.geojson"), GEOJSON, gzip, Feature); - validate(generateTestS3Key("test_valid_2_geojsonfc.geojson"), GEOJSON, gzip, EntityPerLine.FeatureCollection); + validate(generateTestS3Key("test_valid_1_geojson.geojson"), gzip, Feature); + validate(generateTestS3Key("test_valid_1_geojsonfc.geojson"), gzip, EntityPerLine.FeatureCollection); + validate(generateTestS3Key("test_valid_2_geojson.geojson"), gzip, Feature); + validate(generateTestS3Key("test_valid_2_geojsonfc.geojson"), gzip, EntityPerLine.FeatureCollection); } private String generateTestS3Key(String name) { return TEST_PREFIX + name; } - private void validate(String s3Key, Format format, boolean isCompressed, EntityPerLine entityPerLine) throws ValidationException { + private void validate(String s3Key, boolean isCompressed, EntityPerLine entityPerLine) throws ValidationException { ImportFilesQuickValidator.validate(new UploadUrl() .withS3Bucket(Config.instance.JOBS_S3_BUCKET) .withS3Key(s3Key) - .withCompressed(isCompressed), format, entityPerLine); + .withCompressed(isCompressed), entityPerLine); } @Test @@ -136,30 +130,6 @@ public void testQuickValidationGZipped() throws ValidationException, IOException uploadAndValidateValidFiles(true); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testInvalidJsonWkb(boolean gzip) throws IOException { - testInvalidJson( - "\"{'\"properties'\": {invalid}}\",01010000A0E61000007DAD4B8DD0AF07C0BD19355F25B74A400000000000000000", - TEXT_CSV, - CSV_JSON_WKB, - Feature, - gzip - ); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testInvalidJsonCsv(boolean gzip) throws IOException { - testInvalidJson( - "\"{'\"type'\":'\"Feature'\" invalid }}\"", - TEXT_CSV, - CSV_GEOJSON, - Feature, - gzip - ); - } - @ParameterizedTest @ValueSource(booleans = {true, false}) public void testInvalidGeoJsonFeature(boolean gzip) throws IOException { @@ -196,44 +166,6 @@ public void testValidGeoJsonFeatureCollectionWithoutFeatures(boolean gzip) throw ); } - @Test - public void testInvalidWKB() throws IOException { - uploadAndValidateFilesWithInvalidWKB(false); - } - - private void uploadAndValidateFilesWithInvalidWKB(boolean gzip) throws IOException { - /** Invalid WKB */ - uploadFileToS3(generateTestS3Key("test_invalid_1_jsonwkb.csv"), - TEXT_CSV, - "\"{'\"properties'\": {'\"test'\": 1}}\",01010000A0E61000007DAD4B8DD0AF07C0BD19355F25B74A40000000000000000".getBytes(), - gzip); - - uploadFileToS3(generateTestS3Key("test_invalid_2_jsonwkb.csv"), - TEXT_CSV, - "\"{'\"properties'\": {'\"test'\": 1}}\",invalid".getBytes(), - gzip); - try { - validate(generateTestS3Key("test_invalid_1_jsonwkb.csv"), CSV_JSON_WKB, gzip, Feature); - fail("Exception expected"); - } - catch (ValidationException e) { - checkValidationException(e, "Bad WKB encoding! "); - } - - try { - validate(generateTestS3Key("test_invalid_2_jsonwkb.csv"), CSV_JSON_WKB, gzip, Feature); - fail("Exception expected"); - } - catch (ValidationException e) { - checkValidationException(e, "Bad WKB encoding! "); - } - } - - @Test - public void testInvalidWKBGzipped() throws IOException { - uploadAndValidateFilesWithInvalidWKB(true); - } - @Test public void testValidateFilesWithEmptyColumn() throws IOException { uploadAndValidateFilesWithEmptyColumn(false); @@ -252,22 +184,6 @@ private void uploadAndValidateFilesWithEmptyColumn(boolean gzip) throws IOExcept "\"{'\"type'\":'\"Feature'\",'\"geometry'\":{'\"type'\":'\"Point'\",'\"coordinates'\":[8,50]},'\"properties'\":{'\"test'\":1}}\",".getBytes(), gzip ); - - try { - validate(generateTestS3Key("test_invalid_3_jsonwkb.csv"), CSV_JSON_WKB, gzip, Feature); - fail("Exception expected"); - } - catch (ValidationException e) { - checkValidationException(e, "Empty Column detected!"); - } - - try { - validate(generateTestS3Key("test_invalid_3_geojson.csv"), CSV_GEOJSON, gzip, Feature); - fail("Exception expected"); - } - catch (ValidationException e) { - checkValidationException(e, "Empty Column detected!"); - } } @Test @@ -293,7 +209,7 @@ private void testValidJson(String fileContent, S3ContentType contentType, Format fileContent.getBytes(), gzip ); - validate(generateTestS3Key("someFile"), format, gzip, entityPerLine); + validate(generateTestS3Key("someFile"), gzip, entityPerLine); } private static void checkValidationException(ValidationException e, String message) {