Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add statistic to track compaction prefetching memory usage #13302

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ Status BuildTable(
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
tboptions.read_options, file_options, tboptions.internal_comparator,
*meta, nullptr /* range_del_agg */, mutable_cf_options, nullptr,
internal_stats,
(internal_stats == nullptr) ? nullptr
: internal_stats->GetFileReadHist(0),
TableReaderCaller::kFlush, /*arena=*/nullptr,
Expand Down
2 changes: 1 addition & 1 deletion db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ class ColumnFamilyData {
const DBOptions& db_options,
const std::unordered_map<std::string, std::string>& options_map);

InternalStats* internal_stats() { return internal_stats_.get(); }
InternalStats* internal_stats() const { return internal_stats_.get(); }

MemTableList* imm() { return &imm_; }
MemTable* mem() { return mem_; }
Expand Down
2 changes: 1 addition & 1 deletion db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ Status CompactionJob::Run() {
cfd->internal_comparator(), files_output[file_idx]->meta,
/*range_del_agg=*/nullptr,
compact_->compaction->mutable_cf_options(),
/*table_reader_ptr=*/nullptr,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the part that we care about the most IMO. We want to have internal_stats passed in from CompactionJob::Run

/*table_reader_ptr=*/nullptr, cfd->internal_stats(),
cfd->internal_stats()->GetFileReadHist(
compact_->compaction->output_level()),
TableReaderCaller::kCompactionRefill, /*arena=*/nullptr,
Expand Down
9 changes: 5 additions & 4 deletions db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -463,12 +463,13 @@ class CompactionJobTestBase : public testing::Test {
TableReaderOptions(cfd->ioptions(), nullptr, FileOptions(),
cfd_->internal_comparator(),
0 /* block_protection_bytes_per_key */),
std::move(freader), file_size, &table_reader, false);
std::move(freader), file_size, &table_reader,
/*internal_stats=*/nullptr, false);
ASSERT_OK(s);
assert(table_reader);
std::unique_ptr<InternalIterator> iiter(
table_reader->NewIterator(read_opts, nullptr, nullptr, true,
TableReaderCaller::kUncategorized));
std::unique_ptr<InternalIterator> iiter(table_reader->NewIterator(
read_opts, /*prefix_extractor=*/nullptr, /*arena=*/nullptr,
/*internal_stats*/ nullptr, true, TableReaderCaller::kUncategorized));
assert(iiter);

mock::KVVector from_db;
Expand Down
18 changes: 12 additions & 6 deletions db/convenience.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,19 @@ Status DeleteFilesInRanges(DB* db, ColumnFamilyHandle* column_family,

Status VerifySstFileChecksum(const Options& options,
const EnvOptions& env_options,
const std::string& file_path) {
const std::string& file_path,
InternalStats* internal_stats) {
// TODO: plumb Env::IOActivity, Env::IOPriority
const ReadOptions read_options;
return VerifySstFileChecksum(options, env_options, read_options, file_path);
return VerifySstFileChecksum(options, env_options, read_options, file_path,
/*largest_seqno=*/0, internal_stats);
}
Status VerifySstFileChecksum(const Options& options,
const EnvOptions& env_options,
const ReadOptions& _read_options,
const std::string& file_path,
const SequenceNumber& largest_seqno) {
const SequenceNumber& largest_seqno,
InternalStats* internal_stats) {
if (_read_options.io_activity != Env::IOActivity::kUnknown) {
return Status::InvalidArgument(
"Can only call VerifySstFileChecksum with `ReadOptions::io_activity` "
Expand All @@ -50,14 +53,16 @@ Status VerifySstFileChecksum(const Options& options,
}
ReadOptions read_options(_read_options);
return VerifySstFileChecksumInternal(options, env_options, read_options,
file_path, largest_seqno);
file_path, largest_seqno,
internal_stats);
}

Status VerifySstFileChecksumInternal(const Options& options,
const EnvOptions& env_options,
const ReadOptions& read_options,
const std::string& file_path,
const SequenceNumber& largest_seqno) {
const SequenceNumber& largest_seqno,
InternalStats* internal_stats) {
std::unique_ptr<FSRandomAccessFile> file;
uint64_t file_size;
InternalKeyComparator internal_comparator(options.comparator);
Expand Down Expand Up @@ -88,7 +93,8 @@ Status VerifySstFileChecksumInternal(const Options& options,
reader_options.largest_seqno = largest_seqno;
s = options.table_factory->NewTableReader(
read_options, reader_options, std::move(file_reader), file_size,
&table_reader, false /* prefetch_index_and_filter_in_cache */);
&table_reader, internal_stats,
false /* prefetch_index_and_filter_in_cache */);
if (!s.ok()) {
return s;
}
Expand Down
5 changes: 4 additions & 1 deletion db/convenience_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
#pragma once
#include "rocksdb/db.h"

class InternalStats;

namespace ROCKSDB_NAMESPACE {
Status VerifySstFileChecksumInternal(const Options& options,
const EnvOptions& env_options,
const ReadOptions& read_options,
const std::string& file_path,
const SequenceNumber& largest_seqno = 0);
const SequenceNumber& largest_seqno,
InternalStats* internal_stats);
} // namespace ROCKSDB_NAMESPACE
3 changes: 2 additions & 1 deletion db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6548,7 +6548,8 @@ Status DBImpl::VerifyChecksumInternal(const ReadOptions& read_options,
read_options);
} else {
s = ROCKSDB_NAMESPACE::VerifySstFileChecksumInternal(
opts, file_options_, read_options, fname, fd.largest_seqno);
opts, file_options_, read_options, fname, fd.largest_seqno,
default_cf_internal_stats_);
}
RecordTick(stats_, VERIFY_CHECKSUM_READ_BYTES,
IOSTATS(bytes_read) - prev_bytes_read);
Expand Down
2 changes: 2 additions & 0 deletions db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@ Status ExternalSstFileIngestionJob::ResetTableReader(
/* unique_id */ {}, /* largest_seqno */ 0,
/* tail_size */ 0, user_defined_timestamps_persisted),
std::move(sst_file_reader), file_to_ingest->file_size, table_reader,
cfd_->internal_stats(),
// No need to prefetch index/filter if caching is not needed.
/*prefetch_index_and_filter_in_cache=*/ingestion_options_.fill_cache);
return status;
Expand Down Expand Up @@ -923,6 +924,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
ro.fill_cache = ingestion_options_.fill_cache;
std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
cfd_->internal_stats(),
/*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));

// Get first (smallest) and last (largest) key from file.
Expand Down
16 changes: 10 additions & 6 deletions db/forward_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class ForwardLevelIterator : public InternalIterator {
*files_[file_index_],
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
mutable_cf_options_, /*table_reader_ptr=*/nullptr,
cfd_->internal_stats(),
/*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator,
/*arena=*/nullptr, /*skip_filters=*/false, /*level=*/-1,
/*max_file_size_for_l0_meta_pin=*/0,
Expand Down Expand Up @@ -749,8 +750,9 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0,
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
sv_->mutable_cf_options,
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
TableReaderCaller::kUserIterator, /*arena=*/nullptr,
/*table_reader_ptr=*/nullptr, cfd_->internal_stats(),
/*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator,
/*arena=*/nullptr,
/*skip_filters=*/false, /*level=*/-1,
MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
/*smallest_compaction_key=*/nullptr,
Expand Down Expand Up @@ -837,8 +839,9 @@ void ForwardIterator::RenewIterators() {
*l0_files_new[inew],
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
svnew->mutable_cf_options,
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
TableReaderCaller::kUserIterator, /*arena=*/nullptr,
/*table_reader_ptr=*/nullptr, cfd_->internal_stats(),
/*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator,
/*arena=*/nullptr,
/*skip_filters=*/false, /*level=*/-1,
MaxFileSizeForL0MetaPin(svnew->mutable_cf_options),
/*smallest_compaction_key=*/nullptr,
Expand Down Expand Up @@ -902,8 +905,9 @@ void ForwardIterator::ResetIncompleteIterators() {
l0_iters_[i] = cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
*l0_files[i], /*range_del_agg=*/nullptr, sv_->mutable_cf_options,
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
TableReaderCaller::kUserIterator, /*arena=*/nullptr,
/*table_reader_ptr=*/nullptr, cfd_->internal_stats(),
/*file_read_hist=*/nullptr, TableReaderCaller::kUserIterator,
/*arena=*/nullptr,
/*skip_filters=*/false, /*level=*/-1,
MaxFileSizeForL0MetaPin(sv_->mutable_cf_options),
/*smallest_compaction_key=*/nullptr,
Expand Down
4 changes: 3 additions & 1 deletion db/import_column_family_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo(
/*block_cache_tracer*/ nullptr,
/*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
/*cur_file_num*/ new_file_number),
std::move(sst_file_reader), file_to_import->file_size, &table_reader);
std::move(sst_file_reader), file_to_import->file_size, &table_reader,
cfd_->internal_stats());
if (!status.ok()) {
return status;
}
Expand All @@ -362,6 +363,7 @@ Status ImportColumnFamilyJob::GetIngestedFileInfo(
ReadOptions ro;
std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
cfd_->internal_stats(),
/*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));

// Get first (smallest) key from file
Expand Down
2 changes: 2 additions & 0 deletions db/internal_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ const std::map<InternalStats::InternalDBStatsType, DBStatInfo>
DBStatInfo{WriteStallStatsMapKeys::CauseConditionCount(
WriteStallCause::kWriteBufferManagerLimit,
WriteStallCondition::kStopped)}},
{InternalStats::kIntStatsPrefetchBufferSizeBytes,
DBStatInfo{"db.prefetch_buffer_size_bytes"}},
};

namespace {
Expand Down
12 changes: 12 additions & 0 deletions db/internal_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class InternalStats {
// So we should improve, rename or clarify it
kIntStatsWriteStallMicros,
kIntStatsWriteBufferManagerLimitStopsCounts,
kIntStatsPrefetchBufferSizeBytes,
kIntStatsNumMax,
};

Expand Down Expand Up @@ -615,6 +616,17 @@ class InternalStats {
}
}

void SubDBStats(InternalDBStatsType type, uint64_t value,
bool concurrent = false) {
auto& v = db_stats_[type];
if (concurrent) {
v.fetch_sub(value, std::memory_order_relaxed);
} else {
v.store(v.load(std::memory_order_relaxed) - value,
std::memory_order_relaxed);
}
}

uint64_t GetDBStats(InternalDBStatsType type) {
return db_stats_[type].load(std::memory_order_relaxed);
}
Expand Down
2 changes: 1 addition & 1 deletion db/plain_table_db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ class TestPlainTableFactory : public PlainTableFactory {
Status NewTableReader(
const ReadOptions& /*ro*/, const TableReaderOptions& table_reader_options,
std::unique_ptr<RandomAccessFileReader>&& file, uint64_t file_size,
std::unique_ptr<TableReader>* table,
std::unique_ptr<TableReader>* table, InternalStats* /*internal_stats*/,
bool /*prefetch_index_and_filter_in_cache*/) const override {
std::unique_ptr<TableProperties> props;
const ReadOptions read_options;
Expand Down
10 changes: 6 additions & 4 deletions db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,8 @@ class Repairer {
std::move(range_del_iters), &meta, nullptr /* blob_file_additions */,
{}, kMaxSequenceNumber, kMaxSequenceNumber, kMaxSequenceNumber,
snapshot_checker, false /* paranoid_file_checks*/,
nullptr /* internal_stats */, &io_s, nullptr /*IOTracer*/,
BlobFileCreationReason::kRecovery,
cfd == nullptr ? nullptr : cfd->internal_stats(), &io_s,
nullptr /*IOTracer*/, BlobFileCreationReason::kRecovery,
nullptr /* seqno_to_time_mapping */, nullptr /* event_logger */,
0 /* job_id */, nullptr /* table_properties */, write_hint);
ROCKS_LOG_INFO(db_options_.info_log,
Expand Down Expand Up @@ -604,8 +604,10 @@ class Repairer {
InternalIterator* iter = table_cache_->NewIterator(
ropts, file_options_, cfd->internal_comparator(), t->meta,
nullptr /* range_del_agg */, cfd->GetLatestMutableCFOptions(),
/*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
TableReaderCaller::kRepair, /*arena=*/nullptr, /*skip_filters=*/false,
/*table_reader_ptr=*/nullptr,
cfd == nullptr ? nullptr : cfd->internal_stats(),
/*file_read_hist=*/nullptr, TableReaderCaller::kRepair,
/*arena=*/nullptr, /*skip_filters=*/false,
/*level=*/-1, /*max_file_size_for_l0_meta_pin=*/0,
/*smallest_compaction_key=*/nullptr,
/*largest_compaction_key=*/nullptr,
Expand Down
Loading
Loading