Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.equalTo;

import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace;
import io.restassured.response.ValidatableResponse;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.After;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import com.here.xyz.jobs.steps.impl.maintenance.MarkForMaintenance;
import com.here.xyz.jobs.steps.impl.transport.CopySpace;
import com.here.xyz.jobs.steps.impl.transport.ExportSpaceToFiles;
import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace;
import com.here.xyz.jobs.steps.impl.transport.TaskedImportFilesToSpace;
import com.here.xyz.jobs.steps.inputs.Input;
import com.here.xyz.jobs.steps.outputs.Output;
import com.here.xyz.models.geojson.coordinates.PointCoordinates;
Expand Down Expand Up @@ -117,7 +117,6 @@ public class JobPlayground {
private static Space targetSpace;
private static boolean simulateExecution = true;
private static boolean executeWholeJob = false;
private static ImportFilesToSpace.Format importFormat = ImportFilesToSpace.Format.GEOJSON;
private static int uploadFileCount = 2;
private static String jobServiceBaseUrl = "http://localhost:7070";

Expand Down Expand Up @@ -235,7 +234,7 @@ private static void startLambdaExecutions() throws IOException, WebClientExcepti

runDropIndexStep(sampleSpace.getId());

runImportFilesToSpaceStep(sampleSpace.getId(), importFormat);
runImportFilesToSpaceStep(sampleSpace.getId());

for (SystemIndex index : SystemIndex.values())
runCreateIndexStep(sampleSpace.getId(), index);
Expand All @@ -253,15 +252,15 @@ else if (playgroundUsecase == EXPORT)
private static void uploadFiles() throws IOException {
//Generate N Files with M features
for (int i = 0; i < uploadFileCount; i++)
uploadInputFile(generateContent(importFormat, 10));
uploadInputFile(generateContent( 10));
}

private static void uploadFilesToRealJob(String jobId) throws IOException, InterruptedException {
//Generate N Files with M features
for (int i = 0; i < uploadFileCount; i++) {
HttpResponse<byte[]> inputResponse = post("/jobs/" + jobId + "/inputs", Map.of("type", "UploadUrl"));
String uploadUrl = (String) XyzSerializable.deserialize(inputResponse.body(), Map.class).get("url");
uploadInputFile(generateContent(importFormat, 10), new URL(uploadUrl));
uploadInputFile(generateContent( 10), new URL(uploadUrl));
}

}
Expand Down Expand Up @@ -291,16 +290,16 @@ private static void pollRealJobStatus(String jobId) throws InterruptedException
}
}

private static byte[] generateContent(ImportFilesToSpace.Format format, int featureCnt) {
private static byte[] generateContent(int featureCnt) {
String output = "";

for (int i = 1; i <= featureCnt; i++) {
output += generateContentLine(format, i);
output += generateContentLine(i);
}
return output.getBytes();
}

private static void generateContentToFile(ImportFilesToSpace.Format format, int featureCnt, boolean beZipped) throws IOException {
private static void generateContentToFile(int featureCnt, boolean beZipped) throws IOException {
String outputFile = "/tmp/output.file" + (beZipped ? ".gz" : "");

BufferedWriter writer = null;
Expand All @@ -315,24 +314,18 @@ private static void generateContentToFile(ImportFilesToSpace.Format format, int
new OutputStreamWriter(zip, "UTF-8"));
}
for (int i = 1; i <= featureCnt; i++) {
writer.write(generateContentLine(format, i));
writer.write(generateContentLine(i));
}
}finally {
if (writer != null)
writer.close();
}
}

