Skip to content

Commit 613f122

Browse files
cleared review problems
1 parent b9124a3 commit 613f122

File tree

4 files changed

+24
-26
lines changed

4 files changed

+24
-26
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ output/
8585
tree.txt
8686
*.versionsBackup
8787
.flattened-pom.xml
88+
hugegraph-loader/assembly/travis/conf/hugegraph.truststore
8889

8990
# eclipse ignore
9091
.settings/
-956 Bytes
Binary file not shown.

hugegraph-loader/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
<postgres.version>42.4.1</postgres.version>
5353
<mssql.jdbc.version>7.2.0.jre8</mssql.jdbc.version>
5454
<kafka.testcontainer.version>1.19.0</kafka.testcontainer.version>
55+
<parboiled.version>1.1.8</parboiled.version>
5556
</properties>
5657

5758
<dependencies>
@@ -545,7 +546,7 @@
545546
<dependency>
546547
<groupId>org.parboiled</groupId>
547548
<artifactId>parboiled-core</artifactId>
548-
<version>1.1.8</version> <!-- 稳定版本,与 Gremlin 3.4.x 等组件兼容 -->
549+
<version>${parboiled.version}</version>
549550
</dependency>
550551
</dependencies>
551552

hugegraph-loader/src/main/java/org/apache/hugegraph/loader/HugeGraphLoader.java

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@ public final class HugeGraphLoader {
9090
private final TaskManager manager;
9191
private final LoadOptions options;
9292

93-
// load 任务执行线程池
9493
private ExecutorService loadService;
9594

9695
public static class InputTaskItem {
@@ -115,9 +114,15 @@ public static void main(String[] args) {
115114
loader = new HugeGraphLoader(args);
116115
} catch (Throwable e) {
117116
Printer.printError("Failed to start loading", e);
118-
return; // 不再抛出,直接返回
117+
return;
118+
}
119+
120+
try {
121+
loader.load();
122+
} finally {
123+
loader.shutdown();
124+
GlobalExecutorManager.shutdown(loader.options.shutdownTimeout);
119125
}
120-
loader.load();
121126
}
122127

123128
public HugeGraphLoader(String[] args) {
@@ -126,7 +131,7 @@ public HugeGraphLoader(String[] args) {
126131

127132
public HugeGraphLoader(LoadOptions options) {
128133
this(options, LoadMapping.of(options.file));
129-
// 设置并发度
134+
// Set concurrency
130135
GlobalExecutorManager.setBatchThreadCount(options.batchInsertThreads);
131136
GlobalExecutorManager.setSingleThreadCount(options.singleInsertThreads);
132137
}
@@ -168,8 +173,8 @@ private void checkGraphExists() {
168173
}
169174

170175
private void setGraphMode() {
171-
// 设置图的 Mode
172-
// 如果存在 Graph 数据源,则所有 Input 必须都是 Graph 数据源
176+
// Set graph mode
177+
// If there is a Graph data source, all Inputs must be Graph data sources
173178
Supplier<Stream<InputSource>> inputsSupplier =
174179
() -> this.mapping.structs().stream().filter(struct -> !struct.skip())
175180
.map(InputStruct::input);
@@ -218,13 +223,12 @@ public boolean load() {
218223

219224
RuntimeException e = LoadUtil.targetRuntimeException(t);
220225
Printer.printError("Failed to load", e);
221-
e.printStackTrace();
226+
LOG.error("Load failed with exception", e);
222227

223228
throw e;
224229
}
225230

226-
// 任务执行成功
227-
return true;
231+
return this.context.noError();
228232
}
229233

230234
public void shutdown() {
@@ -353,7 +357,7 @@ private void createGraphSourceVertexLabel(HugeClient sourceClient,
353357
sourceClient.assignGraph(graphSource.getGraphSpace(),
354358
graphSource.getGraph());
355359

356-
// 创建 Vertex Schema
360+
// Create Vertex Schema
357361
List<VertexLabel> vertexLabels = new ArrayList<>();
358362
if (graphSource.getSelectedVertices() != null) {
359363
List<String> selectedVertexLabels =
@@ -431,7 +435,7 @@ private void createGraphSourceVertexLabel(HugeClient sourceClient,
431435
private void createGraphSourceEdgeLabel(HugeClient sourceClient,
432436
HugeClient targetClient,
433437
GraphSource graphSource) {
434-
// 创建 Edge Schema
438+
// Create Edge Schema
435439
List<EdgeLabel> edgeLabels = new ArrayList<>();
436440
if (graphSource.getSelectedEdges() != null) {
437441
List<String> selectedEdgeLabels =
@@ -637,16 +641,8 @@ private List<InputTaskItem> prepareTaskItems(List<InputStruct> structs,
637641
}
638642
// sort by seqNumber to allow scatter loading from different sources
639643
if (scatter) {
640-
tasks.sort(new Comparator<InputTaskItem>() {
641-
@Override
642-
public int compare(InputTaskItem o1, InputTaskItem o2) {
643-
if (o1.structIndex == o2.structIndex) {
644-
return o1.seqNumber - o2.seqNumber;
645-
} else {
646-
return o1.structIndex - o2.structIndex;
647-
}
648-
}
649-
});
644+
tasks.sort(Comparator.comparingInt((InputTaskItem o) -> o.structIndex)
645+
.thenComparingInt(o -> o.seqNumber));
650646
}
651647

652648
return tasks;
@@ -666,7 +662,7 @@ private void loadStructs(List<InputStruct> structs) {
666662
LOG.info("{} threads for loading {} structs, from {} to {} in {} mode",
667663
parallelCount, structs.size(), this.context.options().startFile,
668664
this.context.options().endFile,
669-
scatter ? "scatter" : "sequencial");
665+
scatter ? "scatter" : "sequential");
670666

671667
this.loadService = ExecutorUtil.newFixedThreadPool(parallelCount,
672668
"loader");
@@ -707,7 +703,7 @@ private void loadStructs(List<InputStruct> structs) {
707703
} catch (Throwable t) {
708704
throw t;
709705
} finally {
710-
// 关闭 service
706+
// Shutdown service
711707
cleanupEmptyProgress();
712708
this.loadService.shutdown();
713709
LOG.info("load end");
@@ -749,7 +745,7 @@ private void loadStruct(InputStruct struct, InputReader reader) {
749745
// Read next line from data source
750746
if (reader.hasNext()) {
751747
Line next = reader.next();
752-
// 如果数据源为 kafka,存在获取数据为 null 的情况
748+
// If the data source is kafka, there may be cases where the fetched data is null
753749
if (next != null) {
754750
lines.add(next);
755751
metrics.increaseReadSuccess();
@@ -767,7 +763,7 @@ private void loadStruct(InputStruct struct, InputReader reader) {
767763
finished = true;
768764
}
769765
if (lines.size() >= batchSize ||
770-
// 5s 内强制提交,主要影响 kafka 数据源
766+
// Force commit within 5s, mainly affects kafka data source
771767
(lines.size() > 0 &&
772768
System.currentTimeMillis() > batchStartTime + 5000) ||
773769
finished) {

0 commit comments

Comments
 (0)