Skip to content

Commit 288aa73

Browse files
authored
[BugFix] support lazy delta column compact for size tiered compaction in pk table to reduce cost (StarRocks#61930)
Signed-off-by: luohaha <[email protected]>
1 parent 6172f84 commit 288aa73

File tree

2 files changed

+105
-37
lines changed

2 files changed

+105
-37
lines changed

be/src/storage/tablet_updates.cpp

Lines changed: 59 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2181,7 +2181,9 @@ Status TabletUpdates::_commit_compaction(std::unique_ptr<CompactionInfo>* pinfo,
21812181
EditVersionMetaPB edit;
21822182
auto lastv = _edit_version_infos.back().get();
21832183
// handle conflict between column mode partial update
2184-
RETURN_IF_ERROR(_check_conflict_with_partial_update((*pinfo).get()));
2184+
if (rowset->num_rows() > 0) {
2185+
RETURN_IF_ERROR(_check_conflict_with_partial_update((*pinfo).get()));
2186+
}
21852187
auto edit_version_pb = edit.mutable_version();
21862188
edit_version_pb->set_major_number(lastv->version.major_number());
21872189
edit_version_pb->set_minor_number(lastv->version.minor_number() + 1);
@@ -3080,6 +3082,7 @@ Status TabletUpdates::compaction_for_size_tiered(MemTracker* mem_tracker) {
30803082

30813083
size_t total_valid_rowsets = 0;
30823084
size_t total_valid_segments = 0;
3085+
bool has_partial_update_by_column = false;
30833086
// level -1 keep empty rowsets and have no IO overhead, so we can merge them with any level
30843087
std::map<int, vector<CompactionEntry>> candidates_by_level;
30853088
{
@@ -3106,6 +3109,7 @@ Status TabletUpdates::compaction_for_size_tiered(MemTracker* mem_tracker) {
31063109
e.num_dels = stat.num_dels;
31073110
e.bytes = stat.byte_size;
31083111
e.num_segments = stat.num_segments;
3112+
has_partial_update_by_column |= stat.partial_update_by_column;
31093113
}
31103114
}
31113115
}
@@ -3116,7 +3120,21 @@ Status TabletUpdates::compaction_for_size_tiered(MemTracker* mem_tracker) {
31163120
int64_t max_score = 0;
31173121
for (auto& [level, candidates] : candidates_by_level) {
31183122
if (level == -1) {
3119-
continue;
3123+
// When we enable lazy delta column compaction, which means that we don't want to merge
3124+
// delta column back to main segment file too soon, for save compaction IO cost.
3125+
// Separate delta column won't affect query performance.
3126+
// check if there is rowset with column update and more than 1, trigger lazy compaction strategy.
3127+
if (has_partial_update_by_column && candidates.size() > 1 && config::enable_lazy_delta_column_compaction) {
3128+
for (auto& e : candidates) {
3129+
info->inputs.emplace_back(e.rowsetid);
3130+
}
3131+
VLOG(1) << "trigger lazy compaction strategy for tablet:" << _tablet.tablet_id()
3132+
<< " because of column update rowset count:" << candidates.size();
3133+
// only merge empty rowsets, so no need to consider other level
3134+
break;
3135+
} else {
3136+
continue;
3137+
}
31203138
}
31213139
int64_t total_segments = 0;
31223140
int64_t del_rows = 0;
@@ -3135,46 +3153,50 @@ Status TabletUpdates::compaction_for_size_tiered(MemTracker* mem_tracker) {
31353153
int64_t total_merged_segments = 0;
31363154
RowsetStats stat;
31373155
std::set<int32_t> compaction_level_candidate;
3138-
max_score = 0;
3139-
do {
3140-
auto iter = candidates_by_level.find(compaction_level);
3141-
if (iter == candidates_by_level.end()) {
3142-
break;
3143-
}
3144-
for (auto& e : iter->second) {
3145-
size_t new_rows = stat.num_rows + e.num_rows - e.num_dels;
3146-
size_t new_bytes = stat.byte_size;
3147-
if (e.num_rows != 0) {
3148-
new_bytes += e.bytes * (e.num_rows - e.num_dels) / e.num_rows;
3149-
}
3150-
if ((stat.byte_size > 0 && new_bytes > config::update_compaction_result_bytes * 2) ||
3151-
info->inputs.size() >= config::max_update_compaction_num_singleton_deltas) {
3156+
3157+
if (info->inputs.empty()) {
3158+
// no trigger lazy compaction strategy, try to merge level by level
3159+
max_score = 0;
3160+
do {
3161+
auto iter = candidates_by_level.find(compaction_level);
3162+
if (iter == candidates_by_level.end()) {
31523163
break;
31533164
}
3154-
max_score += e.score_per_row * (e.num_rows - e.num_dels);
3155-
info->inputs.emplace_back(e.rowsetid);
3156-
stat.num_rows = new_rows;
3157-
stat.byte_size = new_bytes;
3158-
total_rows += e.num_rows;
3159-
total_bytes += e.bytes;
3160-
total_merged_segments += e.num_segments;
3161-
}
3162-
compaction_level_candidate.insert(compaction_level);
3163-
compaction_level = _calc_compaction_level(&stat);
3164-
stat.num_segments = stat.byte_size > 0 ? (stat.byte_size - 1) / config::max_segment_file_size + 1 : 0;
3165-
_calc_compaction_score(&stat);
3166-
} while (stat.byte_size <= config::update_compaction_result_bytes * 2 &&
3167-
info->inputs.size() < config::max_update_compaction_num_singleton_deltas &&
3168-
compaction_level_candidate.find(compaction_level) == compaction_level_candidate.end() &&
3169-
candidates_by_level.find(compaction_level) != candidates_by_level.end() && stat.compaction_score > 0);
3170-
3171-
if (compaction_level_candidate.find(-1) == compaction_level_candidate.end()) {
3172-
if (candidates_by_level[-1].size() > 0) {
3173-
for (auto& e : candidates_by_level[-1]) {
3165+
for (auto& e : iter->second) {
3166+
size_t new_rows = stat.num_rows + e.num_rows - e.num_dels;
3167+
size_t new_bytes = stat.byte_size;
3168+
if (e.num_rows != 0) {
3169+
new_bytes += e.bytes * (e.num_rows - e.num_dels) / e.num_rows;
3170+
}
3171+
if ((stat.byte_size > 0 && new_bytes > config::update_compaction_result_bytes * 2) ||
3172+
info->inputs.size() >= config::max_update_compaction_num_singleton_deltas) {
3173+
break;
3174+
}
3175+
max_score += e.score_per_row * (e.num_rows - e.num_dels);
31743176
info->inputs.emplace_back(e.rowsetid);
3177+
stat.num_rows = new_rows;
3178+
stat.byte_size = new_bytes;
3179+
total_rows += e.num_rows;
3180+
total_bytes += e.bytes;
31753181
total_merged_segments += e.num_segments;
31763182
}
3177-
compaction_level_candidate.insert(-1);
3183+
compaction_level_candidate.insert(compaction_level);
3184+
compaction_level = _calc_compaction_level(&stat);
3185+
stat.num_segments = stat.byte_size > 0 ? (stat.byte_size - 1) / config::max_segment_file_size + 1 : 0;
3186+
_calc_compaction_score(&stat);
3187+
} while (stat.byte_size <= config::update_compaction_result_bytes * 2 &&
3188+
info->inputs.size() < config::max_update_compaction_num_singleton_deltas &&
3189+
compaction_level_candidate.find(compaction_level) == compaction_level_candidate.end() &&
3190+
candidates_by_level.find(compaction_level) != candidates_by_level.end() && stat.compaction_score > 0);
3191+
3192+
if (compaction_level_candidate.find(-1) == compaction_level_candidate.end()) {
3193+
if (candidates_by_level[-1].size() > 0) {
3194+
for (auto& e : candidates_by_level[-1]) {
3195+
info->inputs.emplace_back(e.rowsetid);
3196+
total_merged_segments += e.num_segments;
3197+
}
3198+
compaction_level_candidate.insert(-1);
3199+
}
31783200
}
31793201
}
31803202

be/test/storage/rowset_column_partial_update_test.cpp

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1444,6 +1444,52 @@ TEST_P(RowsetColumnPartialUpdateTest, test_dcg_file_size) {
14441444
ASSERT_GT(dcg_file_size, 0) << "dcg file size should be greater than 0";
14451445
}
14461446

1447+
TEST_P(RowsetColumnPartialUpdateTest, partial_update_with_size_tier_compaction) {
1448+
const int N = 100;
1449+
auto tablet = create_tablet(rand(), rand());
1450+
ASSERT_EQ(1, tablet->updates()->version_history_count());
1451+
1452+
// create full rowsets first
1453+
std::vector<int64_t> keys(N);
1454+
for (int i = 0; i < N; i++) {
1455+
keys[i] = i;
1456+
}
1457+
std::vector<RowsetSharedPtr> rowsets;
1458+
rowsets.emplace_back(create_rowset(tablet, keys));
1459+
int64_t version = 1;
1460+
commit_rowsets(tablet, rowsets, version);
1461+
// check data
1462+
ASSERT_TRUE(check_tablet(tablet, version, N, [](int64_t k1, int64_t v1, int32_t v2, int32_t v3) {
1463+
return (int16_t)(k1 % 100 + 1) == v1 && (int32_t)(k1 % 1000 + 2) == v2;
1464+
}));
1465+
1466+
std::vector<int32_t> column_indexes = {0, 1};
1467+
auto v1_func = [](int64_t k1) { return (int16_t)(k1 % 100 + 3); };
1468+
auto v2_func = [](int64_t k1) { return (int32_t)(k1 % 1000 + 4); };
1469+
std::shared_ptr<TabletSchema> partial_schema = TabletSchema::create(tablet->tablet_schema(), column_indexes);
1470+
for (int i = 0; i < 10; i++) {
1471+
// create partial rowset
1472+
RowsetSharedPtr partial_rowset =
1473+
create_partial_rowset(tablet, keys, column_indexes, v1_func, v2_func, partial_schema, 1);
1474+
// commit partial update
1475+
auto st = tablet->rowset_commit(++version, partial_rowset, 10000);
1476+
ASSERT_TRUE(st.ok()) << st.to_string();
1477+
}
1478+
// check data
1479+
ASSERT_TRUE(check_tablet(tablet, version, N, [](int64_t k1, int64_t v1, int32_t v2, int32_t v3) {
1480+
return (int16_t)(k1 % 100 + 3) == v1 && (int32_t)(k1 % 1000 + 2) == v2;
1481+
}));
1482+
// trigger size tiered compaction
1483+
config::enable_pk_size_tiered_compaction_strategy = true;
1484+
ASSERT_TRUE(tablet->updates()->compaction(_compaction_mem_tracker.get()).ok());
1485+
// check data
1486+
ASSERT_TRUE(check_tablet(tablet, version, N, [](int64_t k1, int64_t v1, int32_t v2, int32_t v3) {
1487+
return (int16_t)(k1 % 100 + 3) == v1 && (int32_t)(k1 % 1000 + 2) == v2;
1488+
}));
1489+
// there will be two rowsets
1490+
ASSERT_TRUE(tablet->updates()->num_rowsets() == 2);
1491+
}
1492+
14471493
INSTANTIATE_TEST_SUITE_P(RowsetColumnPartialUpdateTest, RowsetColumnPartialUpdateTest,
14481494
::testing::Values(RowsetColumnPartialUpdateParam{1, false},
14491495
RowsetColumnPartialUpdateParam{1024, true},

0 commit comments

Comments
 (0)