Skip to content

Commit 8b80b67

Browse files
committed
storage: fix bounds check in offset range size method
The methods to find offset range size currently use segment bounds to determine whether a given offset is available to be queried. This doesn't account for the case when the segment set contains offsets that don't fall in the log's offset range (e.g. follow a delete records request that trims mid-segment). This commit adds appropriate bounds checks to both methods. With an upcoming change to merge compact after windowed compaction, test_offset_range_size2_compacted would fail because it would prefix truncate mid-segment following a merge compaction, and then trip over this, hitting an unexpected exception when creating a reader: ``` std::runtime_error: Reader cannot read before start of the log 0 < 887 ``` (cherry picked from commit 69e4666)
1 parent a3d6730 commit 8b80b67

File tree

2 files changed

+80
-2
lines changed

2 files changed

+80
-2
lines changed

src/v/storage/disk_log_impl.cc

+26-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "model/adl_serde.h"
1616
#include "model/fundamental.h"
1717
#include "model/namespace.h"
18+
#include "model/offset_interval.h"
1819
#include "model/record_batch_types.h"
1920
#include "model/timeout_clock.h"
2021
#include "model/timestamp.h"
@@ -1898,12 +1899,23 @@ ss::future<size_t> disk_log_impl::get_file_offset(
18981899
ss::future<std::optional<log::offset_range_size_result_t>>
18991900
disk_log_impl::offset_range_size(
19001901
model::offset first, model::offset last, ss::io_priority_class io_priority) {
1902+
auto log_offsets = offsets();
19011903
vlog(
19021904
stlog.debug,
19031905
"Offset range size, first: {}, last: {}, lstat: {}",
19041906
first,
19051907
last,
1906-
offsets());
1908+
log_offsets);
1909+
auto log_interval = model::bounded_offset_interval::optional(
1910+
log_offsets.start_offset, log_offsets.committed_offset);
1911+
if (!log_interval.has_value()) {
1912+
vlog(stlog.debug, "Log is empty, returning early");
1913+
co_return std::nullopt;
1914+
}
1915+
if (!log_interval->contains(first) || !log_interval->contains(last)) {
1916+
vlog(stlog.debug, "Log does not include entire range");
1917+
co_return std::nullopt;
1918+
}
19071919

19081920
// build the collection
19091921
const auto segments = [&] {
@@ -2066,13 +2078,25 @@ disk_log_impl::offset_range_size(
20662078
model::offset first,
20672079
offset_range_size_requirements_t target,
20682080
ss::io_priority_class io_priority) {
2081+
auto log_offsets = offsets();
20692082
vlog(
20702083
stlog.debug,
20712084
"Offset range size, first: {}, target size: {}/{}, lstat: {}",
20722085
first,
20732086
target.target_size,
20742087
target.min_size,
2075-
offsets());
2088+
log_offsets);
2089+
auto log_interval = model::bounded_offset_interval::optional(
2090+
log_offsets.start_offset, log_offsets.committed_offset);
2091+
if (!log_interval.has_value()) {
2092+
vlog(stlog.debug, "Log is empty, returning early");
2093+
co_return std::nullopt;
2094+
}
2095+
if (!log_interval->contains(first)) {
2096+
vlog(stlog.debug, "Log does not include offset {}", first);
2097+
co_return std::nullopt;
2098+
}
2099+
20762100
auto base_it = _segs.lower_bound(first);
20772101

20782102
// Invariant: 'first' offset should be present in the log. If the segment is

src/v/storage/tests/storage_e2e_test.cc

+54
Original file line numberDiff line numberDiff line change
@@ -3884,6 +3884,60 @@ struct batch_size_accumulator {
38843884
size_t* size_bytes;
38853885
};
38863886

3887+
FIXTURE_TEST(
3888+
test_offset_range_size_after_mid_segment_truncation, storage_test_fixture) {
3889+
size_t num_segments = 2;
3890+
model::offset first_segment_last_offset;
3891+
auto cfg = default_log_config(test_dir);
3892+
storage::log_manager mgr = make_log_manager(cfg);
3893+
info("Configuration: {}", mgr.config());
3894+
auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get(); });
3895+
auto ntp = model::ntp("redpanda", "test-topic", 0);
3896+
3897+
storage::ntp_config ntp_cfg(ntp, mgr.config().base_dir);
3898+
3899+
auto log = mgr.manage(std::move(ntp_cfg)).get();
3900+
for (size_t i = 0; i < num_segments; i++) {
3901+
append_random_batches(
3902+
log,
3903+
10,
3904+
model::term_id(0),
3905+
custom_ts_batch_generator(model::timestamp::now()));
3906+
if (first_segment_last_offset == model::offset{}) {
3907+
first_segment_last_offset = log->offsets().dirty_offset;
3908+
}
3909+
log->force_roll(ss::default_priority_class()).get();
3910+
}
3911+
3912+
// Prefix truncate such that offset 1 is the new log start.
3913+
log
3914+
->truncate_prefix(storage::truncate_prefix_config(
3915+
model::offset(1), ss::default_priority_class()))
3916+
.get();
3917+
3918+
// Run size queries on ranges that don't exist in the log, but whose range
3919+
// is still included in a segment.
3920+
3921+
BOOST_CHECK(
3922+
log
3923+
->offset_range_size(
3924+
model::offset(0), model::offset(1), ss::default_priority_class())
3925+
.get()
3926+
== std::nullopt);
3927+
3928+
BOOST_CHECK(
3929+
log
3930+
->offset_range_size(
3931+
model::offset(0),
3932+
storage::log::offset_range_size_requirements_t{
3933+
.target_size = 1,
3934+
.min_size = 0,
3935+
},
3936+
ss::default_priority_class())
3937+
.get()
3938+
== std::nullopt);
3939+
}
3940+
38873941
FIXTURE_TEST(test_offset_range_size, storage_test_fixture) {
38883942
#ifdef NDEBUG
38893943
size_t num_test_cases = 5000;

0 commit comments

Comments
 (0)