Skip to content

Rework rel group #5280

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
May 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.15)

project(Kuzu VERSION 0.10.0.2 LANGUAGES CXX C)
project(Kuzu VERSION 0.10.0.3 LANGUAGES CXX C)

option(SINGLE_THREADED "Single-threaded mode" FALSE)
if(SINGLE_THREADED)
Expand Down
10 changes: 5 additions & 5 deletions dataset/rel-group/copy.cypher
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
COPY personA FROM "dataset/rel-group/node.csv" ;
COPY personB FROM "dataset/rel-group/node.csv" ;
COPY personC FROM "dataset/rel-group/node.csv" ;
COPY knows_personA_personB FROM "dataset/rel-group/edge.csv";
COPY knows_personA_personC FROM "dataset/rel-group/edge.csv";
COPY knows_personB_personC FROM "dataset/rel-group/edge.csv";
COPY likes_personA_personB FROM "dataset/rel-group/edge.csv";
COPY likes_personB_personA FROM "dataset/rel-group/edge.csv";
COPY knows FROM "dataset/rel-group/edge.csv" (from='personA', to='personB');
COPY knows FROM "dataset/rel-group/edge.csv" (from='personA', to='personC');
COPY knows FROM "dataset/rel-group/edge.csv" (from='personB', to='personC');
COPY likes FROM "dataset/rel-group/edge.csv" (from='personA', to='personB');
COPY likes FROM "dataset/rel-group/edge.csv" (from='personB', to='personA');
2 changes: 1 addition & 1 deletion dataset/tck-likes/schema.cypher
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ CREATE NODE TABLE C(ID SERIAL, name STRING, PRIMARY KEY(ID));
CREATE NODE TABLE D(ID SERIAL, name STRING, PRIMARY KEY(ID));
CREATE NODE TABLE E(ID SERIAL, name STRING, PRIMARY KEY(ID));
CREATE REL TABLE LIKES(FROM A TO B, FROM B TO C, FROM C TO D, FROM D TO E, FROM B TO A);
CREATE (n0:A {name: 'n0'}), (n00:B {name: 'n00'}), (n01:B {name: 'n01'}), (n000:C {name: 'n000'}), (n001:C {name: 'n001'}), (n010:C {name: 'n010'}), (n011:C {name: 'n011'}), (n0000:D {name: 'n0000'}), (n0001:D {name: 'n0001'}), (n0010:D {name: 'n0010'}), (n0011:D {name: 'n0011'}), (n0100:D {name: 'n0100'}), (n0101:D {name: 'n0101'}), (n0110:D {name: 'n0110'}), (n0111:D {name: 'n0111'}) CREATE (n0)-[:LIKES_A_B]->(n00), (n0)-[:LIKES_A_B]->(n01), (n00)-[:LIKES_B_C]->(n000), (n00)-[:LIKES_B_C]->(n001), (n01)-[:LIKES_B_C]->(n010), (n01)-[:LIKES_B_C]->(n011), (n000)-[:LIKES_C_D]->(n0000), (n000)-[:LIKES_C_D]->(n0001), (n001)-[:LIKES_C_D]->(n0010), (n001)-[:LIKES_C_D]->(n0011), (n010)-[:LIKES_C_D]->(n0100), (n010)-[:LIKES_C_D]->(n0101), (n011)-[:LIKES_C_D]->(n0110), (n011)-[:LIKES_C_D]->(n0111);
CREATE (n0:A {name: 'n0'}), (n00:B {name: 'n00'}), (n01:B {name: 'n01'}), (n000:C {name: 'n000'}), (n001:C {name: 'n001'}), (n010:C {name: 'n010'}), (n011:C {name: 'n011'}), (n0000:D {name: 'n0000'}), (n0001:D {name: 'n0001'}), (n0010:D {name: 'n0010'}), (n0011:D {name: 'n0011'}), (n0100:D {name: 'n0100'}), (n0101:D {name: 'n0101'}), (n0110:D {name: 'n0110'}), (n0111:D {name: 'n0111'}) CREATE (n0)-[:LIKES]->(n00), (n0)-[:LIKES]->(n01), (n00)-[:LIKES]->(n000), (n00)-[:LIKES]->(n001), (n01)-[:LIKES]->(n010), (n01)-[:LIKES]->(n011), (n000)-[:LIKES]->(n0000), (n000)-[:LIKES]->(n0001), (n001)-[:LIKES]->(n0010), (n001)-[:LIKES]->(n0011), (n010)-[:LIKES]->(n0100), (n010)-[:LIKES]->(n0101), (n011)-[:LIKES]->(n0110), (n011)-[:LIKES]->(n0111);
6 changes: 4 additions & 2 deletions extension/algo/src/function/louvain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,9 +491,11 @@ class WriteResultsVC final : public GDSResultVertexCompute {

void initInMemoryGraph(const table_id_t tableId, const offset_t numNodes, Graph* graph,
PhaseState& state) {
const auto nbrTables = graph->getForwardNbrTableInfos(tableId);
const auto nbrTables = graph->getRelInfos(tableId);
const auto nbrInfo = nbrTables[0];
const auto scanState = graph->prepareRelScan(nbrInfo.relEntry, nbrInfo.nodeEntry, {});
KU_ASSERT(nbrInfo.srcTableID == nbrInfo.dstTableID);
const auto scanState =
graph->prepareRelScan(*nbrInfo.relGroupEntry, nbrInfo.relTableID, nbrInfo.dstTableID, {});

for (auto nodeId = 0u; nodeId < numNodes; ++nodeId) {
state.initNextNode(nodeId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,12 @@ class Kosaraju {
: graph{graph}, visitedState{visitedState}, visitedOrder{visitedOrder} {
KU_ASSERT(graph->getNodeTableIDs().size() == 1);
nodeTableID = graph->getNodeTableIDs()[0];
auto nbrInfos = graph->getForwardNbrTableInfos(nodeTableID);
auto nbrInfos = graph->getRelInfos(nodeTableID);
KU_ASSERT(nbrInfos.size() == 1);
auto nbrInfo = nbrInfos[0];
scanState = graph->prepareRelScan(nbrInfo.relEntry, nbrInfo.nodeEntry, {});
KU_ASSERT(nbrInfo.srcTableID == nbrInfo.dstTableID);
scanState = graph->prepareRelScan(*nbrInfo.relGroupEntry, nbrInfo.relTableID,
nbrInfo.dstTableID, {});
}

void compute(const offset_t maxOffset, NodeOffsetMaskMap* map, ExecutionContext* context) {
Expand Down
1 change: 0 additions & 1 deletion extension/fts/src/function/query_fts_bind_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include "catalog/fts_index_catalog_entry.h"
#include "common/exception/binder.h"
#include "common/string_utils.h"
#include "function/fts_utils.h"
#include "function/stem.h"
#include "libstemmer.h"
#include "re2.h"
Expand Down
9 changes: 6 additions & 3 deletions extension/fts/src/include/function/query_fts_bind_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ struct QueryFTSBindData final : function::GDSBindData {
const catalog::IndexCatalogEntry& entry, QueryFTSOptionalParams optionalParams)
: GDSBindData{std::move(columns), std::move(graphEntry), std::move(docs)},
query{std::move(query)}, entry{entry}, optionalParams{std::move(optionalParams)},
outputTableID{
nodeOutput->constCast<binder::NodeExpression>().getSingleEntry()->getTableID()} {}
outputTableID{nodeOutput->constCast<binder::NodeExpression>().getTableIDs()[0]} {
auto& nodeExpr = nodeOutput->constCast<binder::NodeExpression>();
KU_ASSERT(nodeExpr.getNumEntries() == 1);
outputTableID = nodeExpr.getEntry(0)->getTableID();
}
QueryFTSBindData(const QueryFTSBindData& other)
: GDSBindData{other}, query{other.query}, entry{other.entry},
optionalParams{other.optionalParams}, outputTableID{other.outputTableID} {}
Expand All @@ -39,7 +42,7 @@ struct QueryFTSBindData final : function::GDSBindData {

QueryFTSConfig getConfig() const { return optionalParams.getConfig(); }

std::unique_ptr<function::TableFuncBindData> copy() const override {
std::unique_ptr<TableFuncBindData> copy() const override {
return std::make_unique<QueryFTSBindData>(*this);
}
};
Expand Down
6 changes: 3 additions & 3 deletions extension/fts/test/test_files/fts_small.test
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,9 @@ Binder exception: Table 0_docIdx_docs does not exist.
0|doc|NODE|local(kuzu)|
1|city_stopwords|NODE|local(kuzu)|
2|name_stopwords|NODE|local(kuzu)|
3|likes|REL|local(kuzu)|
4|INVALID_STOPWORDS|NODE|local(kuzu)|
5|INVALID_STOPWORDS_COL_TYPE|NODE|local(kuzu)|
4|likes|REL|local(kuzu)|
5|INVALID_STOPWORDS|NODE|local(kuzu)|
6|INVALID_STOPWORDS_COL_TYPE|NODE|local(kuzu)|
-STATEMENT DROP TABLE `0_docIdx_docs`
---- error
Binder exception: Table 0_docIdx_docs does not exist.
Expand Down
10 changes: 5 additions & 5 deletions extension/httpfs/test/test_files/http.test
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ Guelph|75000
0|person|NODE|test(KUZU)|
1|organisation|NODE|test(KUZU)|
2|movies|NODE|test(KUZU)|
3|knows|REL|test(KUZU)|
4|studyAt|REL|test(KUZU)|
5|workAt|REL|test(KUZU)|
6|meets|REL|test(KUZU)|
7|marries|REL|test(KUZU)|
4|knows|REL|test(KUZU)|
6|studyAt|REL|test(KUZU)|
8|workAt|REL|test(KUZU)|
10|meets|REL|test(KUZU)|
12|marries|REL|test(KUZU)|

-LOG QueryRemoteNodeTable
-STATEMENT match (p:person) return p.*
Expand Down
30 changes: 18 additions & 12 deletions extension/vector/src/function/create_hnsw_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,19 @@ static std::unique_ptr<PhysicalOperator> getPhysicalPlan(PlanMapper* planMapper,
->ptrCast<storage::NodeTable>();
auto nodeTableID = nodeTable->getTableID();
auto upperRelTableEntry =
clientContext->getCatalog()->getTableCatalogEntry(clientContext->getTransaction(),
HNSWIndexUtils::getUpperGraphTableName(nodeTableID, indexName));
auto upperRelTable =
storageManager->getTable(upperRelTableEntry->getTableID())->ptrCast<storage::RelTable>();
clientContext->getCatalog()
->getTableCatalogEntry(clientContext->getTransaction(),
HNSWIndexUtils::getUpperGraphTableName(nodeTableID, indexName))
->ptrCast<catalog::RelGroupCatalogEntry>();
auto upperRelTable = storageManager->getTable(upperRelTableEntry->getSingleRelEntryInfo().oid)
->ptrCast<storage::RelTable>();
auto lowerRelTableEntry =
clientContext->getCatalog()->getTableCatalogEntry(clientContext->getTransaction(),
HNSWIndexUtils::getLowerGraphTableName(nodeTableID, indexName));
auto lowerRelTable =
storageManager->getTable(lowerRelTableEntry->getTableID())->ptrCast<storage::RelTable>();
clientContext->getCatalog()
->getTableCatalogEntry(clientContext->getTransaction(),
HNSWIndexUtils::getLowerGraphTableName(nodeTableID, indexName))
->ptrCast<catalog::RelGroupCatalogEntry>();
auto lowerRelTable = storageManager->getTable(lowerRelTableEntry->getSingleRelEntryInfo().oid)
->ptrCast<storage::RelTable>();
// Initialize partitioner shared state.
const auto partitionerSharedState = finalizeFuncSharedState->partitionerSharedState;
partitionerSharedState->setTables(nodeTable, upperRelTable);
Expand Down Expand Up @@ -181,17 +185,19 @@ static std::unique_ptr<PhysicalOperator> getPhysicalPlan(PlanMapper* planMapper,
fTable, &storageManager->getWAL(), clientContext->getMemoryManager());
auto copyRelUpper = planMapper->createRelBatchInsertOp(clientContext,
partitionerSharedState->upperPartitionerSharedState, upperBatchInsertSharedState,
upperCopyFromInfo, logicalOp->getSchema(), RelDataDirection::FWD, columnIDs,
LogicalType::copy(columnTypes), planMapper->getOperatorID());
upperCopyFromInfo, logicalOp->getSchema(), RelDataDirection::FWD,
upperRelTable->getTableID(), nodeTableID, columnIDs, LogicalType::copy(columnTypes),
planMapper->getOperatorID());
binder::BoundCopyFromInfo lowerCopyFromInfo(lowerRelTableEntry, nullptr, nullptr, {}, {},
nullptr);
lowerCopyFromInfo.tableEntry = lowerRelTableEntry;
const auto lowerBatchInsertSharedState = std::make_shared<BatchInsertSharedState>(lowerRelTable,
fTable, &storageManager->getWAL(), clientContext->getMemoryManager());
auto copyRelLower = planMapper->createRelBatchInsertOp(clientContext,
partitionerSharedState->lowerPartitionerSharedState, lowerBatchInsertSharedState,
lowerCopyFromInfo, logicalOp->getSchema(), RelDataDirection::FWD, columnIDs,
LogicalType::copy(columnTypes), planMapper->getOperatorID());
lowerCopyFromInfo, logicalOp->getSchema(), RelDataDirection::FWD,
lowerRelTable->getTableID(), nodeTableID, columnIDs, LogicalType::copy(columnTypes),
planMapper->getOperatorID());
physical_op_vector_t children;
children.push_back(std::move(copyRelUpper));
children.push_back(std::move(copyRelLower));
Expand Down
16 changes: 8 additions & 8 deletions extension/vector/src/function/query_hnsw_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ std::unique_ptr<TableFuncBindData> QueryHNSWIndexBindData::copy() const {
bindData->nodeTableEntry = nodeTableEntry;
bindData->indexEntry = indexEntry;
bindData->indexColumnID = indexColumnID;
bindData->upperHNSWRelTableEntry = upperHNSWRelTableEntry;
bindData->lowerHNSWRelTableEntry = lowerHNSWRelTableEntry;
bindData->upperRelTableEntry = upperRelTableEntry;
bindData->lowerRelTableEntry = lowerRelTableEntry;
bindData->config = config;
bindData->queryExpression = queryExpression;
bindData->kExpression = kExpression;
Expand Down Expand Up @@ -118,12 +118,12 @@ static std::unique_ptr<TableFuncBindData> bindFunc(main::ClientContext* context,
LogicalTypeID::ARRAY);
bindData->indexColumnID = nodeTableEntry->getColumnID(propertyID);
const auto& auxInfo = bindData->indexEntry->getAuxInfo().cast<HNSWIndexAuxInfo>();
bindData->upperHNSWRelTableEntry =
bindData->upperRelTableEntry =
catalog->getTableCatalogEntry(transaction, auxInfo.upperRelTableID)
->ptrCast<RelTableCatalogEntry>();
bindData->lowerHNSWRelTableEntry =
->ptrCast<RelGroupCatalogEntry>();
bindData->lowerRelTableEntry =
catalog->getTableCatalogEntry(transaction, auxInfo.lowerRelTableID)
->ptrCast<RelTableCatalogEntry>();
->ptrCast<RelGroupCatalogEntry>();
bindData->config = QueryHNSWConfig{input->optionalParams};
bindData->queryExpression = input->params[2];
bindData->kExpression = input->params[3];
Expand Down Expand Up @@ -179,8 +179,8 @@ static offset_t tableFunc(const TableFuncInput& input, TableFuncOutput& output)
// We start searching when there is no query result to output.
const auto& auxInfo = bindData->indexEntry->getAuxInfo().cast<HNSWIndexAuxInfo>();
const auto index = std::make_unique<OnDiskHNSWIndex>(input.context->clientContext,
bindData->nodeTableEntry, bindData->indexColumnID, bindData->upperHNSWRelTableEntry,
bindData->lowerHNSWRelTableEntry, auxInfo.config.copy());
bindData->nodeTableEntry, bindData->indexColumnID, bindData->upperRelTableEntry,
bindData->lowerRelTableEntry, auxInfo.config.copy());
index->setDefaultUpperEntryPoint(auxInfo.upperEntryPoint);
index->setDefaultLowerEntryPoint(auxInfo.lowerEntryPoint);
const auto dimension = ArrayType::getNumElements(
Expand Down
4 changes: 2 additions & 2 deletions extension/vector/src/include/function/hnsw_index_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ struct QueryHNSWIndexBindData final : function::TableFuncBindData {
catalog::NodeTableCatalogEntry* nodeTableEntry = nullptr;
catalog::IndexCatalogEntry* indexEntry = nullptr;
common::column_id_t indexColumnID = common::INVALID_COLUMN_ID;
catalog::RelTableCatalogEntry* upperHNSWRelTableEntry = nullptr;
catalog::RelTableCatalogEntry* lowerHNSWRelTableEntry = nullptr;
catalog::RelGroupCatalogEntry* upperRelTableEntry = nullptr;
catalog::RelGroupCatalogEntry* lowerRelTableEntry = nullptr;

QueryHNSWConfig config;

Expand Down
8 changes: 4 additions & 4 deletions extension/vector/src/include/index/hnsw_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ struct HNSWSearchState {
class OnDiskHNSWIndex final : public HNSWIndex {
public:
OnDiskHNSWIndex(main::ClientContext* context, catalog::NodeTableCatalogEntry* nodeTableEntry,
common::column_id_t columnID, catalog::RelTableCatalogEntry* upperRelTableEntry,
catalog::RelTableCatalogEntry* lowerRelTableEntry, HNSWIndexConfig config);
common::column_id_t columnID, catalog::RelGroupCatalogEntry* upperRelTableEntry,
catalog::RelGroupCatalogEntry* lowerRelTableEntry, HNSWIndexConfig config);

void setDefaultUpperEntryPoint(common::offset_t offset) {
defaultUpperEntryPoint.store(offset);
Expand Down Expand Up @@ -273,8 +273,8 @@ class OnDiskHNSWIndex final : public HNSWIndex {
static constexpr uint64_t FILTERED_SEARCH_INITIAL_CANDIDATES = 10;

common::table_id_t nodeTableID;
catalog::RelTableCatalogEntry* upperRelTableEntry;
catalog::RelTableCatalogEntry* lowerRelTableEntry;
catalog::RelGroupCatalogEntry* upperRelTableEntry;
catalog::RelGroupCatalogEntry* lowerRelTableEntry;

// The search starts in the upper layer to find the closest node, which serves as the entry
// point for the lower layer search. If the upper layer does not return a valid entry point,
Expand Down
15 changes: 10 additions & 5 deletions extension/vector/src/index/hnsw_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ void InMemHNSWIndex::finalize(MemoryManager& mm, common::node_group_idx_t nodeGr

OnDiskHNSWIndex::OnDiskHNSWIndex(main::ClientContext* context,
catalog::NodeTableCatalogEntry* nodeTableEntry, common::column_id_t columnID,
catalog::RelTableCatalogEntry* upperRelTableEntry,
catalog::RelTableCatalogEntry* lowerRelTableEntry, HNSWIndexConfig config)
catalog::RelGroupCatalogEntry* upperRelTableEntry,
catalog::RelGroupCatalogEntry* lowerRelTableEntry, HNSWIndexConfig config)
: HNSWIndex{std::move(config), getArrayTypeInfo(context->getStorageManager()
->getTable(nodeTableEntry->getTableID())
->cast<NodeTable>(),
Expand Down Expand Up @@ -314,7 +314,9 @@ common::offset_t OnDiskHNSWIndex::searchNNInUpperLayer(transaction::Transaction*
auto minDist = metricFunc(queryVector, currNodeVector, embeddings->getDimension());
KU_ASSERT(lastMinDist >= 0);
KU_ASSERT(minDist >= 0);
auto scanState = upperGraph->prepareRelScan(upperRelTableEntry);
auto relEntryInfo = upperRelTableEntry->getSingleRelEntryInfo();
auto scanState = upperGraph->prepareRelScan(*upperRelTableEntry, relEntryInfo.oid,
relEntryInfo.nodePair.dstTableID, {} /* relProperties */);
while (minDist < lastMinDist) {
lastMinDist = minDist;
auto neighborItr =
Expand All @@ -338,11 +340,14 @@ common::offset_t OnDiskHNSWIndex::searchNNInUpperLayer(transaction::Transaction*
void OnDiskHNSWIndex::initLowerLayerSearchState(transaction::Transaction* transaction,
HNSWSearchState& searchState) const {
searchState.visited.reset();
searchState.nbrScanState = lowerGraph->prepareRelScan(lowerRelTableEntry);
auto relEntryInfo = lowerRelTableEntry->getSingleRelEntryInfo();
searchState.nbrScanState = lowerGraph->prepareRelScan(*lowerRelTableEntry, relEntryInfo.oid,
relEntryInfo.nodePair.dstTableID, {} /* relProperties */);
searchState.searchType = getFilteredSearchType(transaction, searchState);
if (searchState.searchType == SearchType::BLIND_TWO_HOP ||
searchState.searchType == SearchType::DIRECTED_TWO_HOP) {
searchState.secondHopNbrScanState = lowerGraph->prepareRelScan(lowerRelTableEntry);
searchState.secondHopNbrScanState = lowerGraph->prepareRelScan(*lowerRelTableEntry,
relEntryInfo.oid, relEntryInfo.nodePair.dstTableID, {} /* relProperties */);
}
}

Expand Down
Loading
Loading