Skip to content

Commit b030e5e

Browse files
Merge pull request #25904 from Lazin/manual-backport-24566-v24.2.x
[v24.2.x] archival: Disable cross-term compaction
2 parents a26470b + 71fd7e3 commit b030e5e

File tree

2 files changed

+115
-39
lines changed

2 files changed

+115
-39
lines changed

src/v/archival/segment_reupload.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
#include "segment_reupload.h"
1212

13+
#include "base/vlog.h"
1314
#include "cloud_storage/partition_manifest.h"
1415
#include "config/configuration.h"
1516
#include "logger.h"
@@ -190,6 +191,21 @@ void segment_collector::do_collect(segment_collector_mode mode) {
190191
result.segment->offsets().get_base_offset()});
191192
align_begin_offset_to_manifest();
192193
}
194+
195+
// Only segments from the same term can be concatenated together.
196+
if (
197+
!_segments.empty()
198+
&& _segments.back()->offsets().get_term()
199+
!= result.segment->offsets().get_term()) {
200+
archival_log.debug(
201+
"Segment collect for ntp {} stopping collection, last segment "
202+
"term {} is different from current segment term: {}",
203+
_manifest.get_ntp(),
204+
_segments.back()->offsets().get_term(),
205+
result.segment->offsets().get_term());
206+
break;
207+
}
208+
193209
_segments.push_back(result.segment);
194210
current_segment_end = result.segment->offsets().get_committed_offset();
195211
start = current_segment_end + model::offset{1};

src/v/archival/tests/ntp_archiver_reupload_test.cc

Lines changed: 99 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "config/configuration.h"
1818
#include "storage/ntp_config.h"
1919
#include "test_utils/fixture.h"
20+
#include "test_utils/scoped_config.h"
2021

2122
#include <seastar/core/sharded.hh>
2223

@@ -226,7 +227,7 @@ struct reupload_fixture : public archiver_fixture {
226227
manifest_view);
227228
}
228229

