Skip to content

Commit

Permalink
introduce simple resource_pool
Browse files Browse the repository at this point in the history
- for visited_list, give an implement of visited_list_pool
- we will have some other object like aio_context...
- currently hgraph use visitlist from hnswlib, now make it global

Signed-off-by: LHT129 <[email protected]>
  • Loading branch information
LHT129 committed Dec 7, 2024
1 parent 0f7e95c commit b11eea9
Show file tree
Hide file tree
Showing 5 changed files with 346 additions and 1 deletion.
5 changes: 4 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ list (FILTER CPP_INDEX_SRCS EXCLUDE REGEX "_test.cpp")
list (FILTER CPP_QUANTIZATION_SRCS EXCLUDE REGEX "_test.cpp")
list (FILTER CPP_DATA_CELL_SRCS EXCLUDE REGEX "_test.cpp")

file (GLOB CPP_UTILS_SRCS "*.cpp")
list (FILTER CPP_UTILS_SRCS EXCLUDE REGEX "_test.cpp")

set (VSAG_SRCS ${CPP_SRCS} ${CPP_FACTORY_SRCS} ${CPP_INDEX_SRCS}
${CPP_CONJUGATE_GRAPH_SRCS} ${CPP_HNSWLIB_SRCS} ${CPP_QUANTIZATION_SRCS} ${CPP_DATA_CELL_SRCS})
${CPP_CONJUGATE_GRAPH_SRCS} ${CPP_HNSWLIB_SRCS} ${CPP_QUANTIZATION_SRCS} ${CPP_DATA_CELL_SRCS} ${CPP_UTILS_SRCS})
add_library (vsag SHARED ${VSAG_SRCS})
add_library (vsag_static STATIC ${VSAG_SRCS})

Expand Down
30 changes: 30 additions & 0 deletions src/utils/resource_object.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@

// Copyright 2024-present the vsag project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

namespace vsag {

class ResourceObject {
public:
ResourceObject() = default;

virtual ~ResourceObject() = default;

virtual void
Reset() = 0;
};

} // namespace vsag
99 changes: 99 additions & 0 deletions src/utils/resource_object_pool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@

// Copyright 2024-present the vsag project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <type_traits>

#include "resource_object.h"
namespace vsag {

template <typename T,
typename = typename std::enable_if<std::is_base_of<ResourceObject, T>::value>::type>
class ResourceObjectPool {
public:
using ConstructFuncType = std::function<std::shared_ptr<T>()>;

public:
template <typename... Args>
explicit ResourceObjectPool(uint64_t init_size, Args... args) {
this->pool_size_ = init_size;
this->constructor_ = [=]() -> std::shared_ptr<T> { return std::make_shared<T>(args...); };
this->resize(pool_size_);
}

void
SetConstructor(ConstructFuncType func) {
this->constructor_ = func;
{
std::unique_lock<std::mutex> lock(mutex_);
while (not pool_.empty()) {
pool_.pop_back();
}
}
this->resize(pool_size_);
}

std::shared_ptr<T>
TakeOne() {
std::unique_lock<std::mutex> lock(mutex_);
if (pool_.empty()) {
return this->constructor_();
}
std::shared_ptr<T> obj = pool_.back();
pool_.pop_back();
pool_size_--;
obj->Reset();
return obj;
}

void
ReturnOne(std::shared_ptr<T>& obj) {
std::unique_lock<std::mutex> lock(mutex_);
pool_.push_back(obj);
pool_size_++;
}

[[nodiscard]] inline uint64_t
GetSize() const {
return this->pool_size_;
}

private:
inline void
resize(uint64_t size) {
std::unique_lock<std::mutex> lock(mutex_);
int count = size - pool_.size();
while (count > 0) {
pool_.emplace_back(this->constructor_());
--count;
}
while (count < 0) {
pool_.pop_back();
++count;
}
}

std::vector<std::shared_ptr<T>> pool_{};
size_t pool_size_{0};
ConstructFuncType constructor_{nullptr};
std::mutex mutex_;
};

} // namespace vsag
80 changes: 80 additions & 0 deletions src/utils/visited_list.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@

// Copyright 2024-present the vsag project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#include <cstring>
#include <limits>

#include "resource_object.h"
#include "resource_object_pool.h"
#include "typing.h"
#include "vsag/allocator.h"

