Skip to content

Commit 1e9bf25

Browse files
Little-Wallacefacebook-github-bot
authored andcommitted
Do not hold mutex when write keys if not necessary (#7516)
Summary: ## Problem Summary RocksDB will acquire the global mutex of db instance for every time when user calls `Write`. When RocksDB schedules a lot of compaction jobs, it will compete the mutex with write thread and it will hurt the write performance. ## Problem Solution: I want to use log_write_mutex to replace the global mutex in most case so that we do not acquire it in write-thread unless there is a write-stall event or a write-buffer-full event occur. Pull Request resolved: #7516 Test Plan: 1. make check 2. CI 3. COMPILE_WITH_TSAN=1 make db_stress make crash_test make crash_test_with_multiops_wp_txn make crash_test_with_multiops_wc_txn make crash_test_with_atomic_flush Reviewed By: siying Differential Revision: D36908702 Pulled By: riversand963 fbshipit-source-id: 59b13881f4f5c0a58fd3ca79128a396d9cd98efe
1 parent a0c6308 commit 1e9bf25

17 files changed

+340
-208
lines changed

HISTORY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999

100100
### Behavior changes
101101
* DB::Open(), DB::OpenAsSecondary() will fail if a Logger cannot be created (#9984)
102+
* DB::Write does not hold global `mutex_` if this db instance does not need to switch wal and mem-table (#7516).
102103
* Removed support for reading Bloom filters using obsolete block-based filter format. (Support for writing such filters was dropped in 7.0.) For good read performance on old DBs using these filters, a full compaction is required.
103104
* Per KV checksum in write batch is verified before a write batch is written to WAL to detect any corruption to the write batch (#10114).
104105

db/db_compaction_test.cc

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5659,18 +5659,10 @@ TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
56595659
for (int j = 0; j != kNumKeysPerFile; ++j) {
56605660
ASSERT_OK(Put(Key(j), rnd.RandomString(990)));
56615661
}
5662-
if (0 == i) {
5663-
// When we reach here, the memtables have kNumKeysPerFile keys. Note that
5664-
// flush is not yet triggered. We need to write an extra key so that the
5665-
// write path will call PreprocessWrite and flush the previous key-value
5666-
// pairs to e flushed. After that, there will be the newest key in the
5667-
// memtable, and a bunch of L0 files. Since there is already one key in
5668-
// the memtable, then for i = 1, 2, ..., we do not have to write this
5669-
// extra key to trigger flush.
5670-
ASSERT_OK(Put("", ""));
5662+
if (i > 0) {
5663+
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
5664+
ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i);
56715665
}
5672-
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
5673-
ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i + 1);
56745666
}
56755667
// When we reach this point, there will be level0_stop_writes_trigger L0
56765668
// files and one extra key (99) in memory, which overlaps with the external

db/db_impl/db_impl.cc

Lines changed: 54 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
185185
log_dir_synced_(false),
186186
log_empty_(true),
187187
persist_stats_cf_handle_(nullptr),
188-
log_sync_cv_(&mutex_),
188+
log_sync_cv_(&log_write_mutex_),
189189
total_log_size_(0),
190190
is_snapshot_supported_(true),
191191
write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
@@ -273,6 +273,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
273273
mutable_db_options_.Dump(immutable_db_options_.info_log.get());
274274
DumpSupportInfo(immutable_db_options_.info_log.get());
275275

276+
max_total_wal_size_.store(mutable_db_options_.max_total_wal_size,
277+
std::memory_order_relaxed);
276278
if (write_buffer_manager_) {
277279
wbm_stall_.reset(new WBMStallInterface());
278280
}
@@ -625,26 +627,28 @@ Status DBImpl::CloseHelper() {
625627
job_context.Clean();
626628
mutex_.Lock();
627629
}
628-
629-
for (auto l : logs_to_free_) {
630-
delete l;
631-
}
632-
for (auto& log : logs_) {
633-
uint64_t log_number = log.writer->get_log_number();
634-
Status s = log.ClearWriter();
635-
if (!s.ok()) {
636-
ROCKS_LOG_WARN(
637-
immutable_db_options_.info_log,
638-
"Unable to Sync WAL file %s with error -- %s",
639-
LogFileName(immutable_db_options_.GetWalDir(), log_number).c_str(),
640-
s.ToString().c_str());
641-
// Retain the first error
642-
if (ret.ok()) {
643-
ret = s;
630+
{
631+
InstrumentedMutexLock lock(&log_write_mutex_);
632+
for (auto l : logs_to_free_) {
633+
delete l;
634+
}
635+
for (auto& log : logs_) {
636+
uint64_t log_number = log.writer->get_log_number();
637+
Status s = log.ClearWriter();
638+
if (!s.ok()) {
639+
ROCKS_LOG_WARN(
640+
immutable_db_options_.info_log,
641+
"Unable to Sync WAL file %s with error -- %s",
642+
LogFileName(immutable_db_options_.GetWalDir(), log_number).c_str(),
643+
s.ToString().c_str());
644+
// Retain the first error
645+
if (ret.ok()) {
646+
ret = s;
647+
}
644648
}
645649
}
650+
logs_.clear();
646651
}
647-
logs_.clear();
648652

649653
// Table cache may have table handles holding blocks from the block cache.
650654
// We need to release them before the block cache is destroyed. The block
@@ -1108,6 +1112,7 @@ Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
11081112
}
11091113

11101114
void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
1115+
mutex_.AssertHeld();
11111116
if (!job_context->logs_to_free.empty()) {
11121117
for (auto l : job_context->logs_to_free) {
11131118
AddToLogsToFreeQueue(l);
@@ -1285,6 +1290,11 @@ Status DBImpl::SetDBOptions(
12851290
new_options.stats_persist_period_sec);
12861291
mutex_.Lock();
12871292
}
1293+
if (new_options.max_total_wal_size !=
1294+
mutable_db_options_.max_total_wal_size) {
1295+
max_total_wal_size_.store(new_options.max_total_wal_size,
1296+
std::memory_order_release);
1297+
}
12881298
write_controller_.set_max_delayed_write_rate(
12891299
new_options.delayed_write_rate);
12901300
table_cache_.get()->SetCapacity(new_options.max_open_files == -1
@@ -1405,7 +1415,7 @@ Status DBImpl::SyncWAL() {
14051415
uint64_t current_log_number;
14061416

14071417
{
1408-
InstrumentedMutexLock l(&mutex_);
1418+
InstrumentedMutexLock l(&log_write_mutex_);
14091419
assert(!logs_.empty());
14101420

14111421
// This SyncWAL() call only cares about logs up to this number.
@@ -1462,19 +1472,37 @@ Status DBImpl::SyncWAL() {
14621472
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
14631473

14641474
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
1475+
VersionEdit synced_wals;
14651476
{
1466-
InstrumentedMutexLock l(&mutex_);
1477+
InstrumentedMutexLock l(&log_write_mutex_);
14671478
if (status.ok()) {
1468-
status = MarkLogsSynced(current_log_number, need_log_dir_sync);
1479+
MarkLogsSynced(current_log_number, need_log_dir_sync, &synced_wals);
14691480
} else {
14701481
MarkLogsNotSynced(current_log_number);
14711482
}
14721483
}
1484+
if (status.ok() && synced_wals.IsWalAddition()) {
1485+
InstrumentedMutexLock l(&mutex_);
1486+
status = ApplyWALToManifest(&synced_wals);
1487+
}
1488+
14731489
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
14741490

14751491
return status;
14761492
}
14771493

1494+
Status DBImpl::ApplyWALToManifest(VersionEdit* synced_wals) {
1495+
// not empty, write to MANIFEST.
1496+
mutex_.AssertHeld();
1497+
Status status =
1498+
versions_->LogAndApplyToDefaultColumnFamily(synced_wals, &mutex_);
1499+
if (!status.ok() && versions_->io_status().IsIOError()) {
1500+
status = error_handler_.SetBGError(versions_->io_status(),
1501+
BackgroundErrorReason::kManifestWrite);
1502+
}
1503+
return status;
1504+
}
1505+
14781506
Status DBImpl::LockWAL() {
14791507
log_write_mutex_.Lock();
14801508
auto cur_log_writer = logs_.back().writer;
@@ -1494,24 +1522,22 @@ Status DBImpl::UnlockWAL() {
14941522
return Status::OK();
14951523
}
14961524

1497-
Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
1498-
mutex_.AssertHeld();
1525+
void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
1526+
VersionEdit* synced_wals) {
1527+
log_write_mutex_.AssertHeld();
14991528
if (synced_dir && logfile_number_ == up_to) {
15001529
log_dir_synced_ = true;
15011530
}
1502-
VersionEdit synced_wals;
15031531
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
15041532
auto& wal = *it;
15051533
assert(wal.IsSyncing());
15061534

15071535
if (logs_.size() > 1) {
15081536
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
15091537
wal.GetPreSyncSize() > 0) {
1510-
synced_wals.AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
1538+
synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
15111539
}
15121540
logs_to_free_.push_back(wal.ReleaseWriter());
1513-
// To modify logs_ both mutex_ and log_write_mutex_ must be held
1514-
InstrumentedMutexLock l(&log_write_mutex_);
15151541
it = logs_.erase(it);
15161542
} else {
15171543
wal.FinishSync();
@@ -1520,22 +1546,11 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
15201546
}
15211547
assert(logs_.empty() || logs_[0].number > up_to ||
15221548
(logs_.size() == 1 && !logs_[0].IsSyncing()));
1523-
1524-
Status s;
1525-
if (synced_wals.IsWalAddition()) {
1526-
// not empty, write to MANIFEST.
1527-
s = versions_->LogAndApplyToDefaultColumnFamily(&synced_wals, &mutex_);
1528-
if (!s.ok() && versions_->io_status().IsIOError()) {
1529-
s = error_handler_.SetBGError(versions_->io_status(),
1530-
BackgroundErrorReason::kManifestWrite);
1531-
}
1532-
}
15331549
log_sync_cv_.SignalAll();
1534-
return s;
15351550
}
15361551

15371552
void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
1538-
mutex_.AssertHeld();
1553+
log_write_mutex_.AssertHeld();
15391554
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
15401555
++it) {
15411556
auto& wal = *it;

0 commit comments

Comments
 (0)