229-
ss::lw_shared_ptr<storage::segment> self_compact_next_segment(
230+
ss::lw_shared_ptr<storage::segment> run_disk_log_housekeeping(
230231
model::offset max_collectible = model::offset::max()) {
231232
auto& seg_set = disk_log_impl()->segments();
232233
auto size_before = seg_set.size();
@@ -322,7 +323,7 @@ FIXTURE_TEST(test_upload_compacted_segments, reupload_fixture) {
322323
// Mark first segment compacted, and re-upload, now only one segment is
323324
// uploaded.
324325
reset_http_call_state();
325-
auto seg = self_compact_next_segment(
326+
auto seg = run_disk_log_housekeeping(
326327
stm_manifest.first_addressable_segment()->committed_offset);
327328

328329
expected = archival::ntp_archiver::batch_result{{0, 0, 0}, {1, 0, 0}};
@@ -340,7 +341,7 @@ FIXTURE_TEST(test_upload_compacted_segments, reupload_fixture) {
340341
BOOST_REQUIRE_EQUAL(replaced[0].base_offset, model::offset{0});
341342

342343
// Mark second segment as compacted and re-upload.
343-
seg = self_compact_next_segment(
344+
seg = run_disk_log_housekeeping(
344345
std::next(stm_manifest.first_addressable_segment())->committed_offset);
345346

346347
reset_http_call_state();
@@ -366,7 +367,7 @@ FIXTURE_TEST(test_upload_compacted_segments, reupload_fixture) {
366367
FIXTURE_TEST(test_upload_compacted_segments_concat, reupload_fixture) {
367368
std::vector<segment_desc> segments = {
368369
{manifest_ntp, model::offset(0), model::term_id(1), 1000, 2},
369-
{manifest_ntp, model::offset(1000), model::term_id(4), 10, 2},
370+
{manifest_ntp, model::offset(1000), model::term_id(1), 10, 2},
370371
};
371372

372373
initialize(segments);
@@ -384,7 +385,7 @@ FIXTURE_TEST(test_upload_compacted_segments_concat, reupload_fixture) {
384385

385386
auto manifest = verify_manifest_request(*part);
386387
verify_segment_request("0-1-v1.log", manifest);
387-
verify_segment_request("1000-4-v1.log", manifest);
388+
verify_segment_request("1000-1-v1.log", manifest);
388389

389390
BOOST_REQUIRE(part->archival_meta_stm());
390391
const cloud_storage::partition_manifest& stm_manifest
@@ -397,7 +398,7 @@ FIXTURE_TEST(test_upload_compacted_segments_concat, reupload_fixture) {
397398
// Mark both segments compacted, and re-upload. One concatenated segment is
398399
// uploaded.
399400
reset_http_call_state();
400-
auto seg = self_compact_next_segment();
401+
auto seg = run_disk_log_housekeeping();
401402

402403
expected = archival::ntp_archiver::batch_result{{0, 0, 0}, {1, 0, 0}};
403404
upload_and_verify(archiver.value(), expected);
@@ -412,8 +413,7 @@ FIXTURE_TEST(test_upload_compacted_segments_concat, reupload_fixture) {
412413
BOOST_REQUIRE_EQUAL(replaced[0].base_offset, model::offset{0});
413414
BOOST_REQUIRE_EQUAL(replaced[1].base_offset, model::offset{1000});
414415

415-
verify_concat_segment_request(
416-
{"0-1-v1.log", "1000-4-v1.log"}, part->archival_meta_stm()->manifest());
416+
BOOST_REQUIRE_EQUAL(stm_manifest.size(), 1);
417417
}
418418

419419
FIXTURE_TEST(
@@ -438,7 +438,7 @@ FIXTURE_TEST(
438438
listen();
439439

440440
// Self-compact just the first couple segments.
441-
self_compact_next_segment(model::offset{999});
441+
run_disk_log_housekeeping(model::offset{999});
442442

443443
archival::ntp_archiver::batch_result expected{{0, 0, 0}, {1, 0, 0}};
444444
upload_and_verify(archiver.value(), expected);
@@ -475,7 +475,7 @@ FIXTURE_TEST(test_upload_compacted_segments_fill_gap, reupload_fixture) {
475475

476476
listen();
477477

478-
self_compact_next_segment();
478+
run_disk_log_housekeeping();
479479

480480
archival::ntp_archiver::batch_result expected{{0, 0, 0}, {1, 0, 0}};
481481
upload_and_verify(archiver.value(), expected);
@@ -541,7 +541,7 @@ FIXTURE_TEST(test_upload_both_compacted_and_non_compacted, reupload_fixture) {
541541
// NOTE: we can only compact up to what's been uploaded, since that
542542
// determines the max collectible offset.
543543
reset_http_call_state();
544-
auto seg = self_compact_next_segment(
544+
auto seg = run_disk_log_housekeeping(
545545
manifest.first_addressable_segment()->committed_offset);
546546

547547
expected = archival::ntp_archiver::batch_result{{1, 0, 0}, {1, 0, 0}};
@@ -607,7 +607,7 @@ FIXTURE_TEST(test_both_uploads_with_one_failing, reupload_fixture) {
607607
// Self-compact the first segment and re-upload. One compacted
608608
// and one non-compacted segments are uploaded.
609609
reset_http_call_state();
610-
auto seg = self_compact_next_segment(disk_log_impl()
610+
auto seg = run_disk_log_housekeeping(disk_log_impl()
611611
->segments()
612612
.begin()
613613
->get()
@@ -671,11 +671,9 @@ FIXTURE_TEST(test_upload_when_compaction_disabled, reupload_fixture) {
671671
BOOST_REQUIRE_EQUAL(
672672
stm_manifest.get_last_uploaded_compacted_offset(), model::offset{});
673673

674-
// Self-compact the first segment, since the topic has compaction
675-
// disabled, and re-upload, nothing is uploaded.
674+
// Since the topic has compaction is disabled nothing should
675+
// be reuploaded
676676
reset_http_call_state();
677-
auto seg = self_compact_next_segment();
678-
679677
expected = archival::ntp_archiver::batch_result{{0, 0, 0}, {0, 0, 0}};
680678
upload_and_verify(archiver.value(), expected);
681679
BOOST_REQUIRE_EQUAL(get_requests().size(), 0);
@@ -719,7 +717,7 @@ FIXTURE_TEST(test_upload_when_reupload_disabled, reupload_fixture) {
719717
// Mark first segment compacted artificially, since the topic has compaction
720718
// disabled, and re-upload, nothing is uploaded.
721719
reset_http_call_state();
722-
auto seg = self_compact_next_segment();
720+
auto seg = run_disk_log_housekeeping();
723721

724722
expected = archival::ntp_archiver::batch_result{{0, 0, 0}, {0, 0, 0}};
725723

@@ -745,10 +743,10 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
745743
// NOTE: different terms so compaction leaves one segment each.
746744
std::vector<segment_desc> segments = {
747745
{manifest_ntp, model::offset(0), model::term_id(1), 10, 2},
748-
{manifest_ntp, model::offset(10), model::term_id(2), 10, 2},
749-
{manifest_ntp, model::offset(20), model::term_id(3), 10, 2},
750-
{manifest_ntp, model::offset(30), model::term_id(4), 10, 2},
751-
{manifest_ntp, model::offset(40), model::term_id(5), 10, 2},
746+
{manifest_ntp, model::offset(10), model::term_id(1), 10, 2},
747+
{manifest_ntp, model::offset(20), model::term_id(1), 10, 2},
748+
{manifest_ntp, model::offset(30), model::term_id(1), 10, 2},
749+
{manifest_ntp, model::offset(40), model::term_id(1), 10, 2},
752750
};
753751

754752
initialize(segments);
@@ -767,9 +765,9 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
767765
get_targets().find(manifest_url)->second.content);
768766

769767
verify_segment_request("0-1-v1.log", manifest);
770-
verify_segment_request("10-2-v1.log", manifest);
771-
verify_segment_request("20-3-v1.log", manifest);
772-
verify_segment_request("30-4-v1.log", manifest);
768+
verify_segment_request("10-1-v1.log", manifest);
769+
verify_segment_request("20-1-v1.log", manifest);
770+
verify_segment_request("30-1-v1.log", manifest);
773771

774772
BOOST_REQUIRE(part->archival_meta_stm());
775773
const cloud_storage::partition_manifest& stm_manifest
@@ -789,28 +787,28 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
789787
create_segment(
790788
{manifest_ntp,
791789
last_segment->offsets().get_committed_offset() + model::offset{1},
792-
model::term_id{6},
790+
model::term_id{2},
793791
10});
794792
}
795793

796794
reset_http_call_state();
797795

798796
// Mark four segments as compacted, so they are valid for upload
799797
ss::lw_shared_ptr<storage::segment> seg;
800-
seg = self_compact_next_segment(model::offset(39));
798+
seg = run_disk_log_housekeeping(model::offset(39));
801799

802800
expected = archival::ntp_archiver::batch_result{{4, 0, 0}, {0, 0, 0}};
803801
upload_and_verify(archiver.value(), expected, model::offset::max());
804802
BOOST_REQUIRE_EQUAL(get_requests().size(), 9);
805803

806804
verify_segment_request(
807-
"40-5-v1.log", part->archival_meta_stm()->manifest());
805+
"40-1-v1.log", part->archival_meta_stm()->manifest());
808806
verify_segment_request(
809-
"50-6-v1.log", part->archival_meta_stm()->manifest());
807+
"50-2-v1.log", part->archival_meta_stm()->manifest());
810808
verify_segment_request(
811-
"65-6-v1.log", part->archival_meta_stm()->manifest());
809+
"65-2-v1.log", part->archival_meta_stm()->manifest());
812810
verify_segment_request(
813-
"85-6-v1.log", part->archival_meta_stm()->manifest());
811+
"85-2-v1.log", part->archival_meta_stm()->manifest());
814812

815813
BOOST_REQUIRE_EQUAL(
816814
stm_manifest.get_last_uploaded_compacted_offset(), model::offset{});
@@ -824,19 +822,81 @@ FIXTURE_TEST(test_upload_limit, reupload_fixture) {
824822
upload_and_verify(archiver.value(), expected);
825823
BOOST_REQUIRE_EQUAL(get_requests().size(), 3);
826824

827-
verify_concat_segment_request(
828-
{
829-
"0-1-v1.log",
830-
"10-2-v1.log",
831-
"20-3-v1.log",
832-
"30-4-v1.log",
833-
},
834-
part->archival_meta_stm()->manifest());
835-
836825
BOOST_REQUIRE_EQUAL(
837826
stm_manifest.get_last_uploaded_compacted_offset(),
838827
seg->offsets().get_committed_offset());
839828

840829
replaced = stm_manifest.replaced_segments();
841830
BOOST_REQUIRE_EQUAL(replaced.size(), 4);
831+
BOOST_REQUIRE_EQUAL(stm_manifest.size(), 5);
832+
}
833+
834+
FIXTURE_TEST(test_upload_compacted_segments_cross_term, reupload_fixture) {
835+
std::vector<segment_desc> segments = {
836+
{manifest_ntp, model::offset(0), model::term_id(1), 1000, 2},
837+
{manifest_ntp, model::offset(1000), model::term_id(4), 10, 2},
838+
};
839+
840+
initialize(segments);
841+
auto action = ss::defer([this] { archiver->stop().get(); });
842+
843+
auto part = app.partition_manager.local().get(manifest_ntp);
844+
listen();
845+
846+
// Upload two non compacted segments, no segment is compacted yet.
847+
archival::ntp_archiver::batch_result expected{{2, 0, 0}, {0, 0, 0}};
848+
upload_and_verify(archiver.value(), expected);
849+
850+
// Two segments, two indices, one manifest
851+
BOOST_REQUIRE_EQUAL(get_requests().size(), 5);
852+
853+
auto manifest = verify_manifest_request(*part);
854+
verify_segment_request("0-1-v1.log", manifest);
855+
verify_segment_request("1000-4-v1.log", manifest);
856+
857+
BOOST_REQUIRE(part->archival_meta_stm());
858+
const cloud_storage::partition_manifest& stm_manifest
859+
= part->archival_meta_stm()->manifest();
860+
verify_stm_manifest(stm_manifest, segments);
861+
862+
BOOST_REQUIRE_EQUAL(
863+
stm_manifest.get_last_uploaded_compacted_offset(), model::offset{});
864+
865+
// Mark both segments compacted, and re-upload. Both segments are
866+
// re-uploaded.
867+
reset_http_call_state();
868+
869+
vlog(test_log.info, "Waiting for segments to self-compact");
870+
auto seg = run_disk_log_housekeeping();
871+
vlog(test_log.info, "Self-compaction completed");
872+
873+
expected = archival::ntp_archiver::batch_result{{0, 0, 0}, {2, 0, 0}};
874+
upload_and_verify(archiver.value(), expected);
875+
BOOST_REQUIRE_EQUAL(get_requests().size(), 5);
876+
877+
BOOST_REQUIRE_EQUAL(
878+
stm_manifest.get_last_uploaded_compacted_offset(),
879+
seg->offsets().get_committed_offset());
880+
881+
auto replaced = stm_manifest.replaced_segments();
882+
BOOST_REQUIRE_EQUAL(replaced.size(), 2);
883+
BOOST_REQUIRE_EQUAL(replaced[0].base_offset, model::offset{0});
884+
BOOST_REQUIRE_EQUAL(replaced[1].base_offset, model::offset{1000});
885+
886+
// We can't reupload x-term so we should end up with two
887+
// compacted uploads.
888+
889+
{
890+
auto it = stm_manifest.get(model::offset(0));
891+
BOOST_REQUIRE_EQUAL(it->base_offset, model::offset(0));
892+
BOOST_REQUIRE_EQUAL(it->committed_offset, model::offset(999));
893+
BOOST_REQUIRE_EQUAL(it->segment_term, model::term_id(1));
894+
}
895+
896+
{
897+
auto it = stm_manifest.get(model::offset(1000));
898+
BOOST_REQUIRE_EQUAL(it->base_offset, model::offset(1000));
899+
BOOST_REQUIRE_EQUAL(it->committed_offset, model::offset(1009));
900+
BOOST_REQUIRE_EQUAL(it->segment_term, model::term_id(4));
901+
}
842902
}

0 commit comments

Comments
 (0)