Skip to content

Commit 1210dfc

Browse files
committed
support index serialization self-description
Signed-off-by: Xiangyu Wang <[email protected]>
1 parent f1a22e8 commit 1210dfc

File tree

14 files changed

+523
-51
lines changed

14 files changed

+523
-51
lines changed

examples/cpp/402_persistent_streaming.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <unordered_set>
2626

2727
#include "vsag/binaryset.h"
28+
#include "vsag/options.h"
2829

2930
int
3031
main(int32_t argc, char** argv) {
@@ -52,14 +53,15 @@ main(int32_t argc, char** argv) {
5253
"dtype": "float32",
5354
"metric_type": "l2",
5455
"dim": 128,
55-
"hnsw": {
56+
"index_param": {
57+
"base_quantization_type": "fp32",
5658
"max_degree": 16,
5759
"ef_construction": 100
5860
}
5961
}
6062
)";
6163
vsag::IndexPtr index = nullptr;
62-
if (auto create_index = engine.CreateIndex("hnsw", index_paramesters);
64+
if (auto create_index = engine.CreateIndex("hgraph", index_paramesters);
6365
not create_index.has_value()) {
6466
std::cout << "create index failed: " << create_index.error().message << std::endl;
6567
abort();
@@ -76,6 +78,7 @@ main(int32_t argc, char** argv) {
7678
std::cout << "index contains vectors: " << index->GetNumElements() << std::endl;
7779

7880
/******************* Save Index to OStream *****************/
81+
vsag::Options::Instance().set_new_version(true);
7982
std::ofstream out_stream("/tmp/vsag-persistent-streaming.index");
8083
auto serialize_result = index->Serialize(out_stream);
8184
out_stream.close();
@@ -86,13 +89,14 @@ main(int32_t argc, char** argv) {
8689

8790
/******************* Load Index from IStream *****************/
8891
index = nullptr;
89-
if (auto create_index = engine.CreateIndex("hnsw", index_paramesters);
92+
if (auto create_index = engine.CreateIndex("hgraph", index_paramesters);
9093
not create_index.has_value()) {
9194
std::cout << "create index failed: " << create_index.error().message << std::endl;
9295
abort();
9396
} else {
9497
index = *create_index;
9598
}
99+
96100
std::ifstream in_stream("/tmp/vsag-persistent-streaming.index");
97101
if (auto deserialize = index->Deserialize(in_stream); not deserialize.has_value()) {
98102
std::cerr << "load index failed: " << deserialize.error().message << std::endl;
@@ -109,7 +113,7 @@ main(int32_t argc, char** argv) {
109113
query->NumElements(1)->Dim(dim)->Float32Vectors(query_vector)->Owner(false);
110114
auto search_parameters = R"(
111115
{
112-
"hnsw": {
116+
"hgraph": {
113117
"ef_search": 100
114118
}
115119
}

include/vsag/constants.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,4 +163,8 @@ extern const char* const IVF_PRECISE_QUANTIZATION_TYPE;
163163
extern const char* const IVF_PRECISE_IO_TYPE;
164164
extern const char* const IVF_PRECISE_FILE_PATH;
165165

166+
// serialization
167+
extern const char* const SERIAL_MAGIC_BEGIN;
168+
extern const char* const SERIAL_MAGIC_END;
169+
166170
} // namespace vsag

include/vsag/options.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <mutex>
2020
#include <string>
2121

22+
#include "utils/function_exists_check.h"
2223
#include "vsag/allocator.h"
2324
#include "vsag/logger.h"
2425

@@ -43,6 +44,20 @@ class Options {
4344
static Options&
4445
Instance();
4546

47+
public:
48+
inline bool
49+
new_version() const {
50+
return new_version_;
51+
}
52+
53+
inline void
54+
set_new_version(bool new_version) {
55+
new_version_ = new_version;
56+
}
57+
58+
private:
59+
bool new_version_ = true;
60+
4661
public:
4762
/**
4863
* @brief Gets the number of threads for IO operations.

src/algorithm/hgraph.cpp

Lines changed: 143 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,13 @@
2727
#include "impl/odescent_graph_builder.h"
2828
#include "impl/pruning_strategy.h"
2929
#include "index/iterator_filter.h"
30+
#include "logger.h"
31+
#include "serialization.h"
32+
#include "stream_reader.h"
33+
#include "typing.h"
3034
#include "utils/standard_heap.h"
3135
#include "utils/util_functions.h"
36+
#include "vsag/options.h"
3237

3338
namespace vsag {
3439

@@ -605,7 +610,7 @@ HGraph::RangeSearch(const DatasetPtr& query,
605610
}
606611

607612
void
608-
HGraph::serialize_basic_info(StreamWriter& writer) const {
613+
HGraph::serialize_basic_info_v0_14(StreamWriter& writer) const {
609614
StreamWriter::WriteObj(writer, this->use_reorder_);
610615
StreamWriter::WriteObj(writer, this->dim_);
611616
StreamWriter::WriteObj(writer, this->metric_);
@@ -627,12 +632,126 @@ HGraph::serialize_basic_info(StreamWriter& writer) const {
627632
}
628633
}
629634

635+
void
636+
HGraph::deserialize_basic_info_v0_14(StreamReader& reader) {
637+
StreamReader::ReadObj(reader, this->use_reorder_);
638+
StreamReader::ReadObj(reader, this->dim_);
639+
StreamReader::ReadObj(reader, this->metric_);
640+
uint64_t max_level;
641+
StreamReader::ReadObj(reader, max_level);
642+
for (uint64_t i = 0; i < max_level; ++i) {
643+
this->route_graphs_.emplace_back(this->generate_one_route_graph());
644+
}
645+
StreamReader::ReadObj(reader, this->entry_point_id_);
646+
StreamReader::ReadObj(reader, this->ef_construct_);
647+
StreamReader::ReadObj(reader, this->mult_);
648+
InnerIdType capacity;
649+
StreamReader::ReadObj(reader, capacity);
650+
this->max_capacity_.store(capacity);
651+
StreamReader::ReadVector(reader, this->label_table_->label_table_);
652+
653+
uint64_t size;
654+
StreamReader::ReadObj(reader, size);
655+
for (uint64_t i = 0; i < size; ++i) {
656+
LabelType key;
657+
StreamReader::ReadObj(reader, key);
658+
InnerIdType value;
659+
StreamReader::ReadObj(reader, value);
660+
this->label_table_->label_remap_.emplace(key, value);
661+
}
662+
}
663+
664+
#define TO_JSON(json_obj, var) \
665+
json_obj[#var] = this->var##_;
666+
667+
#define TO_JSON_BASE64(json_obj, var) \
668+
json_obj[#var] = base64_encode_obj(this->var##_);
669+
670+
#define TO_JSON_ATOMIC(json_obj, var) \
671+
json_obj[#var] = this->var##_.load();
672+
673+
JsonType
674+
HGraph::serialize_basic_info() const {
675+
JsonType jsonify_basic_info;
676+
TO_JSON(jsonify_basic_info, use_reorder);
677+
TO_JSON(jsonify_basic_info, dim);
678+
TO_JSON(jsonify_basic_info, metric);
679+
TO_JSON(jsonify_basic_info, entry_point_id);
680+
TO_JSON(jsonify_basic_info, ef_construct);
681+
// logger::debug("mult: {}", this->mult_);
682+
TO_JSON_BASE64(jsonify_basic_info, mult);
683+
TO_JSON_ATOMIC(jsonify_basic_info, max_capacity);
684+
jsonify_basic_info["max_level"] = this->route_graphs_.size();
685+
686+
return jsonify_basic_info;
687+
}
688+
689+
#define FROM_JSON(json_obj, var) \
690+
this->var##_ = (json_obj)[#var];
691+
692+
#define FROM_JSON_BASE64(json_obj, var) \
693+
base64_decode_obj((json_obj)[#var], this->var##_);
694+
695+
#define FROM_JSON_ATOMIC(json_obj, var) \
696+
this->var##_.store((json_obj)[#var]);
697+
698+
void
699+
HGraph::deserialize_basic_info(JsonType jsonify_basic_info) {
700+
FROM_JSON(jsonify_basic_info, use_reorder);
701+
FROM_JSON(jsonify_basic_info, dim);
702+
FROM_JSON(jsonify_basic_info, metric);
703+
FROM_JSON(jsonify_basic_info, entry_point_id);
704+
FROM_JSON(jsonify_basic_info, ef_construct);
705+
FROM_JSON_BASE64(jsonify_basic_info, mult);
706+
// logger::debug("mult: {}", this->mult_);
707+
FROM_JSON_ATOMIC(jsonify_basic_info, max_capacity);
708+
709+
uint64_t max_level = jsonify_basic_info["max_level"];
710+
for (uint64_t i = 0; i < max_level; ++i) {
711+
this->route_graphs_.emplace_back(this->generate_one_route_graph());
712+
}
713+
}
714+
715+
void
716+
HGraph::serialize_label_info(StreamWriter& writer) const {
717+
StreamWriter::WriteVector(writer, this->label_table_->label_table_);
718+
uint64_t size = this->label_table_->label_remap_.size();
719+
StreamWriter::WriteObj(writer, size);
720+
for (const auto& pair : this->label_table_->label_remap_) {
721+
auto key = pair.first;
722+
StreamWriter::WriteObj(writer, key);
723+
StreamWriter::WriteObj(writer, pair.second);
724+
}
725+
}
726+
727+
void
728+
HGraph::deserialize_label_info(StreamReader& reader) const {
729+
StreamReader::ReadVector(reader, this->label_table_->label_table_);
730+
uint64_t size;
731+
StreamReader::ReadObj(reader, size);
732+
for (uint64_t i = 0; i < size; ++i) {
733+
LabelType key;
734+
StreamReader::ReadObj(reader, key);
735+
InnerIdType value;
736+
StreamReader::ReadObj(reader, value);
737+
this->label_table_->label_remap_.emplace(key, value);
738+
}
739+
}
740+
630741
void
631742
HGraph::Serialize(StreamWriter& writer) const {
632743
if (this->ignore_reorder_) {
633744
this->use_reorder_ = false;
634745
}
635-
this->serialize_basic_info(writer);
746+
747+
// basic info moved to metadata since version 0.15
748+
// only for test
749+
if (Options::Instance().new_version()) {
750+
this->serialize_label_info(writer);
751+
} else {
752+
this->serialize_basic_info_v0_14(writer);
753+
}
754+
636755
this->basic_flatten_codes_->Serialize(writer);
637756
this->bottom_graph_->Serialize(writer);
638757
if (this->use_reorder_) {
@@ -644,11 +763,32 @@ HGraph::Serialize(StreamWriter& writer) const {
644763
if (this->extra_info_size_ > 0 && this->extra_infos_ != nullptr) {
645764
this->extra_infos_->Serialize(writer);
646765
}
766+
767+
// serialize footer (introduce since v0.15)
768+
if (Options::Instance().new_version()) {
769+
auto metadata = std::make_shared<Metadata>();
770+
auto jsonify_basic_info = this->serialize_basic_info();
771+
metadata->Set("basic_info", jsonify_basic_info);
772+
logger::debug(jsonify_basic_info.dump());
773+
auto footer = std::make_shared<Footer>(metadata);
774+
footer->Write(writer);
775+
}
647776
}
648777

649778
void
650779
HGraph::Deserialize(StreamReader& reader) {
651-
this->deserialize_basic_info(reader);
780+
// try to deserialize footer (only in new version)
781+
auto footer = Footer::Parse(reader);
782+
if (footer != nullptr) {
783+
logger::debug("parse with new version format");
784+
auto metadata = footer->GetMetadata();
785+
this->deserialize_basic_info(metadata->Get("basic_info"));
786+
this->deserialize_label_info(reader);
787+
} else {
788+
logger::debug("parse with v0.14 version format");
789+
this->deserialize_basic_info_v0_14(reader);
790+
}
791+
652792
this->basic_flatten_codes_->Deserialize(reader);
653793
this->bottom_graph_->Deserialize(reader);
654794
if (this->use_reorder_) {
@@ -674,35 +814,6 @@ HGraph::Deserialize(StreamReader& reader) {
674814
}
675815
}
676816

677-
void
678-
HGraph::deserialize_basic_info(StreamReader& reader) {
679-
StreamReader::ReadObj(reader, this->use_reorder_);
680-
StreamReader::ReadObj(reader, this->dim_);
681-
StreamReader::ReadObj(reader, this->metric_);
682-
uint64_t max_level;
683-
StreamReader::ReadObj(reader, max_level);
684-
for (uint64_t i = 0; i < max_level; ++i) {
685-
this->route_graphs_.emplace_back(this->generate_one_route_graph());
686-
}
687-
StreamReader::ReadObj(reader, this->entry_point_id_);
688-
StreamReader::ReadObj(reader, this->ef_construct_);
689-
StreamReader::ReadObj(reader, this->mult_);
690-
InnerIdType capacity;
691-
StreamReader::ReadObj(reader, capacity);
692-
this->max_capacity_.store(capacity);
693-
StreamReader::ReadVector(reader, this->label_table_->label_table_);
694-
695-
uint64_t size;
696-
StreamReader::ReadObj(reader, size);
697-
for (uint64_t i = 0; i < size; ++i) {
698-
LabelType key;
699-
StreamReader::ReadObj(reader, key);
700-
InnerIdType value;
701-
StreamReader::ReadObj(reader, value);
702-
this->label_table_->label_remap_.emplace(key, value);
703-
}
704-
}
705-
706817
float
707818
HGraph::CalcDistanceById(const float* query, int64_t id) const {
708819
auto flat = this->basic_flatten_codes_;

src/algorithm/hgraph.h

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,11 +198,25 @@ class HGraph : public InnerIndexInterface {
198198
InnerSearchParam& inner_search_param,
199199
IteratorFilterContext* iter_ctx) const;
200200

201+
// since v0.12
201202
void
202-
serialize_basic_info(StreamWriter& writer) const;
203+
serialize_basic_info_v0_14(StreamWriter& writer) const;
203204

204205
void
205-
deserialize_basic_info(StreamReader& reader);
206+
deserialize_basic_info_v0_14(StreamReader& reader);
207+
208+
// since v0.15
209+
JsonType
210+
serialize_basic_info() const;
211+
212+
void
213+
deserialize_basic_info(JsonType jsonify_basic_info);
214+
215+
void
216+
serialize_label_info(StreamWriter& writer) const;
217+
218+
void
219+
deserialize_label_info(StreamReader& reader) const;
206220

207221
void
208222
reorder(const void* query,

src/algorithm/inner_index_interface.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ InnerIndexInterface::Deserialize(const BinarySet& binary_set) {
137137

138138
try {
139139
uint64_t cursor = 0;
140-
auto reader = ReadFuncStreamReader(func, cursor);
140+
auto reader = ReadFuncStreamReader(func, cursor, b.size);
141141
this->Deserialize(reader);
142142
} catch (const std::runtime_error& e) {
143143
throw VsagException(ErrorType::READ_ERROR, "failed to Deserialize: ", e.what());
@@ -158,7 +158,7 @@ InnerIndexInterface::Deserialize(const ReaderSet& reader_set) {
158158
index_reader->Read(offset, len, dest);
159159
};
160160
uint64_t cursor = 0;
161-
auto reader = ReadFuncStreamReader(func, cursor);
161+
auto reader = ReadFuncStreamReader(func, cursor, index_reader->Size());
162162
this->Deserialize(reader);
163163
return;
164164
} catch (const std::bad_alloc& e) {

src/constants.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,4 +165,7 @@ const char* const IVF_BASE_FILE_PATH = "base_file_path";
165165
const char* const IVF_PRECISE_QUANTIZATION_TYPE = "precise_quantization_type";
166166
const char* const IVF_PRECISE_IO_TYPE = "precise_io_type";
167167
const char* const IVF_PRECISE_FILE_PATH = "precise_file_path";
168+
169+
const char* const SERIAL_MAGIC_BEGIN = "vsag0000";
170+
const char* const SERIAL_MAGIC_END = "0000gasv";
168171
}; // namespace vsag

src/impl/conjugate_graph.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ ConjugateGraph::Deserialize(const Binary& binary) {
183183
};
184184

185185
int64_t cursor = 0;
186-
ReadFuncStreamReader reader(func, cursor);
186+
ReadFuncStreamReader reader(func, cursor, binary.size);
187187
BufferStreamReader buffer_reader(&reader, binary.size, allocator_);
188188
return this->Deserialize(buffer_reader);
189189
}

src/index/hnsw.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ HNSW::deserialize(const BinarySet& binary_set) {
563563
try {
564564
std::unique_lock lock(rw_mutex_);
565565
int64_t cursor = 0;
566-
ReadFuncStreamReader reader(func, cursor);
566+
ReadFuncStreamReader reader(func, cursor, b.size);
567567
BufferStreamReader buffer_reader(&reader, b.size, allocator_.get());
568568
alg_hnsw_->loadIndex(buffer_reader, this->space_.get());
569569
if (use_conjugate_graph_) {
@@ -617,7 +617,7 @@ HNSW::deserialize(const ReaderSet& reader_set) {
617617
std::unique_lock lock(rw_mutex_);
618618

619619
int64_t cursor = 0;
620-
ReadFuncStreamReader reader(func, cursor);
620+
ReadFuncStreamReader reader(func, cursor, hnsw_data->Size());
621621
BufferStreamReader buffer_reader(&reader, hnsw_data->Size(), allocator_.get());
622622
alg_hnsw_->loadIndex(buffer_reader, this->space_.get());
623623
} catch (const std::runtime_error& e) {

0 commit comments

Comments
 (0)