Skip to content

support index serialization self-description #743

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions examples/cpp/304_feature_enhance_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include <vsag/vsag.h>

#include <fstream>
#include <iostream>

int
Expand Down Expand Up @@ -129,6 +130,35 @@ main(int argc, char** argv) {
std::cout << "Fixed queries num: " << error_fixed << std::endl;
}

auto& index = hnsw;
vsag::Resource resource(vsag::Engine::CreateDefaultAllocator(), nullptr);
vsag::Engine engine(&resource);
/******************* Save Index to OStream *****************/
vsag::Options::Instance().set_new_version(true);
std::ofstream out_stream("/tmp/vsag-persistent-streaming.index");
auto serialize_result = index->Serialize(out_stream);
out_stream.close();
if (not serialize_result.has_value()) {
std::cerr << serialize_result.error().message << std::endl;
abort();
}

/******************* Load Index from IStream *****************/
index = nullptr;
if (auto create_index = engine.CreateIndex("hnsw", hnsw_build_paramesters);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: hnsw_build_paramesters -> hnsw_build_parameters

not create_index.has_value()) {
std::cout << "create index failed: " << create_index.error().message << std::endl;
abort();
} else {
index = *create_index;
}

std::ifstream in_stream("/tmp/vsag-persistent-streaming.index");
if (auto deserialize = index->Deserialize(in_stream); not deserialize.has_value()) {
std::cerr << "load index failed: " << deserialize.error().message << std::endl;
abort();
}