namespace vsag {

class VisitedList : public ResourceObject {
public:
using VisitedListType = uint16_t;

public:
explicit VisitedList(InnerIdType max_size, Allocator* allocator)
: max_size_(max_size), allocator_(allocator) {
this->list_ = reinterpret_cast<VisitedListType*>(
allocator_->Allocate((uint64_t)max_size * sizeof(VisitedListType)));
memset(list_, 0, max_size_ * sizeof(VisitedListType));
tag_ = 1;
}

~VisitedList() override {
allocator_->Deallocate(list_);
}

inline void
Set(const InnerIdType& id) {
this->list_[id] = this->tag_;
}

inline bool
Get(const InnerIdType& id) {
return this->list_[id] == this->tag_;
}

inline void
Prefetch(const InnerIdType& id) {
return; // TODO(LHT) implement
}

void
Reset() override {
if (tag_ == std::numeric_limits<VisitedListType>::max()) {
memset(list_, 0, max_size_ * sizeof(VisitedListType));
tag_ = 0;
}
++tag_;
}

private:
Allocator* const allocator_{nullptr};

VisitedListType* list_{nullptr};

VisitedListType tag_{1};

const InnerIdType max_size_{0};
};

using VisitedListPool = ResourceObjectPool<VisitedList>;

} // namespace vsag
133 changes: 133 additions & 0 deletions src/utils/visited_list_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@

// Copyright 2024-present the vsag project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "visited_list.h"

#include <thread>

#include "catch2/catch_test_macros.hpp"
#include "default_allocator.h"
using namespace vsag;

TEST_CASE("test visited_list basic", "[ut][visited_list]") {
auto allocator = std::make_shared<DefaultAllocator>();
auto size = 10000;
auto vl_ptr = std::make_shared<VisitedList>(size, allocator.get());

SECTION("test set & get normal") {
int count = 500;
std::unordered_set<InnerIdType> ids;
for (int i = 0; i < count; ++i) {
auto id = random() % size;
ids.insert(id);
vl_ptr->Set(id);
}
for (auto& id : ids) {
REQUIRE(vl_ptr->Get(id));
}

for (int i = 0; i < size; ++i) {
if (ids.count(i) == 0) {
REQUIRE(not vl_ptr->Get(i));
}
}
}

SECTION("test reset") {
int count = 500;
std::unordered_set<InnerIdType> ids;
for (int i = 0; i < count; ++i) {
auto id = random() % size;
ids.insert(id);
vl_ptr->Set(id);
}
vl_ptr->Reset();
for (auto& id : ids) {
REQUIRE(not vl_ptr->Get(id));
}
}
}

TEST_CASE("test visited_list_pool basic", "[ut][visited_list_pool]") {
auto allocator = std::make_shared<DefaultAllocator>();
auto init_size = 10;
auto vl_size = 1000;
auto pool = std::make_shared<VisitedListPool>(init_size, vl_size, allocator.get());

auto TestVL = [&](std::shared_ptr<VisitedList>& vl_ptr) {
int count = 500;
std::unordered_set<InnerIdType> ids;
for (int i = 0; i < count; ++i) {
auto id = random() % vl_size;
ids.insert(id);
vl_ptr->Set(id);
}
for (auto& id : ids) {
REQUIRE(vl_ptr->Get(id) == true);
}

for (InnerIdType i = 0; i < vl_size; ++i) {
if (ids.count(i) == 0) {
REQUIRE(vl_ptr->Get(i) == false);
}
}
};

SECTION("test basic") {
std::vector<std::shared_ptr<VisitedList>> lists;
REQUIRE(pool->GetSize() == init_size);
lists.reserve(init_size * 2);
for (auto i = 0; i < init_size * 2; ++i) {
lists.emplace_back(pool->TakeOne());
}
REQUIRE(pool->GetSize() == 0);
for (auto& ptr : lists) {
pool->ReturnOne(ptr);
}
REQUIRE(pool->GetSize() == init_size * 2);

auto ptr = pool->TakeOne();
REQUIRE(pool->GetSize() == init_size * 2 - 1);
TestVL(ptr);
}

SECTION("test concurrency") {
auto func = [&]() {
int count = 10;
int max_operators = 20;
std::vector<std::shared_ptr<VisitedList>> results;
for (int i = 0; i < count; ++i) {
auto opt = random() % max_operators + 1;
for (auto j = 0; j < opt; ++j) {
results.emplace_back(pool->TakeOne());
}
auto test = results[random() % opt];
TestVL(test);
for (auto& result : results) {
pool->ReturnOne(result);
}
results.clear();
}
};
std::vector<std::shared_ptr<std::thread>> ths;
auto thread_count = 2;
for (auto i = 0; i < thread_count; ++i) {
ths.emplace_back((std::make_shared<std::thread>(func)));
}
for (auto& thread : ths) {
thread->join();
}
}
}

0 comments on commit b11eea9

Please sign in to comment.