Skip to content

Commit 065cf80

Browse files
authored
Merge pull request #26656 from WillemKauf/backport-pr-26513-v25.1.x-81
[v25.1.x] `storage`: set cached disk usage after compaction rounds & more cached disk usage improvements (MANUAL BACKPORT)
2 parents e7cdea9 + 2cd9564 commit 065cf80

17 files changed

+285
-63
lines changed

src/v/storage/compacted_index_writer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class compacted_index_writer {
7878
virtual void set_flag(compacted_index::footer_flags) = 0;
7979
virtual void print(std::ostream&) const = 0;
8080
const ss::sstring& filename() const { return _name; }
81+
virtual size_t size_bytes() const = 0;
8182

8283
private:
8384
friend std::ostream&

src/v/storage/disk_log_impl.cc

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,6 @@ ss::future<> disk_log_impl::adjacent_merge_compact(
530530
segment->reader().filename(),
531531
result);
532532
if (result.did_compact()) {
533-
segment->clear_cached_disk_usage();
534533
_compaction_ratio.update(result.compaction_ratio());
535534
const ssize_t removed_bytes = ssize_t(result.size_before)
536535
- ssize_t(result.size_after);
@@ -1064,7 +1063,12 @@ ss::future<compaction_result> disk_log_impl::do_compact_adjacent_segments(
10641063
}
10651064
// transfer segment state from replacement to target
10661065
locks = co_await internal::transfer_segment(
1067-
target, replacement, cfg, *_probe, std::move(locks));
1066+
target,
1067+
replacement,
1068+
cfg,
1069+
*_probe,
1070+
std::move(locks),
1071+
ret.cmp_idx_size_after);
10681072

10691073
auto segment_to_remove = segments.back();
10701074
// We are going to remove one of the segments, but its bytes have not
@@ -1576,7 +1580,8 @@ ss::future<> disk_log_impl::rewrite_segment_with_offset_map(
15761580
co_await seg->index().drop_all_data();
15771581

15781582
// Rename the data file.
1579-
co_await internal::do_swap_data_file_handles(tmpname, seg, cfg, *_probe);
1583+
co_await internal::do_swap_data_file_handles(
1584+
tmpname, seg, cfg, *_probe, compacted_idx_writer->size_bytes());
15801585

15811586
// Persist the state of our indexes in their new names.
15821587
seg->index().swap_index_state(std::move(new_idx));

src/v/storage/segment.cc

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,9 @@ ss::future<usage> segment::persistent_size() {
120120
* segment appender will transparently extend the size of the segment file
121121
* using fallocate, always stat the on disk size for the head partition.
122122
*/
123-
if (!_appender && _data_disk_usage_size.has_value()) {
123+
if (_appender) {
124+
u.data = _appender->size_bytes();
125+
} else if (_data_disk_usage_size.has_value()) {
124126
u.data = _data_disk_usage_size.value();
125127
} else {
126128
try {
@@ -139,7 +141,9 @@ ss::future<usage> segment::persistent_size() {
139141
* however, we pay for that stat() with no guarantee that the information
140142
* will be used.
141143
*/
142-
if (_compaction_index_size.has_value()) {
144+
if (_compaction_index) {
145+
u.compaction = _compaction_index.value()->size_bytes();
146+
} else if (_compaction_index_size.has_value()) {
143147
u.compaction = _compaction_index_size.value();
144148
} else {
145149
auto path = reader().path().to_compacted_index();
@@ -189,6 +193,14 @@ void segment::clear_cached_disk_usage() {
189193
_compaction_index_size.reset();
190194
}
191195

196+
void segment::set_cached_disk_usage(
197+
size_t new_seg_size, std::optional<size_t> new_compacted_index_size) {
198+
_data_disk_usage_size = new_seg_size;
199+
if (new_compacted_index_size.has_value()) {
200+
_compaction_index_size = new_compacted_index_size.value();
201+
}
202+
}
203+
192204
ss::future<size_t> segment::remove_persistent_state() {
193205
vassert(is_closed(), "Cannot clear state from unclosed segment");
194206

@@ -243,13 +255,22 @@ ss::future<> segment::do_release_appender(
243255
std::optional<std::unique_ptr<compacted_index_writer>>&
244256
compacted_index) {
245257
return appender->close()
246-
.then([this] { return _idx.flush(); })
247-
.then([this, &compacted_index] {
258+
.then([this] {
259+
clear_cached_disk_usage();
260+
return _idx.flush();
261+
})
262+
.then([&compacted_index] {
248263
if (compacted_index) {
249264
return compacted_index.value()->close();
250265
}
251-
clear_cached_disk_usage();
252266
return ss::now();
267+
})
268+
.then([this, &compacted_index, &appender] {
269+
std::optional<size_t> cmp_idx_size{std::nullopt};
270+
if (compacted_index) {
271+
cmp_idx_size = compacted_index.value()->size_bytes();
272+
}
273+
set_cached_disk_usage(appender->size_bytes(), cmp_idx_size);
253274
});
254275
});
255276
}

src/v/storage/segment.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ struct segment_closed_exception final : std::exception {
3737
}
3838
};
3939

40+
namespace testing_details {
41+
class segment_accessor;
42+
}; // namespace testing_details
43+
4044
class segment {
4145
public:
4246
using generation_id = named_type<uint64_t, struct segment_gen_tag>;
@@ -271,6 +275,17 @@ class segment {
271275
}
272276

273277
void clear_cached_disk_usage();
278+
// Sets the cached disk usage for the `segment` and optionally the
279+
// `compacted_index`. These are usually the values from the underlying
280+
// `segment_{appender/reader}` for either of these objects. The base
281+
// `segment_index` sets its disk usage in `flush_to_file()`.
282+
//
283+
// This function is widely used within the compaction subsystem to cut down
284+
// on future calls to `stat()`, since the cached disk usage is cleared
285+
// during every round of compaction, and we know from in-memory state how
286+
// large these objects are going to be on disk.
287+
void set_cached_disk_usage(
288+
size_t new_seg_size, std::optional<size_t> new_compacted_index_size);
274289

275290
/// Fallback timestamp method, for use if the timestamps in the index
276291
/// appear to be invalid (e.g. too far in the future)
@@ -350,6 +365,7 @@ class segment {
350365
std::optional<ss::lowres_clock::time_point> _first_write;
351366

352367
friend std::ostream& operator<<(std::ostream&, const segment&);
368+
friend class testing_details::segment_accessor;
353369
};
354370

355371
/**

src/v/storage/segment_appender.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,12 @@ ss::future<> segment_appender::close() {
326326
_closed = true;
327327
return hard_flush()
328328
.then([this] { return do_truncation(_committed_offset); })
329-
.then([this] { return _out.close(); });
329+
.then([this] {
330+
_fallocation_offset = _committed_offset;
331+
_flushed_offset = _committed_offset;
332+
_stable_offset = _committed_offset;
333+
return _out.close();
334+
});
330335
}
331336

332337
ss::future<> segment_appender::do_next_adaptive_fallocation() {

src/v/storage/segment_appender.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ class segment_appender {
8989
return _committed_offset + _bytes_flush_pending;
9090
}
9191

92+
// The size of the underlying file on disk, which is the
93+
// _fallocation_offset.
94+
size_t size_bytes() const { return _fallocation_offset; }
95+
9296
/**
9397
* @brief Appends a batch.
9498
*

src/v/storage/segment_index.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ ss::future<bool> segment_index::materialize_index() {
181181
ss::future<bool> segment_index::materialize_index_from_file(ss::file f) {
182182
auto size = co_await f.size();
183183
auto buf = co_await f.dma_read_bulk<char>(0, size);
184+
_disk_usage_size = size;
184185
if (buf.empty()) {
185186
co_return false;
186187
}
@@ -229,6 +230,9 @@ ss::future<> segment_index::flush_to_file(ss::file backing_file) {
229230
for (const auto& f : b) {
230231
co_await out.write(f.get(), f.size());
231232
}
233+
234+
_disk_usage_size = b.size_bytes();
235+
232236
co_await out.flush();
233237
}
234238

src/v/storage/segment_reader.cc

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ segment_reader::~segment_reader() noexcept {
5151
ss::future<> segment_reader::load_size() {
5252
ss::gate::holder guard{_gate};
5353

54-
auto s = co_await stat();
55-
set_file_size(s.st_size);
54+
auto s = co_await fsize();
55+
set_file_size(s);
5656
};
5757

5858
ss::future<segment_reader_handle>
@@ -132,6 +132,15 @@ ss::future<> segment_reader::put() {
132132
}
133133
}
134134

135+
ss::future<size_t> segment_reader::fsize() {
136+
ss::gate::holder guard{_gate};
137+
138+
auto handle = co_await get();
139+
auto r = co_await _data_file.size();
140+
co_await handle.close();
141+
co_return r;
142+
}
143+
135144
ss::future<struct stat> segment_reader::stat() {
136145
ss::gate::holder guard{_gate};
137146

src/v/storage/segment_reader.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,9 @@ class segment_reader {
132132
/// close the underlying file handle
133133
ss::future<> close();
134134

135+
// Performs a syscall to get the size of _data_file on disk.
136+
ss::future<size_t> fsize();
137+
135138
/// perform syscall stat
136139
ss::future<struct stat> stat();
137140

src/v/storage/segment_set.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,8 @@ static ss::future<segment_set> unsafe_do_recover(
281281
to_recover.begin(),
282282
to_recover.end(),
283283
[](ss::lw_shared_ptr<segment>& segment) {
284-
auto stat = segment->reader().stat().get();
285-
if (stat.st_size != 0) {
284+
auto size = segment->reader().fsize().get();
285+
if (size != 0) {
286286
return true;
287287
}
288288
vlog(stlog.info, "Removing empty segment: {}", segment);

0 commit comments

Comments
 (0)