Skip to content

Add compaction explicit prefetch stats #13520

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions db_stress_tool/db_stress_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class DbStressListener : public EventListener {
fault_fs_guard->DisableAllThreadLocalErrorInjection();
// TODO(hx235): only exempt the flush thread during error recovery instead
// of all the flush threads from error injection
fault_fs_guard->SetIOActivtiesExcludedFromFaultInjection(
fault_fs_guard->SetIOActivitiesExcludedFromFaultInjection(
{Env::IOActivity::kFlush});
}
}
Expand All @@ -275,7 +275,7 @@ class DbStressListener : public EventListener {
RandomSleep();
if (FLAGS_error_recovery_with_no_fault_injection && fault_fs_guard) {
fault_fs_guard->EnableAllThreadLocalErrorInjection();
fault_fs_guard->SetIOActivtiesExcludedFromFaultInjection({});
fault_fs_guard->SetIOActivitiesExcludedFromFaultInjection({});
}
}

Expand Down
6 changes: 5 additions & 1 deletion file/file_prefetch_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ Status FilePrefetchBuffer::Read(BufferInfo* buf, const IOOptions& opts,

if (usage_ == FilePrefetchBufferUsage::kUserScanPrefetch) {
RecordTick(stats_, PREFETCH_BYTES, read_len);
} else if (usage_ == FilePrefetchBufferUsage::kCompactionPrefetch) {
RecordInHistogram(stats_, COMPACTION_PREFETCH_BYTES, read_len);
}
if (!use_fs_buffer) {
// Update the buffer size.
Expand Down Expand Up @@ -154,7 +156,9 @@ Status FilePrefetchBuffer::ReadAsync(BufferInfo* buf, const IOOptions& opts,
&(buf->del_fn_), /*aligned_buf =*/nullptr);
req.status.PermitUncheckedError();
if (s.ok()) {
RecordTick(stats_, PREFETCH_BYTES, read_len);
if (usage_ == FilePrefetchBufferUsage::kUserScanPrefetch) {
RecordTick(stats_, PREFETCH_BYTES, read_len);
}
buf->async_read_in_progress_ = true;
}
return s;
Expand Down
4 changes: 4 additions & 0 deletions file/file_prefetch_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ struct BufferInfo {
enum class FilePrefetchBufferUsage {
kTableOpenPrefetchTail,
kUserScanPrefetch,
kCompactionPrefetch,
kUnknown,
};

Expand Down Expand Up @@ -574,6 +575,9 @@ class FilePrefetchBuffer {
size_t& read_len, uint64_t& aligned_useful_len);

void UpdateStats(bool found_in_buffer, size_t length_found) {
if (usage_ != FilePrefetchBufferUsage::kUserScanPrefetch) {
return;
}
if (found_in_buffer) {
RecordTick(stats_, PREFETCH_HITS);
}
Expand Down
36 changes: 31 additions & 5 deletions file/prefetch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,18 @@ TEST_P(PrefetchTest, Basic) {
const uint64_t prev_table_open_prefetch_tail_hit =
options.statistics->getTickerCount(TABLE_OPEN_PREFETCH_TAIL_HIT);

HistogramData pre_compaction_prefetch_bytes;
options.statistics->histogramData(COMPACTION_PREFETCH_BYTES,
&pre_compaction_prefetch_bytes);
ASSERT_EQ(pre_compaction_prefetch_bytes.count, 0);

// commenting out the line below causes the example to work correctly
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));

HistogramData post_compaction_prefetch_bytes;
options.statistics->histogramData(COMPACTION_PREFETCH_BYTES,
&post_compaction_prefetch_bytes);

HistogramData cur_table_open_prefetch_tail_read;
options.statistics->histogramData(TABLE_OPEN_PREFETCH_TAIL_READ_BYTES,
&cur_table_open_prefetch_tail_read);
Expand All @@ -318,6 +327,7 @@ TEST_P(PrefetchTest, Basic) {
ASSERT_GT(fs->GetPrefetchCount(), 1);
ASSERT_EQ(0, buff_prefetch_count);
fs->ClearPrefetchCount();
ASSERT_EQ(post_compaction_prefetch_bytes.count, 0);
} else {
ASSERT_FALSE(fs->IsPrefetchCalled());
// To rule out false positive by the SST file tail prefetch during
Expand All @@ -331,6 +341,20 @@ TEST_P(PrefetchTest, Basic) {
prev_table_open_prefetch_tail_hit);
ASSERT_GE(cur_table_open_prefetch_tail_miss,
prev_table_open_prefetch_tail_miss);

ASSERT_GT(post_compaction_prefetch_bytes.count, 0);

// Not an exact match due to potential roundup/down for alignment
auto expected_compaction_readahead_size =
Options().compaction_readahead_size;
ASSERT_LE(post_compaction_prefetch_bytes.max,
expected_compaction_readahead_size * 1.1);
ASSERT_GE(post_compaction_prefetch_bytes.max,
expected_compaction_readahead_size * 0.9);
ASSERT_LE(post_compaction_prefetch_bytes.average,
expected_compaction_readahead_size * 1.1);
ASSERT_GE(post_compaction_prefetch_bytes.average,
expected_compaction_readahead_size * 0.9);
}

for (bool disable_io : {false, true}) {
Expand Down Expand Up @@ -3251,8 +3275,9 @@ TEST_F(FilePrefetchBufferTest, SyncReadaheadStats) {
ReadaheadParams readahead_params;
readahead_params.initial_readahead_size = 8192;
readahead_params.max_readahead_size = 8192;
FilePrefetchBuffer fpb(readahead_params, true, false, fs(), nullptr,
stats.get());
FilePrefetchBuffer fpb(
readahead_params, true, false, fs(), nullptr, stats.get(),
nullptr /* cb */, FilePrefetchBufferUsage::kUserScanPrefetch /* usage */);
Slice result;
// Simulate a seek of 4096 bytes at offset 0. Due to the readahead settings,
// it will do a read of offset 0 and length - (4096 + 8192) 12288.
Expand Down Expand Up @@ -3497,9 +3522,10 @@ TEST_P(FSBufferPrefetchTest, FSBufferPrefetchStatsInternals) {
size_t num_buffers = use_async_prefetch ? 2 : 1;
readahead_params.num_buffers = num_buffers;

FilePrefetchBuffer fpb(readahead_params, true /* enable */,
false /* track_min_offset */, fs(), clock(),
stats.get());
FilePrefetchBuffer fpb(
readahead_params, true /* enable */, false /* track_min_offset */, fs(),
clock(), stats.get(), nullptr /* cb */,
FilePrefetchBufferUsage::kUserScanPrefetch /* usage */);

int overlap_buffer_write_ct = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
Expand Down
11 changes: 8 additions & 3 deletions include/rocksdb/statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -520,10 +520,11 @@ enum Tickers : uint32_t {
// Number of bytes prefetched during user initiated scan
PREFETCH_BYTES,

// Number of prefetched bytes that were actually useful
// Number of prefetched bytes that were actually useful during user initiated
// scan
PREFETCH_BYTES_USEFUL,

// Number of FS reads avoided due to scan prefetching
// Number of FS reads avoided due to prefetching during user initiated scan
PREFETCH_HITS,

// Footer corruption detected when opening an SST file for reading
Expand Down Expand Up @@ -657,13 +658,17 @@ enum Histograms : uint32_t {
ASYNC_READ_BYTES,
POLL_WAIT_MICROS,

// Number of bytes for RocksDB's prefetching (as opposed to file
// system's prefetch) on SST file during compaction read
COMPACTION_PREFETCH_BYTES,

// Number of prefetched bytes discarded by RocksDB.
PREFETCHED_BYTES_DISCARDED,

// Wait time for aborting async read in FilePrefetchBuffer destructor
ASYNC_PREFETCH_ABORT_MICROS,

// Number of bytes read for RocksDB's prefetching contents (as opposed to file
// Number of bytes for RocksDB's prefetching contents (as opposed to file
// system's prefetch) from the end of SST table during block based table open
TABLE_OPEN_PREFETCH_TAIL_READ_BYTES,

Expand Down
10 changes: 8 additions & 2 deletions java/rocksjni/portal.h
Original file line number Diff line number Diff line change
Expand Up @@ -5889,8 +5889,11 @@ class HistogramTypeJni {
return 0x3C;
case ROCKSDB_NAMESPACE::Histograms::TABLE_OPEN_PREFETCH_TAIL_READ_BYTES:
return 0x3D;
case ROCKSDB_NAMESPACE::Histograms::COMPACTION_PREFETCH_BYTES:
return 0x3F;
case ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX:
// 0x3D for backwards compatibility on current minor version.
// 0x3E is reserved for backwards compatibility on current minor
// version.
return 0x3E;
default:
// undefined/default
Expand Down Expand Up @@ -6033,8 +6036,11 @@ class HistogramTypeJni {
case 0x3D:
return ROCKSDB_NAMESPACE::Histograms::
TABLE_OPEN_PREFETCH_TAIL_READ_BYTES;
case 0x3F:
return ROCKSDB_NAMESPACE::Histograms::COMPACTION_PREFETCH_BYTES;
case 0x3E:
// 0x1F for backwards compatibility on current minor version.
// 0x3E is reserved for backwards compatibility on current minor
// version.
return ROCKSDB_NAMESPACE::Histograms::HISTOGRAM_ENUM_MAX;

default:
Expand Down
4 changes: 3 additions & 1 deletion java/src/main/java/org/rocksdb/HistogramType.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,9 @@ public enum HistogramType {
*/
TABLE_OPEN_PREFETCH_TAIL_READ_BYTES((byte) 0x3D),

// 0x3E for backwards compatibility on current minor version.
COMPACTION_PREFETCH_BYTES((byte) 0x3F),

// 0x3E is reserved for backwards compatibility on current minor version.
HISTOGRAM_ENUM_MAX((byte) 0x3E);

private final byte value;
Expand Down
1 change: 1 addition & 0 deletions monitoring/statistics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {
"rocksdb.error.handler.autoresume.retry.count"},
{ASYNC_READ_BYTES, "rocksdb.async.read.bytes"},
{POLL_WAIT_MICROS, "rocksdb.poll.wait.micros"},
{COMPACTION_PREFETCH_BYTES, "rocksdb.compaction.prefetch.bytes"},
{PREFETCHED_BYTES_DISCARDED, "rocksdb.prefetched.bytes.discarded"},
{ASYNC_PREFETCH_ABORT_MICROS, "rocksdb.async.prefetch.abort.micros"},
{TABLE_OPEN_PREFETCH_TAIL_READ_BYTES,
Expand Down
7 changes: 4 additions & 3 deletions table/block_based/block_prefetcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ void BlockPrefetcher::PrefetchIfNeeded(
// implicit_auto_readahead is set.
readahead_params.initial_readahead_size = compaction_readahead_size_;
readahead_params.max_readahead_size = compaction_readahead_size_;
rep->CreateFilePrefetchBufferIfNotExists(readahead_params,
&prefetch_buffer_,
/*readaheadsize_cb=*/nullptr);
rep->CreateFilePrefetchBufferIfNotExists(
readahead_params, &prefetch_buffer_,
/*readaheadsize_cb=*/nullptr,
/*usage=*/FilePrefetchBufferUsage::kCompactionPrefetch);
return;
}

Expand Down
1 change: 1 addition & 0 deletions unreleased_history/behavior_changes/ra_stats_user_only.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make stats `PREFETCH_BYTES_USEFUL`, `PREFETCH_HITS`, `PREFETCH_BYTES` only account for prefetching during user initiated scan
2 changes: 2 additions & 0 deletions unreleased_history/new_features/compact_ra_stats.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Provide histogram stats `COMPACTION_PREFETCH_BYTES` to measure number of bytes for RocksDB's prefetching (as opposed to file
system's prefetch) on SST file during compaction read
4 changes: 2 additions & 2 deletions utilities/fault_injection_fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1379,7 +1379,7 @@ IOStatus FaultInjectionTestFS::MaybeInjectThreadLocalReadError(
ErrorContext* ctx =
static_cast<ErrorContext*>(injected_thread_local_read_error_.Get());
if (ctx == nullptr || !ctx->enable_error_injection || !ctx->one_in ||
ShouldIOActivtiesExcludedFromFaultInjection(io_options.io_activity)) {
ShouldIOActivitiesExcludedFromFaultInjection(io_options.io_activity)) {
return IOStatus::OK();
}

Expand Down Expand Up @@ -1465,7 +1465,7 @@ IOStatus FaultInjectionTestFS::MaybeInjectThreadLocalError(

ErrorContext* ctx = GetErrorContextFromFaultInjectionIOType(type);
if (ctx == nullptr || !ctx->enable_error_injection || !ctx->one_in ||
ShouldIOActivtiesExcludedFromFaultInjection(io_options.io_activity) ||
ShouldIOActivitiesExcludedFromFaultInjection(io_options.io_activity) ||
(type == FaultInjectionIOType::kWrite &&
ShouldExcludeFromWriteFaultInjection(file_name))) {
return IOStatus::OK();
Expand Down
15 changes: 8 additions & 7 deletions utilities/fault_injection_fs.h
Original file line number Diff line number Diff line change
Expand Up @@ -424,10 +424,11 @@ class FaultInjectionTestFS : public FileSystemWrapper {
allow_link_open_file_ = allow_link_open_file;
}

bool ShouldIOActivtiesExcludedFromFaultInjection(Env::IOActivity io_activty) {
bool ShouldIOActivitiesExcludedFromFaultInjection(
Env::IOActivity io_activity) {
MutexLock l(&mutex_);
return io_activties_excluded_from_fault_injection.find(io_activty) !=
io_activties_excluded_from_fault_injection.end();
return io_activities_excluded_from_fault_injection.find(io_activity) !=
io_activities_excluded_from_fault_injection.end();
}

void AssertNoOpenFile() { assert(open_managed_files_.empty()); }
Expand Down Expand Up @@ -520,10 +521,10 @@ class FaultInjectionTestFS : public FileSystemWrapper {
return count;
}

void SetIOActivtiesExcludedFromFaultInjection(
const std::set<Env::IOActivity>& io_activties) {
void SetIOActivitiesExcludedFromFaultInjection(
const std::set<Env::IOActivity>& io_activities) {
MutexLock l(&mutex_);
io_activties_excluded_from_fault_injection = io_activties;
io_activities_excluded_from_fault_injection = io_activities;
}

void SetFileTypesExcludedFromWriteFaultInjection(
Expand Down Expand Up @@ -627,7 +628,7 @@ class FaultInjectionTestFS : public FileSystemWrapper {
};

std::set<FileType> file_types_excluded_from_write_fault_injection_;
std::set<Env::IOActivity> io_activties_excluded_from_fault_injection;
std::set<Env::IOActivity> io_activities_excluded_from_fault_injection;
ThreadLocalPtr injected_thread_local_read_error_;
ThreadLocalPtr injected_thread_local_write_error_;
ThreadLocalPtr injected_thread_local_metadata_read_error_;
Expand Down
Loading