Skip to content

Commit cf83036

Browse files
authored
Merge branch 'main' into impl-kafka-writer
2 parents 56d4fc5 + 1fc617f commit cf83036

File tree

292 files changed

+9275
-3480
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

292 files changed

+9275
-3480
lines changed

.github/workflows/interactive.yml

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ jobs:
141141
# download dataset
142142
git clone -b master --single-branch --depth=1 https://github.com/GraphScope/gstest.git ${GS_TEST_DIR}
143143
mkdir -p ${INTERACTIVE_WORKSPACE}/data/ldbc
144-
GRAPH_SCHEMA_YAML=${GS_TEST_DIR}/flex/ldbc-sf01-long-date/audit_graph_schema.yaml
145-
BUILD_LOAD_FILE=${GS_TEST_DIR}/flex/ldbc-sf01-long-date/audit_bulk_load.yaml
144+
GRAPH_SCHEMA_YAML=${GS_TEST_DIR}/flex/ldbc-sf01-long-date/audit_graph_schema_creationDate.yaml
145+
BUILD_LOAD_FILE=${GS_TEST_DIR}/flex/ldbc-sf01-long-date/audit_bulk_load_creationDate.yaml
146146
cp ${GRAPH_SCHEMA_YAML} ${INTERACTIVE_WORKSPACE}/data/ldbc/graph.yaml
147147
cp ${BUILD_LOAD_FILE} ${INTERACTIVE_WORKSPACE}/data/ldbc/import.yaml
148148
mkdir -p ${INTERACTIVE_WORKSPACE}/data/movies
@@ -295,14 +295,12 @@ jobs:
295295
# plus_one: (num: int64) -> (num: int64), CppEncoder
296296
# sample_app: (num: int64) -> (num: int64), kCypherJson
297297