private static String generateContentLine(ImportFilesToSpace.Format format, int i){
private static String generateContentLine(int i){
Random rd = new Random();
String lineSeparator = "\n";

if(format.equals(ImportFilesToSpace.Format.CSV_JSON_WKB))
return "\"{'\"properties'\": {'\"test'\": "+i+"}}\",01010000A0E61000007DAD4B8DD0AF07C0BD19355F25B74A400000000000000000"+lineSeparator;
else if(format.equals(ImportFilesToSpace.Format.CSV_GEOJSON))
return "\"{'\"type'\":'\"Feature'\",'\"geometry'\":{'\"type'\":'\"Point'\",'\"coordinates'\":["+(rd.nextInt(179))+"."+(rd.nextInt(100))+","+(rd.nextInt(79))+"."+(rd.nextInt(100))+"]},'\"properties'\":{'\"test'\":"+i+"}}\""+lineSeparator;
else
return "{\"type\":\"Feature\",\"geometry\":{\"type\":\"Point\",\"coordinates\":["+(rd.nextInt(179))+"."+(rd.nextInt(100))+","+(rd.nextInt(79))+"."+(rd.nextInt(100))+"]},\"properties\":{\"te\\\"st\":"+i+"}}"+lineSeparator;
return "{\"type\":\"Feature\",\"geometry\":{\"type\":\"Point\",\"coordinates\":["+(rd.nextInt(179))+"."+(rd.nextInt(100))+","+(rd.nextInt(79))+"."+(rd.nextInt(100))+"]},\"properties\":{\"te\\\"st\":"+i+"}}"+lineSeparator;
}

private static void startMockJob() {
Expand Down Expand Up @@ -479,8 +472,8 @@ public static void runDropIndexStep(String spaceId) throws IOException {
runStep(new DropIndexes().withSpaceId(spaceId));
}

public static void runImportFilesToSpaceStep(String spaceId, ImportFilesToSpace.Format format) throws IOException {
runStep(new ImportFilesToSpace().withSpaceId(spaceId).withFormat(format).withUpdateStrategy(UpdateStrategy.DEFAULT_UPDATE_STRATEGY));
public static void runImportFilesToSpaceStep(String spaceId) throws IOException {
runStep(new TaskedImportFilesToSpace().withSpaceId(spaceId).withUpdateStrategy(UpdateStrategy.DEFAULT_UPDATE_STRATEGY));
}

public static void runCreateIndexStep(String spaceId, SystemIndex index) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2017-2025 HERE Europe B.V.
* Copyright (C) 2017-2026 HERE Europe B.V.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,14 +19,6 @@

package com.here.xyz.jobs.steps.compiler;

import static com.here.xyz.jobs.steps.Step.InputSet.USER_INPUTS;
import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format.CSV_GEOJSON;
import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format.CSV_JSON_WKB;
import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format.GEOJSON;
import static com.here.xyz.util.db.pg.IndexHelper.SystemIndex.NEXT_VERSION;
import static com.here.xyz.util.db.pg.IndexHelper.SystemIndex.OPERATION;
import static com.here.xyz.util.db.pg.IndexHelper.SystemIndex.VERSION_ID;

import com.google.common.collect.Lists;
import com.here.xyz.jobs.Job;
import com.here.xyz.jobs.datasets.DatasetDescription.Space;
Expand All @@ -42,35 +34,28 @@
import com.here.xyz.jobs.steps.impl.AnalyzeSpaceTable;
import com.here.xyz.jobs.steps.impl.CreateIndex;
import com.here.xyz.jobs.steps.impl.DropIndexes;
import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace;
import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.EntityPerLine;
import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format;
import com.here.xyz.jobs.steps.impl.transport.TaskedImportFilesToSpace;
import com.here.xyz.models.hub.Ref;
import com.here.xyz.util.db.pg.IndexHelper.Index;
import com.here.xyz.util.db.pg.IndexHelper.SystemIndex;
import com.here.xyz.util.web.HubWebClient;
import com.here.xyz.util.web.XyzWebClient.ErrorResponseException;
import com.here.xyz.util.web.XyzWebClient.WebClientException;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ImportFromFiles implements JobCompilationInterceptor {
public boolean useNewTaskedImportStep = true;
import static com.here.xyz.jobs.steps.Step.InputSet.USER_INPUTS;
import static com.here.xyz.util.db.pg.IndexHelper.SystemIndex.NEXT_VERSION;
import static com.here.xyz.util.db.pg.IndexHelper.SystemIndex.OPERATION;
import static com.here.xyz.util.db.pg.IndexHelper.SystemIndex.VERSION_ID;

public class ImportFromFiles implements JobCompilationInterceptor {
public static Set<Class<? extends Space>> allowedTargetTypes = new HashSet<>(Set.of(Space.class));

public void setUseNewTaskedImportStep(boolean useNewTaskedImportStep) {
this.useNewTaskedImportStep = useNewTaskedImportStep;
}
public ImportFromFiles withUseNewTaskedImportStep(boolean useNewTaskedImportStep) {
setUseNewTaskedImportStep(useNewTaskedImportStep);
return this;
}

@Override
public boolean chooseMe(Job job) {
return job.getProcess() == null && job.getSource() instanceof Files files && isSupportedFormat(files)
Expand All @@ -87,52 +72,31 @@ public CompilationStepGraph compile(Job job) {
String spaceId = target.getId();

final FileFormat sourceFormat = ((Files) job.getSource()).getInputSettings().getFormat();
Format importStepFormat;
if (sourceFormat instanceof GeoJson)
importStepFormat = GEOJSON;
else if (sourceFormat instanceof Csv csvFormat)
importStepFormat = csvFormat.isGeometryAsExtraWkbColumn() ? CSV_JSON_WKB : CSV_GEOJSON;
else
throw new CompilationError("Unsupported import file format: " + sourceFormat.getClass().getSimpleName());

EntityPerLine entityPerLine = getEntityPerLine(sourceFormat);
if (!(sourceFormat instanceof GeoJson))
throw new CompilationError("Unsupported import file format: " + sourceFormat.getClass().getSimpleName());

//This validation check is necessary to deliver a constructive error to the user - otherwise keepIndices will throw a runtime error.
checkIfSpaceIsAccessible(spaceId);

if(useNewTaskedImportStep) {
if(!entityPerLine.equals(EntityPerLine.Feature))
throw new CompilationError("TaskedImportStep - Unsupported entityPerLine configuration: " + entityPerLine.name());
if(!(sourceFormat instanceof GeoJson))
throw new CompilationError("TaskedImportStep - Unsupported format configuration: " + sourceFormat.getClass().getSimpleName());

TaskedImportFilesToSpace importFilesStep = new TaskedImportFilesToSpace() //Perform import
.withSpaceId(spaceId)
.withVersionRef(new Ref(Ref.HEAD))
.withJobId(job.getId())
.withInputSets(List.of(USER_INPUTS.get()));
if (importFilesStep.keepIndices())
//Perform only the import Step
return (CompilationStepGraph) new CompilationStepGraph()
.addExecution(importFilesStep);
TaskedImportFilesToSpace importFilesStep = new TaskedImportFilesToSpace() //Perform import
.withEntityPerLine(getEntityPerLine(sourceFormat))
.withSpaceId(spaceId)
.withVersionRef(new Ref(Ref.HEAD))
.withJobId(job.getId())
.withInputSets(List.of(USER_INPUTS.get()));

//perform full Import with all 11 Steps (IDX deletion/creation..)
return compileTaskedImportSteps(importFilesStep);
}else{
ImportFilesToSpace importFilesStep = new ImportFilesToSpace() //Perform import
.withSpaceId(spaceId)
.withFormat(importStepFormat)
.withEntityPerLine(entityPerLine)
.withJobId(job.getId())
.withInputSets(List.of(USER_INPUTS.get()));
if (importFilesStep.keepIndices())
try {
if (importFilesStep.useFeatureWriter())
//Perform only the import Step
return (CompilationStepGraph) new CompilationStepGraph()
.addExecution(importFilesStep);

//perform full Import with all 11 Steps (IDX deletion/creation..)
return compileImportSteps(importFilesStep);
} catch (WebClientException e) {
throw new CompilationError("Error retrieving statistics for target resource during compilation!", e);
}

//perform full Import with all 11 Steps (IDX deletion/creation..)
return compileTaskedImportSteps(importFilesStep);
}

public static CompilationStepGraph compileWrapWithDropRecreateIndices(String spaceId, StepExecution stepExecution) {
Expand Down Expand Up @@ -179,20 +143,10 @@ public static CompilationStepGraph compileTaskedImportSteps(TaskedImportFilesToS
}
}

public static CompilationStepGraph compileImportSteps(ImportFilesToSpace importFilesStep) {
try {
//Keep these indices if FeatureWriter is used
List<Index> whiteListIndex = importFilesStep.useFeatureWriter() ? List.of(VERSION_ID, NEXT_VERSION, OPERATION) : null;
return compileWrapWithDropRecreateIndices(importFilesStep.getSpaceId(), importFilesStep, whiteListIndex);
} catch (WebClientException e) {
throw new CompilationError("Unexpected error occurred during compilation", e);
}
}

private EntityPerLine getEntityPerLine(FileFormat format) {
return EntityPerLine.valueOf((format instanceof GeoJson geoJson
? geoJson.getEntityPerLine()
: ((Csv) format).getEntityPerLine()).toString());
private TaskedImportFilesToSpace.EntityPerLine getEntityPerLine(FileFormat format) {
return TaskedImportFilesToSpace.EntityPerLine.valueOf((format instanceof GeoJson geoJson
? geoJson.getEntityPerLine()
: ((Csv) format).getEntityPerLine()).toString());
}

private static List<StepExecution> toSequentialSteps(String spaceId, List<SystemIndex> indices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.here.xyz.jobs.datasets.files.GeoJson;
import com.here.xyz.jobs.steps.JobCompiler;
import com.here.xyz.jobs.steps.compiler.ImportFromFiles;
import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace;
import com.here.xyz.jobs.util.test.ContentCreator;
import com.here.xyz.models.hub.Space;
import org.junit.jupiter.api.Assertions;
Expand All @@ -46,12 +45,13 @@
public class ImportJobTestIT extends JobTest {
@BeforeEach
public void setUp() {
createSpace(new Space().withId(SPACE_ID).withSearchableProperties(Map.of(
createSpace(new Space().withId(SPACE_ID).withVersionsToKeep(10000).withSearchableProperties(Map.of(
"foo1", true,
"foo2.nested", true,
"foo3.nested.array::array", true
)
), false);
putRandomFeatureCollectionToSpace(SPACE_ID, 10);
}

@Test
Expand All @@ -60,7 +60,7 @@ public void testSimpleImport() throws Exception {
new FileInputSettings()
.withFormat(new GeoJson().withEntityPerLine(Feature)))
);
createAndStartJob(importJob, ContentCreator.generateImportFileContent(ImportFilesToSpace.Format.GEOJSON, 50));
createAndStartJob(importJob, ContentCreator.generateImportFileContent(50));
}

@Test
Expand All @@ -71,7 +71,6 @@ public void testInvalidEntity() throws Exception {
);
Assertions.assertThrows(JobCompiler.CompilationError.class, () ->
new ImportFromFiles()
.withUseNewTaskedImportStep(true)
.compile(importJob));
}

Expand All @@ -82,7 +81,6 @@ public void testInvalidFormat() throws Exception {
.withFormat(new Csv().withEntityPerLine(Feature))));
Assertions.assertThrows(JobCompiler.CompilationError.class, () ->
new ImportFromFiles()
.withUseNewTaskedImportStep(true)
.compile(importJob));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import static com.here.xyz.jobs.steps.Step.Visibility.SYSTEM;
import static com.here.xyz.jobs.steps.impl.transport.ExportSpaceToFiles.EXPORTED_DATA;
import static com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace.Format.GEOJSON;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -34,7 +33,6 @@
import com.here.xyz.jobs.steps.StepExecution;
import com.here.xyz.jobs.steps.StepGraph;
import com.here.xyz.jobs.steps.impl.transport.ExportSpaceToFiles;
import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
Expand All @@ -43,6 +41,8 @@
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import com.here.xyz.jobs.steps.impl.transport.TaskedImportFilesToSpace;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -119,7 +119,7 @@ public void testReuseSequentialGraphWithInputIds() {
StepGraph graph1 = new CompilationStepGraph()
.withExecutions(List.of(
exportStep1,
stepGenerator(ImportFilesToSpace.class, JOB_ID1, SOURCE_ID2, Set.of(exportStep1.getId()))
stepGenerator(TaskedImportFilesToSpace.class, JOB_ID1, SOURCE_ID2, Set.of(exportStep1.getId()))
))
.withParallel(false);
((CompilationStepGraph) graph1).enrich(JOB_ID1);
Expand All @@ -129,7 +129,7 @@ public void testReuseSequentialGraphWithInputIds() {
StepGraph graph2 = new CompilationStepGraph()
.withExecutions(List.of(
exportStep2,
stepGenerator(ImportFilesToSpace.class, JOB_ID2, SOURCE_ID2, Set.of(exportStep2.getId())) //not reusable
stepGenerator(TaskedImportFilesToSpace.class, JOB_ID2, SOURCE_ID2, Set.of(exportStep2.getId())) //not reusable
))
.withParallel(false);
((CompilationStepGraph) graph2).enrich(JOB_ID1);
Expand All @@ -147,7 +147,7 @@ public void testReuseSequentialGraphWithInputIds() {
Step execution2 = (Step) graph.getExecutions().get(1);

assertInstanceOf(DelegateStep.class, execution1);
assertInstanceOf(ImportFilesToSpace.class, execution2);
assertInstanceOf(TaskedImportFilesToSpace.class, execution2);

//Check if previousStepIds are set correct
assertEquals(Set.of(execution1.getId()), execution2.getPreviousStepIds());
Expand Down Expand Up @@ -417,9 +417,8 @@ public StepExecution stepGenerator(Class stepType, String jobId, String sourceId
.withJobId(jobId)
.withOutputSetVisibility(EXPORTED_DATA, SYSTEM);

case "ImportFilesToSpace" -> {
ImportFilesToSpace importFilesToSpace = new ImportFilesToSpace()
.withFormat(GEOJSON)
case "TaskedImportFilesToSpace" -> {
TaskedImportFilesToSpace importFilesToSpace = new TaskedImportFilesToSpace()
.withSpaceId(sourceId)
.withJobId(jobId);
if (inputStepIds != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import com.here.xyz.jobs.steps.impl.transport.ExportChangedTiles;
import com.here.xyz.jobs.steps.impl.transport.ExportSpaceToFiles;
import com.here.xyz.jobs.steps.impl.transport.GetNextSpaceVersion;
import com.here.xyz.jobs.steps.impl.transport.ImportFilesToSpace;
import com.here.xyz.jobs.steps.impl.transport.TaskedImportFilesToSpace;
import com.here.xyz.models.hub.Branch;
import com.here.xyz.models.hub.Connector;
Expand All @@ -67,7 +66,6 @@
@JsonSubTypes.Type(value = CreateIndex.class),
@JsonSubTypes.Type(value = ExportSpaceToFiles.class),
@JsonSubTypes.Type(value = ExportChangedTiles.class),
@JsonSubTypes.Type(value = ImportFilesToSpace.class),
@JsonSubTypes.Type(value = TaskedImportFilesToSpace.class),
@JsonSubTypes.Type(value = DropIndexes.class),
@JsonSubTypes.Type(value = AnalyzeSpaceTable.class),
Expand Down
Loading
Loading