Skip to content

Commit d8b0dbc

Browse files
committed
Add output
1 parent 8ab455e commit d8b0dbc

23 files changed

+1450
-1
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ build/
5858
*.pyc
5959

6060
# maven ignore
61-
output/
61+
6262
apache-hugegraph-*-incubating-*/
6363
*.war
6464
*.zip
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to You under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
18+
package org.apache.hugegraph.computer.core.output;
19+
20+
import org.apache.hugegraph.computer.core.config.ComputerOptions;
21+
import org.apache.hugegraph.computer.core.config.Config;
22+
import org.apache.hugegraph.computer.core.worker.Computation;
23+
import org.apache.hugegraph.util.Log;
24+
import org.slf4j.Logger;
25+
26+
public abstract class AbstractComputerOutput implements ComputerOutput {
27+
28+
private static final Logger LOG = Log.logger(ComputerOutput.class);
29+
30+
private String name;
31+
private int partition;
32+
33+
@Override
34+
public void init(Config config, int partition) {
35+
Computation<?> computation = config.createObject(
36+
ComputerOptions.WORKER_COMPUTATION_CLASS);
37+
this.name = computation.name();
38+
this.partition = partition;
39+
40+
LOG.info("Start write back partition {} for {}",
41+
this.partition(), this.name());
42+
}
43+
44+
@Override
45+
public String name() {
46+
return this.name;
47+
}
48+
49+
public int partition() {
50+
return this.partition;
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to You under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
18+
package org.apache.hugegraph.computer.core.output;
19+
20+
import org.apache.hugegraph.computer.core.config.Config;
21+
import org.apache.hugegraph.computer.core.graph.vertex.Vertex;
22+
import org.apache.hugegraph.computer.core.worker.Computation;
23+
24+
/**
25+
* Computer output is used to output computer results. There is an output object
26+
* for every partition.
27+
*/
28+
public interface ComputerOutput {
29+
30+
/**
31+
* Initialize the output. Create connection to target output system.
32+
*/
33+
void init(Config config, int partition);
34+
35+
/**
36+
* For each vertex in partition, this method is called regardless
37+
* vertex's status.
38+
*/
39+
void write(Vertex vertex);
40+
41+
/**
42+
* Write filter.
43+
* True to commit the computation result, otherwise not to commit.
44+
*/
45+
default boolean filter(Config config, Computation computation, Vertex vertex) {
46+
return true;
47+
}
48+
49+
/**
50+
* Merge output files of multiple partitions if applicable.
51+
*/
52+
default void mergePartitions(Config config) {
53+
// pass
54+
}
55+
56+
/**
57+
* Close the connection to target output system. Commit if target output
58+
* required.
59+
*/
60+
void close();
61+
62+
/**
63+
* The name of output property.
64+
*/
65+
String name();
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to You under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
18+
package org.apache.hugegraph.computer.core.output.hg;
19+
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
23+
import org.apache.hugegraph.computer.core.config.ComputerOptions;
24+
import org.apache.hugegraph.computer.core.config.Config;
25+
import org.apache.hugegraph.computer.core.graph.vertex.Vertex;
26+
import org.apache.hugegraph.computer.core.output.AbstractComputerOutput;
27+
import org.apache.hugegraph.computer.core.output.hg.task.TaskManager;
28+
import org.apache.hugegraph.driver.HugeClient;
29+
import org.apache.hugegraph.structure.constant.WriteType;
30+
import org.apache.hugegraph.util.Log;
31+
import org.slf4j.Logger;
32+
33+
public abstract class HugeGraphOutput<T> extends AbstractComputerOutput {
34+
35+
private static final Logger LOG = Log.logger(HugeGraphOutput.class);
36+
37+
private TaskManager taskManager;
38+
private List<org.apache.hugegraph.structure.graph.Vertex> localVertices;
39+
private int batchSize;
40+
private WriteType writeType;
41+
42+
@Override
43+
public void init(Config config, int partition) {
44+
super.init(config, partition);
45+
46+
this.taskManager = new TaskManager(config);
47+
this.batchSize = config.get(ComputerOptions.OUTPUT_BATCH_SIZE);
48+
this.localVertices = new ArrayList<>(this.batchSize);
49+
this.writeType = WriteType.valueOf(
50+
config.get(ComputerOptions.OUTPUT_RESULT_WRITE_TYPE));
51+
52+
this.prepareSchema();
53+
}
54+
55+
public HugeClient client() {
56+
return this.taskManager.client();
57+
}
58+
59+
@Override
60+
public void write(Vertex vertex) {
61+
this.localVertices.add(this.constructHugeVertex(vertex));
62+
if (this.localVertices.size() >= this.batchSize) {
63+
this.commit();
64+
}
65+
}
66+
67+
@Override
68+
public void close() {
69+
if (!this.localVertices.isEmpty()) {
70+
this.commit();
71+
}
72+
this.taskManager.waitFinished();
73+
this.taskManager.shutdown();
74+
LOG.info("End write back partition {}", this.partition());
75+
}
76+
77+
protected void commit() {
78+
this.taskManager.submitBatch(this.localVertices);
79+
LOG.info("Write back {} vertices", this.localVertices.size());
80+
81+
this.localVertices = new ArrayList<>(this.batchSize);
82+
}
83+
84+
protected org.apache.hugegraph.structure.graph.Vertex constructHugeVertex(
85+
Vertex vertex) {
86+
org.apache.hugegraph.structure.graph.Vertex hugeVertex =
87+
new org.apache.hugegraph.structure.graph.Vertex(null);
88+
hugeVertex.id(vertex.id().asObject());
89+
hugeVertex.property(this.name(), this.value(vertex));
90+
return hugeVertex;
91+
}
92+
93+
protected T value(Vertex vertex) {
94+
@SuppressWarnings("unchecked")
95+
T value = (T) vertex.value().value();
96+
return value;
97+
}
98+
99+
protected WriteType writeType() {
100+
return this.writeType;
101+
}
102+
103+
protected abstract void prepareSchema();
104+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to You under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
18+
package org.apache.hugegraph.computer.core.output.hg.exceptions;
19+
20+
public class WriteBackException extends RuntimeException {
21+
22+
private static final long serialVersionUID = 5504623124963497613L;
23+
24+
public WriteBackException(String message) {
25+
super(message);
26+
}
27+
28+
public WriteBackException(String message, Throwable cause) {
29+
super(message, cause);
30+
}
31+
32+
public WriteBackException(String message, Object... args) {
33+
super(String.format(message, args));
34+
}
35+
36+
public WriteBackException(String message, Throwable cause, Object... args) {
37+
super(String.format(message, args), cause);
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to You under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
18+
package org.apache.hugegraph.computer.core.output.hg.metrics;
19+
20+
import java.util.concurrent.atomic.LongAdder;
21+
22+
public final class LoadMetrics {
23+
24+
private final LongAdder insertSuccess;
25+
private final LongAdder insertFailure;
26+
27+
public LoadMetrics() {
28+
this.insertSuccess = new LongAdder();
29+
this.insertFailure = new LongAdder();
30+
}
31+
32+
public long insertSuccess() {
33+
return this.insertSuccess.longValue();
34+
}
35+
36+
public void plusInsertSuccess(long count) {
37+
this.insertSuccess.add(count);
38+
}
39+
40+
public long insertFailure() {
41+
return this.insertFailure.longValue();
42+
}
43+
44+
public void increaseInsertFailure() {
45+
this.insertFailure.increment();
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with this
4+
* work for additional information regarding copyright ownership. The ASF
5+
* licenses this file to You under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14+
* License for the specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
18+
package org.apache.hugegraph.computer.core.output.hg.metrics;
19+
20+
public final class LoadReport {
21+
22+
private long vertexInsertSuccess;
23+
private long vertexInsertFailure;
24+
25+
public long vertexInsertSuccess() {
26+
return this.vertexInsertSuccess;
27+
}
28+
29+
public long vertexInsertFailure() {
30+
return this.vertexInsertFailure;
31+
}
32+
33+
public static LoadReport collect(LoadSummary summary) {
34+
LoadReport report = new LoadReport();
35+
LoadMetrics metrics = summary.metrics();
36+
report.vertexInsertSuccess += metrics.insertSuccess();
37+
report.vertexInsertFailure += metrics.insertFailure();
38+
return report;
39+
}
40+
}

0 commit comments

Comments
 (0)