Skip to content

Commit

Permalink
Add compaction prefetching internal stats (#13302)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: #13302

Differential Revision: D68224419

Pulled By: archang19
  • Loading branch information
archang19 authored and facebook-github-bot committed Jan 17, 2025
1 parent 9798807 commit d1ec645
Show file tree
Hide file tree
Showing 52 changed files with 211 additions and 116 deletions.
1 change: 1 addition & 0 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ Status BuildTable(
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
tboptions.read_options, file_options, tboptions.internal_comparator,
*meta, nullptr /* range_del_agg */, mutable_cf_options, nullptr,
internal_stats,
(internal_stats == nullptr) ? nullptr
: internal_stats->GetFileReadHist(0),
TableReaderCaller::kFlush, /*arena=*/nullptr,
Expand Down
2 changes: 1 addition & 1 deletion db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ class ColumnFamilyData {
const DBOptions& db_options,
const std::unordered_map<std::string, std::string>& options_map);

InternalStats* internal_stats() { return internal_stats_.get(); }
InternalStats* internal_stats() const { return internal_stats_.get(); }

MemTableList* imm() { return &imm_; }
MemTable* mem() { return mem_; }
Expand Down
1 change: 1 addition & 0 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,7 @@ Status CompactionJob::Run() {
/*range_del_agg=*/nullptr,
compact_->compaction->mutable_cf_options(),
/*table_reader_ptr=*/nullptr,
cfd->internal_stats(),
cfd->internal_stats()->GetFileReadHist(
compact_->compaction->output_level()),
TableReaderCaller::kCompactionRefill, /*arena=*/nullptr,
Expand Down
4 changes: 2 additions & 2 deletions db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -463,11 +463,11 @@ class CompactionJobTestBase : public testing::Test {
TableReaderOptions(cfd->ioptions(), nullptr, FileOptions(),
cfd_->internal_comparator(),
0 /* block_protection_bytes_per_key */),
std::move(freader), file_size, &table_reader, false);
std::move(freader), file_size, &table_reader, /*internal_stast=*/nullptr, false);
ASSERT_OK(s);
assert(table_reader);
std::unique_ptr<InternalIterator> iiter(
table_reader->NewIterator(read_opts, nullptr, nullptr, true,
table_reader->NewIterator(read_opts, nullptr, nullptr, nullptr, true,
TableReaderCaller::kUncategorized));
assert(iiter);

Expand Down
15 changes: 9 additions & 6 deletions db/convenience.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,18 @@ Status DeleteFilesInRanges(DB* db, ColumnFamilyHandle* column_family,

Status VerifySstFileChecksum(const Options& options,
const EnvOptions& env_options,
const std::string& file_path) {
const std::string& file_path,
InternalStats* internal_stats) {
// TODO: plumb Env::IOActivity, Env::IOPriority
const ReadOptions read_options;
return VerifySstFileChecksum(options, env_options, read_options, file_path);
return VerifySstFileChecksum(options, env_options, read_options, file_path, /*largest_seqno=*/0, internal_stats);
}
Status VerifySstFileChecksum(const Options& options,
const EnvOptions& env_options,
const ReadOptions& _read_options,
const std::string& file_path,
const SequenceNumber& largest_seqno) {
const SequenceNumber& largest_seqno,
InternalStats* internal_stats) {
if (_read_options.io_activity != Env::IOActivity::kUnknown) {
return Status::InvalidArgument(
"Can only call VerifySstFileChecksum with `ReadOptions::io_activity` "
Expand All @@ -50,14 +52,15 @@ Status VerifySstFileChecksum(const Options& options,
}
ReadOptions read_options(_read_options);
return VerifySstFileChecksumInternal(options, env_options, read_options,
file_path, largest_seqno);
file_path, largest_seqno, internal_stats);
}

Status VerifySstFileChecksumInternal(const Options& options,
const EnvOptions& env_options,
const ReadOptions& read_options,
const std::string& file_path,
const SequenceNumber& largest_seqno) {
const SequenceNumber& largest_seqno,
InternalStats* internal_stats) {
std::unique_ptr<FSRandomAccessFile> file;
uint64_t file_size;
InternalKeyComparator internal_comparator(options.comparator);
Expand Down Expand Up @@ -88,7 +91,7 @@ Status VerifySstFileChecksumInternal(const Options& options,
reader_options.largest_seqno = largest_seqno;
s = options.table_factory->NewTableReader(
read_options, reader_options, std::move(file_reader), file_size,
&table_reader, false /* prefetch_index_and_filter_in_cache */);
&table_reader, internal_stats, false /* prefetch_index_and_filter_in_cache */);
if (!s.ok()) {
return s;
}
Expand Down
5 changes: 4 additions & 1 deletion db/convenience_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
#pragma once
#include "rocksdb/db.h"

class InternalStats;

namespace ROCKSDB_NAMESPACE {
Status VerifySstFileChecksumInternal(const Options& options,
const EnvOptions& env_options,
const ReadOptions& read_options,
const std::string& file_path,
const SequenceNumber& largest_seqno = 0);
const SequenceNumber& largest_seqno,
InternalStats* internal_stats);
} // namespace ROCKSDB_NAMESPACE
3 changes: 2 additions & 1 deletion db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6548,7 +6548,8 @@ Status DBImpl::VerifyChecksumInternal(const ReadOptions& read_options,
read_options);
} else {
s = ROCKSDB_NAMESPACE::VerifySstFileChecksumInternal(
opts, file_options_, read_options, fname, fd.largest_seqno);
opts, file_options_, read_options, fname, fd.largest_seqno,
default_cf_internal_stats_);
}
RecordTick(stats_, VERIFY_CHECKSUM_READ_BYTES,
IOSTATS(bytes_read) - prev_bytes_read);
Expand Down
3 changes: 2 additions & 1 deletion db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ Status ExternalSstFileIngestionJob::ResetTableReader(
/* unique_id */ {}, /* largest_seqno */ 0,
/* tail_size */ 0, user_defined_timestamps_persisted),
std::move(sst_file_reader), file_to_ingest->file_size, table_reader,
/*internal_stats=*/nullptr,
// No need to prefetch index/filter if caching is not needed.
/*prefetch_index_and_filter_in_cache=*/ingestion_options_.fill_cache);
return status;
Expand Down Expand Up @@ -922,7 +923,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
ReadOptions ro;
ro.fill_cache = ingestion_options_.fill_cache;
std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr, /*internal_stats=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));

// Get first (smallest) and last (largest) key from file.
Expand Down
7 changes: 4 additions & 3 deletions db/forward_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class ForwardLevelIterator : public InternalIterator {
*files_[file_index_],
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
mutable_cf_options_, /*table_reader_ptr=*/nullptr,
cfd_->internal_stats(),
/*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator,
/*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1,
/*max_file_size_for_l0_meta_pin=*/0,
Expand Down Expand Up @@ -749,7 +750,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0,
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
sv_->mutable_cf_options,
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
/*table_reader_ptr=*/nullptr, cfd_->internal_stats(),/*file_read_hist=*/nullptr,
TableReaderCaller::kUserIterator, /*arena=*/nullptr,
/*skip_filters=*/false, /*level=*/-1,
MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
Expand Down Expand Up @@ -837,7 +838,7 @@ void ForwardIterator::RenewIterators() {
*l0_files_new[inew],
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
svnew->mutable_cf_options,
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
/*table_reader_ptr=*/nullptr, cfd_->internal_stats(),/*file_read_hist=*/nullptr,
TableReaderCaller::kUserIterator, /*arena=*/nullptr,
/*skip_filters=*/false, /*level=*/-1,
MaxFileSizeForL0MetaPin(svnew->mutable_cf_options),
Expand Down Expand Up @@ -902,7 +903,7 @@ void ForwardIterator::ResetIncompleteIterators() {
l0_iters_[i] = cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
*l0_files[i], /*range_del_agg=*/nullptr, sv_->mutable_cf_options,
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
/*table_reader_ptr=*/nullptr, cfd_->internal_stats(),/*file_read_hist=*/nullptr,
TableReaderCaller::kUserIterator, /*arena=*/nullptr,
/*skip_filters=*/false, /*level=*/-1,
MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
Expand Down
4 changes: 2 additions & 2 deletions db/import_column_family_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo(
/*block_cache_tracer*/ nullptr,
/*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
/*cur_file_num*/ new_file_number),
std::move(sst_file_reader), file_to_import->file_size, &table_reader);
std::move(sst_file_reader), file_to_import->file_size, &table_reader, /*internal_stats=*/nullptr);
if (!status.ok()) {
return status;
}
Expand All @@ -361,7 +361,7 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo(
// TODO: plumb Env::IOActivity, Env::IOPriority
ReadOptions ro;
std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr, /*internal_stats=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));

// Get first (smallest) key from file
Expand Down
12 changes: 12 additions & 0 deletions db/internal_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class InternalStats {
// So we should improve, rename or clarify it
kIntStatsWriteStallMicros,
kIntStatsWriteBufferManagerLimitStopsCounts,
kIntStatsPrefetchBufferSizeBytes,
kIntStatsNumMax,
};

Expand Down Expand Up @@ -615,6 +616,17 @@ class InternalStats {
}
}

void SubDBStats(InternalDBStatsType type, uint64_t value,
bool concurrent = false) {
auto& v = db_stats_[type];
if (concurrent) {
v.fetch_sub(value, std::memory_order_relaxed);
} else {
v.store(v.load(std::memory_order_relaxed) - value,
std::memory_order_relaxed);
}
}

uint64_t GetDBStats(InternalDBStatsType type) {
return db_stats_[type].load(std::memory_order_relaxed);
}
Expand Down
2 changes: 1 addition & 1 deletion db/plain_table_db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ class TestPlainTableFactory : public PlainTableFactory {
Status NewTableReader(
const ReadOptions& /*ro*/, const TableReaderOptions& table_reader_options,
std::unique_ptr<RandomAccessFileReader>&& file, uint64_t file_size,
std::unique_ptr<TableReader>* table,
std::unique_ptr<TableReader>* table,InternalStats* /*internal_stats*/,
bool /*prefetch_index_and_filter_in_cache*/) const override {
std::unique_ptr<TableProperties> props;
const ReadOptions read_options;
Expand Down
2 changes: 1 addition & 1 deletion db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ class Repairer {
InternalIterator* iter = table_cache_->NewIterator(
ropts, file_options_, cfd->internal_comparator(), t->meta,
nullptr /* range_del_agg */, cfd->GetLatestMutableCFOptions(),
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
/*table_reader_ptr=*/nullptr, cfd == nullptr ? nullptr : cfd->internal_stats(),/*file_read_hist=*/nullptr,
TableReaderCaller::kRepair, /*arena=*/nullptr, /*skip_filters=*/false,
/*level=*/-1, /*max_file_size_for_l0_meta_pin=*/0,
/*smallest_compaction_key=*/nullptr,
Expand Down
6 changes: 4 additions & 2 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ Status TableCache::GetTableReader(
file_meta.fd.GetNumber(), expected_unique_id,
file_meta.fd.largest_seqno, file_meta.tail_size,
file_meta.user_defined_timestamps_persisted),
std::move(file_reader), file_meta.fd.GetFileSize(), table_reader,
std::move(file_reader), file_meta.fd.GetFileSize(), table_reader, /*internal_stats=*/nullptr,
prefetch_index_and_filter_in_cache);
TEST_SYNC_POINT("TableCache::GetTableReader:0");
}
Expand Down Expand Up @@ -222,14 +222,15 @@ InternalIterator* TableCache::NewIterator(
const InternalKeyComparator& icomparator, const FileMetaData& file_meta,
RangeDelAggregator* range_del_agg,
const MutableCFOptions& mutable_cf_options, TableReader** table_reader_ptr,
InternalStats* internal_stats,
HistogramImpl* file_read_hist, TableReaderCaller caller, Arena* arena,
bool skip_filters, int level, size_t max_file_size_for_l0_meta_pin,
const InternalKey* smallest_compaction_key,
const InternalKey* largest_compaction_key, bool allow_unprepared_value,
const SequenceNumber* read_seqno,
std::unique_ptr<TruncatedRangeDelIterator>* range_del_iter) {
PERF_TIMER_GUARD(new_table_iterator_nanos);

(void) internal_stats;
Status s;
TableReader* table_reader = nullptr;
TypedHandle* handle = nullptr;
Expand Down Expand Up @@ -258,6 +259,7 @@ InternalIterator* TableCache::NewIterator(
} else {
result = table_reader->NewIterator(
options, mutable_cf_options.prefix_extractor.get(), arena,
internal_stats,
skip_filters, caller, file_options.compaction_readahead_size,
allow_unprepared_value);
}
Expand Down
3 changes: 2 additions & 1 deletion db/table_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Arena;
struct FileDescriptor;
class GetContext;
class HistogramImpl;
class InternalStats;

// Manages caching for TableReader objects for a column family. The actual
// cache is allocated separately and passed to the constructor. TableCache
Expand Down Expand Up @@ -93,7 +94,7 @@ class TableCache {
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, RangeDelAggregator* range_del_agg,
const MutableCFOptions& mutable_cf_options,
TableReader** table_reader_ptr, HistogramImpl* file_read_hist,
TableReader** table_reader_ptr, InternalStats* internal_stats, HistogramImpl* file_read_hist,
TableReaderCaller caller, Arena* arena, bool skip_filters, int level,
size_t max_file_size_for_l0_meta_pin,
const InternalKey* smallest_compaction_key,
Expand Down
14 changes: 11 additions & 3 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,7 @@ class LevelIterator final : public InternalIterator {
TableCache* table_cache, const ReadOptions& read_options,
const FileOptions& file_options, const InternalKeyComparator& icomparator,
const LevelFilesBrief* flevel, const MutableCFOptions& mutable_cf_options,
bool should_sample, HistogramImpl* file_read_hist,
bool should_sample, InternalStats* internal_stats,HistogramImpl* file_read_hist,
TableReaderCaller caller, bool skip_filters, int level,
RangeDelAggregator* range_del_agg,
const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries =
Expand All @@ -987,6 +987,7 @@ class LevelIterator final : public InternalIterator {
flevel_(flevel),
mutable_cf_options_(mutable_cf_options),
prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
internal_stats_(internal_stats),
file_read_hist_(file_read_hist),
caller_(caller),
file_index_(flevel_->num_files),
Expand Down Expand Up @@ -1149,7 +1150,7 @@ class LevelIterator final : public InternalIterator {
return table_cache_->NewIterator(
read_options_, file_options_, icomparator_, *file_meta.file_metadata,
range_del_agg_, mutable_cf_options_,
nullptr /* don't need reference to table */, file_read_hist_, caller_,
nullptr /* don't need reference to table */, internal_stats_, file_read_hist_, caller_,
/*arena=*/nullptr, skip_filters_, level_,
/*max_file_size_for_l0_meta_pin=*/0, smallest_compaction_key,
largest_compaction_key, allow_unprepared_value_, &read_seq_,
Expand Down Expand Up @@ -1180,6 +1181,7 @@ class LevelIterator final : public InternalIterator {
const MutableCFOptions& mutable_cf_options_;
const SliceTransform* prefix_extractor_;

InternalStats* internal_stats_;
HistogramImpl* file_read_hist_;
TableReaderCaller caller_;
size_t file_index_;
Expand Down Expand Up @@ -1931,6 +1933,7 @@ InternalIterator* Version::TEST_GetLevelIterator(
cfd_->table_cache(), read_options, file_options_,
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
mutable_cf_options_, should_sample_file_read(),
cfd_->internal_stats(),
cfd_->internal_stats()->GetFileReadHist(level),
TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
nullptr /* range_del_agg */, nullptr /* compaction_boundaries */,
Expand Down Expand Up @@ -2038,7 +2041,7 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
auto table_iter = cfd_->table_cache()->NewIterator(
read_options, soptions, cfd_->internal_comparator(),
*file.file_metadata, /*range_del_agg=*/nullptr, mutable_cf_options_,
nullptr, cfd_->internal_stats()->GetFileReadHist(0),
nullptr, cfd_->internal_stats(), cfd_->internal_stats()->GetFileReadHist(0),
TableReaderCaller::kUserIterator, arena,
/*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
/*smallest_compaction_key=*/nullptr,
Expand Down Expand Up @@ -2070,6 +2073,7 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
cfd_->table_cache(), read_options, soptions,
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
mutable_cf_options_, should_sample_file_read(),
cfd_->internal_stats(),
cfd_->internal_stats()->GetFileReadHist(level),
TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
/*range_del_agg=*/nullptr,
Expand Down Expand Up @@ -2111,6 +2115,7 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
ScopedArenaPtr<InternalIterator> iter(cfd_->table_cache()->NewIterator(
read_options, file_options, cfd_->internal_comparator(),
*file->file_metadata, &range_del_agg, mutable_cf_options_, nullptr,
cfd_->internal_stats(),
cfd_->internal_stats()->GetFileReadHist(0),
TableReaderCaller::kUserIterator, &arena,
/*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
Expand All @@ -2129,6 +2134,7 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
cfd_->table_cache(), read_options, file_options,
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
mutable_cf_options_, should_sample_file_read(),
cfd_->internal_stats(),
cfd_->internal_stats()->GetFileReadHist(level),
TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
&range_del_agg, nullptr, false));
Expand Down Expand Up @@ -7075,6 +7081,7 @@ InternalIterator* VersionSet::MakeInputIterator(
cfd->internal_comparator(), fmd, range_del_agg,
c->mutable_cf_options(),
/*table_reader_ptr=*/nullptr,
cfd->internal_stats(),
/*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
/*arena=*/nullptr,
/*skip_filters=*/false,
Expand All @@ -7096,6 +7103,7 @@ InternalIterator* VersionSet::MakeInputIterator(
cfd->table_cache(), read_options, file_options_compactions,
cfd->internal_comparator(), flevel, c->mutable_cf_options(),
/*should_sample=*/false,
cfd->internal_stats(),
/*no per level latency histogram=*/nullptr,
TableReaderCaller::kCompaction, /*skip_filters=*/false,
/*level=*/static_cast<int>(c->level(which)), range_del_agg,
Expand Down
Loading

0 comments on commit d1ec645

Please sign in to comment.