Skip to content

Commit 10ac170

Browse files
[BugFix] avoid publish compaction crash when input rowset not found (backport #67154) (#67226)
Signed-off-by: luohaha <[email protected]> Co-authored-by: Yixin Luo <[email protected]>
1 parent 0a931d4 commit 10ac170

File tree

2 files changed

+60
-0
lines changed

2 files changed

+60
-0
lines changed

be/src/storage/lake/update_manager.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -948,6 +948,11 @@ Status UpdateManager::publish_primary_compaction(const TxnLogPB_OpCompaction& op
948948
// then get the max src rssid, to solve conflict between write and compaction
949949
auto input_rowset = std::find_if(metadata.rowsets().begin(), metadata.rowsets().end(),
950950
[&](const RowsetMetadata& r) { return r.id() == max_rowset_id; });
951+
if (input_rowset == metadata.rowsets().end()) {
952+
LOG(ERROR) << "cannot find input rowset in tablet metadata, rowset_id: " << max_rowset_id
953+
<< ", meta : " << metadata.ShortDebugString();
954+
return Status::InternalError("cannot find input rowset in tablet metadata");
955+
}
951956
uint32_t max_src_rssid = max_rowset_id + input_rowset->segments_size() - 1;
952957
std::map<uint32_t, size_t> segment_id_to_add_dels;
953958

be/test/storage/lake/primary_key_compaction_task_test.cpp

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1562,6 +1562,61 @@ TEST_P(LakePrimaryKeyCompactionTest, test_concurrent_compaction_and_publish) {
15621562
ASSERT_EQ(kChunkSize, read(version));
15631563
}
15641564

1565+
TEST_P(LakePrimaryKeyCompactionTest, test_publish_compaction_with_invalid_rowset_id) {
1566+
auto chunk0 = generate_data(kChunkSize, 0);
1567+
auto indexes = std::vector<uint32_t>(kChunkSize);
1568+
for (int i = 0; i < kChunkSize; i++) {
1569+
indexes[i] = i;
1570+
}
1571+
1572+
auto version = 1;
1573+
auto tablet_id = _tablet_metadata->id();
1574+
1575+
for (int i = 0; i < 3; i++) {
1576+
auto txn_id = next_id();
1577+
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
1578+
.set_tablet_manager(_tablet_mgr.get())
1579+
.set_tablet_id(tablet_id)
1580+
.set_txn_id(txn_id)
1581+
.set_partition_id(_partition_id)
1582+
.set_mem_tracker(_mem_tracker.get())
1583+
.set_schema_id(_tablet_schema->id())
1584+
.set_profile(&_dummy_runtime_profile)
1585+
.build());
1586+
ASSERT_OK(delta_writer->open());
1587+
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
1588+
ASSERT_OK(delta_writer->finish_with_txnlog());
1589+
delta_writer->close();
1590+
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
1591+
version++;
1592+
}
1593+
1594+
ASSERT_EQ(kChunkSize, read(version));
1595+
ASSIGN_OR_ABORT(auto metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
1596+
EXPECT_EQ(metadata->rowsets_size(), 3);
1597+
1598+
auto txn_id = next_id();
1599+
TxnLogPB txn_log;
1600+
txn_log.set_tablet_id(tablet_id);
1601+
txn_log.set_txn_id(txn_id);
1602+
auto* op_compaction = txn_log.mutable_op_compaction();
1603+
1604+
op_compaction->add_input_rowsets(0);
1605+
op_compaction->add_input_rowsets(1);
1606+
op_compaction->add_input_rowsets(999);
1607+
1608+
auto* output_rowset = op_compaction->mutable_output_rowset();
1609+
output_rowset->set_num_rows(kChunkSize);
1610+
output_rowset->set_data_size(1024);
1611+
output_rowset->add_segments("fake_segment.dat");
1612+
output_rowset->add_segment_size(1024);
1613+
1614+
ASSERT_OK(_tablet_mgr->put_txn_log(txn_log));
1615+
1616+
auto res = publish_single_version(tablet_id, version + 1, txn_id);
1617+
EXPECT_TRUE(res.status().is_internal_error());
1618+
}
1619+
15651620
INSTANTIATE_TEST_SUITE_P(
15661621
LakePrimaryKeyCompactionTest, LakePrimaryKeyCompactionTest,
15671622
::testing::Values(CompactionParam{HORIZONTAL_COMPACTION, 5, false},

0 commit comments

Comments
 (0)