Skip to content

Commit fcba74b

Browse files
(loader): enhanced and checked logics
1 parent d7955af commit fcba74b

File tree

3 files changed

+32
-32
lines changed

3 files changed

+32
-32
lines changed

hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ public boolean load() {
226226
throw e;
227227
}
228228

229-
return this.context.noError();
229+
return true;
230230
}
231231

232232
public void shutdown() {
@@ -278,7 +278,9 @@ private void createSchema() {
278278
.ifNotExist()
279279
.dataType(config.getIdFieldType())
280280
.build();
281-
client.schema().addPropertyKey(propertyKey);
281+
if (client.schema().getPropertyKey(config.getIdFieldName()) == null) {
282+
client.schema().addPropertyKey(propertyKey);
283+
}
282284
}
283285
groovyExecutor.execute(script, client);
284286
List<VertexLabel> vertexLabels = client.schema().getVertexLabels();
@@ -646,7 +648,7 @@ private List<InputTaskItem> prepareTaskItems(List<InputStruct> structs,
646648
try {
647649
r.close();
648650
} catch (Exception ex) {
649-
LOG.warn("Failed to close reader: {}", ex.getMessage());
651+
LOG.warn("Failed to close reader", ex);
650652
}
651653
}
652654
}
@@ -678,29 +680,27 @@ private void loadStructs(List<InputStruct> structs) {
678680
this.context.options().endFile,
679681
scatter ? "scatter" : "sequential");
680682

681-
ExecutorService loadService = ExecutorUtil.newFixedThreadPool(parallelCount,
682-
"loader");
683-
684-
List<InputTaskItem> taskItems = prepareTaskItems(structs, scatter);
685-
List<CompletableFuture<Void>> loadTasks = new ArrayList<>();
683+
ExecutorService loadService = null;
684+
try {
685+
loadService = ExecutorUtil.newFixedThreadPool(parallelCount, "loader");
686+
List<InputTaskItem> taskItems = prepareTaskItems(structs, scatter);
687+
List<CompletableFuture<Void>> loadTasks = new ArrayList<>();
686688

687-
if (taskItems.isEmpty()) {
688-
LOG.info("No tasks to execute after filtering");
689-
return;
690-
}
689+
if (taskItems.isEmpty()) {
690+
LOG.info("No tasks to execute after filtering");
691+
return;
692+
}
691693

692-
for (InputTaskItem item : taskItems) {
693-
// Init reader
694-
item.reader.init(this.context, item.struct);
695-
// Load data from current input mapping
696-
loadTasks.add(
697-
this.asyncLoadStruct(item.struct, item.reader,
698-
loadService));
699-
}
694+
for (InputTaskItem item : taskItems) {
695+
// Init reader
696+
item.reader.init(this.context, item.struct);
697+
// Load data from current input mapping
698+
loadTasks.add(
699+
this.asyncLoadStruct(item.struct, item.reader,
700+
loadService));
701+
}
700702

701-
LOG.info("waiting for loading finish {}", loadTasks.size());
702-
// wait for finish
703-
try {
703+
LOG.info("waiting for loading finish {}", loadTasks.size());
704704
CompletableFuture.allOf(loadTasks.toArray(new CompletableFuture[0]))
705705
.join();
706706
} catch (CompletionException e) {
@@ -723,8 +723,10 @@ private void loadStructs(List<InputStruct> structs) {
723723
} finally {
724724
// Shutdown service
725725
cleanupEmptyProgress();
726-
loadService.shutdown();
727-
LOG.info("load end");
726+
if (loadService != null) {
727+
loadService.shutdownNow();
728+
}
729+
LOG.info("Load end");
728730
}
729731
}
730732

hugegraph-loader/src/main/java/org/apache/hugegraph/loader/reader/jdbc/Fetcher.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,6 @@ private Connection connect() throws SQLException {
6868
password);
6969
}
7070

71-
;
72-
7371
abstract String[] readHeader() throws SQLException;
7472

7573
abstract void readPrimaryKey() throws SQLException;

hugegraph-loader/src/test/java/org/apache/hugegraph/loader/test/functional/FileLoadTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2033,15 +2033,15 @@ public void testLoadIncrementalModeAndLoadFailure()
20332033
throws IOException, InterruptedException {
20342034
ioUtil.write("vertex_person.csv",
20352035
"name,age,city",
2036-
"marko应该是数字Beijing",
2036+
"marko,应该是数字,Beijing",
20372037
"vadas,27,Hongkong",
20382038
"josh,32,Beijing",
20392039
"peter,35,Shanghai",
20402040
"\"li,nary\",26,\"Wu,han\"");
20412041
ioUtil.write("vertex_software.csv", GBK,
20422042
"name,lang,price",
20432043
"office,C#,999",
2044-
"lop,java应该是数字",
2044+
"lop,java,应该是数字",
20452045
"ripple,java,199");
20462046

20472047
// 1st time
@@ -2096,7 +2096,7 @@ public void testLoadIncrementalModeAndLoadFailure()
20962096
List<String> personFailureLines = FileUtils.readLines(personFailureFile,
20972097
Constants.CHARSET);
20982098
Assert.assertEquals(2, personFailureLines.size());
2099-
Assert.assertEquals("marko应该是数字Beijing",
2099+
Assert.assertEquals("marko,应该是数字,Beijing",
21002100
personFailureLines.get(1));
21012101

21022102
// 2nd time, incremental-mode
@@ -2159,14 +2159,14 @@ public void testLoadIncrementalModeAndLoadFailure()
21592159
personFailureLines = FileUtils.readLines(personFailureFile,
21602160
Constants.CHARSET);
21612161
Assert.assertEquals(2, personFailureLines.size());
2162-
Assert.assertEquals("marko应该是数字Beijing",
2162+
Assert.assertEquals("marko,应该是数字,Beijing",
21632163
personFailureLines.get(1));
21642164

21652165
File softwareFailureFile = files[2];
21662166
List<String> softwareFailureLines = FileUtils.readLines(
21672167
softwareFailureFile, GBK);
21682168
Assert.assertEquals(2, softwareFailureLines.size());
2169-
Assert.assertEquals("lop,java应该是数字", softwareFailureLines.get(1));
2169+
Assert.assertEquals("lop,java,应该是数字", softwareFailureLines.get(1));
21702170

21712171
// TODO: Change only one line first, and make the second line go wrong
21722172
// modify person and software failure file

0 commit comments

Comments
 (0)