Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit c5921df

Browse files
jaykoreanfacebook-github-bot
authored andcommittedMar 12, 2025·
Add PerKeyPlacement support (#13459)
Summary: This PR adds support for PerKeyPlacement in Remote Compaction. The `seqno_to_time_mapping` is already available from the table properties of the input files. `preserve_internal_time_seconds` and `preclude_last_level_data_seconds` are directly read from the OPTIONS file upon db open in the remote worker. The necessary changes include: - Add `is_penultimate_level_output` and `file_temperature` to the `CompactionServiceOutputFile` - When building the output for the remote compaction, get the outputs for penultimate level and last level separately, serialize them with the two additional information added in this PR. - When deserializing the result from the primary, SubcompactionState's `GetOutputs()` now takes `is_penultimate_level`. This allows us to determine which level to place the output file. - Include stats from `compaction_stats.penultimate_level_stats` in the remote compaction result # To Follow up - Stats to be fixed. Stats are not being populated correctly for PerKeyPlacement even for non-remote compactions. - Clean up / Reconcile the "penultimate" naming by replacing with "proximal" Pull Request resolved: #13459 Test Plan: Updated the unit test ``` ./compaction_service_test ``` Reviewed By: pdillinger Differential Revision: D71007211 Pulled By: jaykorean fbshipit-source-id: f926e56df17239875d849d46b8b940f8cd5f1825
1 parent 8e16f8f commit c5921df

7 files changed

+74
-38
lines changed
 

‎db/compaction/compaction_job.h

+8-2
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,9 @@ struct CompactionServiceOutputFile {
431431
bool marked_for_compaction;
432432
UniqueId64x2 unique_id{};
433433
TableProperties table_properties;
434+
// TODO: clean up the rest of the "penultimate" naming in the codebase
435+
bool is_proximal_level_output; // == is_penultimate_level_output
436+
Temperature file_temperature;
434437

435438
CompactionServiceOutputFile() = default;
436439
CompactionServiceOutputFile(
@@ -440,7 +443,8 @@ struct CompactionServiceOutputFile {
440443
uint64_t _epoch_number, const std::string& _file_checksum,
441444
const std::string& _file_checksum_func_name, uint64_t _paranoid_hash,
442445
bool _marked_for_compaction, UniqueId64x2 _unique_id,
443-
const TableProperties& _table_properties)
446+
const TableProperties& _table_properties, bool _is_proximal_level_output,
447+
Temperature _file_temperature)
444448
: file_name(name),
445449
smallest_seqno(smallest),
446450
largest_seqno(largest),
@@ -454,7 +458,9 @@ struct CompactionServiceOutputFile {
454458
paranoid_hash(_paranoid_hash),
455459
marked_for_compaction(_marked_for_compaction),
456460
unique_id(std::move(_unique_id)),
457-
table_properties(_table_properties) {}
461+
table_properties(_table_properties),
462+
is_proximal_level_output(_is_proximal_level_output),
463+
file_temperature(_file_temperature) {}
458464
};
459465

460466
// CompactionServiceResult contains the compaction result from a different db

‎db/compaction/compaction_job_test.cc

+4-1
Original file line numberDiff line numberDiff line change
@@ -1682,7 +1682,8 @@ TEST_F(CompactionJobTest, ResultSerialization) {
16821682
file_checksum /* file_checksum */,
16831683
file_checksum_func_name /* file_checksum_func_name */,
16841684
rnd64.Uniform(UINT64_MAX) /* paranoid_hash */,
1685-
rnd.OneIn(2) /* marked_for_compaction */, id /* unique_id */, tp);
1685+
rnd.OneIn(2) /* marked_for_compaction */, id /* unique_id */, tp,
1686+
false /* is_proximal_level_output */, Temperature::kHot);
16861687
}
16871688
result.output_level = rnd.Uniform(10);
16881689
result.output_path = rnd.RandomString(rnd.Uniform(kStrMaxLen));
@@ -1736,6 +1737,8 @@ TEST_F(CompactionJobTest, ResultSerialization) {
17361737
ASSERT_EQ(deserialized_tmp.output_files[0].file_checksum, file_checksum);
17371738
ASSERT_EQ(deserialized_tmp.output_files[0].file_checksum_func_name,
17381739
file_checksum_func_name);
1740+
ASSERT_EQ(deserialized_tmp.output_files[0].file_temperature,
1741+
Temperature::kHot);
17391742
}
17401743

17411744
// Test unknown field

‎db/compaction/compaction_outputs.h

