Skip to content

fix(interactive): fix bug in degree fusion in Insight Runtime #4529

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions interactive_engine/compiler/ir_csr_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ cd ${base_dir}/../executor/ir/target/release &&
RUST_LOG=info CSR_PATH=/tmp/gstest/modern_graph_csr_bin PARTITION_ID=0 ./start_rpc_server_csr --config ${base_dir}/../executor/ir/integrated/config &
sleep 5s
# start compiler service
cd ${base_dir} && make run graph.store=rust-mcsr &
cd ${base_dir} && make run graph.store=rust-mcsr gremlin.script.language.name=antlr_gremlin_calcite graph.physical.opt=proto graph.planner.opt=CBO graph.statistics=./src/test/resources/statistics/modern_statistics.json &
sleep 5s
# run gremlin standard tests
cd ${base_dir} && make gremlin_test
cd ${base_dir} && make gremlin_calcite_test
exit_code=$?
# clean service
ps -ef | grep "com.alibaba.graphscope.GraphServer" | grep -v grep | awk '{print $2}' | xargs kill -9 || true
ps -ef | grep "start_rpc_server" | grep -v grep | awk '{print $2}' | xargs kill -9
# report test result
if [ $exit_code -ne 0 ]; then
echo "ir integration test on experimental store fail"
echo "ir integration test on csr store fail"
exit 1
fi
34 changes: 18 additions & 16 deletions interactive_engine/compiler/ir_experimental_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,26 @@ base_dir=$(cd $(dirname $0); pwd)

# Test1: run gremlin standard tests on experimental store via ir-core
# start engine service and load modern graph
cd ${base_dir}/../executor/ir/target/release && RUST_LOG=info ./start_rpc_server --config ${base_dir}/../executor/ir/integrated/config &
sleep 5s
# start compiler service
cd ${base_dir} && make run &
sleep 5s
# run gremlin standard tests
cd ${base_dir} && make gremlin_test
exit_code=$?
# clean service
ps -ef | grep "com.alibaba.graphscope.GraphServer" | grep -v grep | awk '{print $2}' | xargs kill -9 || true
# report test result
if [ $exit_code -ne 0 ]; then
echo "ir gremlin integration test on experimental store fail"
exit 1
fi
# cd ${base_dir}/../executor/ir/target/release && RUST_LOG=info ./start_rpc_server --config ${base_dir}/../executor/ir/integrated/config &
# sleep 5s
# # start compiler service
# cd ${base_dir} && make run &
# sleep 5s
# # run gremlin standard tests
# cd ${base_dir} && make gremlin_test
# exit_code=$?
# # clean service
# ps -ef | grep "com.alibaba.graphscope.GraphServer" | grep -v grep | awk '{print $2}' | xargs kill -9 || true
# # report test result
# if [ $exit_code -ne 0 ]; then
# echo "ir gremlin integration test on experimental store fail"
# exit 1
# fi

# Test2: run gremlin standard tests on experimental store via calcite-based ir
# restart compiler service
# start engine service and load modern graph
cd ${base_dir}/../executor/ir/target/release && RUST_LOG=info ./start_rpc_server --config ${base_dir}/../executor/ir/integrated/config &
# start compiler service
cd ${base_dir} && make run gremlin.script.language.name=antlr_gremlin_calcite graph.physical.opt=proto graph.planner.opt=CBO graph.statistics=./src/test/resources/statistics/modern_statistics.json &
sleep 5s
# run gremlin standard tests to test calcite-based IR layer
Expand Down
37 changes: 36 additions & 1 deletion interactive_engine/executor/ir/integrated/tests/expand_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,39 @@ mod test {
let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0);
let v5: DefaultId = LDBCVertexParser::to_global_id(5, 1);
let v6: DefaultId = LDBCVertexParser::to_global_id(6, 0);
let mut expected_results = vec![(v1, 3), (v4, 2), (v6, 1)];
while let Some(Ok(record)) = pegasus_result.next() {
if let Some(v) = record.get(None).unwrap().as_vertex() {
if let Some(degree_obj) = record.get(Some(1)).unwrap().as_object() {
results.push((v.id() as DefaultId, degree_obj.as_u64().unwrap()));
}
}
}
results.sort();
expected_results.sort();

assert_eq!(results, expected_results)
}

// g.V().as(0).select(0).by(out().count().as(1))
#[test]
fn expand_optional_out_degree_test() {
let expand_opr_pb = pb::EdgeExpand {
v_tag: None,
direction: 0,
params: None,
expand_opt: 2,
alias: Some(1.into()),
is_optional: true,
};
let mut pegasus_result = expand_degree_opt_test(expand_opr_pb);
let mut results = vec![];
let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0);
let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0);
let v3: DefaultId = LDBCVertexParser::to_global_id(3, 1);
let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0);
let v5: DefaultId = LDBCVertexParser::to_global_id(5, 1);
let v6: DefaultId = LDBCVertexParser::to_global_id(6, 0);
let mut expected_results = vec![(v1, 3), (v2, 0), (v3, 0), (v4, 2), (v5, 0), (v6, 1)];
while let Some(Ok(record)) = pegasus_result.next() {
if let Some(v) = record.get(None).unwrap().as_vertex() {
Expand Down Expand Up @@ -637,7 +670,9 @@ mod test {
let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0);
let v5: DefaultId = LDBCVertexParser::to_global_id(5, 1);
let v6: DefaultId = LDBCVertexParser::to_global_id(6, 0);
let mut expected_results = vec![(v1, 0), (v2, 1), (v3, 3), (v4, 1), (v5, 1), (v6, 0)];
// the zero-degree vertex should not be included in the result
// (v1, 0), (v2, 1), (v3, 3), (v4, 1), (v5, 1), (v6, 0)
let mut expected_results = vec![(v2, 1), (v3, 3), (v4, 1), (v5, 1)];
while let Some(Ok(record)) = pegasus_result.next() {
if let Some(v) = record.get(None).unwrap().as_vertex() {
if let Some(degree_obj) = record.get(Some(1)).unwrap().as_object() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,12 @@ impl<E: Entry + 'static> FlatMapFunction<Record, Record> for EdgeExpandOperator<
// the case of get degree.
ExpandOpt::Degree => {
let degree = iter.count();
input.append(object!(degree), self.alias);
Ok(Box::new(vec![input].into_iter()))
if !self.is_optional && degree == 0 {
Ok(Box::new(vec![].into_iter()))
} else {
input.append(object!(degree), self.alias);
Ok(Box::new(vec![input].into_iter()))
}
}
}
}
Expand Down
Loading