diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 93314e22fd..3decbabea2 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -3,11 +3,12 @@ name: release on: schedule: - cron: '0 13 * * *' # This schedule runs every 13:00:00Z(21:00:00+08:00) - # The "create tags" trigger is specifically focused on the creation of new tags, while the "push tags" trigger is activated when tags are pushed, including both new tag creations and updates to existing tags. - create: + # Use push: tags to trigger only when the mutable tag 'slow-test' is pushed/updated. + # 'create' does not support tag name filters and would fire for any tag creation. + push: tags: - - "v*.*.*" # normal release, use release.sh instead - - "nightly" # the only one mutable tag + - 'v*.*.*' # normal release, use release.sh instead + - 'nightly' # mutable tag # https://docs.github.com/en/actions/using-jobs/using-concurrency concurrency: diff --git a/.github/workflows/slow_tests.yml b/.github/workflows/slow_tests.yml index 3df8a0f66c..1e727ca027 100644 --- a/.github/workflows/slow_tests.yml +++ b/.github/workflows/slow_tests.yml @@ -5,10 +5,11 @@ on: # https://docs.github.com/zh/actions/writing-workflows/choosing-when-your-workflow-runs/events-that-trigger-workflows#schedule schedule: - cron: '30 16 * * *' # utc-8: 16:30 - # The "create tags" trigger is specifically focused on the creation of new tags, while the "push tags" trigger is activated when tags are pushed, including both new tag creations and updates to existing tags. - create: + # Use push: tags to trigger only when the mutable tag 'slow-test' is pushed/updated. + # 'create' does not support tag name filters and would fire for any tag creation. + push: tags: - - "slow-test" # mutable tag + - 'slow-test' # mutable tag # The only difference between pull_request and pull_request_target is the context in which the workflow runs: # — pull_request_target workflows use the workflow files from the default branch, and secrets are available. # — pull_request workflows use the workflow files from the pull request branch, and secrets are unavailable. diff --git a/src/storage/new_txn/new_txn_impl.cpp b/src/storage/new_txn/new_txn_impl.cpp index 7871d8d65e..da57d0ba48 100644 --- a/src/storage/new_txn/new_txn_impl.cpp +++ b/src/storage/new_txn/new_txn_impl.cpp @@ -4539,7 +4539,6 @@ Status NewTxn::Rollback() { } Status NewTxn::Cleanup() { - if (base_txn_store_ != nullptr) { return Status::UnexpectedError("txn store is not null"); } @@ -4575,30 +4574,38 @@ Status NewTxn::Cleanup() { return status; } - for (auto &key : dropped_keys) { - status = kv_instance->Delete(key); + if (metas.empty()) { + LOG_TRACE("Cleanup: No data need to clean."); + } else { + status = CleanupInner(metas); if (!status.ok()) { return status; } - } - if (metas.empty()) { - LOG_TRACE("Cleanup: No data need to clean. Try to remove all empty directories..."); - BufferManager *buffer_mgr = InfinityContext::instance().storage()->buffer_manager(); - auto data_dir_str = buffer_mgr->GetFullDataDir(); - auto data_dir = static_cast(*data_dir_str); - // Delete empty dir - VirtualStore::RecursiveCleanupAllEmptyDir(data_dir); - return Status::OK(); - } + for (auto &key : dropped_keys) { + status = kv_instance->Delete(key); + if (!status.ok()) { + return status; + } + } - status = CleanupInner(metas); - if (!status.ok()) { - return status; + txn_store->dropped_keys_ = dropped_keys; + txn_store->metas_ = metas; } - txn_store->dropped_keys_ = dropped_keys; - txn_store->metas_ = metas; + // Delete empty dir + BufferManager *buffer_mgr = InfinityContext::instance().storage()->buffer_manager(); + auto data_dir_str = buffer_mgr->GetFullDataDir(); + auto data_dir = static_cast(*data_dir_str); + VirtualStore::RecursiveCleanupAllEmptyDir(data_dir); + + // Clean up stale object data that has no corresponding file path + PersistenceManager *pm = InfinityContext::instance().persistence_manager(); + if (pm != nullptr) { + PersistResultHandler handler(pm); + PersistWriteResult result = pm->CleanupStaleObjectData(); + handler.HandleWriteResult(result); + } return Status::OK(); } diff --git a/src/storage/persistence/persistence_manager.cppm b/src/storage/persistence/persistence_manager.cppm index 0f05e53888..b08faf2344 100644 --- a/src/storage/persistence/persistence_manager.cppm +++ b/src/storage/persistence/persistence_manager.cppm @@ -38,6 +38,9 @@ export struct ObjAddr { size_t part_offset_{}; size_t part_size_{}; + ObjAddr() {} + ObjAddr(const std::string &obj_key, size_t part_offset, size_t part_size) : obj_key_(obj_key), part_offset_(part_offset), part_size_(part_size) {} + bool Valid() const { return !obj_key_.empty(); } nlohmann::json Serialize() const; @@ -98,6 +101,9 @@ public: [[nodiscard]] PersistWriteResult Cleanup(const std::string &file_path); + // Clean up stale object data that has no corresponding file path + PersistWriteResult CleanupStaleObjectData(); + /** * Utils */ diff --git a/src/storage/persistence/persistence_manager_impl.cpp b/src/storage/persistence/persistence_manager_impl.cpp index c2ef8eca3b..0b1f3475e9 100644 --- a/src/storage/persistence/persistence_manager_impl.cpp +++ b/src/storage/persistence/persistence_manager_impl.cpp @@ -652,17 +652,120 @@ std::unordered_map PersistenceManager::GetAllFiles() const const std::string &obj_prefix = KeyEncode::PMObjectPrefix(); size_t obj_prefix_len = obj_prefix.size(); - std::unique_ptr kv_instance = kv_store_->GetInstance(); - auto iter = kv_instance->GetIterator(); - iter->Seek(obj_prefix); - while (iter->Valid() && iter->Key().starts_with(obj_prefix)) { - std::string path = iter->Key().ToString().substr(obj_prefix_len); - ObjAddr obj_addr; - obj_addr.Deserialize(iter->Value().ToString()); - local_path_obj.emplace(path, obj_addr); - iter->Next(); + std::vector> all_key_values = kv_store_->GetAllKeyValue(); + for (const auto &[key, value] : all_key_values) { + if (key.starts_with(obj_prefix)) { + std::string path = key.substr(obj_prefix_len); + ObjAddr obj_addr; + obj_addr.Deserialize(value); + local_path_obj.emplace(path, obj_addr); + } } return local_path_obj; } +PersistWriteResult PersistenceManager::CleanupStaleObjectData() { + PersistWriteResult result; + std::unordered_map> object_range_map; + std::unordered_map> object_objstat_map; + std::unordered_map file_objaddr_map; + { + std::lock_guard lock(mtx_); + object_objstat_map = GetAllObjects(); + file_objaddr_map = GetAllFiles(); + } + + // Initialize range map with deleted ranges from each object. + for (const auto &[obj_key, obj_stat_ptr] : object_objstat_map) { + object_range_map[obj_key] = obj_stat_ptr->deleted_ranges_; + } + + // Process each file to merge and update ranges of each object. + for (const auto &[file_path, object_addr] : file_objaddr_map) { + std::string obj_key = object_addr.obj_key_; + if (obj_key == "KEY_EMPTY") { + continue; + } + + auto range_it = object_range_map.find(obj_key); + if (range_it == object_range_map.end()) { + LOG_WARN(fmt::format("Failed to find range info for object {}", obj_key)); + continue; + } + auto &range_set = range_it->second; + size_t range_end = object_addr.part_offset_ + object_addr.part_size_; + range_end = (range_end + ObjAlignment - 1) & ~(ObjAlignment - 1); + Range obj_range(object_addr.part_offset_, range_end); + Range range(obj_range); + + std::vector::iterator> to_erase; + auto it = range_set.lower_bound(range); + if (it != range_set.begin()) { + auto prev_it = std::prev(it); + if (prev_it->end_ >= range.start_) { + range.start_ = std::min(range.start_, prev_it->start_); + range.end_ = std::max(range.end_, prev_it->end_); + to_erase.push_back(prev_it); + } + } + + while (it != range_set.end() && it->start_ <= range.end_) { + range.end_ = std::max(range.end_, it->end_); + to_erase.push_back(it); + ++it; + } + + for (auto &erase_it : to_erase) { + range_set.erase(erase_it); + } + range_set.insert(range); + } + + // Find and clean up missing ranges for each object. + for (auto &obj_range : object_range_map) { + auto &obj_stat = object_objstat_map[obj_range.first]; + size_t obj_size = (obj_stat->obj_size_ + ObjAlignment - 1) & ~(ObjAlignment - 1); + auto &ranges = obj_range.second; + if (ranges.size() == 1 && ranges.begin()->start_ == 0 && ranges.begin()->end_ == obj_size) { + continue; + } + + std::vector missing_ranges; + if (ranges.empty()) { + if (obj_size > 0) { + missing_ranges.push_back({0, obj_size}); + } + } else { + auto first_it = ranges.begin(); + if (first_it->start_ > 0) { + missing_ranges.push_back({0, first_it->start_}); + } + + auto it = ranges.begin(); + auto next_it = std::next(it); + while (next_it != ranges.end()) { + if (it->end_ < next_it->start_) { + missing_ranges.push_back({it->end_, next_it->start_}); + } + ++it; + ++next_it; + } + + auto last_it = ranges.rbegin(); + if (last_it->end_ < obj_size) { + missing_ranges.push_back({last_it->end_, obj_size}); + } + } + + if (!missing_ranges.empty()) { + LOG_INFO("Cleanup stale object data"); + for (auto &missing_range : missing_ranges) { + ObjAddr obj_addr(obj_range.first, missing_range.start_, missing_range.end_ - missing_range.start_); + CleanupNoLock(obj_addr, result.persist_keys_, result.drop_from_remote_keys_, true); + } + } + } + return result; +} + } // namespace infinity \ No newline at end of file diff --git a/src/unit_test/storage/new_catalog/kv_store_ut.cpp b/src/unit_test/storage/new_catalog/kv_store_ut.cpp index 1bf8cd8fdb..dec4a6a7cd 100644 --- a/src/unit_test/storage/new_catalog/kv_store_ut.cpp +++ b/src/unit_test/storage/new_catalog/kv_store_ut.cpp @@ -513,3 +513,41 @@ TEST_F(TestTxnKVStoreTest, wal) { status = kv_store->Destroy(rocksdb_tmp_path); EXPECT_TRUE(status.ok()); } + +TEST_F(TestTxnKVStoreTest, ImmediateVisibility) { + using namespace infinity; + const auto rocksdb_tmp_path = fmt::format("{}/rocksdb_visibility", GetFullTmpDir()); + + std::unique_ptr kv_store = std::make_unique(); + Status status = kv_store->Init(rocksdb_tmp_path); + EXPECT_TRUE(status.ok()); + + const int num_iterations = 1000; + + for (int i = 0; i < num_iterations; ++i) { + std::string key = "key_" + std::to_string(i); + std::string value = "value_" + std::to_string(i); + + status = kv_store->Put(key, value, false); // 启用WAL + EXPECT_TRUE(status.ok()) << "Put failed at iteration " << i; + + std::string retrieved_value; + status = kv_store->Get(key, retrieved_value); + EXPECT_TRUE(status.ok()) << "Get failed at iteration " << i; + EXPECT_EQ(retrieved_value, value) << "Value mismatch at iteration " << i << ", expected: " << value << ", got: " << retrieved_value; + + if (i % 10 == 0) { + std::cout << "Iteration " << i << " passed" << std::endl; + } + } + + auto final_data = kv_store->GetAllKeyValue(); + EXPECT_EQ(final_data.size(), num_iterations) << "Expected " << num_iterations << " items, got " << final_data.size(); + + status = kv_store->Uninit(); + EXPECT_TRUE(status.ok()); + status = kv_store->Destroy(rocksdb_tmp_path); + EXPECT_TRUE(status.ok()); + + std::cout << "Immediate visibility test completed successfully!" << std::endl; +}