+6-3
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,16 @@ class CompactionOutputs {
3030
// compaction output file
3131
struct Output {
3232
Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp,
33-
bool _enable_hash, bool _finished, uint64_t precalculated_hash)
33+
bool _enable_hash, bool _finished, uint64_t precalculated_hash,
34+
bool _is_penultimate_level)
3435
: meta(std::move(_meta)),
3536
validator(_icmp, _enable_hash, precalculated_hash),
36-
finished(_finished) {}
37+
finished(_finished),
38+
is_penultimate_level(_is_penultimate_level) {}
3739
FileMetaData meta;
3840
OutputValidator validator;
3941
bool finished;
42+
bool is_penultimate_level;
4043
std::shared_ptr<const TableProperties> table_properties;
4144
};
4245

@@ -52,7 +55,7 @@ class CompactionOutputs {
5255
bool enable_hash, bool finished = false,
5356
uint64_t precalculated_hash = 0) {
5457
outputs_.emplace_back(std::move(meta), icmp, enable_hash, finished,
55-
precalculated_hash);
58+
precalculated_hash, is_penultimate_level_);
5659
}
5760

5861
// Set new table builder for the current output

‎db/compaction/compaction_service_job.cc

+31-17
Original file line numberDiff line numberDiff line change
@@ -239,12 +239,15 @@ CompactionJob::ProcessKeyValueCompactionWithCompactionService(
239239
meta.file_checksum_func_name = file.file_checksum_func_name;
240240
meta.marked_for_compaction = file.marked_for_compaction;
241241
meta.unique_id = file.unique_id;
242+
meta.temperature = file.file_temperature;
242243

243244
auto cfd = compaction->column_family_data();
244-
sub_compact->Current().AddOutput(std::move(meta),
245-
cfd->internal_comparator(), false, true,
246-
file.paranoid_hash);
247-
sub_compact->Current().UpdateTableProperties(file.table_properties);
245+
CompactionOutputs* compaction_outputs =
246+
sub_compact->Outputs(file.is_proximal_level_output);
247+
assert(compaction_outputs);
248+
compaction_outputs->AddOutput(std::move(meta), cfd->internal_comparator(),
249+
false, true, file.paranoid_hash);
250+
compaction_outputs->UpdateTableProperties(file.table_properties);
248251
}
249252
sub_compact->compaction_job_stats = compaction_result.stats;
250253
sub_compact->Current().SetNumOutputRecords(
@@ -273,14 +276,12 @@ void CompactionServiceCompactionJob::RecordCompactionIOStats() {
273276

274277
void CompactionServiceCompactionJob::UpdateCompactionJobStats(
275278
const InternalStats::CompactionStats& stats) const {
276-
compaction_job_stats_->elapsed_micros = stats.micros;
277-
278279
// output information only in remote compaction
279-
compaction_job_stats_->total_output_bytes = stats.bytes_written;
280-
compaction_job_stats_->total_output_bytes_blob = stats.bytes_written_blob;
281-
compaction_job_stats_->num_output_records = stats.num_output_records;
282-
compaction_job_stats_->num_output_files = stats.num_output_files;
283-
compaction_job_stats_->num_output_files_blob = stats.num_output_files_blob;
280+
compaction_job_stats_->total_output_bytes += stats.bytes_written;
281+
compaction_job_stats_->total_output_bytes_blob += stats.bytes_written_blob;
282+
compaction_job_stats_->num_output_records += stats.num_output_records;
283+
compaction_job_stats_->num_output_files += stats.num_output_files;
284+
compaction_job_stats_->num_output_files_blob += stats.num_output_files_blob;
284285
}
285286

286287
CompactionServiceCompactionJob::CompactionServiceCompactionJob(
@@ -344,15 +345,15 @@ Status CompactionServiceCompactionJob::Run() {
344345

345346
ProcessKeyValueCompaction(sub_compact);
346347

347-
compaction_stats_.stats.micros =
348+
compaction_job_stats_->elapsed_micros =
348349
db_options_.clock->NowMicros() - start_micros;
349-
compaction_stats_.stats.cpu_micros =
350+
compaction_job_stats_->cpu_micros =
350351
sub_compact->compaction_job_stats.cpu_micros;
351352

352353
RecordTimeToHistogram(stats_, COMPACTION_TIME,
353-
compaction_stats_.stats.micros);
354+
compaction_job_stats_->elapsed_micros);
354355
RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
355-
compaction_stats_.stats.cpu_micros);
356+
compaction_job_stats_->cpu_micros);
356357

357358
Status status = sub_compact->status;
358359
IOStatus io_s = sub_compact->io_status;
@@ -390,6 +391,9 @@ Status CompactionServiceCompactionJob::Run() {
390391
// 2. Update the Output information in the Compaction Job Stats with
391392
// aggregated Internal Compaction Stats.
392393
UpdateCompactionJobStats(compaction_stats_.stats);
394+
if (compaction_stats_.has_penultimate_level_output) {
395+
UpdateCompactionJobStats(compaction_stats_.penultimate_level_stats);
396+
}
393397

394398
// 3. Set fields that are not propagated as part of aggregations above
395399
compaction_result_->stats.is_manual_compaction = c->is_manual_compaction();
@@ -413,7 +417,8 @@ Status CompactionServiceCompactionJob::Run() {
413417
meta.file_creation_time, meta.epoch_number, meta.file_checksum,
414418
meta.file_checksum_func_name, output_file.validator.GetHash(),
415419
meta.marked_for_compaction, meta.unique_id,
416-
*output_file.table_properties);
420+
*output_file.table_properties, output_file.is_penultimate_level,
421+
meta.temperature);
417422
}
418423
}
419424