/******************* Search Hnsw Index with Conjugate Graph *****************/
auto after_enhance_parameters = R"(
{
Expand Down
54 changes: 23 additions & 31 deletions examples/cpp/401_persistent_kv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <dirent.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <vsag/vsag.h>
Expand All @@ -28,11 +29,11 @@

class LocalKvStore {
public:
LocalKvStore(const std::string& path) : path_(path), meta_filename_(path + "/" + "_meta") {
LocalKvStore(const std::string& path) : path_(path), keys_filename_(path + "/" + "_keys") {
struct stat info;
if (stat(path.c_str(), &info) != 0) {
if (mkdir(path.c_str(), 0755) != 0) {
std::cerr << "create example directory failed" << std::endl;
std::cerr << "create index directory failed" << std::endl;
abort();
}
}
Expand All @@ -50,19 +51,6 @@ class LocalKvStore {
}
value_file.write(value.c_str(), value.length());
value_file.close();

// update metadata if it's a new key
auto keys = GetKeys();
if (not keys.count(key)) {
keys.insert(key);
std::ofstream new_meta_file(meta_filename_);
while (not keys.empty()) {
auto key = *keys.begin();
new_meta_file << key << std::endl;
keys.erase(key);
}
new_meta_file.close();
}
}

std::string
Expand All @@ -87,23 +75,27 @@ class LocalKvStore {
return content;
}

// get all keys via scanning with a prefix
std::unordered_set<std::string>
GetKeys() {
std::ifstream meta_file(meta_filename_);
if (not meta_file.is_open()) {
return {};
}
std::unordered_set<std::string> keys;
std::string line;
while (std::getline(meta_file, line)) {
keys.insert(line);

// the path belong to the index
DIR* dir = opendir(path_.c_str());
if (dir != nullptr) {
struct dirent* ent;
while ((ent = readdir(dir)) != nullptr) {
if (ent->d_type == DT_REG) {
keys.insert(ent->d_name);
}
}
closedir(dir);
}
meta_file.close();
return keys;
}

private:
const std::string meta_filename_;
const std::string keys_filename_;
const std::string path_;
std::mutex mutex_;
};
Expand Down Expand Up @@ -134,14 +126,14 @@ main(int32_t argc, char** argv) {
"dtype": "float32",
"metric_type": "l2",
"dim": 128,
"hnsw": {
"max_degree": 16,
"ef_construction": 100
"index_param": {
"buckets_count": 50,
"base_quantization_type": "fp32"
}
}
)";
vsag::IndexPtr index = nullptr;
if (auto create_index = engine.CreateIndex("hnsw", index_paramesters);
if (auto create_index = engine.CreateIndex("ivf", index_paramesters);
not create_index.has_value()) {
std::cout << "create index failed: " << create_index.error().message << std::endl;
abort();
Expand Down Expand Up @@ -175,7 +167,7 @@ main(int32_t argc, char** argv) {

/******************* Load Index from KVStore *****************/
index = nullptr;
if (auto create_index = engine.CreateIndex("hnsw", index_paramesters);
if (auto create_index = engine.CreateIndex("ivf", index_paramesters);
not create_index.has_value()) {
std::cout << "create index failed: " << create_index.error().message << std::endl;
abort();
Expand Down Expand Up @@ -211,8 +203,8 @@ main(int32_t argc, char** argv) {
query->NumElements(1)->Dim(dim)->Float32Vectors(query_vector)->Owner(false);
auto search_parameters = R"(
{
"hnsw": {
"ef_search": 100
"ivf": {
"scan_buckets_count": 10
}
}
)";
Expand Down
17 changes: 10 additions & 7 deletions examples/cpp/402_persistent_streaming.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <unordered_set>

#include "vsag/binaryset.h"
#include "vsag/options.h"

int
main(int32_t argc, char** argv) {
Expand Down Expand Up @@ -52,14 +53,14 @@ main(int32_t argc, char** argv) {
"dtype": "float32",
"metric_type": "l2",
"dim": 128,
"hnsw": {
"max_degree": 16,
"ef_construction": 100
"index_param": {
"buckets_count": 50,
"base_quantization_type": "fp32"
}
}
)";
vsag::IndexPtr index = nullptr;
if (auto create_index = engine.CreateIndex("hnsw", index_paramesters);
if (auto create_index = engine.CreateIndex("ivf", index_paramesters);
not create_index.has_value()) {
std::cout << "create index failed: " << create_index.error().message << std::endl;
abort();
Expand All @@ -76,6 +77,7 @@ main(int32_t argc, char** argv) {
std::cout << "index contains vectors: " << index->GetNumElements() << std::endl;

/******************* Save Index to OStream *****************/
vsag::Options::Instance().set_new_version(true);
std::ofstream out_stream("/tmp/vsag-persistent-streaming.index");
auto serialize_result = index->Serialize(out_stream);
out_stream.close();
Expand All @@ -86,13 +88,14 @@ main(int32_t argc, char** argv) {

/******************* Load Index from IStream *****************/
index = nullptr;
if (auto create_index = engine.CreateIndex("hnsw", index_paramesters);
if (auto create_index = engine.CreateIndex("ivf", index_paramesters);
not create_index.has_value()) {
std::cout << "create index failed: " << create_index.error().message << std::endl;
abort();
} else {
index = *create_index;
}

std::ifstream in_stream("/tmp/vsag-persistent-streaming.index");
if (auto deserialize = index->Deserialize(in_stream); not deserialize.has_value()) {
std::cerr << "load index failed: " << deserialize.error().message << std::endl;
Expand All @@ -109,8 +112,8 @@ main(int32_t argc, char** argv) {
query->NumElements(1)->Dim(dim)->Float32Vectors(query_vector)->Owner(false);
auto search_parameters = R"(
{
"hnsw": {
"ef_search": 100
"ivf": {
"scan_buckets_count": 10
}
}
)";
Expand Down
10 changes: 5 additions & 5 deletions extern/diskann/DiskANN/src/pq_flash_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1433,16 +1433,16 @@ int PQFlashIndex<T, LabelT>::load_from_separate_paths(std::stringstream &pivots_
<< ". Will not output it at search time." << std::endl;
}

// diskann::cout << "Disk-Index File Meta-data: ";
// diskann::cout << "# nodes per sector: " << nnodes_per_sector;
// diskann::cout << ", max node len (bytes): " << max_node_len;
// diskann::cout << ", max node degree: " << max_degree << std::endl;
diskann::cout << "Disk-Index File Meta-data: ";
diskann::cout << "# nodes per sector: " << nnodes_per_sector;
diskann::cout << ", max node len (bytes): " << max_node_len;
diskann::cout << ", max node degree: " << max_degree << std::endl;
num_medoids = 1;
medoids = new uint32_t[1];
medoids[0] = (uint32_t)(medoid_id_on_file);

use_medoids_data_as_centroids();
// diskann::cout << "done.." << std::endl;
diskann::cout << "done.." << std::endl;
return 0;
}

Expand Down
5 changes: 5 additions & 0 deletions include/vsag/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,9 @@ extern const char* const IVF_PRECISE_QUANTIZATION_TYPE;
extern const char* const IVF_PRECISE_IO_TYPE;
extern const char* const IVF_PRECISE_FILE_PATH;

// serialization
extern const char* const SERIAL_MAGIC_BEGIN;
extern const char* const SERIAL_MAGIC_END;
extern const char* const SERIAL_META_KEY;

} // namespace vsag
15 changes: 15 additions & 0 deletions include/vsag/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <mutex>
#include <string>

#include "utils/function_exists_check.h"
#include "vsag/allocator.h"
#include "vsag/logger.h"

Expand All @@ -43,6 +44,20 @@ class Options {
static Options&
Instance();

public:
inline bool
new_version() const {
return new_version_;
}

inline void
set_new_version(bool new_version) {
new_version_ = new_version;
}

private:
bool new_version_ = true;

public:
/**
* @brief Gets the number of threads for IO operations.
Expand Down
47 changes: 41 additions & 6 deletions src/algorithm/brute_force.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "data_cell/flatten_datacell.h"
#include "inner_string_params.h"
#include "serialization.h"
#include "utils/slow_task_timer.h"
#include "utils/standard_heap.h"
#include "utils/util_functions.h"
Expand Down Expand Up @@ -187,17 +188,47 @@ BruteForce::CalcDistanceById(const float* vector, int64_t id) const {

void
BruteForce::Serialize(StreamWriter& writer) const {
StreamWriter::WriteObj(writer, dim_);
StreamWriter::WriteObj(writer, total_count_);
// basic info moved to metadata since version 0.15
// only for test
if (Options::Instance().new_version()) {
// do nothing
} else {
StreamWriter::WriteObj(writer, dim_);
StreamWriter::WriteObj(writer, total_count_);
}

this->inner_codes_->Serialize(writer);
this->label_table_->Serialize(writer);

// serialize footer (introduce since v0.15)
if (Options::Instance().new_version()) {
auto metadata = std::make_shared<Metadata>();
JsonType basic_info;
basic_info["dim"] = dim_;
basic_info["total_count"] = total_count_;
metadata->Set("basic_info", basic_info);
auto footer = std::make_shared<Footer>(metadata);
footer->Write(writer);
}
}

void
BruteForce::Deserialize(StreamReader& reader) {
StreamReader::ReadObj(reader, dim_);
StreamReader::ReadObj(reader, total_count_);
// try to deserialize footer (only in new version)
auto footer = Footer::Parse(reader);
if (footer != nullptr) {
logger::debug("parse with new version format");
auto metadata = footer->GetMetadata();
auto basic_info = metadata->Get("basic_info");
dim_ = basic_info["dim"];
total_count_ = basic_info["total_count"];
} else {
logger::debug("parse with v0.13 version format");

StreamReader::ReadObj(reader, dim_);
StreamReader::ReadObj(reader, total_count_);
}

this->inner_codes_->Deserialize(reader);
this->label_table_->Deserialize(reader);
}
Expand All @@ -216,16 +247,19 @@ BruteForce::InitFeatures() {
IndexFeature::SUPPORT_RANGE_SEARCH_WITH_ID_FILTER,
});
}
// Add & Build

// add & build
this->index_feature_list_->SetFeatures({
IndexFeature::SUPPORT_BUILD,
IndexFeature::SUPPORT_ADD_AFTER_BUILD,
});
// Search

// search
this->index_feature_list_->SetFeatures({
IndexFeature::SUPPORT_KNN_SEARCH,
IndexFeature::SUPPORT_KNN_SEARCH_WITH_ID_FILTER,
});

// concurrency
this->index_feature_list_->SetFeatures({
IndexFeature::SUPPORT_SEARCH_CONCURRENT,
Expand All @@ -240,6 +274,7 @@ BruteForce::InitFeatures() {
IndexFeature::SUPPORT_SERIALIZE_BINARY_SET,
IndexFeature::SUPPORT_SERIALIZE_FILE,
});

// others
this->index_feature_list_->SetFeatures({
IndexFeature::SUPPORT_ESTIMATE_MEMORY,
Expand Down
2 changes: 2 additions & 0 deletions src/algorithm/brute_force.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "typing.h"

namespace vsag {

// introduce since v0.13
class BruteForce : public InnerIndexInterface {
public:
static ParamPtr
Expand Down
Loading
Loading