Skip to content

Commit

Permalink
DisableManualCompaction may fail to cancel an unscheduled task (#9659)
Browse files Browse the repository at this point in the history
Summary:
#9625 didn't change the unschedule condition which was waiting for the background thread to clean-up the compaction.
make sure we only unschedule the task when it's scheduled.

Pull Request resolved: #9659

Reviewed By: ajkr

Differential Revision: D34651820

Pulled By: jay-zhuang

fbshipit-source-id: 23f42081b15ec8886cd81cbf131b116e0c74dc2f
  • Loading branch information
jay-zhuang authored and ajkr committed Mar 13, 2022
1 parent 22e011f commit 9fe3a53
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 66 deletions.
3 changes: 3 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Rocksdb Change Log
## Unreleased
* Fixed a bug that DisableManualCompaction may assert when disable an unscheduled manual compaction.

## 7.0.1 (03/02/2022)
### Bug Fixes
* Fix a race condition when cancel manual compaction with `DisableManualCompaction`. Also DB close can cancel the manual compaction thread.
Expand Down
1 change: 0 additions & 1 deletion db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,6 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
GetL0ThresholdSpeedupCompaction(
mutable_cf_options.level0_file_num_compaction_trigger,
mutable_cf_options.level0_slowdown_writes_trigger)) {
fprintf(stdout, "JJJ2\n");
write_controller_token_ =
write_controller->GetCompactionPressureToken();
ROCKS_LOG_INFO(
Expand Down
1 change: 0 additions & 1 deletion db/compaction/compaction_picker_universal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,6 @@ Compaction* UniversalCompactionBuilder::PickCompaction() {
const int kLevel0 = 0;
score_ = vstorage_->CompactionScore(kLevel0);
sorted_runs_ = CalculateSortedRuns(*vstorage_);
fprintf(stdout, "JJJ1\n");

if (sorted_runs_.size() == 0 ||
(vstorage_->FilesMarkedForPeriodicCompaction().empty() &&
Expand Down
106 changes: 106 additions & 0 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6889,6 +6889,112 @@ TEST_F(DBCompactionTest, FIFOWarm) {
Destroy(options);
}

TEST_F(DBCompactionTest, DisableMultiManualCompaction) {
const int kNumL0Files = 10;

Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = kNumL0Files;
Reopen(options);

// Generate 2 levels of file to make sure the manual compaction is not skipped
for (int i = 0; i < 10; i++) {
ASSERT_OK(Put(Key(i), "value"));
if (i % 2) {
ASSERT_OK(Flush());
}
}
MoveFilesToLevel(2);

for (int i = 0; i < 10; i++) {
ASSERT_OK(Put(Key(i), "value"));
if (i % 2) {
ASSERT_OK(Flush());
}
}
MoveFilesToLevel(1);

// Block compaction queue
test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);

port::Thread compact_thread1([&]() {
CompactRangeOptions cro;
cro.exclusive_manual_compaction = false;
std::string begin_str = Key(0);
std::string end_str = Key(3);
Slice b = begin_str;
Slice e = end_str;
auto s = db_->CompactRange(cro, &b, &e);
ASSERT_TRUE(s.IsIncomplete());
});

port::Thread compact_thread2([&]() {
CompactRangeOptions cro;
cro.exclusive_manual_compaction = false;
std::string begin_str = Key(4);
std::string end_str = Key(7);
Slice b = begin_str;
Slice e = end_str;
auto s = db_->CompactRange(cro, &b, &e);
ASSERT_TRUE(s.IsIncomplete());
});

// Disable manual compaction should cancel both manual compactions and both
// compaction should return incomplete.
db_->DisableManualCompaction();

compact_thread1.join();
compact_thread2.join();

sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
}

TEST_F(DBCompactionTest, DisableJustStartedManualCompaction) {
const int kNumL0Files = 4;

Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = kNumL0Files;
Reopen(options);

// generate files, but avoid trigger auto compaction
for (int i = 0; i < kNumL0Files / 2; i++) {
ASSERT_OK(Put(Key(1), "value1"));
ASSERT_OK(Put(Key(2), "value2"));
ASSERT_OK(Flush());
}

// make sure the manual compaction background is started but not yet set the
// status to in_progress, then cancel the manual compaction, which should not
// result in segfault
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BGWorkCompaction",
"DBCompactionTest::DisableJustStartedManualCompaction:"
"PreDisableManualCompaction"},
{"DBCompactionTest::DisableJustStartedManualCompaction:"
"ManualCompactionReturn",
"BackgroundCallCompaction:0"}});
SyncPoint::GetInstance()->EnableProcessing();

port::Thread compact_thread([&]() {
CompactRangeOptions cro;
cro.exclusive_manual_compaction = true;
auto s = db_->CompactRange(cro, nullptr, nullptr);
ASSERT_TRUE(s.IsIncomplete());
TEST_SYNC_POINT(
"DBCompactionTest::DisableJustStartedManualCompaction:"
"ManualCompactionReturn");
});
TEST_SYNC_POINT(
"DBCompactionTest::DisableJustStartedManualCompaction:"
"PreDisableManualCompaction");
db_->DisableManualCompaction();

compact_thread.join();
}

TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) {
const int kNumL0Files = 4;

Expand Down
31 changes: 22 additions & 9 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1514,19 +1514,31 @@ class DBImpl : public DB {

// Information for a manual compaction
struct ManualCompactionState {
ManualCompactionState(ColumnFamilyData* _cfd, int _input_level,
int _output_level, uint32_t _output_path_id,
bool _exclusive, bool _disallow_trivial_move,
std::atomic<bool>* _canceled)
: cfd(_cfd),
input_level(_input_level),
output_level(_output_level),
output_path_id(_output_path_id),
exclusive(_exclusive),
disallow_trivial_move(_disallow_trivial_move),
canceled(_canceled) {}

ColumnFamilyData* cfd;
int input_level;
int output_level;
uint32_t output_path_id;
Status status;
bool done;
bool in_progress; // compaction request being processed?
bool incomplete; // only part of requested range compacted
bool exclusive; // current behavior of only one manual
bool disallow_trivial_move; // Force actual compaction to run
const InternalKey* begin; // nullptr means beginning of key range
const InternalKey* end; // nullptr means end of key range
InternalKey* manual_end; // how far we are compacting
bool done = false;
bool in_progress = false; // compaction request being processed?
bool incomplete = false; // only part of requested range compacted
bool exclusive; // current behavior of only one manual
bool disallow_trivial_move; // Force actual compaction to run
const InternalKey* begin = nullptr; // nullptr means beginning of key range
const InternalKey* end = nullptr; // nullptr means end of key range
InternalKey* manual_end = nullptr; // how far we are compacting
InternalKey tmp_storage; // Used to keep track of compaction progress
InternalKey tmp_storage1; // Used to keep track of compaction progress
std::atomic<bool>* canceled; // Compaction canceled by the user?
Expand All @@ -1536,7 +1548,8 @@ class DBImpl : public DB {
Compaction* compaction;
// caller retains ownership of `manual_compaction_state` as it is reused
// across background compactions.
ManualCompactionState* manual_compaction_state; // nullptr if non-manual
std::shared_ptr<ManualCompactionState>
manual_compaction_state; // nullptr if non-manual
// task limiter token is requested during compaction picking.
std::unique_ptr<TaskLimiterToken> task_token;
};
Expand Down
Loading

0 comments on commit 9fe3a53

Please sign in to comment.