@@ -585,7 +590,16 @@ static std::unordered_map<std::string, OptionTypeInfo>
585590
const auto this_one = static_cast<const TableProperties*>(addr1);
586591
const auto that_one = static_cast<const TableProperties*>(addr2);
587592
return this_one->AreEqual(opts, that_one, mismatch);
588-
}}}};
593+
}}},
594+
{"is_proximal_level_output",
595+
{offsetof(struct CompactionServiceOutputFile,
596+
is_proximal_level_output),
597+
OptionType::kBoolean, OptionVerificationType::kNormal,
598+
OptionTypeFlags::kNone}},
599+
{"file_temperature",
600+
{offsetof(struct CompactionServiceOutputFile, file_temperature),
601+
OptionType::kTemperature, OptionVerificationType::kNormal,
602+
OptionTypeFlags::kNone}}};
589603

590604
static std::unordered_map<std::string, OptionTypeInfo>
591605
compaction_job_stats_type_info = {

‎db/compaction/compaction_service_test.cc

+15-15
Original file line numberDiff line numberDiff line change
@@ -1188,34 +1188,34 @@ TEST_F(CompactionServiceTest, PrecludeLastLevel) {
11881188

11891189
for (int i = 0; i < kNumTrigger; i++) {
11901190
for (int j = 0; j < kNumKeys; j++) {
1191-
// FIXME: need to assign outputs to levels to allow overlapping ranges:
1192-
// ASSERT_OK(Put(Key(j * kNumTrigger + i), "v" + std::to_string(i)));
1193-
// instead of this (too easy):
1194-
ASSERT_OK(Put(Key(i * kNumKeys + j), "v" + std::to_string(i)));
1191+
ASSERT_OK(Put(Key(j * kNumTrigger + i), "v" + std::to_string(i)));
11951192
}
11961193
ASSERT_OK(Flush());
11971194
}
11981195
ASSERT_OK(dbfull()->TEST_WaitForCompact());
11991196

12001197
// Data split between penultimate (kUnknown) and last (kCold) levels
1201-
// FIXME: need to assign outputs to levels to get this:
1202-
// ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
1203-
// ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
1204-
// ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
1205-
// instead of this (WRONG but currently expected):
1206-
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
1207-
// Check manifest temperatures
1198+
ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel());
12081199
ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0);
1209-
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
1200+
ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0);
1201+
12101202
// TODO: Check FileSystem temperatures with FileTemperatureTestFS
12111203

12121204
for (int i = 0; i < kNumTrigger; i++) {
12131205
for (int j = 0; j < kNumKeys; j++) {
1214-
// FIXME
1215-
// ASSERT_EQ(Get(Key(j * kNumTrigger + i)), "v" + std::to_string(i));
1216-
ASSERT_EQ(Get(Key(i * kNumKeys + j)), "v" + std::to_string(i));
1206+
ASSERT_EQ(Get(Key(j * kNumTrigger + i)), "v" + std::to_string(i));
12171207
}
12181208
}
1209+
1210+
// Verify Output Stats
1211+
auto my_cs = GetCompactionService();
1212+
CompactionServiceResult result;
1213+
my_cs->GetResult(&result);
1214+
ASSERT_OK(result.status);
1215+
ASSERT_GT(result.stats.cpu_micros, 0);
1216+
ASSERT_GT(result.stats.elapsed_micros, 0);
1217+
ASSERT_EQ(result.stats.num_output_records, kNumTrigger * kNumKeys);
1218+
ASSERT_EQ(result.stats.num_output_files, 2);
12191219
}
12201220

12211221
TEST_F(CompactionServiceTest, ConcurrentCompaction) {

‎db/compaction/subcompaction_state.h

+9
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,15 @@ class SubcompactionState {
169169
return *current_outputs_;
170170
}
171171

172+
CompactionOutputs* Outputs(bool is_penultimate_level) {
173+
assert(compaction);
174+
if (is_penultimate_level) {
175+
assert(compaction->SupportsPerKeyPlacement());
176+
return &penultimate_level_outputs_;
177+
}
178+
return &compaction_outputs_;
179+
}
180+
172181
CompactionRangeDelAggregator* RangeDelAgg() const {
173182
return range_del_agg_.get();
174183
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added per-key-placement feature in Remote Compaction

0 commit comments

Comments
 (0)
Please sign in to comment.