298-
sed -i 's/default_graph: ldbc/default_graph: modern_graph/g' ${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml
299298
sed -i 's/interactive_workspace/temp_workspace/g' ${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml
300299
cd ${GITHUB_WORKSPACE}/flex/tests/interactive/
301300
bash test_plugin_loading.sh ${TMP_INTERACTIVE_WORKSPACE} modern_graph \
302301
${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml \
303302
./modern_graph_schema_v0_0.yaml ./modern_graph_schema_v0_1.yaml
304303
sed -i 's/temp_workspace/interactive_workspace/g' ${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml
305-
sed -i 's/default_graph: modern_graph/default_graph: movies/g' ${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml
306304

307305
- name: Let compiler use latest interactive java sdk
308306
env:
@@ -346,10 +344,8 @@ jobs:
346344
run: |
347345
cd ${GITHUB_WORKSPACE}/flex/tests/hqps/
348346
export ENGINE_TYPE=hiactor
349-
# change the default_graph config in ./interactive_config_test.yaml to ldbc
350-
sed -i 's/default_graph: movies/default_graph: ldbc/g' ./interactive_config_test.yaml
351-
bash hqps_adhoc_test.sh ${INTERACTIVE_WORKSPACE} ldbc \
352-
${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml
347+
bash hqps_adhoc_test.sh ${INTERACTIVE_WORKSPACE} ldbc CBO
348+
bash hqps_adhoc_test.sh ${INTERACTIVE_WORKSPACE} ldbc RBO
353349
354350
- name: Run End-to-End cypher adhoc movie query test
355351
env:
@@ -359,10 +355,7 @@ jobs:
359355
run: |
360356
cd ${GITHUB_WORKSPACE}/flex/tests/hqps/
361357
export ENGINE_TYPE=hiactor
362-
# change the default_graph config in ./interactive_config_test.yaml to movies
363-
sed -i 's/default_graph: ldbc/default_graph: movies/g' ./interactive_config_test.yaml
364-
bash hqps_adhoc_test.sh ${INTERACTIVE_WORKSPACE} movies \
365-
${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml
358+
bash hqps_adhoc_test.sh ${INTERACTIVE_WORKSPACE} movies RBO
366359
367360
- name: Run End-to-End cypher adhoc graph_algo query test
368361
env:
@@ -372,10 +365,7 @@ jobs:
372365
run: |
373366
cd ${GITHUB_WORKSPACE}/flex/tests/hqps/
374367
export ENGINE_TYPE=hiactor
375-
# change the default_graph config in ${GS_TEST_DIR}/flex/ldbc-sf01-long-date/interactive_config.yaml to graph_algo
376-
sed -i 's/default_graph: movies/default_graph: graph_algo/g' ./interactive_config_test.yaml
377-
bash hqps_adhoc_test.sh ${INTERACTIVE_WORKSPACE} graph_algo \
378-
${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml
368+
bash hqps_adhoc_test.sh ${INTERACTIVE_WORKSPACE} graph_algo RBO
379369
380370
- name: Run kafka wal test
381371
env:
@@ -402,9 +392,7 @@ jobs:
402392
run: |
403393
cd ${GITHUB_WORKSPACE}/flex/tests/hqps/
404394
export ENGINE_TYPE=hiactor
405-
sed -i 's/default_graph: graph_algo/default_graph: modern_graph/g' ./interactive_config_test.yaml
406-
bash hqps_adhoc_test.sh ${INTERACTIVE_WORKSPACE} modern_graph \
407-
${GITHUB_WORKSPACE}/flex/tests/hqps/interactive_config_test.yaml gremlin
395+
bash hqps_adhoc_test.sh ${INTERACTIVE_WORKSPACE} modern_graph RBO gremlin
408396
409397
test-build-flex:
410398
runs-on: ubuntu-22.04

.gitmodules

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,6 @@
1313
[submodule "flex/third_party/parallel-hashmap"]
1414
path = flex/third_party/parallel-hashmap
1515
url = https://github.com/greg7mdp/parallel-hashmap.git
16+
[submodule "flex/third_party/aliyun-oss-cpp-sdk"]
17+
path = flex/third_party/aliyun-oss-cpp-sdk
18+
url = https://github.com/aliyun/aliyun-oss-cpp-sdk.git

analytical_engine/core/context/context_protocols.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ enum class ContextDataType {
3636
};
3737

3838
/* N.B. These values should be the same as vineyard::TypeToInt::value. Because
39-
* theses values are used to decode in Python side. Refer:
39+
* these values are used to decode in Python side. Refer:
4040
* python.graphscope.framework.utils._to_numpy_dtype
4141
*/
4242
inline int ContextDataTypeToInt(ContextDataType type) {

analytical_engine/core/error.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ inline std::string formatEnumValue(const vineyard::ErrorCode& value) {
8585

8686
#ifndef __FRAME_CURRENT_EXCEPTION_TYPENAME
8787
#if defined(__GLIBCXX__) || defined(__GLIBCPP__)
88-
#define __FRAME_CURRENT_EXCEPTION_TYPENAME(var) \
89-
do { \
90-
std::exception_ptr __p = std::current_exception(); \
91-
var = __p ? __p.__cxa_exception_type()->name() : "unknow type"; \
88+
#define __FRAME_CURRENT_EXCEPTION_TYPENAME(var) \
89+
do { \
90+
std::exception_ptr __p = std::current_exception(); \
91+
var = __p ? __p.__cxa_exception_type()->name() : "unknown type"; \
9292
} while (0)
9393
#else
9494
#define __FRAME_CURRENT_EXCEPTION_TYPENAME(var) \

analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/context/GiraphComputationAdaptorContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public void writeBackVertexData() {
176176
for (long lid = 0; lid < innerVerticesNum; ++lid) {
177177
// Write the output of toString().
178178
outputStream.writeBytes(vertexDataManager.getVertexData(lid).toString());
179-
long cur = outputStream.bytesWriten();
179+
long cur = outputStream.bytesWritten();
180180
offsets[(int) lid] = cur - previous;
181181
maxOffset = Math.max(offsets[(int) lid], maxOffset);
182182
previous = cur;

analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/graph/impl/GiraphVertexIdManagerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ private FFIByteVectorInputStream generateVertexIdStream() {
148148
outputStream.finishSetting();
149149
logger.info(
150150
"Vertex id stream size: "
151-
+ outputStream.bytesWriten()
151+
+ outputStream.bytesWritten()
152152
+ ", vertices: "
153153
+ vertexNum);
154154
} catch (IOException e) {

analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/graph/impl/VertexDataManagerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ private void readVertexDataFromIFragment(FFIByteVectorOutputStream outputStream)
156156
outputStream.finishSetting();
157157
logger.info(
158158
"Vertex data stream size: "
159-
+ outputStream.bytesWriten()
159+
+ outputStream.bytesWritten()
160160
+ ", vertices: "
161161
+ vertexNum);
162162
} catch (IOException e) {

analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/loader/impl/GraphDataBufferManagerImpl.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,14 @@ private void check() {
170170
@Override
171171
public synchronized void addVertex(int threadId, Writable id, Writable value)
172172
throws IOException {
173-
int bytes = (int) -vidOutputStream[threadId].bytesWriten();
173+
int bytes = (int) -vidOutputStream[threadId].bytesWritten();
174174
id.write(vidOutputStream[threadId]);
175-
bytes += vidOutputStream[threadId].bytesWriten();
175+
bytes += vidOutputStream[threadId].bytesWritten();
176176
idOffsetsArr[threadId].push_back(bytes);
177177

178-
int bytes2 = (int) -vdataOutputStream[threadId].bytesWriten();
178+
int bytes2 = (int) -vdataOutputStream[threadId].bytesWritten();
179179
value.write(vdataOutputStream[threadId]);
180-
bytes2 += vdataOutputStream[threadId].bytesWriten();
180+
bytes2 += vdataOutputStream[threadId].bytesWritten();
181181
vdataOffsetsArr[threadId].push_back(bytes2);
182182
}
183183

@@ -187,19 +187,19 @@ public synchronized void addEdges(int threadId, Writable id, Iterable<Edge> edge
187187
int bytesEdgeSrcOffset = 0, bytesEdgeDstOffset = 0, bytesDataOffsets = 0;
188188

189189
for (Edge edge : edges) {
190-
bytesEdgeSrcOffset = (int) -edgeSrcIdOutputStream[threadId].bytesWriten();
190+
bytesEdgeSrcOffset = (int) -edgeSrcIdOutputStream[threadId].bytesWritten();
191191
id.write(edgeSrcIdOutputStream[threadId]);
192-
bytesEdgeSrcOffset += edgeSrcIdOutputStream[threadId].bytesWriten();
192+
bytesEdgeSrcOffset += edgeSrcIdOutputStream[threadId].bytesWritten();
193193
edgeSrcIdOffsetArr[threadId].push_back(bytesEdgeSrcOffset);
194194

195-
bytesEdgeDstOffset = (int) -edgeDstOutputStream[threadId].bytesWriten();
195+
bytesEdgeDstOffset = (int) -edgeDstOutputStream[threadId].bytesWritten();
196196
edge.getTargetVertexId().write(edgeDstOutputStream[threadId]);
197-
bytesEdgeDstOffset += edgeDstOutputStream[threadId].bytesWriten();
197+
bytesEdgeDstOffset += edgeDstOutputStream[threadId].bytesWritten();
198198
edgeDstIdOffsetArr[threadId].push_back(bytesEdgeDstOffset);
199199

200-
bytesDataOffsets = (int) -edgeDataOutStream[threadId].bytesWriten();
200+
bytesDataOffsets = (int) -edgeDataOutStream[threadId].bytesWritten();
201201
edge.getValue().write(edgeDataOutStream[threadId]);
202-
bytesDataOffsets += edgeDataOutStream[threadId].bytesWriten();
202+
bytesDataOffsets += edgeDataOutStream[threadId].bytesWritten();
203203
edgeDataOffsetsArr[threadId].push_back(bytesDataOffsets);
204204
}
205205
}
@@ -210,19 +210,19 @@ public void addEdge(
210210
throws IOException {
211211
int bytesEdgeSrcOffset = 0, bytesEdgeDstOffset = 0, bytesDataOffsets = 0;
212212

213-
bytesEdgeSrcOffset = (int) -edgeSrcIdOutputStream[threadId].bytesWriten();
213+
bytesEdgeSrcOffset = (int) -edgeSrcIdOutputStream[threadId].bytesWritten();
214214
srcId.write(edgeSrcIdOutputStream[threadId]);
215-
bytesEdgeSrcOffset += edgeSrcIdOutputStream[threadId].bytesWriten();
215+
bytesEdgeSrcOffset += edgeSrcIdOutputStream[threadId].bytesWritten();
216216
edgeSrcIdOffsetArr[threadId].push_back(bytesEdgeSrcOffset);
217217

218-
bytesEdgeDstOffset = (int) -edgeDstOutputStream[threadId].bytesWriten();
218+
bytesEdgeDstOffset = (int) -edgeDstOutputStream[threadId].bytesWritten();
219219
dstId.write(edgeDstOutputStream[threadId]);
220-
bytesEdgeDstOffset += edgeDstOutputStream[threadId].bytesWriten();
220+
bytesEdgeDstOffset += edgeDstOutputStream[threadId].bytesWritten();
221221
edgeDstIdOffsetArr[threadId].push_back(bytesEdgeDstOffset);
222222

223-
bytesDataOffsets = (int) -edgeDataOutStream[threadId].bytesWriten();
223+
bytesDataOffsets = (int) -edgeDataOutStream[threadId].bytesWritten();
224224
value.write(edgeDataOutStream[threadId]);
225-
bytesDataOffsets += edgeDataOutStream[threadId].bytesWriten();
225+
bytesDataOffsets += edgeDataOutStream[threadId].bytesWritten();
226226
edgeDataOffsetsArr[threadId].push_back(bytesDataOffsets);
227227

228228
// logger.debug("worker [{}] adding edge [{}]->[{}], value {}", workerId, srcId,

analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/parallel/mm/impl/GiraphMpiMessageManager.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,9 @@ public void sendMessage(OID_T dstOid, OUT_MSG_T message) {
135135

136136
private void sendMessage(com.alibaba.graphscope.ds.Vertex<GS_VID_T> vertex, OUT_MSG_T msg) {
137137
int dstfragId = fragment.getFragId(vertex);
138-
if (cacheOut[dstfragId].bytesWriten() >= THRESHOLD && dstfragId != fragId) {
138+
if (cacheOut[dstfragId].bytesWritten() >= THRESHOLD && dstfragId != fragId) {
139139
cacheOut[dstfragId].writeLong(
140-
0, cacheOut[dstfragId].bytesWriten() - 8); // minus size_of_long
140+
0, cacheOut[dstfragId].bytesWritten() - 8); // minus size_of_long
141141
cacheOut[dstfragId].finishSetting();
142142
// the vertex will be swapped. so this vector is empty;
143143
grapeMessager.sendToFragment(dstfragId, cacheOut[dstfragId].getVector());
@@ -194,31 +194,31 @@ public void sendMessageToAllEdges(Vertex<OID_T, VDATA_T, EDATA_T> vertex, OUT_MS
194194
@Override
195195
public void finishMessageSending() {
196196
for (int i = 0; i < fragNum; ++i) {
197-
long bytesWriten = cacheOut[i].bytesWriten();
197+
long bytesWritten = cacheOut[i].bytesWritten();
198198
cacheOut[i].finishSetting();
199-
cacheOut[i].writeLong(0, bytesWriten - SIZE_OF_LONG);
199+
cacheOut[i].writeLong(0, bytesWritten - SIZE_OF_LONG);
200200

201-
if (bytesWriten == SIZE_OF_LONG) {
201+
if (bytesWritten == SIZE_OF_LONG) {
202202
logger.debug(
203203
"[Finish msg] sending skip msg from {} -> {}, since msg size: {}",
204204
fragId,
205205
i,
206-
bytesWriten);
206+
bytesWritten);
207207
continue;
208208
}
209209
if (i == fragId) {
210210
nextIncomingMessageStore.digest(cacheOut[i].getVector());
211211
logger.info(
212212
"In final step, Frag [{}] digest msg to self of size: {}",
213213
fragId,
214-
bytesWriten);
214+
bytesWritten);
215215
} else {
216216
grapeMessager.sendToFragment(i, cacheOut[i].getVector());
217217
logger.info(
218218
"In final step, Frag [{}] send msg to [{}] of size: {}",
219219
fragId,
220220
i,
221-
bytesWriten);
221+
bytesWritten);
222222
}
223223
}
224224
// if (maxSuperStep > 0) {

analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/parallel/netty/request/serialization/WritableRequestEncoder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
106106
buf.setInt(0, buf.writerIndex() - SIZE_OF_INT);
107107
if (logger.isDebugEnabled()) {
108108
logger.debug(
109-
"Encode msg, type: [{}], writen bytes: [{}]",
109+
"Encode msg, type: [{}], written bytes: [{}]",
110110
request.getRequestType().getClazz().getName(),
111111
buf.readableBytes());
112112
}
@@ -161,7 +161,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
161161
//
162162
// out.setInt(0, out.writerIndex() - SIZE_OF_INT);
163163
// logger.info("Encode msg, type: " + request.getRequestType().getClazz().getName() + ",
164-
// writen bytes: " + out.readableBytes());
164+
// written bytes: " + out.readableBytes());
165165
// }
166166
// else {
167167
// logger.error("Encoder: got instance " + msg + ", expect a WritableRequest");

analytical_engine/java/grape-giraph/src/main/java/com/alibaba/graphscope/parallel/utils/Utils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ private static String[] actAsCoordinator(
7878
outputStream.writeUTF(res[i]);
7979
info(workerId, "from worker: " + i + ": " + res[i]);
8080
}
81-
outputStream.writeLong(0, outputStream.bytesWriten() - 8);
81+
outputStream.writeLong(0, outputStream.bytesWritten() - 8);
8282
outputStream.finishSetting();
8383

8484
// Distribute to others;
@@ -111,7 +111,7 @@ private static String[] actAsWorker(
111111
e.printStackTrace();
112112
return null;
113113
}
114-
outputStream.writeLong(0, outputStream.bytesWriten() - 8);
114+
outputStream.writeLong(0, outputStream.bytesWritten() - 8);
115115
outputStream.finishSetting();
116116
info(workerId, "now send to coordinator: " + selfIp);
117117
communicator.sendTo(0, outputStream.getVector());

analytical_engine/java/grape-giraph/src/test/java/com/alibaba/graphscope/serialization/FFIByteVectorStreamTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,11 @@ public void testOutPutStream() throws IOException {
113113
for (int i = 0; i < 25; ++i) {
114114
outputStream.writeInt(i);
115115
}
116-
Assert.assertTrue(outputStream.bytesWriten() == 100);
116+
Assert.assertTrue(outputStream.bytesWritten() == 100);
117117
FFIByteVector vector = outputStream.getVector();
118118
System.out.println("Buffer size: " + vector.size() + ", size: " + vector.size);
119119
outputStream.finishSetting();
120120
vector = outputStream.getVector();
121-
Assert.assertTrue(vector.size == outputStream.bytesWriten());
121+
Assert.assertTrue(vector.size == outputStream.bytesWritten());
122122
}
123123
}

analytical_engine/java/grape-graphx/src/main/java/com/alibaba/graphscope/utils/MPIUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ public static <VD, ED> String[] loadFragment(
264264
String ipcSocket,
265265
Class<? extends VD> vdClass,
266266
Class<? extends ED> edClass) {
267-
logger.info("Try to load fragment from raw datas: {}", Arrays.toString(rawDataIds));
267+
logger.info("Try to load fragment from raw data: {}", Arrays.toString(rawDataIds));
268268
int numWorkers = rawDataIds.length;
269269
logger.info("running mpi with {} workers", numWorkers);
270270
String hostNameAndSlots = generateHostNameAndSlotsFromIDs(rawDataIds);

analytical_engine/java/grape-graphx/src/main/java/com/alibaba/graphscope/utils/VertexDataUtils.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,23 +56,23 @@ private static <VD> Tuple2<FFIByteVector, FFIIntVector> fillStringVertexArray(
5656
DoubleDouble dd = (DoubleDouble) array.get(i);
5757
ffiByteVectorOutput.writeDouble(dd.a());
5858
ffiByteVectorOutput.writeDouble(dd.b());
59-
ffiOffset.set(i, (int) (ffiByteVectorOutput.bytesWriten() - prevBytesWritten));
60-
prevBytesWritten = ffiByteVectorOutput.bytesWriten();
59+
ffiOffset.set(i, (int) (ffiByteVectorOutput.bytesWritten() - prevBytesWritten));
60+
prevBytesWritten = ffiByteVectorOutput.bytesWritten();
6161
}
6262
} else {
6363
ObjectOutputStream objectOutputStream = new ObjectOutputStream(ffiByteVectorOutput);
6464
for (int i = 0; i < size; ++i) {
6565
objectOutputStream.writeObject(array.get(i));
66-
ffiOffset.set(i, (int) (ffiByteVectorOutput.bytesWriten() - prevBytesWritten));
67-
prevBytesWritten = ffiByteVectorOutput.bytesWriten();
66+
ffiOffset.set(i, (int) (ffiByteVectorOutput.bytesWritten() - prevBytesWritten));
67+
prevBytesWritten = ffiByteVectorOutput.bytesWritten();
6868
}
6969
objectOutputStream.flush();
7070
}
7171

7272
ffiByteVectorOutput.finishSetting();
73-
long writenBytes = ffiByteVectorOutput.bytesWriten();
73+
long writenBytes = ffiByteVectorOutput.bytesWritten();
7474
logger.info(
75-
"write data array {} of type {}, writen bytes {}",
75+
"write data array {} of type {}, written bytes {}",
7676
size,
7777
clz.getName(),
7878
writenBytes);

0 commit comments

Comments
 (0)