Skip to content

Commit c9c6a23

Browse files
committed
storage: always schedule adjacent segment compaction
CONFLICT: - this branch had code that takes a range instead of just the compaction config for compact_adjacent_segments() We previously fell back on adjacent segment compaction only if there was no new data to compact. In some situations, we've seen the rate of incoming data outpace the compaction interval, causing segments to pile up without ever being merged. This change tweaks the logic to always run adjacent segment compaction after running sliding window compaction. Along the way, a couple tests needed to be tweaked to handle the fact that housekeeping now may merge segments. (cherry picked from commit 08d0433)
1 parent 8b80b67 commit c9c6a23

File tree

4 files changed

+26
-36
lines changed

4 files changed

+26
-36
lines changed

src/v/archival/tests/ntp_archiver_reupload_test.cc

+3-9
Original file line numberDiff line numberDiff line change
@@ -226,10 +226,11 @@ struct reupload_fixture : public archiver_fixture {
226226
manifest_view);
227227
}
228228

229+
// XXX: rename this! It doesn't just run self compaction -- it performs
230+
// houskeeping which may also merge segments.
229231
ss::lw_shared_ptr<storage::segment> self_compact_next_segment(
230232
model::offset max_collectible = model::offset::max()) {
231233
auto& seg_set = disk_log_impl()->segments();
232-
auto size_before = seg_set.size();
233234

234235
disk_log_impl()
235236
->housekeeping(storage::housekeeping_config{
@@ -240,13 +241,6 @@ struct reupload_fixture : public archiver_fixture {
240241
abort_source})
241242
.get();
242243

243-
auto size_after = seg_set.size();
244-
245-
// We are only looking to trigger self-compaction here.
246-
// If the segment count reduced, adjacent segment compaction must
247-
// have occurred.
248-
BOOST_REQUIRE_EQUAL(size_before, size_after);
249-
250244
ss::lw_shared_ptr<storage::segment> last_compacted_segment;
251245
for (auto& i : seg_set) {
252246
if (i->finished_self_compaction()) {
@@ -447,11 +441,11 @@ FIXTURE_TEST(
447441
std::stringstream st;
448442
stm_manifest.serialize_json(st);
449443
vlog(test_log.debug, "manifest: {}", st.str());
450-
verify_segment_request("500-1-v1.log", stm_manifest);
451444

452445
BOOST_REQUIRE_EQUAL(
453446
stm_manifest.get_last_uploaded_compacted_offset(), model::offset{999});
454447
auto replaced = stm_manifest.replaced_segments();
448+
BOOST_REQUIRE_EQUAL(replaced.size(), 1);
455449
BOOST_REQUIRE_EQUAL(replaced[0].base_offset, model::offset{500});
456450
}
457451

src/v/archival/tests/segment_reupload_test.cc

+8-6
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,7 @@ SEASTAR_THREAD_TEST_CASE(test_upload_aligned_to_non_existent_offset) {
832832

833833
auto first = spec.segment_starts.begin();
834834
auto second = std::next(first);
835+
// Segment boundaries: [5, 14][15, 24][25, 34][35, 50]...
835836
for (; second != spec.segment_starts.end(); ++first, ++second) {
836837
b | storage::add_segment(*first);
837838
for (auto curr_offset = *first; curr_offset < *second; ++curr_offset) {
@@ -843,19 +844,18 @@ SEASTAR_THREAD_TEST_CASE(test_upload_aligned_to_non_existent_offset) {
843844
.get();
844845
}
845846
auto seg = b.get_log_segments().back();
846-
seg->appender().close().get();
847-
seg->release_appender().get();
847+
seg->release_appender(&b.get_disk_log_impl().readers()).get();
848848
}
849849

850850
b | storage::add_segment(*first)
851851
| storage::add_random_batch(*first, spec.last_segment_num_records);
852852

853-
// Compaction will rewrite each segment.
853+
// Compaction will rewrite each segment, and merge the first two.
854+
// Segment boundaries: [5, 24][25, 34][35, 50]...
854855
b.gc(model::timestamp::max(), std::nullopt).get();
855856

856857
size_t max_size = b.get_segment(0).size_bytes()
857-
+ b.get_segment(1).size_bytes()
858-
+ b.get_segment(2).size_bytes();
858+
+ b.get_segment(1).size_bytes();
859859
archival::segment_collector collector{
860860
model::offset{5}, m, b.get_disk_log_impl(), max_size};
861861

@@ -868,6 +868,8 @@ SEASTAR_THREAD_TEST_CASE(test_upload_aligned_to_non_existent_offset) {
868868
ss::default_priority_class(), segment_lock_timeout)
869869
.get());
870870

871+
// The upload candidate should align with the manifest's segment
872+
// boundaries.
871873
auto upload_candidate = upload_with_locks.candidate;
872874
BOOST_REQUIRE(!upload_candidate.sources.empty());
873875
BOOST_REQUIRE_EQUAL(upload_candidate.starting_offset, model::offset{10});
@@ -885,7 +887,7 @@ SEASTAR_THREAD_TEST_CASE(test_upload_aligned_to_non_existent_offset) {
885887
BOOST_REQUIRE_EQUAL(
886888
expected_content_length, upload_candidate.content_length);
887889

888-
BOOST_REQUIRE_EQUAL(upload_with_locks.read_locks.size(), 3);
890+
BOOST_REQUIRE_EQUAL(upload_with_locks.read_locks.size(), 2);
889891
}
890892

891893
SEASTAR_THREAD_TEST_CASE(test_same_size_reupload_skipped) {

src/v/storage/disk_log_impl.cc

+6-2
Original file line numberDiff line numberDiff line change
@@ -1243,7 +1243,12 @@ ss::future<> disk_log_impl::do_compact(
12431243
std::rethrow_exception(eptr);
12441244
}
12451245
bool compacted = did_compact_fut.get();
1246-
if (!compacted) {
1246+
vlog(
1247+
gclog.debug,
1248+
"Sliding compaction of {} did {}compact data, proceeding to adjacent "
1249+
"segment compaction",
1250+
config().ntp(),
1251+
compacted ? "" : "not ");
12471252
if (auto range = find_compaction_range(compact_cfg); range) {
12481253
auto r = co_await compact_adjacent_segments(
12491254
std::move(*range), compact_cfg);
@@ -1261,7 +1266,6 @@ ss::future<> disk_log_impl::do_compact(
12611266
"Adjacent segments of {}, no adjacent pair",
12621267
config().ntp());
12631268
}
1264-
}
12651269
}
12661270

12671271
ss::future<> disk_log_impl::gc(gc_config cfg) {

src/v/storage/tests/storage_e2e_test.cc

+9-19
Original file line numberDiff line numberDiff line change
@@ -1805,22 +1805,21 @@ FIXTURE_TEST(adjacent_segment_compaction, storage_test_fixture) {
18051805

18061806
// There are 4 segments, and the last is the active segments. The first two
18071807
// will merge, and the third will be compacted but not merged.
1808+
18081809
log->housekeeping(c_cfg).get0();
1809-
BOOST_REQUIRE_EQUAL(log->segment_count(), 4);
1810+
BOOST_REQUIRE_EQUAL(log->segment_count(), 3);
18101811

18111812
// Check if it honors max_compactible offset by resetting it to the base
18121813
// offset of first segment. Nothing should be compacted.
18131814
const auto first_segment_offsets = log->segments().front()->offsets();
18141815
c_cfg.compact.max_collectible_offset
18151816
= first_segment_offsets.get_base_offset();
18161817
log->housekeeping(c_cfg).get0();
1817-
BOOST_REQUIRE_EQUAL(log->segment_count(), 4);
1818-
1819-
// The segment count will be reduced again.
1820-
c_cfg.compact.max_collectible_offset = model::offset::max();
1821-
log->housekeeping(c_cfg).get0();
18221818
BOOST_REQUIRE_EQUAL(log->segment_count(), 3);
18231819

1820+
// Now compact without restricting collectible offset. The segment count
1821+
// will be reduced again.
1822+
c_cfg.compact.max_collectible_offset = model::offset::max();
18241823
log->housekeeping(c_cfg).get0();
18251824
BOOST_REQUIRE_EQUAL(log->segment_count(), 2);
18261825

@@ -1872,9 +1871,6 @@ FIXTURE_TEST(adjacent_segment_compaction_terms, storage_test_fixture) {
18721871
as);
18731872

18741873
// compact all the individual segments
1875-
log->housekeeping(c_cfg).get0();
1876-
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 6);
1877-
18781874
// the two segments with term 2 can be combined
18791875
log->housekeeping(c_cfg).get0();
18801876
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 5);
@@ -1946,21 +1942,15 @@ FIXTURE_TEST(max_adjacent_segment_compaction, storage_test_fixture) {
19461942
as);
19471943

19481944
// self compaction steps
1949-
log->housekeeping(c_cfg).get0();
1950-
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 6);
1951-
19521945
// the first two segments are combined 2+2=4 < 6 MB
19531946
log->housekeeping(c_cfg).get0();
19541947
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 5);
19551948

19561949
// the new first and second are too big 4+5 > 6 MB but the second and third
19571950
// can be combined 5 + 15KB < 6 MB
1958-
log->housekeeping(c_cfg).get0();
1959-
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 4);
1960-
19611951
// then the next 16 KB can be folded in
19621952
log->housekeeping(c_cfg).get0();
1963-
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 3);
1953+
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 4);
19641954

19651955
// that's all that can be done. the next seg is an appender
19661956
log->housekeeping(c_cfg).get0();
@@ -2159,16 +2149,16 @@ FIXTURE_TEST(compaction_backlog_calculation, storage_test_fixture) {
21592149
// self compaction steps
21602150
log->housekeeping(c_cfg).get0();
21612151

2162-
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 5);
2152+
BOOST_REQUIRE_EQUAL(disk_log->segment_count(), 4);
21632153
auto new_backlog_size = log->compaction_backlog();
21642154
/**
21652155
* after all self segments are compacted they shouldn't be included into the
21662156
* backlog (only last segment is since it has appender and isn't self
21672157
* compacted)
21682158
*/
2169-
BOOST_REQUIRE_EQUAL(
2159+
BOOST_REQUIRE_LT(
21702160
new_backlog_size,
2171-
backlog_size - self_seg_compaction_sz + segments[4]->size_bytes());
2161+
backlog_size - self_seg_compaction_sz + segments[3]->size_bytes());
21722162
}
21732163

21742164
FIXTURE_TEST(not_compacted_log_backlog, storage_test_fixture) {

0 commit comments

Comments
 (0)