Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ name: release
on:
schedule:
- cron: '0 13 * * *' # This schedule runs every 13:00:00Z(21:00:00+08:00)
# The "create tags" trigger is specifically focused on the creation of new tags, while the "push tags" trigger is activated when tags are pushed, including both new tag creations and updates to existing tags.
create:
# Use push: tags to trigger only when the mutable tag 'slow-test' is pushed/updated.
# 'create' does not support tag name filters and would fire for any tag creation.
push:
tags:
- "v*.*.*" # normal release, use release.sh instead
- "nightly" # the only one mutable tag
- 'v*.*.*' # normal release, use release.sh instead
- 'nightly' # mutable tag

# https://docs.github.com/en/actions/using-jobs/using-concurrency
concurrency:
Expand Down
7 changes: 4 additions & 3 deletions .github/workflows/slow_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ on:
# https://docs.github.com/zh/actions/writing-workflows/choosing-when-your-workflow-runs/events-that-trigger-workflows#schedule
schedule:
- cron: '30 16 * * *' # utc-8: 16:30
# The "create tags" trigger is specifically focused on the creation of new tags, while the "push tags" trigger is activated when tags are pushed, including both new tag creations and updates to existing tags.
create:
# Use push: tags to trigger only when the mutable tag 'slow-test' is pushed/updated.
# 'create' does not support tag name filters and would fire for any tag creation.
push:
tags:
- "slow-test" # mutable tag
- 'slow-test' # mutable tag
# The only difference between pull_request and pull_request_target is the context in which the workflow runs:
# — pull_request_target workflows use the workflow files from the default branch, and secrets are available.
# — pull_request workflows use the workflow files from the pull request branch, and secrets are unavailable.
Expand Down
43 changes: 25 additions & 18 deletions src/storage/new_txn/new_txn_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4539,7 +4539,6 @@ Status NewTxn::Rollback() {
}

