Skip to content

Commit 3b72dc8

Browse files
committed
support index serialization self-description
Signed-off-by: Xiangyu Wang <[email protected]>
1 parent 74309d9 commit 3b72dc8

File tree

14 files changed

+518
-51
lines changed

14 files changed

+518
-51
lines changed

examples/cpp/402_persistent_streaming.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <unordered_set>
2626

2727
#include "vsag/binaryset.h"
28+
#include "vsag/options.h"
2829

2930
int
3031
main(int32_t argc, char** argv) {
@@ -52,14 +53,15 @@ main(int32_t argc, char** argv) {
5253
"dtype": "float32",
5354
"metric_type": "l2",
5455
"dim": 128,
55-
"hnsw": {
56+
"index_param": {
57+
"base_quantization_type": "fp32",
5658
"max_degree": 16,
5759
"ef_construction": 100
5860
}
5961
}
6062
)";
6163
vsag::IndexPtr index = nullptr;
62-
if (auto create_index = engine.CreateIndex("hnsw", index_paramesters);
64+
if (auto create_index = engine.CreateIndex("hgraph", index_paramesters);
6365
not create_index.has_value()) {
6466
std::cout << "create index failed: " << create_index.error().message << std::endl;
6567
abort();
@@ -76,6 +78,7 @@ main(int32_t argc, char** argv) {
7678
std::cout << "index contains vectors: " << index->GetNumElements() << std::endl;
7779

7880
/******************* Save Index to OStream *****************/
81+
vsag::Options::Instance().set_new_version(true);
7982
std::ofstream out_stream("/tmp/vsag-persistent-streaming.index");
8083
auto serialize_result = index->Serialize(out_stream);
8184
out_stream.close();
@@ -86,13 +89,14 @@ main(int32_t argc, char** argv) {
8689

8790
/******************* Load Index from IStream *****************/
8891
index = nullptr;
89-
if (auto create_index = engine.CreateIndex("hnsw", index_paramesters);
92+
if (auto create_index = engine.CreateIndex("hgraph", index_paramesters);
9093
not create_index.has_value()) {
9194
std::cout << "create index failed: " << create_index.error().message << std::endl;
9295
abort();
9396
} else {
9497
index = *create_index;
9598
}
99+
96100
std::ifstream in_stream("/tmp/vsag-persistent-streaming.index");
97101
if (auto deserialize = index->Deserialize(in_stream); not deserialize.has_value()) {
98102
std::cerr << "load index failed: " << deserialize.error().message << std::endl;
@@ -109,7 +113,7 @@ main(int32_t argc, char** argv) {
109113
query->NumElements(1)->Dim(dim)->Float32Vectors(query_vector)->Owner(false);
110114
auto search_parameters = R"(
111115
{
112-
"hnsw": {
116+
"hgraph": {
113117
"ef_search": 100
114118
}
115119
}

include/vsag/constants.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,4 +165,8 @@ extern const char* const IVF_PRECISE_QUANTIZATION_TYPE;
165165
extern const char* const IVF_PRECISE_IO_TYPE;
166166
extern const char* const IVF_PRECISE_FILE_PATH;
167167

168+
// serialization
169+
extern const char* const SERIAL_MAGIC_BEGIN;
170+
extern const char* const SERIAL_MAGIC_END;
171+
168172
} // namespace vsag

include/vsag/options.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <mutex>
2020
#include <string>
2121

22+
#include "utils/function_exists_check.h"
2223
#include "vsag/allocator.h"
2324
#include "vsag/logger.h"
2425

@@ -43,6 +44,20 @@ class Options {
4344
static Options&
4445
Instance();
4546

47+
public:
48+
inline bool
49+
new_version() const {
50+
return new_version_;
51+
}
52+
53+
inline void
54+
set_new_version(bool new_version) {
55+
new_version_ = new_version;
56+
}
57+
58+
private:
59+
bool new_version_ = true;
60+
4661
public:
4762
/**
4863
* @brief Gets the number of threads for IO operations.

src/algorithm/hgraph.cpp

Lines changed: 137 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,13 @@
2727
#include "impl/odescent_graph_builder.h"
2828
#include "impl/pruning_strategy.h"
2929
#include "index/iterator_filter.h"
30+
#include "logger.h"
31+
#include "serialization.h"
32+
#include "stream_reader.h"
33+
#include "typing.h"
3034
#include "utils/standard_heap.h"
3135
#include "utils/util_functions.h"
36+
#include "vsag/options.h"
3237

3338
namespace vsag {
3439

@@ -607,7 +612,7 @@ HGraph::RangeSearch(const DatasetPtr& query,
607612
}
608613

609614
void
610-
HGraph::serialize_basic_info(StreamWriter& writer) const {
615+
HGraph::serialize_basic_info_v0_14(StreamWriter& writer) const {
611616
StreamWriter::WriteObj(writer, this->use_reorder_);
612617
StreamWriter::WriteObj(writer, this->dim_);
613618
StreamWriter::WriteObj(writer, this->metric_);
@@ -629,12 +634,120 @@ HGraph::serialize_basic_info(StreamWriter& writer) const {
629634
}
630635
}
631636

637+
void
638+
HGraph::deserialize_basic_info_v0_14(StreamReader& reader) {
639+
StreamReader::ReadObj(reader, this->use_reorder_);
640+
StreamReader::ReadObj(reader, this->dim_);
641+
StreamReader::ReadObj(reader, this->metric_);
642+
uint64_t max_level;
643+
StreamReader::ReadObj(reader, max_level);
644+
for (uint64_t i = 0; i < max_level; ++i) {
645+
this->route_graphs_.emplace_back(this->generate_one_route_graph());
646+
}
647+
StreamReader::ReadObj(reader, this->entry_point_id_);
648+
StreamReader::ReadObj(reader, this->ef_construct_);
649+
StreamReader::ReadObj(reader, this->mult_);
650+
InnerIdType capacity;
651+
StreamReader::ReadObj(reader, capacity);
652+
this->max_capacity_.store(capacity);
653+
StreamReader::ReadVector(reader, this->label_table_->label_table_);
654+
655+
uint64_t size;
656+
StreamReader::ReadObj(reader, size);
657+
for (uint64_t i = 0; i < size; ++i) {
658+
LabelType key;
659+
StreamReader::ReadObj(reader, key);
660+
InnerIdType value;
661+
StreamReader::ReadObj(reader, value);
662+
this->label_table_->label_remap_.emplace(key, value);
663+
}
664+
}
665+
666+
#define TO_JSON(json_obj, var) json_obj[#var] = this->var##_;
667+
668+
#define TO_JSON_BASE64(json_obj, var) json_obj[#var] = base64_encode_obj(this->var##_);
669+
670+
#define TO_JSON_ATOMIC(json_obj, var) json_obj[#var] = this->var##_.load();
671+
672+
JsonType
673+
HGraph::serialize_basic_info() const {
674+
JsonType jsonify_basic_info;
675+
TO_JSON(jsonify_basic_info, use_reorder);
676+
TO_JSON(jsonify_basic_info, dim);
677+
TO_JSON(jsonify_basic_info, metric);
678+
TO_JSON(jsonify_basic_info, entry_point_id);
679+
TO_JSON(jsonify_basic_info, ef_construct);
680+
// logger::debug("mult: {}", this->mult_);
681+
TO_JSON_BASE64(jsonify_basic_info, mult);
682+
TO_JSON_ATOMIC(jsonify_basic_info, max_capacity);
683+
jsonify_basic_info["max_level"] = this->route_graphs_.size();
684+
685+
return jsonify_basic_info;
686+
}
687+
688+
#define FROM_JSON(json_obj, var) this->var##_ = (json_obj)[#var];
689+
690+
#define FROM_JSON_BASE64(json_obj, var) base64_decode_obj((json_obj)[#var], this->var##_);
691+
692+
#define FROM_JSON_ATOMIC(json_obj, var) this->var##_.store((json_obj)[#var]);
693+
694+
void
695+
HGraph::deserialize_basic_info(JsonType jsonify_basic_info) {
696+
FROM_JSON(jsonify_basic_info, use_reorder);
697+
FROM_JSON(jsonify_basic_info, dim);
698+
FROM_JSON(jsonify_basic_info, metric);
699+
FROM_JSON(jsonify_basic_info, entry_point_id);
700+
FROM_JSON(jsonify_basic_info, ef_construct);
701+
FROM_JSON_BASE64(jsonify_basic_info, mult);
702+
// logger::debug("mult: {}", this->mult_);
703+
FROM_JSON_ATOMIC(jsonify_basic_info, max_capacity);
704+
705+
uint64_t max_level = jsonify_basic_info["max_level"];
706+
for (uint64_t i = 0; i < max_level; ++i) {
707+
this->route_graphs_.emplace_back(this->generate_one_route_graph());
708+
}
709+
}
710+
711+
void
712+
HGraph::serialize_label_info(StreamWriter& writer) const {
713+
StreamWriter::WriteVector(writer, this->label_table_->label_table_);
714+
uint64_t size = this->label_table_->label_remap_.size();
715+
StreamWriter::WriteObj(writer, size);
716+
for (const auto& pair : this->label_table_->label_remap_) {
717+
auto key = pair.first;
718+
StreamWriter::WriteObj(writer, key);
719+
StreamWriter::WriteObj(writer, pair.second);
720+
}
721+
}
722+
723+
void
724+
HGraph::deserialize_label_info(StreamReader& reader) const {
725+
StreamReader::ReadVector(reader, this->label_table_->label_table_);
726+
uint64_t size;
727+
StreamReader::ReadObj(reader, size);
728+
for (uint64_t i = 0; i < size; ++i) {
729+
LabelType key;
730+
StreamReader::ReadObj(reader, key);
731+
InnerIdType value;
732+
StreamReader::ReadObj(reader, value);
733+
this->label_table_->label_remap_.emplace(key, value);
734+
}
735+
}
736+
632737
void
633738
HGraph::Serialize(StreamWriter& writer) const {
634739
if (this->ignore_reorder_) {
635740
this->use_reorder_ = false;
636741
}
637-
this->serialize_basic_info(writer);
742+
743+
// basic info moved to metadata since version 0.15
744+
// only for test
745+
if (Options::Instance().new_version()) {
746+
this->serialize_label_info(writer);
747+
} else {
748+
this->serialize_basic_info_v0_14(writer);
749+
}
750+
638751
this->basic_flatten_codes_->Serialize(writer);
639752
this->bottom_graph_->Serialize(writer);
640753
if (this->use_reorder_) {
@@ -646,11 +759,32 @@ HGraph::Serialize(StreamWriter& writer) const {
646759
if (this->extra_info_size_ > 0 && this->extra_infos_ != nullptr) {
647760
this->extra_infos_->Serialize(writer);
648761
}
762+
763+
// serialize footer (introduce since v0.15)
764+
if (Options::Instance().new_version()) {
765+
auto metadata = std::make_shared<Metadata>();
766+
auto jsonify_basic_info = this->serialize_basic_info();
767+
metadata->Set("basic_info", jsonify_basic_info);
768+
logger::debug(jsonify_basic_info.dump());
769+
auto footer = std::make_shared<Footer>(metadata);
770+
footer->Write(writer);
771+
}
649772
}
650773

651774
void
652775
HGraph::Deserialize(StreamReader& reader) {
653-
this->deserialize_basic_info(reader);
776+
// try to deserialize footer (only in new version)
777+
auto footer = Footer::Parse(reader);
778+
if (footer != nullptr) {
779+
logger::debug("parse with new version format");
780+
auto metadata = footer->GetMetadata();
781+
this->deserialize_basic_info(metadata->Get("basic_info"));
782+
this->deserialize_label_info(reader);
783+
} else {
784+
logger::debug("parse with v0.14 version format");
785+
this->deserialize_basic_info_v0_14(reader);
786+
}
787+
654788
this->basic_flatten_codes_->Deserialize(reader);
655789
this->bottom_graph_->Deserialize(reader);
656790
if (this->use_reorder_) {
@@ -676,35 +810,6 @@ HGraph::Deserialize(StreamReader& reader) {
676810
}
677811
}
678812

679-
void
680-
HGraph::deserialize_basic_info(StreamReader& reader) {
681-
StreamReader::ReadObj(reader, this->use_reorder_);
682-
StreamReader::ReadObj(reader, this->dim_);
683-
StreamReader::ReadObj(reader, this->metric_);
684-
uint64_t max_level;
685-
StreamReader::ReadObj(reader, max_level);
686-
for (uint64_t i = 0; i < max_level; ++i) {
687-
this->route_graphs_.emplace_back(this->generate_one_route_graph());
688-
}
689-
StreamReader::ReadObj(reader, this->entry_point_id_);
690-
StreamReader::ReadObj(reader, this->ef_construct_);
691-
StreamReader::ReadObj(reader, this->mult_);
692-
InnerIdType capacity;
693-
StreamReader::ReadObj(reader, capacity);
694-
this->max_capacity_.store(capacity);
695-
StreamReader::ReadVector(reader, this->label_table_->label_table_);
696-
697-
uint64_t size;
698-
StreamReader::ReadObj(reader, size);
699-
for (uint64_t i = 0; i < size; ++i) {
700-
LabelType key;
701-
StreamReader::ReadObj(reader, key);
702-
InnerIdType value;
703-
StreamReader::ReadObj(reader, value);
704-
this->label_table_->label_remap_.emplace(key, value);
705-
}
706-
}
707-
708813
float
709814
HGraph::CalcDistanceById(const float* query, int64_t id) const {
710815
auto flat = this->basic_flatten_codes_;

src/algorithm/hgraph.h

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,11 +198,25 @@ class HGraph : public InnerIndexInterface {
198198
InnerSearchParam& inner_search_param,
199199
IteratorFilterContext* iter_ctx) const;
200200

201+
// since v0.12
201202
void
202-
serialize_basic_info(StreamWriter& writer) const;
203+
serialize_basic_info_v0_14(StreamWriter& writer) const;
203204

204205
void
205-
deserialize_basic_info(StreamReader& reader);
206+
deserialize_basic_info_v0_14(StreamReader& reader);
207+
208+
// since v0.15
209+
JsonType
210+
serialize_basic_info() const;
211+
212+
void
213+
deserialize_basic_info(JsonType jsonify_basic_info);
214+
215+
void
216+
serialize_label_info(StreamWriter& writer) const;
217+
218+
void
219+
deserialize_label_info(StreamReader& reader) const;
206220

207221
void
208222
reorder(const void* query,

src/algorithm/inner_index_interface.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ InnerIndexInterface::Deserialize(const BinarySet& binary_set) {
140140

141141
try {
142142
uint64_t cursor = 0;
143-
auto reader = ReadFuncStreamReader(func, cursor);
143+
auto reader = ReadFuncStreamReader(func, cursor, b.size);
144144
this->Deserialize(reader);
145145
} catch (const std::runtime_error& e) {
146146
throw VsagException(ErrorType::READ_ERROR, "failed to Deserialize: ", e.what());
@@ -161,7 +161,7 @@ InnerIndexInterface::Deserialize(const ReaderSet& reader_set) {
161161
index_reader->Read(offset, len, dest);
162162
};
163163
uint64_t cursor = 0;
164-
auto reader = ReadFuncStreamReader(func, cursor);
164+
auto reader = ReadFuncStreamReader(func, cursor, index_reader->Size());
165165
this->Deserialize(reader);
166166
return;
167167
} catch (const std::bad_alloc& e) {

src/constants.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,4 +167,7 @@ const char* const IVF_BASE_FILE_PATH = "base_file_path";
167167
const char* const IVF_PRECISE_QUANTIZATION_TYPE = "precise_quantization_type";
168168
const char* const IVF_PRECISE_IO_TYPE = "precise_io_type";
169169
const char* const IVF_PRECISE_FILE_PATH = "precise_file_path";
170+
171+
const char* const SERIAL_MAGIC_BEGIN = "vsag0000";
172+
const char* const SERIAL_MAGIC_END = "0000gasv";
170173
}; // namespace vsag

src/impl/conjugate_graph.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ ConjugateGraph::Deserialize(const Binary& binary) {
183183
};
184184

185185
int64_t cursor = 0;
186-
ReadFuncStreamReader reader(func, cursor);
186+
ReadFuncStreamReader reader(func, cursor, binary.size);
187187
BufferStreamReader buffer_reader(&reader, binary.size, allocator_);
188188
return this->Deserialize(buffer_reader);
189189
}

src/index/hnsw.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ HNSW::deserialize(const BinarySet& binary_set) {
563563
try {
564564
std::unique_lock lock(rw_mutex_);
565565
int64_t cursor = 0;
566-
ReadFuncStreamReader reader(func, cursor);
566+
ReadFuncStreamReader reader(func, cursor, b.size);
567567
BufferStreamReader buffer_reader(&reader, b.size, allocator_.get());
568568
alg_hnsw_->loadIndex(buffer_reader, this->space_.get());
569569
if (use_conjugate_graph_) {
@@ -617,7 +617,7 @@ HNSW::deserialize(const ReaderSet& reader_set) {
617617
std::unique_lock lock(rw_mutex_);
618618

619619
int64_t cursor = 0;
620-
ReadFuncStreamReader reader(func, cursor);
620+
ReadFuncStreamReader reader(func, cursor, hnsw_data->Size());
621621
BufferStreamReader buffer_reader(&reader, hnsw_data->Size(), allocator_.get());
622622
alg_hnsw_->loadIndex(buffer_reader, this->space_.get());
623623
} catch (const std::runtime_error& e) {

0 commit comments

Comments
 (0)