diff --git a/CMakeLists.txt b/CMakeLists.txt index 256cbb20..42edf03d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -145,6 +145,7 @@ if (WITH_TITAN_TESTS AND (CMAKE_BUILD_TYPE STREQUAL "Debug")) blob_gc_job_test blob_gc_picker_test gc_stats_test + punch_hole_gc_test table_builder_test thread_safety_test titan_db_test diff --git a/include/titan/options.h b/include/titan/options.h index b02da2d9..5d28b9a6 100644 --- a/include/titan/options.h +++ b/include/titan/options.h @@ -172,7 +172,7 @@ struct TitanCFOptions : public ColumnFamilyOptions { // requirement for blob entries and Titan has to distinguish between real // data's 0s and 0s created by punch holes). uint64_t block_size{4096}; - bool enable_punch_hole_gc{false}; + uint64_t punch_hole_threshold{0}; TitanCFOptions() = default; explicit TitanCFOptions(const ColumnFamilyOptions& options) @@ -220,7 +220,6 @@ struct ImmutableTitanCFOptions { bool skip_value_in_compaction_filter; uint64_t block_size; - bool enable_punch_hole_gc; }; struct MutableTitanCFOptions { @@ -230,12 +229,14 @@ struct MutableTitanCFOptions { : blob_run_mode(opts.blob_run_mode), min_blob_size(opts.min_blob_size), blob_file_compression(opts.blob_file_compression), - blob_file_discardable_ratio(opts.blob_file_discardable_ratio) {} + blob_file_discardable_ratio(opts.blob_file_discardable_ratio), + punch_hole_threshold(opts.punch_hole_threshold) {} TitanBlobRunMode blob_run_mode; uint64_t min_blob_size; CompressionType blob_file_compression; double blob_file_discardable_ratio; + uint64_t punch_hole_threshold; }; struct TitanOptions : public TitanDBOptions, public TitanCFOptions { diff --git a/src/blob_file_builder.cc b/src/blob_file_builder.cc index 0514d75f..f96ffc99 100644 --- a/src/blob_file_builder.cc +++ b/src/blob_file_builder.cc @@ -33,7 +33,7 @@ BlobFileBuilder::BlobFileBuilder(const TitanDBOptions& db_options, return; #endif } - block_size_ = cf_options.enable_punch_hole_gc ? cf_options.block_size : 0; + block_size_ = cf_options.punch_hole_threshold > 0 ? cf_options.block_size : 0; WriteHeader(); } @@ -164,7 +164,11 @@ void BlobFileBuilder::FlushSampleRecords(OutContexts* out_ctx) { void BlobFileBuilder::WriteEncoderData(BlobHandle* handle) { handle->offset = file_->GetFileSize(); handle->size = encoder_.GetEncodedSize(); - live_data_size_ += handle->size; + if (block_size_ > 0) { + live_data_size_ += Roundup(handle->size, block_size_); + } else { + live_data_size_ += handle->size; + } status_ = file_->Append(encoder_.GetHeader()); if (ok()) { diff --git a/src/blob_file_builder.h b/src/blob_file_builder.h index ff4359da..abbefdef 100644 --- a/src/blob_file_builder.h +++ b/src/blob_file_builder.h @@ -108,6 +108,8 @@ class BlobFileBuilder { const std::string& GetSmallestKey() { return smallest_key_; } const std::string& GetLargestKey() { return largest_key_; } + uint64_t GetBlockSize() { return block_size_; } + uint64_t live_data_size() const { return live_data_size_; } private: diff --git a/src/blob_file_iterator.cc b/src/blob_file_iterator.cc index a31e701f..658a4754 100644 --- a/src/blob_file_iterator.cc +++ b/src/blob_file_iterator.cc @@ -75,23 +75,12 @@ bool BlobFileIterator::Init() { return true; } -uint64_t BlobFileIterator::AdjustOffsetToNextBlockHead() { - if (block_size_ == 0) return 0; - uint64_t block_offset = iterate_offset_ % block_size_; - if (block_offset != 0) { - uint64_t shift = block_size_ - block_offset; - iterate_offset_ += shift; - return shift; - } - return 0; -} - void BlobFileIterator::SeekToFirst() { if (!init_ && !Init()) return; status_ = Status::OK(); iterate_offset_ = header_size_; if (block_size_ != 0) { - AdjustOffsetToNextBlockHead(); + iterate_offset_ = Roundup(iterate_offset_, block_size_); } PrefetchAndGet(); } @@ -120,7 +109,8 @@ void BlobFileIterator::IterateForPrev(uint64_t offset) { uint64_t total_length = 0; FixedSlice header_buffer; - iterate_offset_ = header_size_; + iterate_offset_ = + block_size_ > 0 ? Roundup(header_size_, block_size_) : header_size_; while (iterate_offset_ < offset) { IOOptions io_options; // Since BlobFileIterator is only used for GC, we always set IO priority to @@ -133,17 +123,17 @@ void BlobFileIterator::IterateForPrev(uint64_t offset) { status_ = decoder_.DecodeHeader(&header_buffer); if (!status_.ok()) return; total_length = kRecordHeaderSize + decoder_.GetRecordSize(); - iterate_offset_ += total_length; if (block_size_ != 0) { - total_length += AdjustOffsetToNextBlockHead(); + total_length = Roundup(total_length, block_size_); } + iterate_offset_ += total_length; } if (iterate_offset_ > offset) iterate_offset_ -= total_length; valid_ = false; } -void BlobFileIterator::GetBlobRecord() { +bool BlobFileIterator::GetBlobRecord() { FixedSlice header_buffer; // Since BlobFileIterator is only used for GC, we always set IO priority to // low. @@ -152,9 +142,18 @@ void BlobFileIterator::GetBlobRecord() { status_ = file_->Read(io_options, iterate_offset_, kRecordHeaderSize, &header_buffer, header_buffer.get(), nullptr /*aligned_buf*/); - if (!status_.ok()) return; + if (!status_.ok()) return false; + + // Check if the record is a hole-punch record by checking the size field in + // the header. + uint32_t* size = (uint32_t*)(header_buffer.get() + 4); + if (*size == 0) { + // This is a hole-punch record. + return false; + } + status_ = decoder_.DecodeHeader(&header_buffer); - if (!status_.ok()) return; + if (!status_.ok()) return false; Slice record_slice; auto record_size = decoder_.GetRecordSize(); @@ -167,47 +166,53 @@ void BlobFileIterator::GetBlobRecord() { decoder_.DecodeRecord(&record_slice, &cur_blob_record_, &uncompressed_, titan_cf_options_.memory_allocator()); } - if (!status_.ok()) return; + if (!status_.ok()) return false; cur_record_offset_ = iterate_offset_; cur_record_size_ = kRecordHeaderSize + record_size; iterate_offset_ += cur_record_size_; - AdjustOffsetToNextBlockHead(); + if (block_size_ != 0) iterate_offset_ = Roundup(iterate_offset_, block_size_); valid_ = true; + return true; } void BlobFileIterator::PrefetchAndGet() { - if (iterate_offset_ >= end_of_blob_record_) { - valid_ = false; - return; - } + while (iterate_offset_ < end_of_blob_record_) { + if (readahead_begin_offset_ > iterate_offset_ || + readahead_end_offset_ < iterate_offset_) { + // alignment + readahead_begin_offset_ = + iterate_offset_ - (iterate_offset_ & (kDefaultPageSize - 1)); + readahead_end_offset_ = readahead_begin_offset_; + readahead_size_ = kMinReadaheadSize; + } + auto min_blob_size = + iterate_offset_ + kRecordHeaderSize + titan_cf_options_.min_blob_size; + if (readahead_end_offset_ <= min_blob_size) { + IOOptions io_options; + io_options.rate_limiter_priority = Env::IOPriority::IO_LOW; + while (readahead_end_offset_ + readahead_size_ <= min_blob_size && + readahead_size_ < kMaxReadaheadSize) + readahead_size_ <<= 1; + file_->Prefetch(io_options, readahead_end_offset_, readahead_size_); + readahead_end_offset_ += readahead_size_; + readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ << 1); + } - if (readahead_begin_offset_ > iterate_offset_ || - readahead_end_offset_ < iterate_offset_) { - // alignment - readahead_begin_offset_ = - iterate_offset_ - (iterate_offset_ & (kDefaultPageSize - 1)); - readahead_end_offset_ = readahead_begin_offset_; - readahead_size_ = kMinReadaheadSize; - } - auto min_blob_size = - iterate_offset_ + kRecordHeaderSize + titan_cf_options_.min_blob_size; - if (readahead_end_offset_ <= min_blob_size) { - while (readahead_end_offset_ + readahead_size_ <= min_blob_size && - readahead_size_ < kMaxReadaheadSize) - readahead_size_ <<= 1; - IOOptions io_options; - io_options.rate_limiter_priority = Env::IOPriority::IO_LOW; - file_->Prefetch(io_options, readahead_end_offset_, readahead_size_); - readahead_end_offset_ += readahead_size_; - readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ << 1); - } + bool live = GetBlobRecord(); - GetBlobRecord(); + if (readahead_end_offset_ < iterate_offset_) { + readahead_end_offset_ = iterate_offset_; + } - if (readahead_end_offset_ < iterate_offset_) { - readahead_end_offset_ = iterate_offset_; + // If the record is a hole-punch record, we should continue to the next + // record by adjusting iterate_offset_, otherwise (not a hole-punch record), + // we should break the loop and return the record, iterate_offset_ is + // already adjusted inside GetBlobRecord() in this case. + if (live || !status().ok()) return; + iterate_offset_ += block_size_; } + valid_ = false; } BlobFileMergeIterator::BlobFileMergeIterator( diff --git a/src/blob_file_iterator.h b/src/blob_file_iterator.h index 2a17e1f3..8bfc2595 100644 --- a/src/blob_file_iterator.h +++ b/src/blob_file_iterator.h @@ -77,8 +77,8 @@ class BlobFileIterator { uint64_t readahead_size_{kMinReadaheadSize}; void PrefetchAndGet(); - void GetBlobRecord(); - uint64_t AdjustOffsetToNextBlockHead(); + // Returns false if the record is invalid (punch-hole). + bool GetBlobRecord(); }; class BlobFileMergeIterator { diff --git a/src/blob_file_iterator_test.cc b/src/blob_file_iterator_test.cc index 1d219dd1..9f7413d7 100644 --- a/src/blob_file_iterator_test.cc +++ b/src/blob_file_iterator_test.cc @@ -55,7 +55,7 @@ class BlobFileIteratorTest : public testing::Test { TitanDBOptions db_options(titan_options_); TitanCFOptions cf_options(titan_options_); if (with_blocks) { - cf_options.enable_punch_hole_gc = true; + cf_options.punch_hole_threshold = 4096; cf_options.block_size = 4096; } BlobFileCache cache(db_options, cf_options, {NewLRUCache(128)}, nullptr); diff --git a/src/blob_file_set.h b/src/blob_file_set.h index 6b76650d..cf6f3755 100644 --- a/src/blob_file_set.h +++ b/src/blob_file_set.h @@ -98,6 +98,22 @@ class BlobFileSet { bool IsOpened() { return opened_.load(std::memory_order_acquire); } + uint64_t GetBlockSize(uint32_t cf_id) { + MutexLock l(mutex_); + auto storage = GetBlobStorage(cf_id).lock(); + if (storage != nullptr && storage->cf_options().punch_hole_threshold > 0) { + return storage->cf_options().block_size; + } + return 0; + } + + std::unordered_map GetFileBlockSizes(uint32_t cf_id) { + MutexLock l(mutex_); + auto storage = GetBlobStorage(cf_id).lock(); + return storage ? storage->GetFileBlockSizes() + : std::unordered_map(); + } + private: struct ManifestWriter; diff --git a/src/blob_file_size_collector.cc b/src/blob_file_size_collector.cc index fa37897b..2b77e800 100644 --- a/src/blob_file_size_collector.cc +++ b/src/blob_file_size_collector.cc @@ -7,8 +7,13 @@ namespace titandb { TablePropertiesCollector* BlobFileSizeCollectorFactory::CreateTablePropertiesCollector( - rocksdb::TablePropertiesCollectorFactory::Context /* context */) { - return new BlobFileSizeCollector(); + rocksdb::TablePropertiesCollectorFactory::Context context) { + if (blob_file_set_ != nullptr) { + return new BlobFileSizeCollector( + blob_file_set_->GetBlockSize(context.column_family_id), + blob_file_set_->GetFileBlockSizes(context.column_family_id)); + } + return new BlobFileSizeCollector(0, {}); } const std::string BlobFileSizeCollector::kPropertiesName = @@ -57,11 +62,32 @@ Status BlobFileSizeCollector::AddUserKey(const Slice& /* key */, return s; } + auto size = index.blob_handle.size; + if (default_block_size_ > 0 || !file_block_sizes_.empty()) { + // If the blob file cannot be found in the block size map, it must be a + // newly created file that has not been added blob_file_set, in this case, + // we know the block size of the file is default_block_size_. + // If the blob file can be found in the block size map, it implies we are + // moving the reference only, while keeping the blob at the original file, + // in this case, we should use the block size of the original file. + uint64_t block_size = default_block_size_; + if (!file_block_sizes_.empty()) { + auto iter = file_block_sizes_.find(index.file_number); + if (iter != file_block_sizes_.end()) { + block_size = iter->second; + } + } + if (block_size > 0) { + // Align blob size with block size. + size = Roundup(size, block_size); + } + } + auto iter = blob_files_size_.find(index.file_number); if (iter == blob_files_size_.end()) { - blob_files_size_[index.file_number] = index.blob_handle.size; + blob_files_size_[index.file_number] = size; } else { - iter->second += index.blob_handle.size; + iter->second += size; } return Status::OK(); diff --git a/src/blob_file_size_collector.h b/src/blob_file_size_collector.h index 4bd8a344..b94627a1 100644 --- a/src/blob_file_size_collector.h +++ b/src/blob_file_size_collector.h @@ -13,16 +13,31 @@ namespace titandb { class BlobFileSizeCollectorFactory final : public TablePropertiesCollectorFactory { public: + // If punch_hole_gc is enabled, then blob_file_set must be provided. + // If blob_file_set is not provided, then punch_hole_gc will be considered + // disabled, blob size will not align with block size. + BlobFileSizeCollectorFactory(BlobFileSet* blob_file_set = nullptr) + : blob_file_set_(blob_file_set) {} + BlobFileSizeCollectorFactory(const BlobFileSizeCollectorFactory&) = delete; + void operator=(const BlobFileSizeCollectorFactory&) = delete; TablePropertiesCollector* CreateTablePropertiesCollector( TablePropertiesCollectorFactory::Context context) override; const char* Name() const override { return "BlobFileSizeCollector"; } + + private: + BlobFileSet* blob_file_set_; }; class BlobFileSizeCollector final : public TablePropertiesCollector { public: const static std::string kPropertiesName; + BlobFileSizeCollector(uint64_t default_block_size, + std::unordered_map file_block_sizes) + : default_block_size_(default_block_size), + file_block_sizes_(file_block_sizes) {} + static bool Encode(const std::map& blob_files_size, std::string* result); static bool Decode(Slice* slice, @@ -38,6 +53,8 @@ class BlobFileSizeCollector final : public TablePropertiesCollector { private: std::map blob_files_size_; + uint64_t default_block_size_; + std::unordered_map file_block_sizes_; }; } // namespace titandb diff --git a/src/blob_format.h b/src/blob_format.h index 38ff6324..d148e227 100644 --- a/src/blob_format.h +++ b/src/blob_format.h @@ -248,10 +248,16 @@ class BlobFileMeta { uint64_t file_size() const { return file_size_; } uint64_t live_data_size() const { return live_data_size_; } uint32_t file_level() const { return file_level_; } + uint64_t block_size() const { return block_size_; } const std::string& smallest_key() const { return smallest_key_; } const std::string& largest_key() const { return largest_key_; } + int64_t effective_file_size() const { return effective_file_size_; } void set_live_data_size(int64_t size) { live_data_size_ = size; } + // This should be called with db mutex held. + void set_effective_file_size(int64_t size) { effective_file_size_ = size; } + void set_disk_uage(int64_t size) { disk_usage_ = size; } + uint64_t file_entries() const { return file_entries_; } FileState file_state() const { return state_; } bool is_obsolete() const { return state_ == FileState::kObsolete; } @@ -275,6 +281,9 @@ class BlobFileMeta { (file_size_ - kBlobMaxHeaderSize - kBlobFooterSize)); } TitanInternalStats::StatsType GetDiscardableRatioLevel() const; + uint64_t GetHolePunchableSize() const { + return effective_file_size_ - live_data_size_; + } void Dump(bool with_keys) const; private: @@ -303,7 +312,20 @@ class BlobFileMeta { // So when state_ == kPendingLSM, it uses this to record the delta as a // positive number if any later compaction is trigger before previous // `OnCompactionCompleted()` is called. + // The size is aligned with block size, when punch hole GC is enabled. std::atomic live_data_size_{0}; + // This is used to calculate the size of the punchable hole. i.e. + // effective_file_size_ - live_data_size_. + // The effective size of current file. This is different from `file_size_`, as + // `file_size_` is the original size of the file, and does not consider space + // reclaimed by punch hole GC. + // This might be bigger than the actual effective size of the file, when Titan + // crashes or restarts. This is fine, as it will be corrected when the file is + // chose for GC next time. + int64_t effective_file_size_{0}; + // Disk usage of the file, This is different from `effective_file_size_`, when + // block size does not align with file system block size. + int64_t disk_usage_{0}; std::atomic state_{FileState::kNone}; }; diff --git a/src/blob_gc.cc b/src/blob_gc.cc index 9fe6cd2d..9b31c81f 100644 --- a/src/blob_gc.cc +++ b/src/blob_gc.cc @@ -4,10 +4,12 @@ namespace rocksdb { namespace titandb { BlobGC::BlobGC(std::vector>&& blob_files, - TitanCFOptions&& _titan_cf_options, bool need_trigger_next) + TitanCFOptions&& _titan_cf_options, bool need_trigger_next, + bool punch_hole_gc) : inputs_(blob_files), titan_cf_options_(std::move(_titan_cf_options)), - trigger_next_(need_trigger_next) { + trigger_next_(need_trigger_next), + punch_hole_gc_(punch_hole_gc) { MarkFilesBeingGC(); } diff --git a/src/blob_gc.h b/src/blob_gc.h index 5ce1998f..20f3b042 100644 --- a/src/blob_gc.h +++ b/src/blob_gc.h @@ -14,7 +14,8 @@ namespace titandb { class BlobGC { public: BlobGC(std::vector>&& blob_files, - TitanCFOptions&& _titan_cf_options, bool need_trigger_next); + TitanCFOptions&& _titan_cf_options, bool need_trigger_next, + bool punch_hole_gc = false); // No copying allowed BlobGC(const BlobGC&) = delete; @@ -40,6 +41,8 @@ class BlobGC { bool trigger_next() { return trigger_next_; } + bool punch_hole_gc() { return punch_hole_gc_; } + private: std::vector> inputs_; std::vector outputs_; @@ -47,6 +50,7 @@ class BlobGC { ColumnFamilyHandle* cfh_{nullptr}; // Whether need to trigger gc after this gc or not const bool trigger_next_; + const bool punch_hole_gc_; }; struct GCScore { diff --git a/src/blob_gc_picker.cc b/src/blob_gc_picker.cc index 2b102ca1..7fc436a0 100644 --- a/src/blob_gc_picker.cc +++ b/src/blob_gc_picker.cc @@ -18,7 +18,19 @@ BasicBlobGCPicker::BasicBlobGCPicker(TitanDBOptions db_options, BasicBlobGCPicker::~BasicBlobGCPicker() {} -std::unique_ptr BasicBlobGCPicker::PickBlobGC( +std::unique_ptr BasicBlobGCPicker::PickBlobGC(BlobStorage* blob_storage, + bool allow_punch_hole) { + auto regular_gc = PickRegularBlobGC(blob_storage); + if (regular_gc) { + return regular_gc; + } + if (allow_punch_hole) { + return PickPunchHoleGC(blob_storage); + } + return nullptr; +} + +std::unique_ptr BasicBlobGCPicker::PickRegularBlobGC( BlobStorage* blob_storage) { Status s; std::vector> blob_files; @@ -26,7 +38,7 @@ std::unique_ptr BasicBlobGCPicker::PickBlobGC( uint64_t batch_size = 0; uint64_t estimate_output_size = 0; bool stop_picking = false; - bool maybe_continue_next_time = false; + bool need_trigger_next = false; uint64_t next_gc_size = 0; bool in_fallback = cf_options_.blob_run_mode == TitanBlobRunMode::kFallback; @@ -59,14 +71,14 @@ std::unique_ptr BasicBlobGCPicker::PickBlobGC( estimate_output_size += blob_file->live_data_size(); if (batch_size >= cf_options_.max_gc_batch_size || estimate_output_size >= cf_options_.blob_file_target_size) { - // Stop pick file for this gc, but still check file for whether need - // trigger gc after this + // Stop picking file for this gc, but still check file for whether + // another round of gc is needed. stop_picking = true; } } else { next_gc_size += blob_file->file_size(); if (next_gc_size > cf_options_.min_gc_batch_size || in_fallback) { - maybe_continue_next_time = true; + need_trigger_next = true; RecordTick(statistics(stats_), TITAN_GC_REMAIN, 1); TITAN_LOG_INFO(db_options_.info_log, "remain more than %" PRIu64 @@ -100,7 +112,48 @@ std::unique_ptr BasicBlobGCPicker::PickBlobGC( } return std::unique_ptr(new BlobGC( - std::move(blob_files), std::move(cf_options_), maybe_continue_next_time)); + std::move(blob_files), std::move(cf_options_), need_trigger_next)); +} + +std::unique_ptr BasicBlobGCPicker::PickPunchHoleGC( + BlobStorage* blob_storage) { + Status s; + std::vector> blob_files; + + uint64_t batch_size = 0; + uint64_t estimate_output_size = 0; + bool stop_picking = false; + bool need_trigger_next = false; + uint64_t next_gc_size = 0; + + for (auto& gc_score : blob_storage->punch_hole_score()) { + auto blob_file = blob_storage->FindFile(gc_score.file_number).lock(); + if (!CheckBlobFile(blob_file.get())) { + // Skip this file id this file is being GCed + // or this file had been GCed + TITAN_LOG_INFO(db_options_.info_log, + "Blob file %" PRIu64 " no need punch hole gc", + blob_file->file_number()); + continue; + } + if (!stop_picking) { + blob_files.emplace_back(blob_file); + batch_size += blob_file->file_size(); + if (batch_size >= cf_options_.max_gc_batch_size) { + // Stop pick file for this gc, but still check file for whether need + // trigger gc after this + stop_picking = true; + } + } else { + // TODO: add a batch threshold for punch hole gc. + need_trigger_next = true; + break; + } + } + if (blob_files.empty()) return nullptr; + return std::unique_ptr(new BlobGC( + std::move(blob_files), std::move(cf_options_), need_trigger_next, + /*punch_hole_gc=*/true)); } bool BasicBlobGCPicker::CheckBlobFile(BlobFileMeta* blob_file) const { diff --git a/src/blob_gc_picker.h b/src/blob_gc_picker.h index ca570872..33191734 100644 --- a/src/blob_gc_picker.h +++ b/src/blob_gc_picker.h @@ -24,7 +24,10 @@ class BlobGCPicker { // Returns nullptr if there is no gc to be done. // Otherwise returns a pointer to a heap-allocated object that // describes the gc. Caller should delete the result. - virtual std::unique_ptr PickBlobGC(BlobStorage* blob_storage) = 0; + // If allow_punch_hole is true, picker may return a punch hole gc. + // Otherwise, picker will only return a regular gc. + virtual std::unique_ptr PickBlobGC(BlobStorage* blob_storage, + bool allow_punch_hole = false) = 0; }; class BasicBlobGCPicker final : public BlobGCPicker { @@ -32,9 +35,13 @@ class BasicBlobGCPicker final : public BlobGCPicker { BasicBlobGCPicker(TitanDBOptions, TitanCFOptions, TitanStats*); ~BasicBlobGCPicker(); - std::unique_ptr PickBlobGC(BlobStorage* blob_storage) override; + std::unique_ptr PickBlobGC(BlobStorage* blob_storage, + bool allow_punch_hole = false) override; private: + std::unique_ptr PickRegularBlobGC(BlobStorage* blob_storage); + std::unique_ptr PickPunchHoleGC(BlobStorage* blob_storage); + TitanDBOptions db_options_; TitanCFOptions cf_options_; TitanStats* stats_; diff --git a/src/blob_storage.cc b/src/blob_storage.cc index 3a2af47f..8c8cbe6c 100644 --- a/src/blob_storage.cc +++ b/src/blob_storage.cc @@ -1,5 +1,7 @@ #include "blob_storage.h" +#include + #include "blob_file_set.h" #include "titan_logging.h" @@ -122,6 +124,30 @@ Status BlobStorage::GetBlobFilesInRanges( return Status::OK(); } +Status BlobStorage::InitPunchHoleGCOnStart() { + MutexLock l(&mutex_); + for (auto& file : files_) { + struct stat file_stat; + if (stat(BlobFileName(db_options_.dirname, file.second->file_number()) + .c_str(), + &file_stat) != 0) { + return Status::IOError("stat failed when init effective file size"); + } + // st_blocks is the number of blocks allocated in 512B units. + int64_t physical_size = file_stat.st_blocks * 512; + file.second->set_disk_uage(physical_size); + // This is a over-approximation of the effective file size, because the + // effective size of the file must be equal or smaller than the physical + // size. They are equal only when the file's block size matches the file + // system's block size, otherwise the real effective size is usually + // smaller, considering the existance of fragmentation. But it's fine to use + // the physical size as the initial value of the effective size, because the + // effective size will be updated after the first GC. + file.second->set_effective_file_size(physical_size); + } + return Status::OK(); +} + std::weak_ptr BlobStorage::FindFile(uint64_t file_number) const { MutexLock l(&mutex_); auto it = files_.find(file_number); @@ -182,8 +208,12 @@ bool BlobStorage::RemoveFile(uint64_t file_number) { break; } } + auto removed_size = file->second->file_size(); + if (file->second->block_size() > 0) { + removed_size = file->second->effective_file_size(); + } SubStats(stats_, cf_id_, TitanInternalStats::OBSOLETE_BLOB_FILE_SIZE, - file->second->file_size()); + removed_size); SubStats(stats_, cf_id_, TitanInternalStats::NUM_OBSOLETE_BLOB_FILE, 1); files_.erase(file_number); file_cache_->Evict(file_number); @@ -237,6 +267,7 @@ void BlobStorage::UpdateStats() { levels_file_count_.assign(cf_options_.num_levels, 0); uint64_t live_blob_file_size = 0, num_live_blob_file = 0; uint64_t obsolete_blob_file_size = 0, num_obsolete_blob_file = 0; + uint64_t pending_punch_hole_size = 0; std::unordered_map ratio_levels; // collect metrics @@ -244,6 +275,9 @@ void BlobStorage::UpdateStats() { if (file.second->is_obsolete()) { num_obsolete_blob_file += 1; obsolete_blob_file_size += file.second->file_size(); + if (file.second->block_size() > 0) { + obsolete_blob_file_size = file.second->effective_file_size(); + } continue; } num_live_blob_file += 1; @@ -252,6 +286,7 @@ void BlobStorage::UpdateStats() { // If the file is initialized yet, skip it if (file.second->file_state() != BlobFileMeta::FileState::kPendingInit) { live_blob_file_size += file.second->file_size(); + pending_punch_hole_size += file.second->GetHolePunchableSize(); ratio_levels[static_cast(file.second->GetDiscardableRatioLevel())] += 1; } @@ -271,6 +306,8 @@ void BlobStorage::UpdateStats() { SetStats(stats_, cf_id_, static_cast(i), ratio_levels[i]); } + SetStats(stats_, cf_id_, TitanInternalStats::PENDING_PUNCH_HOLE_SIZE, + pending_punch_hole_size); } void BlobStorage::ComputeGCScore() { UpdateStats(); @@ -280,32 +317,49 @@ void BlobStorage::ComputeGCScore() { MutexLock l(&mutex_); gc_score_.clear(); + punch_hole_score_.clear(); for (auto& file : files_) { if (file.second->is_obsolete()) { continue; } - double score; + double gc_score; if (file.second->file_size() < cf_options_.merge_small_file_threshold) { // for the small file or file with gc mark (usually the file that just // recovered) we want gc these file but more hope to gc other file with // more invalid data - score = std::max(cf_options_.blob_file_discardable_ratio, - file.second->GetDiscardableRatio()); + gc_score = std::max(cf_options_.blob_file_discardable_ratio, + file.second->GetDiscardableRatio()); + } else { + gc_score = file.second->GetDiscardableRatio(); + } + if (gc_score >= cf_options_.blob_file_discardable_ratio) { + gc_score_.emplace_back(GCScore{ + .file_number = file.first, + .score = gc_score, + }); } else { - score = file.second->GetDiscardableRatio(); + if (cf_options_.punch_hole_threshold > 0 && + file.second->GetHolePunchableSize() > + cf_options_.punch_hole_threshold) { + punch_hole_score_.emplace_back(GCScore{ + .file_number = file.first, + .score = (double)file.second->GetHolePunchableSize() / + file.second->file_size(), + }); + } } - gc_score_.emplace_back(GCScore{ - .file_number = file.first, - .score = score, - }); } std::sort(gc_score_.begin(), gc_score_.end(), [](const GCScore& first, const GCScore& second) { return first.score > second.score; }); + std::sort(punch_hole_score_.begin(), punch_hole_score_.end(), + [](const GCScore& first, const GCScore& second) { + return first.score > second.score; + }); } } // namespace titandb diff --git a/src/blob_storage.h b/src/blob_storage.h index 8ac9a26a..1ac9b604 100644 --- a/src/blob_storage.h +++ b/src/blob_storage.h @@ -55,6 +55,11 @@ class BlobStorage { return gc_score_; } + const std::vector punch_hole_score() { + MutexLock l(&mutex_); + return punch_hole_score_; + } + Cache* blob_cache() { return blob_cache_.get(); } // Gets the blob record pointed by the blob index. The provided @@ -93,6 +98,8 @@ class BlobStorage { } } + Status InitPunchHoleGCOnStart(); + // Must call before TitanDBImpl initialized. void InitializeAllFiles() { MutexLock l(&mutex_); @@ -167,6 +174,15 @@ class BlobStorage { mutable_cf_options_ = options; } + std::unordered_map GetFileBlockSizes() { + MutexLock l(&mutex_); + std::unordered_map file_block_sizes; + for (auto& file : files_) { + file_block_sizes[file.first] = file.second->block_size(); + } + return file_block_sizes; + } + private: friend class BlobFileSet; friend class VersionTest; @@ -214,6 +230,7 @@ class BlobStorage { std::shared_ptr file_cache_; std::vector gc_score_; + std::vector punch_hole_score_; std::list> obsolete_files_; // It is marked when the column family handle is destroyed, indicating the diff --git a/src/db_impl.cc b/src/db_impl.cc index 81791c7e..29650f46 100644 --- a/src/db_impl.cc +++ b/src/db_impl.cc @@ -283,7 +283,7 @@ Status TitanDBImpl::OpenImpl(const std::vector& descs, // Disable compactions before blob file set is initialized. cf_opts.disable_auto_compactions = true; cf_opts.table_properties_collector_factories.emplace_back( - std::make_shared()); + std::make_shared(blob_file_set_.get())); titan_table_factories.push_back(std::make_shared( db_options_, desc.options, blob_manager_, &mutex_, blob_file_set_.get(), stats_.get())); @@ -459,7 +459,7 @@ Status TitanDBImpl::CreateColumnFamilies( stats_.get())); options.table_factory = titan_table_factory.back(); options.table_properties_collector_factories.emplace_back( - std::make_shared()); + std::make_shared(blob_file_set_.get())); if (options.compaction_filter != nullptr || options.compaction_filter_factory != nullptr) { std::shared_ptr titan_cf_factory = @@ -818,6 +818,25 @@ void TitanDBImpl::ReleaseSnapshot(const Snapshot* snapshot) { // We can record here whether the oldest snapshot is released. // If not, we can just skip the next round of purging obsolete files. db_->ReleaseSnapshot(snapshot); + { + MutexLock l(&mutex_); + + if (pending_punch_hole_gc_ != nullptr && !punch_hole_gc_running_ && + pending_punch_hole_gc_->snapshot()->GetSequenceNumber() <= + GetOldestSnapshotSequence() && + bg_gc_scheduled_ < db_options_.max_background_gc) { + if (db_options_.disable_background_gc) return; + + if (!initialized_.load(std::memory_order_acquire)) return; + + if (shuting_down_.load(std::memory_order_acquire)) return; + + TITAN_LOG_DEBUG(db_options_.info_log, + "Titan schedule punch hole GC after releasing snapshot"); + bg_gc_scheduled_++; + thread_pool_->SubmitJob(std::bind(&TitanDBImpl::BGWorkGC, this)); + } + } } Status TitanDBImpl::DisableFileDeletions() { diff --git a/src/db_impl.h b/src/db_impl.h index bb0596ad..393d95d4 100644 --- a/src/db_impl.h +++ b/src/db_impl.h @@ -7,6 +7,7 @@ #include "blob_file_manager.h" #include "blob_file_set.h" +#include "punch_hole_gc_job.h" #include "table_factory.h" #include "titan/db.h" #include "titan_stats.h" @@ -224,6 +225,7 @@ class TitanDBImpl : public TitanDB { friend class FileManager; friend class BlobGCJobTest; friend class BaseDbListener; + friend class PunchHoleGCTest; friend class TitanDBTest; friend class TitanThreadSafetyTest; friend class TitanGCStatsTest; @@ -287,6 +289,7 @@ class TitanDBImpl : public TitanDB { static void BGWorkGC(void* db); void BackgroundCallGC(); + bool MaybeRunPendingPunchHoleGC(); Status BackgroundGC(LogBuffer* log_buffer, uint32_t column_family_id); void PurgeObsoleteFiles(); @@ -381,6 +384,14 @@ class TitanDBImpl : public TitanDB { std::deque gc_queue_; // REQUIRE: mutex_ held. + // This is not a queue, since punch hole GC is only runnable when its owned + // snapshot is the oldest one. So we can't really multi-thread it. + std::unique_ptr pending_punch_hole_gc_; + // REQUIRE: mutex_ held. + // Indicates whether the scheduled punch hole GC is running, in case multiple + // threads are trying to work on the same job at the same time. + bool punch_hole_gc_running_ = false; + int bg_gc_scheduled_ = 0; // REQUIRE: mutex_ held. int bg_gc_running_ = 0; diff --git a/src/db_impl_gc.cc b/src/db_impl_gc.cc index aeb97dfc..7a4814ad 100644 --- a/src/db_impl_gc.cc +++ b/src/db_impl_gc.cc @@ -6,6 +6,7 @@ #include "blob_gc_picker.h" #include "db/version_set.h" #include "db_impl.h" +#include "punch_hole_gc_job.h" #include "titan_logging.h" #include "util.h" @@ -135,6 +136,7 @@ Status TitanDBImpl::AsyncInitializeGC( file->UpdateLiveDataSize(file_size.second); } } + blob_storage->InitPunchHoleGCOnStart(); blob_storage->InitializeAllFiles(); TITAN_LOG_INFO(db_options_.info_log, "Titan finish async GC initialization on cf [%s]", @@ -194,19 +196,24 @@ void TitanDBImpl::BackgroundCallGC() { } bg_gc_running_++; - TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:BeforeBackgroundGC"); - if (!gc_queue_.empty()) { - uint32_t column_family_id = PopFirstFromGCQueue(); - LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, - db_options_.info_log.get()); - BackgroundGC(&log_buffer, column_family_id); - { - mutex_.Unlock(); - log_buffer.FlushBufferToLog(); - LogFlush(db_options_.info_log.get()); - mutex_.Lock(); + // Try running pending (waiting for the snapshot to be the oldest) punch + // hole GC first. + if (!MaybeRunPendingPunchHoleGC()) { + TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:BeforeBackgroundGC"); + if (!gc_queue_.empty()) { + uint32_t column_family_id = PopFirstFromGCQueue(); + LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, + db_options_.info_log.get()); + BackgroundGC(&log_buffer, column_family_id); + { + mutex_.Unlock(); + log_buffer.FlushBufferToLog(); + LogFlush(db_options_.info_log.get()); + mutex_.Lock(); + } } } + TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:AfterGCRunning"); bg_gc_running_--; bg_gc_scheduled_--; @@ -225,6 +232,49 @@ void TitanDBImpl::BackgroundCallGC() { } } +bool TitanDBImpl::MaybeRunPendingPunchHoleGC() { + mutex_.AssertHeld(); + if (pending_punch_hole_gc_ == nullptr || punch_hole_gc_running_) { + return false; + } + if (blob_file_set_->IsColumnFamilyObsolete(pending_punch_hole_gc_->cf_id())) { + TITAN_LOG_INFO(db_options_.info_log, "GC skip dropped colum family [%s].", + cf_info_[pending_punch_hole_gc_->cf_id()].name.c_str()); + pending_punch_hole_gc_ = nullptr; + return false; + } + if (pending_punch_hole_gc_->snapshot()->GetSequenceNumber() == + GetOldestSnapshotSequence()) { + TITAN_LOG_DEBUG(db_options_.info_log, + "Titan start scheduled punch hole GC"); + punch_hole_gc_running_ = true; + mutex_.Unlock(); + Status s = pending_punch_hole_gc_->Run(); + mutex_.Lock(); + if (s.ok()) { + TITAN_LOG_DEBUG(db_options_.info_log, + "Titan finish scheduled punch hole GC"); + pending_punch_hole_gc_->Finish(); + } else { + SetBGError(s); + TITAN_LOG_ERROR(db_options_.info_log, + "Titan scheduled punch hole GC error: %s", + s.ToString().c_str()); + } + punch_hole_gc_running_ = false; + bool need_schedule_gc = pending_punch_hole_gc_->blob_gc()->trigger_next(); + pending_punch_hole_gc_ = nullptr; + TEST_SYNC_POINT( + "TitanDBImpl::MaybeRunPendingPunchHoleGC:" + "AfterRunPendingPunchHoleGC"); + if (need_schedule_gc) { + MaybeScheduleGC(); + } + return true; + } + return false; +} + Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, uint32_t column_family_id) { mutex_.AssertHeld(); @@ -246,22 +296,36 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer, std::shared_ptr blob_gc_picker = std::make_shared(db_options_, cf_options, stats_.get()); - blob_gc = blob_gc_picker->PickBlobGC(blob_storage.get()); - - if (blob_gc) { - cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id); - assert(column_family_id == cfh->GetID()); - blob_gc->SetColumnFamily(cfh.get()); - } + blob_gc = blob_gc_picker->PickBlobGC( + blob_storage.get(), + /*allow_punch_hole=*/cf_options.punch_hole_threshold > 0 && + pending_punch_hole_gc_ == nullptr); } Status s; // TODO(@DorianZheng) Make sure enough room for GC - if (UNLIKELY(!blob_gc)) { + if (UNLIKELY(blob_gc == nullptr)) { RecordTick(statistics(stats_.get()), TITAN_GC_NO_NEED, 1); // Nothing to do TITAN_LOG_BUFFER(log_buffer, "Titan GC nothing to do"); + TEST_SYNC_POINT("TitanDBImpl::BackgroundGC:NothingToDo"); + return s; + } + if (blob_gc->punch_hole_gc()) { + auto snapshot = db_impl_->GetSnapshot(); + pending_punch_hole_gc_ = std::unique_ptr(new PunchHoleGCJob( + column_family_id, std::move(blob_gc), db_impl_, db_options_, env_, + env_options_, snapshot, &shuting_down_)); + if (!MaybeRunPendingPunchHoleGC()) { + MaybeScheduleGC(); + } else { + TEST_SYNC_POINT("TitanDBImpl::BackgroundGC:RunPunchHoleGCRightAway"); + } } else { + cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id); + assert(column_family_id == cfh->GetID()); + blob_gc->SetColumnFamily(cfh.get()); + StopWatch gc_sw(env_->GetSystemClock().get(), statistics(stats_.get()), TITAN_GC_MICROS); BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_, diff --git a/src/options.cc b/src/options.cc index 22b7d241..0cb34132 100644 --- a/src/options.cc +++ b/src/options.cc @@ -45,7 +45,7 @@ TitanCFOptions::TitanCFOptions(const ColumnFamilyOptions& cf_opts, skip_value_in_compaction_filter( immutable_opts.skip_value_in_compaction_filter), block_size(immutable_opts.block_size), - enable_punch_hole_gc(immutable_opts.enable_punch_hole_gc) {} + punch_hole_threshold(mutable_opts.punch_hole_threshold) {} void TitanCFOptions::Dump(Logger* logger) const { TITAN_LOG_HEADER(logger, @@ -90,6 +90,12 @@ void TitanCFOptions::Dump(Logger* logger) const { TITAN_LOG_HEADER(logger, "TitanCFOptions.merge_small_file_threshold : %" PRIu64, merge_small_file_threshold); + TITAN_LOG_HEADER(logger, + "TtitanCFOptions.block_size : %" PRIu64, + block_size); + TITAN_LOG_HEADER(logger, + "TitanCFOptions.punch_hole_threshold : %" PRIu64, + punch_hole_threshold); std::string blob_run_mode_str = "unknown"; if (blob_run_mode_to_string.count(blob_run_mode) > 0) { blob_run_mode_str = blob_run_mode_to_string.at(blob_run_mode); @@ -104,6 +110,7 @@ void TitanCFOptions::UpdateMutableOptions( min_blob_size = new_options.min_blob_size; blob_file_compression = new_options.blob_file_compression; blob_file_discardable_ratio = new_options.blob_file_discardable_ratio; + punch_hole_threshold = new_options.punch_hole_threshold; } std::map diff --git a/src/punch_hole_gc_job.cc b/src/punch_hole_gc_job.cc new file mode 100644 index 00000000..672c3e95 --- /dev/null +++ b/src/punch_hole_gc_job.cc @@ -0,0 +1,169 @@ +#include "punch_hole_gc_job.h" + +#include +#include +#include + +#include "db/db_impl/db_impl.h" + +#include "blob_file_iterator.h" +#include "blob_file_reader.h" +#include "titan_logging.h" + +namespace rocksdb { +namespace titandb { + +Status PunchHoleGCJob::Run() { + Status s; + auto cfh = base_db_impl_->GetColumnFamilyHandleUnlocked(cf_id_); + assert(cf_id_ == cfh->GetID()); + blob_gc_->SetColumnFamily(cfh.get()); + s = HolePunchBlobFiles(); + if (!s.ok()) { + return s; + } + return Status::OK(); +} + +Status PunchHoleGCJob::HolePunchBlobFiles() { + for (const auto& file : blob_gc_->inputs()) { + if (IsShutingDown()) { + return Status::ShutdownInProgress(); + } + Status s = HolePunchSingleBlobFile(file); + if (!s.ok()) { + TITAN_LOG_ERROR(db_options_.info_log, + "Hole punch file %" PRIu64 " failed: %s", + file->file_number(), s.ToString().c_str()); + + return s; + } + } + return Status::OK(); +} + +Status PunchHoleGCJob::HolePunchSingleBlobFile( + std::shared_ptr file) { + Status s; + std::unique_ptr file_reader; + s = NewBlobFileReader(file->file_number(), 0, db_options_, env_options_, env_, + &file_reader); + if (!s.ok()) { + return s; + } + auto fd = open(BlobFileName(db_options_.dirname, file->file_number()).c_str(), + O_WRONLY); + uint64_t effective_file_size = 0; + uint64_t aligned_data_size = 0; + std::unique_ptr iter( + new BlobFileIterator(std::move(file_reader), file->file_number(), + file->file_size(), blob_gc_->titan_cf_options())); + iter->SeekToFirst(); + if (!iter->status().ok()) { + return iter->status(); + } + auto block_size = file->block_size(); + for (; iter->Valid(); iter->Next()) { + if (IsShutingDown()) { + return Status::ShutdownInProgress(); + } + BlobIndex blob_index = iter->GetBlobIndex(); + auto key = iter->key(); + bool discardable = false; + s = WhetherToPunchHole(key, blob_index, &discardable); + if (!s.ok()) { + return s; + } + + aligned_data_size = Roundup(blob_index.blob_handle.size, block_size); + + if (!discardable) { + effective_file_size += aligned_data_size; + continue; + } + +#if defined(FALLOC_FL_PUNCH_HOLE) && defined(FALLOC_FL_KEEP_SIZE) + // Hole punch the file at the blob_index.blob_handle.offset with + // blob_index.blob_handle.size aligned to alignment_size. + auto err = fallocate(fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, + blob_index.blob_handle.offset, aligned_data_size); + if (err != 0) { + return Status::IOError("Hole punch failed", strerror(err)); + } +#else + return Status::NotSupported("Hole punch not supported"); +#endif + } + if (!iter->status().ok()) { + return iter->status(); + } + struct stat st; + if (fstat(fd, &st) != 0) { + // Do nothing, so far, this is only for stats. + } + close(fd); + disk_usage_map_[file->file_number()] = st.st_blocks * 512; + effective_file_size_map_[file->file_number()] = effective_file_size; + return Status::OK(); +} + +Status PunchHoleGCJob::WhetherToPunchHole(const Slice& key, + const BlobIndex& blob_index, + bool* discardable) { + // TitanStopWatch sw(env_, metrics_.gc_read_lsm_micros); + assert(discardable != nullptr); + PinnableSlice index_entry; + bool is_blob_index = false; + DBImpl::GetImplOptions gopts; + gopts.column_family = blob_gc_->column_family_handle(); + gopts.value = &index_entry; + gopts.is_blob_index = &is_blob_index; + auto read_opts = ReadOptions(); + read_opts.snapshot = snapshot_; + Status s = base_db_impl_->GetImpl(read_opts, key, gopts); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + // count read bytes for checking LSM entry + // metrics_.gc_bytes_read += key.size() + index_entry.size(); + if (s.IsNotFound() || !is_blob_index) { + // Either the key is deleted or updated with a newer version which is + // inlined in LSM. + *discardable = true; + return Status::OK(); + } + + BlobIndex other_blob_index; + s = other_blob_index.DecodeFrom(&index_entry); + if (!s.ok()) { + return s; + } + + *discardable = !(blob_index == other_blob_index); + return Status::OK(); +} + +void PunchHoleGCJob::UpdateBlobFilesMeta() { + for (auto& file : blob_gc_->inputs()) { + if (file->is_obsolete()) { + continue; + } + auto it = effective_file_size_map_.find(file->file_number()); + if (it == effective_file_size_map_.end()) { + continue; + } + file->set_effective_file_size(it->second); + file->set_disk_uage(disk_usage_map_[file->file_number()]); + } +} + +Status PunchHoleGCJob::Cleanup() { + base_db_impl_->ReleaseSnapshot(snapshot_); + snapshot_ = nullptr; + // Release input files. + blob_gc_->ReleaseGcFiles(); + return Status::OK(); +} + +} // namespace titandb +} // namespace rocksdb \ No newline at end of file diff --git a/src/punch_hole_gc_job.h b/src/punch_hole_gc_job.h new file mode 100644 index 00000000..b9930db0 --- /dev/null +++ b/src/punch_hole_gc_job.h @@ -0,0 +1,68 @@ +#pragma once + +#include + +#include "rocksdb/status.h" + +#include "blob_file_manager.h" +#include "blob_format.h" +#include "blob_gc.h" + +namespace rocksdb { +namespace titandb { + +class PunchHoleGCJob { + public: + PunchHoleGCJob(uint32_t cf_id, std::unique_ptr blob_gc, + DBImpl* base_db_impl, const TitanDBOptions& db_options, + Env* env, const EnvOptions& env_options, + const Snapshot* snapshot, std::atomic_bool* shuting_down) + : cf_id_(cf_id), + blob_gc_(std::move(blob_gc)), + base_db_impl_(base_db_impl), + db_options_(db_options), + env_(env), + env_options_(env_options), + snapshot_(snapshot), + shuting_down_(shuting_down) {} + PunchHoleGCJob(const PunchHoleGCJob&) = delete; + void operator=(const PunchHoleGCJob&) = delete; + ~PunchHoleGCJob() { Cleanup(); }; + + Status Run(); + // REQUIRE: db mutex held + void Finish() { UpdateBlobFilesMeta(); }; + + uint32_t cf_id() const { return cf_id_; } + const Snapshot* snapshot() const { return snapshot_; } + BlobGC* blob_gc() const { return blob_gc_.get(); } + + private: + uint32_t cf_id_; + std::unique_ptr blob_gc_; + DBImpl* base_db_impl_; + TitanDBOptions db_options_; + Env* env_; + EnvOptions env_options_; + const Snapshot* snapshot_; + + std::atomic_bool* shuting_down_{nullptr}; + + std::unordered_map effective_file_size_map_; + std::unordered_map disk_usage_map_; + + // TODO: Add more stats + + Status HolePunchBlobFiles(); + Status HolePunchSingleBlobFile(std::shared_ptr file); + Status WhetherToPunchHole(const Slice& key, const BlobIndex& blob_index, + bool* discardable); + bool IsShutingDown() { + return (shuting_down_ && shuting_down_->load(std::memory_order_acquire)); + } + // REQUIRE: db mutex held + void UpdateBlobFilesMeta(); + Status Cleanup(); +}; +} // namespace titandb +} // namespace rocksdb \ No newline at end of file diff --git a/src/punch_hole_gc_test.cc b/src/punch_hole_gc_test.cc new file mode 100644 index 00000000..139607ee --- /dev/null +++ b/src/punch_hole_gc_test.cc @@ -0,0 +1,347 @@ +#include + +#include "test_util/testharness.h" + +#include "db_impl.h" + +namespace rocksdb { +namespace titandb { +std::string GenKey(int i) { + char buffer[32]; + snprintf(buffer, sizeof(buffer), "k-%08d", i); + return buffer; +} + +std::string GenValue(int i) { + char buffer[32]; + snprintf(buffer, sizeof(buffer), "v-%08d", i); + return buffer; +} + +class PunchHoleGCTest : public testing::Test { + public: + std::string dbname_; + TitanDB* db_; + DBImpl* base_db_; + TitanDBImpl* tdb_; + BlobFileSet* blob_file_set_; + TitanOptions options_; + port::Mutex* mutex_; + + PunchHoleGCTest() : dbname_(test::TmpDir()) { + options_.dirname = dbname_ + "/titandb"; + options_.create_if_missing = true; + options_.min_blob_size = 0; + options_.disable_background_gc = false; + options_.disable_auto_compactions = false; + options_.punch_hole_threshold = 4096; + options_.blob_file_discardable_ratio = 0.8; + options_.env->CreateDirIfMissing(dbname_); + options_.env->CreateDirIfMissing(options_.dirname); + } + ~PunchHoleGCTest() { ClearDir(); } + + void DisableMergeSmall() { options_.merge_small_file_threshold = 0; } + + std::weak_ptr GetBlobStorage(uint32_t cf_id) { + MutexLock l(mutex_); + return blob_file_set_->GetBlobStorage(cf_id); + } + + void ClearDir() { + std::vector filenames; + options_.env->GetChildren(options_.dirname, &filenames); + for (auto& fname : filenames) { + if (fname != "." && fname != "..") { + ASSERT_OK(options_.env->DeleteFile(options_.dirname + "/" + fname)); + } + } + options_.env->DeleteDir(options_.dirname); + filenames.clear(); + options_.env->GetChildren(dbname_, &filenames); + for (auto& fname : filenames) { + if (fname != "." && fname != "..") { + options_.env->DeleteFile(dbname_ + "/" + fname); + } + } + } + + void NewDB() { + ClearDir(); + Open(); + } + + void Open() { + ASSERT_OK(TitanDB::Open(options_, dbname_, &db_)); + tdb_ = reinterpret_cast(db_); + blob_file_set_ = tdb_->blob_file_set_.get(); + mutex_ = &tdb_->mutex_; + base_db_ = reinterpret_cast(tdb_->GetRootDB()); + } + + void Reopen() { + Close(); + Open(); + } + + void Flush() { + FlushOptions fopts; + fopts.wait = true; + ASSERT_OK(db_->Flush(fopts)); + } + + void CompactAll() { + auto opts = db_->GetOptions(); + auto compact_opts = CompactRangeOptions(); + compact_opts.change_level = true; + compact_opts.target_level = opts.num_levels - 1; + compact_opts.bottommost_level_compaction = BottommostLevelCompaction::kSkip; + ASSERT_OK(db_->CompactRange(compact_opts, nullptr, nullptr)); + } + + void Close() { + if (!db_) return; + ASSERT_OK(db_->Close()); + delete db_; + db_ = nullptr; + } +}; + +TEST_F(PunchHoleGCTest, Basic) { +#if defined(FALLOC_FL_PUNCH_HOLE) + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"PunchHoleGCTest::Basic:AfterCompact", + "TitanDBImpl::BackgroundCallGC:BeforeGCRunning"}, + {"TitanDBImpl::BackgroundCallGC:AfterGCRunning", + "PunchHoleGCTest::Basic:BeforeCheckPunchHoleGCIsQueued"}, + {"TitanDBImpl::MaybeRunPendingPunchHoleGC:AfterRunPendingPunchHoleGC", + "PunchHoleGCTest::Basic:BeforeCheckPunchHoleGCIsFinished"}, + {"TitanDBImpl::BackgroundGC:RunPunchHoleGCRightAway", + "PunchHoleGCTest::Basic:" + "BeforeCheckSecondRoundPunchHoleGCIsFinished"}}); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + DisableMergeSmall(); + + NewDB(); + auto b = GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock(); + std::vector values(1000); + for (int i = 0; i < 1000; i++) { + values.push_back(GenValue(i)); + db_->Put(WriteOptions(), GenKey(i), values[i]); + } + Flush(); + std::map> files; + b->ExportBlobFiles(files); + ASSERT_EQ(files.size(), 1); + auto blob_file_number = files.begin()->first; + auto file_size = files.begin()->second.lock()->file_size(); + auto effective_file_size = + files.begin()->second.lock()->effective_file_size(); + for (int i = 0; i < 1000; i++) { + if (i % 3 == 0) { + db_->Delete(WriteOptions(), GenKey(i)); + } + } + Flush(); + CompactAll(); + + files.clear(); + b->ExportBlobFiles(files); + ASSERT_EQ(files.size(), 1); + ASSERT_EQ(files.begin()->second.lock()->GetHolePunchableSize(), 334 * 4096); + ASSERT_EQ(files.begin()->second.lock()->effective_file_size(), 1000 * 4096); + + auto snapshot = db_->GetSnapshot(); + db_->Put(WriteOptions(), GenKey(100000), GenValue(1)); + + TEST_SYNC_POINT("PunchHoleGCTest::Basic:AfterCompact"); + TEST_SYNC_POINT("PunchHoleGCTest::Basic:BeforeCheckPunchHoleGCIsQueued"); + + files.clear(); + b->ExportBlobFiles(files); + ASSERT_EQ(files.size(), 1); + ASSERT_EQ(files.begin()->second.lock()->GetHolePunchableSize(), 334 * 4096); + ASSERT_EQ(files.begin()->second.lock()->effective_file_size(), 1000 * 4096); + + db_->ReleaseSnapshot(snapshot); + TEST_SYNC_POINT("PunchHoleGCTest::Basic:BeforeCheckPunchHoleGCIsFinished"); + + files.clear(); + b->ExportBlobFiles(files); + ASSERT_EQ(files.size(), 1); + auto post_punch_hole_file_size = files.begin()->second.lock()->file_size(); + ASSERT_EQ(post_punch_hole_file_size, file_size); + ASSERT_EQ(files.begin()->second.lock()->effective_file_size(), 666 * 4096); + ASSERT_EQ(files.begin()->second.lock()->GetHolePunchableSize(), 0); + for (int i = 0; i < 1000; i++) { + if (i % 3 != 0) { + std::string value; + db_->Get(ReadOptions(), GenKey(i), &value); + ASSERT_EQ(value, values[i]); + } + } + + for (int i = 0; i < 1000; i++) { + if (i % 3 == 1) { + db_->Delete(WriteOptions(), GenKey(i)); + } + } + Flush(); + CompactAll(); + + TEST_SYNC_POINT( + "PunchHoleGCTest::Basic:BeforeCheckSecondRoundPunchHoleGCIsFinished"); + + files.clear(); + b->ExportBlobFiles(files); + // One blob file for 1000 records inserted at the beginning, one blob file for + // the single record inserted for updating sequence number. + bool punch_hole_gc_finished = false; + ASSERT_EQ(files.size(), 2); + for (auto& file : files) { + if (file.first == blob_file_number) { + punch_hole_gc_finished = true; + ASSERT_EQ(file.second.lock()->GetHolePunchableSize(), 0); + ASSERT_EQ(file.second.lock()->effective_file_size(), 333 * 4096); + } + } + ASSERT_TRUE(punch_hole_gc_finished); + post_punch_hole_file_size = files.begin()->second.lock()->file_size(); + ASSERT_EQ(post_punch_hole_file_size, file_size); + ASSERT_EQ(files.begin()->second.lock()->effective_file_size(), 333 * 4096); + ASSERT_EQ(files.begin()->second.lock()->GetHolePunchableSize(), 0); + for (int i = 0; i < 1000; i++) { + if (i % 3 == 2) { + std::string value; + db_->Get(ReadOptions(), GenKey(i), &value); + ASSERT_EQ(value, values[i]); + } + } + Close(); +#endif +} + +TEST_F(PunchHoleGCTest, NonFSDefaultBlockSize) { +#if defined(FALLOC_FL_PUNCH_HOLE) + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"PunchHoleGCTest::Basic:AfterCompact", + "TitanDBImpl::BackgroundCallGC:BeforeGCRunning"}, + {"TitanDBImpl::BackgroundCallGC:AfterGCRunning", + "PunchHoleGCTest::Basic:BeforeCheckPunchHoleGCIsQueued"}, + {"TitanDBImpl::MaybeRunPendingPunchHoleGC:AfterRunPendingPunchHoleGC", + "PunchHoleGCTest::Basic:BeforeCheckPunchHoleGCIsFinished"}, + {"TitanDBImpl::BackgroundGC:RunPunchHoleGCRightAway", + "PunchHoleGCTest::Basic:" + "BeforeCheckSecondRoundPunchHoleGCIsFinished"}}); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + options_.block_size = 1024; + + DisableMergeSmall(); + + NewDB(); + auto b = GetBlobStorage(base_db_->DefaultColumnFamily()->GetID()).lock(); + std::vector values(1000); + for (int i = 0; i < 1000; i++) { + values.push_back(GenValue(i)); + db_->Put(WriteOptions(), GenKey(i), values[i]); + } + Flush(); + std::map> files; + b->ExportBlobFiles(files); + ASSERT_EQ(files.size(), 1); + auto blob_file_number = files.begin()->first; + auto file_size = files.begin()->second.lock()->file_size(); + auto effective_file_size = + files.begin()->second.lock()->effective_file_size(); + for (int i = 0; i < 1000; i++) { + if (i % 3 == 0) { + db_->Delete(WriteOptions(), GenKey(i)); + } + } + Flush(); + CompactAll(); + + files.clear(); + b->ExportBlobFiles(files); + ASSERT_EQ(files.size(), 1); + ASSERT_EQ(files.begin()->second.lock()->GetHolePunchableSize(), 334 * 1024); + ASSERT_EQ(files.begin()->second.lock()->effective_file_size(), 1000 * 1024); + + auto snapshot = db_->GetSnapshot(); + db_->Put(WriteOptions(), GenKey(100000), GenValue(1)); + + TEST_SYNC_POINT("PunchHoleGCTest::Basic:AfterCompact"); + TEST_SYNC_POINT("PunchHoleGCTest::Basic:BeforeCheckPunchHoleGCIsQueued"); + + files.clear(); + b->ExportBlobFiles(files); + ASSERT_EQ(files.size(), 1); + ASSERT_EQ(files.begin()->second.lock()->GetHolePunchableSize(), 334 * 1024); + ASSERT_EQ(files.begin()->second.lock()->effective_file_size(), 1000 * 1024); + + db_->ReleaseSnapshot(snapshot); + TEST_SYNC_POINT("PunchHoleGCTest::Basic:BeforeCheckPunchHoleGCIsFinished"); + + files.clear(); + b->ExportBlobFiles(files); + ASSERT_EQ(files.size(), 1); + auto post_punch_hole_file_size = files.begin()->second.lock()->file_size(); + ASSERT_EQ(post_punch_hole_file_size, file_size); + ASSERT_EQ(files.begin()->second.lock()->effective_file_size(), 666 * 1024); + ASSERT_EQ(files.begin()->second.lock()->GetHolePunchableSize(), 0); + for (int i = 0; i < 1000; i++) { + if (i % 3 != 0) { + std::string value; + db_->Get(ReadOptions(), GenKey(i), &value); + ASSERT_EQ(value, values[i]); + } + } + + for (int i = 0; i < 1000; i++) { + if (i % 3 == 1) { + db_->Delete(WriteOptions(), GenKey(i)); + } + } + Flush(); + CompactAll(); + + TEST_SYNC_POINT( + "PunchHoleGCTest::Basic:BeforeCheckSecondRoundPunchHoleGCIsFinished"); + + files.clear(); + b->ExportBlobFiles(files); + // One blob file for 1000 records inserted at the beginning, one blob file for + // the single record inserted for updating sequence number. + bool punch_hole_gc_finished = false; + ASSERT_EQ(files.size(), 2); + for (auto& file : files) { + if (file.first == blob_file_number) { + punch_hole_gc_finished = true; + ASSERT_EQ(file.second.lock()->GetHolePunchableSize(), 0); + ASSERT_EQ(file.second.lock()->effective_file_size(), 333 * 1024); + } + } + ASSERT_TRUE(punch_hole_gc_finished); + post_punch_hole_file_size = files.begin()->second.lock()->file_size(); + ASSERT_EQ(post_punch_hole_file_size, file_size); + ASSERT_EQ(files.begin()->second.lock()->effective_file_size(), 333 * 1024); + ASSERT_EQ(files.begin()->second.lock()->GetHolePunchableSize(), 0); + for (int i = 0; i < 1000; i++) { + if (i % 3 == 2) { + std::string value; + db_->Get(ReadOptions(), GenKey(i), &value); + ASSERT_EQ(value, values[i]); + } + } + Close(); +#endif +} + +} // namespace titandb +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file diff --git a/src/table_builder.cc b/src/table_builder.cc index acf8d642..4a083b58 100644 --- a/src/table_builder.cc +++ b/src/table_builder.cc @@ -239,8 +239,9 @@ void TitanTableBuilder::FinishBlobFile() { blob_handle_->GetNumber(), blob_handle_->GetFile()->GetFileSize(), blob_builder_->NumEntries(), target_level_, blob_builder_->GetSmallestKey(), blob_builder_->GetLargestKey(), - /*block_size=*/0); + blob_builder_->GetBlockSize()); file->set_live_data_size(blob_builder_->live_data_size()); + file->set_effective_file_size(blob_builder_->live_data_size()); file->FileStateTransit(BlobFileMeta::FileEvent::kFlushOrCompactionOutput); finished_blobs_.push_back({file, std::move(blob_handle_)}); // level merge is performed diff --git a/src/table_builder_test.cc b/src/table_builder_test.cc index 7c02912f..cf03ac75 100644 --- a/src/table_builder_test.cc +++ b/src/table_builder_test.cc @@ -18,6 +18,15 @@ const uint64_t kMinBlobSize = 128; const uint64_t kTestFileNumber = 123; const uint64_t kTargetBlobFileSize = 4096; +void DeleteDir(Env* env, const std::string& dirname) { + std::vector filenames; + env->GetChildren(dirname, &filenames); + for (auto& fname : filenames) { + env->DeleteFile(dirname + "/" + fname); + } + env->DeleteDir(dirname); +} + class FileManager : public BlobFileManager { public: FileManager(const TitanDBOptions& db_options, BlobFileSet* blob_file_set) @@ -149,6 +158,8 @@ class TableBuilderTest : public testing::Test { db_options_.dirname = tmpdir_; db_options_.statistics = nullptr; cf_options_.min_blob_size = kMinBlobSize; + DeleteDir(env_, tmpdir_); + env_->CreateDirIfMissing(tmpdir_); Open(); } diff --git a/src/titan_db_test.cc b/src/titan_db_test.cc index b4985786..b59019c2 100644 --- a/src/titan_db_test.cc +++ b/src/titan_db_test.cc @@ -1328,7 +1328,7 @@ TEST_F(TitanDBTest, GCAfterDropCF) { SyncPoint::GetInstance()->LoadDependency( {{"TitanDBTest::GCAfterDropCF:AfterDropCF", "TitanDBImpl::BackgroundCallGC:BeforeGCRunning"}, - {"TitanDBImpl::BackgroundGC:Finish", + {"TitanDBImpl::BackgroundGC:NothingToDo", "TitanDBTest::GCAfterDropCF:WaitGC"}}); SyncPoint::GetInstance()->SetCallBack( "TitanDBImpl::BackgroundGC:CFDropped", diff --git a/src/titan_stats.h b/src/titan_stats.h index a8db5f40..b458437a 100644 --- a/src/titan_stats.h +++ b/src/titan_stats.h @@ -72,6 +72,8 @@ class TitanInternalStats { NUM_DISCARDABLE_RATIO_LE80, NUM_DISCARDABLE_RATIO_LE100, + PENDING_PUNCH_HOLE_SIZE, + INTERNAL_STATS_ENUM_MAX, }; diff --git a/src/util.cc b/src/util.cc index de19468f..f8028b4f 100644 --- a/src/util.cc +++ b/src/util.cc @@ -56,5 +56,9 @@ Status SyncTitanManifest(TitanStats* stats, return file->Sync(db_options->use_fsync); } +uint64_t Roundup(uint64_t offset, uint64_t align) { + return (offset + align - 1) / align * align; +} + } // namespace titandb } // namespace rocksdb diff --git a/src/util.h b/src/util.h index 62d3b8a4..5493c84b 100644 --- a/src/util.h +++ b/src/util.h @@ -82,5 +82,6 @@ Status SyncTitanManifest(TitanStats* stats, const ImmutableDBOptions* db_options, WritableFileWriter* file); +uint64_t Roundup(uint64_t offset, uint64_t align); } // namespace titandb } // namespace rocksdb