Status NewTxn::Cleanup() {

if (base_txn_store_ != nullptr) {
return Status::UnexpectedError("txn store is not null");
}
Expand Down Expand Up @@ -4575,30 +4574,38 @@ Status NewTxn::Cleanup() {
return status;
}

for (auto &key : dropped_keys) {
status = kv_instance->Delete(key);
if (metas.empty()) {
LOG_TRACE("Cleanup: No data need to clean.");
} else {
status = CleanupInner(metas);
if (!status.ok()) {
return status;
}
}

if (metas.empty()) {
LOG_TRACE("Cleanup: No data need to clean. Try to remove all empty directories...");
BufferManager *buffer_mgr = InfinityContext::instance().storage()->buffer_manager();
auto data_dir_str = buffer_mgr->GetFullDataDir();
auto data_dir = static_cast<std::filesystem::path>(*data_dir_str);
// Delete empty dir
VirtualStore::RecursiveCleanupAllEmptyDir(data_dir);
return Status::OK();
}
for (auto &key : dropped_keys) {
status = kv_instance->Delete(key);
if (!status.ok()) {
return status;
}
}

status = CleanupInner(metas);
if (!status.ok()) {
return status;
txn_store->dropped_keys_ = dropped_keys;
txn_store->metas_ = metas;
}

txn_store->dropped_keys_ = dropped_keys;
txn_store->metas_ = metas;
// Delete empty dir
BufferManager *buffer_mgr = InfinityContext::instance().storage()->buffer_manager();
auto data_dir_str = buffer_mgr->GetFullDataDir();
auto data_dir = static_cast<std::filesystem::path>(*data_dir_str);
VirtualStore::RecursiveCleanupAllEmptyDir(data_dir);

// Clean up stale object data that has no corresponding file path
PersistenceManager *pm = InfinityContext::instance().persistence_manager();
if (pm != nullptr) {
PersistResultHandler handler(pm);
PersistWriteResult result = pm->CleanupStaleObjectData();
handler.HandleWriteResult(result);
}

return Status::OK();
}
Expand Down
6 changes: 6 additions & 0 deletions src/storage/persistence/persistence_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ export struct ObjAddr {
size_t part_offset_{};
size_t part_size_{};

ObjAddr() {}
ObjAddr(const std::string &obj_key, size_t part_offset, size_t part_size) : obj_key_(obj_key), part_offset_(part_offset), part_size_(part_size) {}

bool Valid() const { return !obj_key_.empty(); }

nlohmann::json Serialize() const;
Expand Down Expand Up @@ -98,6 +101,9 @@ public:

[[nodiscard]] PersistWriteResult Cleanup(const std::string &file_path);

// Clean up stale object data that has no corresponding file path
PersistWriteResult CleanupStaleObjectData();

/**
* Utils
*/
Expand Down
121 changes: 112 additions & 9 deletions src/storage/persistence/persistence_manager_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -652,17 +652,120 @@ std::unordered_map<std::string, ObjAddr> PersistenceManager::GetAllFiles() const
const std::string &obj_prefix = KeyEncode::PMObjectPrefix();
size_t obj_prefix_len = obj_prefix.size();

std::unique_ptr<KVInstance> kv_instance = kv_store_->GetInstance();
auto iter = kv_instance->GetIterator();
iter->Seek(obj_prefix);
while (iter->Valid() && iter->Key().starts_with(obj_prefix)) {
std::string path = iter->Key().ToString().substr(obj_prefix_len);
ObjAddr obj_addr;
obj_addr.Deserialize(iter->Value().ToString());
local_path_obj.emplace(path, obj_addr);
iter->Next();
std::vector<std::pair<std::string, std::string>> all_key_values = kv_store_->GetAllKeyValue();
for (const auto &[key, value] : all_key_values) {
if (key.starts_with(obj_prefix)) {
std::string path = key.substr(obj_prefix_len);
ObjAddr obj_addr;
obj_addr.Deserialize(value);
local_path_obj.emplace(path, obj_addr);
}
}
return local_path_obj;
}

PersistWriteResult PersistenceManager::CleanupStaleObjectData() {
PersistWriteResult result;
std::unordered_map<std::string, std::set<Range>> object_range_map;
std::unordered_map<std::string, std::shared_ptr<ObjStat>> object_objstat_map;
std::unordered_map<std::string, ObjAddr> file_objaddr_map;
{
std::lock_guard<std::mutex> lock(mtx_);
object_objstat_map = GetAllObjects();
file_objaddr_map = GetAllFiles();
}

// Initialize range map with deleted ranges from each object.
for (const auto &[obj_key, obj_stat_ptr] : object_objstat_map) {
object_range_map[obj_key] = obj_stat_ptr->deleted_ranges_;
}

// Process each file to merge and update ranges of each object.
for (const auto &[file_path, object_addr] : file_objaddr_map) {
std::string obj_key = object_addr.obj_key_;
if (obj_key == "KEY_EMPTY") {
continue;
}

auto range_it = object_range_map.find(obj_key);
if (range_it == object_range_map.end()) {
LOG_WARN(fmt::format("Failed to find range info for object {}", obj_key));
continue;
}
auto &range_set = range_it->second;
size_t range_end = object_addr.part_offset_ + object_addr.part_size_;
range_end = (range_end + ObjAlignment - 1) & ~(ObjAlignment - 1);
Range obj_range(object_addr.part_offset_, range_end);
Range range(obj_range);

std::vector<std::set<Range>::iterator> to_erase;
auto it = range_set.lower_bound(range);
if (it != range_set.begin()) {
auto prev_it = std::prev(it);
if (prev_it->end_ >= range.start_) {
range.start_ = std::min(range.start_, prev_it->start_);
range.end_ = std::max(range.end_, prev_it->end_);
to_erase.push_back(prev_it);
}
}

while (it != range_set.end() && it->start_ <= range.end_) {
range.end_ = std::max(range.end_, it->end_);
to_erase.push_back(it);
++it;
}

for (auto &erase_it : to_erase) {
range_set.erase(erase_it);
}
range_set.insert(range);
}

// Find and clean up missing ranges for each object.
for (auto &obj_range : object_range_map) {
auto &obj_stat = object_objstat_map[obj_range.first];
size_t obj_size = (obj_stat->obj_size_ + ObjAlignment - 1) & ~(ObjAlignment - 1);
auto &ranges = obj_range.second;
if (ranges.size() == 1 && ranges.begin()->start_ == 0 && ranges.begin()->end_ == obj_size) {
continue;
}

std::vector<Range> missing_ranges;
if (ranges.empty()) {
if (obj_size > 0) {
missing_ranges.push_back({0, obj_size});
}
} else {
auto first_it = ranges.begin();
if (first_it->start_ > 0) {
missing_ranges.push_back({0, first_it->start_});
}

auto it = ranges.begin();
auto next_it = std::next(it);
while (next_it != ranges.end()) {
if (it->end_ < next_it->start_) {
missing_ranges.push_back({it->end_, next_it->start_});
}
++it;
++next_it;
}

auto last_it = ranges.rbegin();
if (last_it->end_ < obj_size) {
missing_ranges.push_back({last_it->end_, obj_size});
}
}

if (!missing_ranges.empty()) {
LOG_INFO("Cleanup stale object data");
for (auto &missing_range : missing_ranges) {
ObjAddr obj_addr(obj_range.first, missing_range.start_, missing_range.end_ - missing_range.start_);
CleanupNoLock(obj_addr, result.persist_keys_, result.drop_from_remote_keys_, true);
}
}
}
return result;
}

} // namespace infinity
38 changes: 38 additions & 0 deletions src/unit_test/storage/new_catalog/kv_store_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,3 +513,41 @@ TEST_F(TestTxnKVStoreTest, wal) {
status = kv_store->Destroy(rocksdb_tmp_path);
EXPECT_TRUE(status.ok());
}

TEST_F(TestTxnKVStoreTest, ImmediateVisibility) {
using namespace infinity;
const auto rocksdb_tmp_path = fmt::format("{}/rocksdb_visibility", GetFullTmpDir());

std::unique_ptr<KVStore> kv_store = std::make_unique<KVStore>();
Status status = kv_store->Init(rocksdb_tmp_path);
EXPECT_TRUE(status.ok());

const int num_iterations = 1000;

for (int i = 0; i < num_iterations; ++i) {
std::string key = "key_" + std::to_string(i);
std::string value = "value_" + std::to_string(i);

status = kv_store->Put(key, value, false); // 启用WAL
EXPECT_TRUE(status.ok()) << "Put failed at iteration " << i;

std::string retrieved_value;
status = kv_store->Get(key, retrieved_value);
EXPECT_TRUE(status.ok()) << "Get failed at iteration " << i;
EXPECT_EQ(retrieved_value, value) << "Value mismatch at iteration " << i << ", expected: " << value << ", got: " << retrieved_value;

if (i % 10 == 0) {
std::cout << "Iteration " << i << " passed" << std::endl;
}
}

auto final_data = kv_store->GetAllKeyValue();
EXPECT_EQ(final_data.size(), num_iterations) << "Expected " << num_iterations << " items, got " << final_data.size();

status = kv_store->Uninit();
EXPECT_TRUE(status.ok());
status = kv_store->Destroy(rocksdb_tmp_path);
EXPECT_TRUE(status.ok());

std::cout << "Immediate visibility test completed successfully!" << std::endl;
}