Skip to content

Commit

Permalink
Add compaction prefetching internal stats
Browse files Browse the repository at this point in the history
Differential Revision: D68224419
  • Loading branch information
archang19 authored and facebook-github-bot committed Jan 16, 2025
1 parent b333358 commit c76a690
Show file tree
Hide file tree
Showing 44 changed files with 125 additions and 98 deletions.
1 change: 1 addition & 0 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ Status BuildTable(
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
tboptions.read_options, file_options, tboptions.internal_comparator,
*meta, nullptr /* range_del_agg */, mutable_cf_options, nullptr,
(internal_stats == nullptr) ? nullptr : internal_stats,
(internal_stats == nullptr) ? nullptr
: internal_stats->GetFileReadHist(0),
TableReaderCaller::kFlush, /*arena=*/nullptr,
Expand Down
2 changes: 1 addition & 1 deletion db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ class ColumnFamilyData {
const DBOptions& db_options,
const std::unordered_map<std::string, std::string>& options_map);

InternalStats* internal_stats() { return internal_stats_.get(); }
InternalStats* internal_stats() const { return internal_stats_.get(); }

MemTableList* imm() { return &imm_; }
MemTable* mem() { return mem_; }
Expand Down
1 change: 1 addition & 0 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,7 @@ Status CompactionJob::Run() {
/*range_del_agg=*/nullptr,
compact_->compaction->mutable_cf_options(),
/*table_reader_ptr=*/nullptr,
cfd->internal_stats(),
cfd->internal_stats()->GetFileReadHist(
compact_->compaction->output_level()),
TableReaderCaller::kCompactionRefill, /*arena=*/nullptr,
Expand Down
4 changes: 2 additions & 2 deletions db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -463,11 +463,11 @@ class CompactionJobTestBase : public testing::Test {
TableReaderOptions(cfd->ioptions(), nullptr, FileOptions(),
cfd_->internal_comparator(),
0 /* block_protection_bytes_per_key */),
std::move(freader), file_size, &table_reader, false);
std::move(freader), file_size, &table_reader, /*internal_stast=*/nullptr, false);
ASSERT_OK(s);
assert(table_reader);
std::unique_ptr<InternalIterator> iiter(
table_reader->NewIterator(read_opts, nullptr, nullptr, true,
table_reader->NewIterator(read_opts, nullptr, nullptr, nullptr, true,
TableReaderCaller::kUncategorized));
assert(iiter);

Expand Down
2 changes: 1 addition & 1 deletion db/convenience.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ Status VerifySstFileChecksumInternal(const Options& options,
reader_options.largest_seqno = largest_seqno;
s = options.table_factory->NewTableReader(
read_options, reader_options, std::move(file_reader), file_size,
&table_reader, false /* prefetch_index_and_filter_in_cache */);
&table_reader, /*internal_stats=*/nullptr, false /* prefetch_index_and_filter_in_cache */);
if (!s.ok()) {
return s;
}
Expand Down
3 changes: 2 additions & 1 deletion db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ Status ExternalSstFileIngestionJob::ResetTableReader(
/* unique_id */ {}, /* largest_seqno */ 0,
/* tail_size */ 0, user_defined_timestamps_persisted),
std::move(sst_file_reader), file_to_ingest->file_size, table_reader,
/*internal_stats=*/nullptr,
// No need to prefetch index/filter if caching is not needed.
/*prefetch_index_and_filter_in_cache=*/ingestion_options_.fill_cache);
return status;
Expand Down Expand Up @@ -922,7 +923,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
ReadOptions ro;
ro.fill_cache = ingestion_options_.fill_cache;
std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr, /*internal_stats=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));

// Get first (smallest) and last (largest) key from file.
Expand Down
7 changes: 4 additions & 3 deletions db/forward_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class ForwardLevelIterator : public InternalIterator {
*files_[file_index_],
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
mutable_cf_options_, /*table_reader_ptr=*/nullptr,
cfd_->internal_stats(),
/*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator,
/*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1,
/*max_file_size_for_l0_meta_pin=*/0,
Expand Down Expand Up @@ -749,7 +750,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0,
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
sv_->mutable_cf_options,
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
/*table_reader_ptr=*/nullptr, cfd_->internal_stats(),/*file_read_hist=*/nullptr,
TableReaderCaller::kUserIterator, /*arena=*/nullptr,
/*skip_filters=*/false, /*level=*/-1,
MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
Expand Down Expand Up @@ -837,7 +838,7 @@ void ForwardIterator::RenewIterators() {
*l0_files_new[inew],
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
svnew->mutable_cf_options,
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
/*table_reader_ptr=*/nullptr, cfd_->internal_stats(),/*file_read_hist=*/nullptr,
TableReaderCaller::kUserIterator, /*arena=*/nullptr,
/*skip_filters=*/false, /*level=*/-1,
MaxFileSizeForL0MetaPin(svnew->mutable_cf_options),
Expand Down Expand Up @@ -902,7 +903,7 @@ void ForwardIterator::ResetIncompleteIterators() {
l0_iters_[i] = cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
*l0_files[i], /*range_del_agg=*/nullptr, sv_->mutable_cf_options,
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
/*table_reader_ptr=*/nullptr, cfd_->internal_stats(),/*file_read_hist=*/nullptr,
TableReaderCaller::kUserIterator, /*arena=*/nullptr,
/*skip_filters=*/false, /*level=*/-1,
MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
Expand Down
4 changes: 2 additions & 2 deletions db/import_column_family_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo(
/*block_cache_tracer*/ nullptr,
/*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
/*cur_file_num*/ new_file_number),
std::move(sst_file_reader), file_to_import->file_size, &table_reader);
std::move(sst_file_reader), file_to_import->file_size, &table_reader, /*internal_stats=*/nullptr);
if (!status.ok()) {
return status;
}
Expand All @@ -361,7 +361,7 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo(
// TODO: plumb Env::IOActivity, Env::IOPriority
ReadOptions ro;
std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr, /*internal_stats=*/nullptr,
/*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));

// Get first (smallest) key from file
Expand Down
2 changes: 1 addition & 1 deletion db/plain_table_db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ class TestPlainTableFactory : public PlainTableFactory {
Status NewTableReader(
const ReadOptions& /*ro*/, const TableReaderOptions& table_reader_options,
std::unique_ptr<RandomAccessFileReader>&& file, uint64_t file_size,
std::unique_ptr<TableReader>* table,
std::unique_ptr<TableReader>* table,InternalStats* /*internal_stats*/,
bool /*prefetch_index_and_filter_in_cache*/) const override {
std::unique_ptr<TableProperties> props;
const ReadOptions read_options;
Expand Down
2 changes: 1 addition & 1 deletion db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ class Repairer {
InternalIterator* iter = table_cache_->NewIterator(
ropts, file_options_, cfd->internal_comparator(), t->meta,
nullptr /* range_del_agg */, cfd->GetLatestMutableCFOptions(),
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
/*table_reader_ptr=*/nullptr, cfd == nullptr ? nullptr : cfd->internal_stats(),/*file_read_hist=*/nullptr,
TableReaderCaller::kRepair, /*arena=*/nullptr, /*skip_filters=*/false,
/*level=*/-1, /*max_file_size_for_l0_meta_pin=*/0,
/*smallest_compaction_key=*/nullptr,
Expand Down
6 changes: 4 additions & 2 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ Status TableCache::GetTableReader(
file_meta.fd.GetNumber(), expected_unique_id,
file_meta.fd.largest_seqno, file_meta.tail_size,
file_meta.user_defined_timestamps_persisted),
std::move(file_reader), file_meta.fd.GetFileSize(), table_reader,
std::move(file_reader), file_meta.fd.GetFileSize(), table_reader, /*internal_stats=*/nullptr,
prefetch_index_and_filter_in_cache);
TEST_SYNC_POINT("TableCache::GetTableReader:0");
}
Expand Down Expand Up @@ -222,14 +222,15 @@ InternalIterator* TableCache::NewIterator(
const InternalKeyComparator& icomparator, const FileMetaData& file_meta,
RangeDelAggregator* range_del_agg,
const MutableCFOptions& mutable_cf_options, TableReader** table_reader_ptr,
InternalStats* internal_stats,
HistogramImpl* file_read_hist, TableReaderCaller caller, Arena* arena,
bool skip_filters, int level, size_t max_file_size_for_l0_meta_pin,
const InternalKey* smallest_compaction_key,
const InternalKey* largest_compaction_key, bool allow_unprepared_value,
const SequenceNumber* read_seqno,
std::unique_ptr<TruncatedRangeDelIterator>* range_del_iter) {
PERF_TIMER_GUARD(new_table_iterator_nanos);

(void) internal_stats;
Status s;
TableReader* table_reader = nullptr;
TypedHandle* handle = nullptr;
Expand Down Expand Up @@ -258,6 +259,7 @@ InternalIterator* TableCache::NewIterator(
} else {
result = table_reader->NewIterator(
options, mutable_cf_options.prefix_extractor.get(), arena,
internal_stats,
skip_filters, caller, file_options.compaction_readahead_size,
allow_unprepared_value);
}
Expand Down
4 changes: 3 additions & 1 deletion db/table_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class Arena;
struct FileDescriptor;
class GetContext;
class HistogramImpl;
class InternalStats;

// Manages caching for TableReader objects for a column family. The actual
// cache is allocated separately and passed to the constructor. TableCache
Expand Down Expand Up @@ -93,7 +94,7 @@ class TableCache {
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, RangeDelAggregator* range_del_agg,
const MutableCFOptions& mutable_cf_options,
TableReader** table_reader_ptr, HistogramImpl* file_read_hist,
TableReader** table_reader_ptr, InternalStats* internal_stats, HistogramImpl* file_read_hist,
TableReaderCaller caller, Arena* arena, bool skip_filters, int level,
size_t max_file_size_for_l0_meta_pin,
const InternalKey* smallest_compaction_key,
Expand Down Expand Up @@ -278,6 +279,7 @@ class TableCache {
Status* read_status,
SequenceNumber seq_no = kMaxSequenceNumber);

InternalStats* internal_stats_;
const ImmutableOptions& ioptions_;
const FileOptions& file_options_;
CacheInterface cache_;
Expand Down
14 changes: 11 additions & 3 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,7 @@ class LevelIterator final : public InternalIterator {
TableCache* table_cache, const ReadOptions& read_options,
const FileOptions& file_options, const InternalKeyComparator& icomparator,
const LevelFilesBrief* flevel, const MutableCFOptions& mutable_cf_options,
bool should_sample, HistogramImpl* file_read_hist,
bool should_sample, InternalStats* internal_stats,HistogramImpl* file_read_hist,
TableReaderCaller caller, bool skip_filters, int level,
RangeDelAggregator* range_del_agg,
const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries =
Expand All @@ -987,6 +987,7 @@ class LevelIterator final : public InternalIterator {
flevel_(flevel),
mutable_cf_options_(mutable_cf_options),
prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
internal_stats_(internal_stats),
file_read_hist_(file_read_hist),
caller_(caller),
file_index_(flevel_->num_files),
Expand Down Expand Up @@ -1149,7 +1150,7 @@ class LevelIterator final : public InternalIterator {
return table_cache_->NewIterator(
read_options_, file_options_, icomparator_, *file_meta.file_metadata,
range_del_agg_, mutable_cf_options_,
nullptr /* don't need reference to table */, file_read_hist_, caller_,
nullptr /* don't need reference to table */, internal_stats_, file_read_hist_, caller_,
/*arena=*/nullptr, skip_filters_, level_,
/*max_file_size_for_l0_meta_pin=*/0, smallest_compaction_key,
largest_compaction_key, allow_unprepared_value_, &read_seq_,
Expand Down Expand Up @@ -1180,6 +1181,7 @@ class LevelIterator final : public InternalIterator {
const MutableCFOptions& mutable_cf_options_;
const SliceTransform* prefix_extractor_;

InternalStats* internal_stats_;
HistogramImpl* file_read_hist_;
TableReaderCaller caller_;
size_t file_index_;
Expand Down Expand Up @@ -1931,6 +1933,7 @@ InternalIterator* Version::TEST_GetLevelIterator(
cfd_->table_cache(), read_options, file_options_,
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
mutable_cf_options_, should_sample_file_read(),
cfd_->internal_stats(),
cfd_->internal_stats()->GetFileReadHist(level),
TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
nullptr /* range_del_agg */, nullptr /* compaction_boundaries */,
Expand Down Expand Up @@ -2038,7 +2041,7 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
auto table_iter = cfd_->table_cache()->NewIterator(
read_options, soptions, cfd_->internal_comparator(),
*file.file_metadata, /*range_del_agg=*/nullptr, mutable_cf_options_,
nullptr, cfd_->internal_stats()->GetFileReadHist(0),
nullptr, cfd_->internal_stats(), cfd_->internal_stats()->GetFileReadHist(0),
TableReaderCaller::kUserIterator, arena,
/*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
/*smallest_compaction_key=*/nullptr,
Expand Down Expand Up @@ -2070,6 +2073,7 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
cfd_->table_cache(), read_options, soptions,
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
mutable_cf_options_, should_sample_file_read(),
cfd_->internal_stats(),
cfd_->internal_stats()->GetFileReadHist(level),
TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
/*range_del_agg=*/nullptr,
Expand Down Expand Up @@ -2111,6 +2115,7 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
ScopedArenaPtr<InternalIterator> iter(cfd_->table_cache()->NewIterator(
read_options, file_options, cfd_->internal_comparator(),
*file->file_metadata, &range_del_agg, mutable_cf_options_, nullptr,
cfd_->internal_stats(),
cfd_->internal_stats()->GetFileReadHist(0),
TableReaderCaller::kUserIterator, &arena,
/*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
Expand All @@ -2129,6 +2134,7 @@ Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
cfd_->table_cache(), read_options, file_options,
cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
mutable_cf_options_, should_sample_file_read(),
cfd_->internal_stats(),
cfd_->internal_stats()->GetFileReadHist(level),
TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
&range_del_agg, nullptr, false));
Expand Down Expand Up @@ -7085,6 +7091,7 @@ InternalIterator* VersionSet::MakeInputIterator(
cfd->internal_comparator(), fmd, range_del_agg,
c->mutable_cf_options(),
/*table_reader_ptr=*/nullptr,
cfd->internal_stats(),
/*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
/*arena=*/nullptr,
/*skip_filters=*/false,
Expand All @@ -7106,6 +7113,7 @@ InternalIterator* VersionSet::MakeInputIterator(
cfd->table_cache(), read_options, file_options_compactions,
cfd->internal_comparator(), flevel, c->mutable_cf_options(),
/*should_sample=*/false,
cfd->internal_stats(),
/*no per level latency histogram=*/nullptr,
TableReaderCaller::kCompaction, /*skip_filters=*/false,
/*level=*/static_cast<int>(c->level(which)), range_del_agg,
Expand Down
4 changes: 3 additions & 1 deletion file/file_prefetch_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class FilePrefetchBuffer {
FilePrefetchBuffer(
const ReadaheadParams& readahead_params = {}, bool enable = true,
bool track_min_offset = false, FileSystem* fs = nullptr,
SystemClock* clock = nullptr, Statistics* stats = nullptr,
SystemClock* clock = nullptr, InternalStats* internal_stats = nullptr, Statistics* stats = nullptr,
const std::function<void(bool, uint64_t&, uint64_t&)>& cb = nullptr,
FilePrefetchBufferUsage usage = FilePrefetchBufferUsage::kUnknown)
: readahead_size_(readahead_params.initial_readahead_size),
Expand All @@ -204,6 +204,7 @@ class FilePrefetchBuffer {
explicit_prefetch_submitted_(false),
fs_(fs),
clock_(clock),
internal_stats_(internal_stats),
stats_(stats),
usage_(usage),
readaheadsize_cb_(cb),
Expand Down Expand Up @@ -696,6 +697,7 @@ class FilePrefetchBuffer {

FileSystem* fs_;
SystemClock* clock_;
InternalStats* internal_stats_;
Statistics* stats_;

FilePrefetchBufferUsage usage_;
Expand Down
Loading

0 comments on commit c76a690

Please sign in to comment.