Skip to content

Commit fbfb190

Browse files
committed
opt log and apply mutex duration
Signed-off-by: Connor1996 <zbk602423539@gmail.com>
1 parent 2b04d05 commit fbfb190

File tree

8 files changed

+270
-77
lines changed

8 files changed

+270
-77
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,7 @@ set(SOURCES
755755
db/version_edit.cc
756756
db/version_edit_handler.cc
757757
db/version_set.cc
758+
db/version_set_deletion_scheduler.cc
758759
db/wal_edit.cc
759760
db/wal_manager.cc
760761
db/wide/wide_column_serialization.cc

TARGETS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
9999
"db/version_edit.cc",
100100
"db/version_edit_handler.cc",
101101
"db/version_set.cc",
102+
"db/version_set_deletion_scheduler.cc",
102103
"db/wal_edit.cc",
103104
"db/wal_manager.cc",
104105
"db/wide/wide_column_serialization.cc",

db/listener_test.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1585,6 +1585,7 @@ TEST_F(EventListenerTest, BlobDBFileTest) {
15851585
// delete the oldest blob file and create new blob file during compaction.
15861586
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
15871587
ASSERT_OK(dbfull()->TEST_WaitForCompact());
1588+
db_->Close();
15881589

15891590
blob_event_listener->CheckCounters();
15901591
}

db/version_set.cc

Lines changed: 93 additions & 71 deletions
Large diffs are not rendered by default.

db/version_set.h

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
#include "db/table_cache.h"
4747
#include "db/version_builder.h"
4848
#include "db/version_edit.h"
49+
#include "db/version_set_deletion_scheduler.h"
4950
#include "db/write_controller.h"
5051
#include "env/file_system_tracer.h"
5152
#if USE_COROUTINES
@@ -994,7 +995,7 @@ class Version {
994995
std::shared_ptr<const TableProperties>* tp, int level = -1);
995996

996997
uint64_t GetEstimatedActiveKeys() {
997-
return storage_info_.GetEstimatedActiveKeys();
998+
return storage_info_->GetEstimatedActiveKeys();
998999
}
9991000

10001001
size_t GetMemoryUsageByTableReaders(const ReadOptions& read_options);
@@ -1006,8 +1007,8 @@ class Version {
10061007

10071008
int TEST_refs() const { return refs_; }
10081009

1009-
VersionStorageInfo* storage_info() { return &storage_info_; }
1010-
const VersionStorageInfo* storage_info() const { return &storage_info_; }
1010+
VersionStorageInfo* storage_info() { return storage_info_; }
1011+
const VersionStorageInfo* storage_info() const { return storage_info_; }
10111012

10121013
VersionSet* version_set() { return vset_; }
10131014

@@ -1038,10 +1039,10 @@ class Version {
10381039
friend class VersionEditHandlerPointInTime;
10391040

10401041
const InternalKeyComparator* internal_comparator() const {
1041-
return storage_info_.internal_comparator_;
1042+
return storage_info_->internal_comparator_;
10421043
}
10431044
const Comparator* user_comparator() const {
1044-
return storage_info_.user_comparator_;
1045+
return storage_info_->user_comparator_;
10451046
}
10461047

10471048
// Returns true if the filter blocks in the specified level will not be
@@ -1097,7 +1098,7 @@ class Version {
10971098
BlobSource* blob_source_;
10981099
const MergeOperator* merge_operator_;
10991100

1100-
VersionStorageInfo storage_info_;
1101+
VersionStorageInfo* storage_info_;
11011102
VersionSet* vset_; // VersionSet to which this Version belongs
11021103
Version* next_; // Next version in linked list
11031104
Version* prev_; // Previous version in linked list
@@ -1678,6 +1679,8 @@ class VersionSet {
16781679

16791680
// Pointer to the DB's ErrorHandler.
16801681
ErrorHandler* const error_handler_;
1682+
// Dedicated background thread scheduler for storage deletion operations
1683+
std::unique_ptr<VersionSetDeletionScheduler> deletion_scheduler_;
16811684

16821685
private:
16831686
// REQUIRES db mutex at beginning. may release and re-acquire db mutex
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2+
// This source code is licensed under both the GPLv2 (found in the
3+
// COPYING file in the root directory) and Apache 2.0 License
4+
// (found in the LICENSE.Apache file in the root directory).
5+
6+
#include "db/version_set_deletion_scheduler.h"
7+
8+
#include <memory>
9+
10+
#include "db/version_set.h"
11+
#include "logging/logging.h"
12+
#include "port/port.h"
13+
#include "rocksdb/env.h"
14+
#include "util/mutexlock.h"
15+
16+
namespace ROCKSDB_NAMESPACE {
17+
18+
VersionSetDeletionScheduler::VersionSetDeletionScheduler(Logger* info_log)
19+
: version_delete_mutex_(),
20+
cv_(&version_delete_mutex_),
21+
deletion_queue_(),
22+
bg_thread_(nullptr),
23+
shutting_down_(false),
24+
pending_deletion_count_(0),
25+
info_log_(info_log) {
26+
// Start the background thread
27+
bg_thread_.reset(new port::Thread(
28+
&VersionSetDeletionScheduler::BackgroundDeletionThread, this));
29+
30+
ROCKS_LOG_INFO(info_log_,
31+
"VersionSetDeletionScheduler: Created dedicated background "
32+
"thread for storage deletion");
33+
}
34+
35+
VersionSetDeletionScheduler::~VersionSetDeletionScheduler() { Shutdown(); }
36+
37+
void VersionSetDeletionScheduler::ScheduleDeletion(
38+
VersionStorageInfo* storage_info) {
39+
MutexLock lock(&version_delete_mutex_);
40+
41+
if (shutting_down_) {
42+
return;
43+
}
44+
45+
deletion_queue_.push(storage_info);
46+
pending_deletion_count_++;
47+
48+
// Notify the background thread that work is available
49+
cv_.Signal();
50+
}
51+
52+
void VersionSetDeletionScheduler::Shutdown() {
53+
{
54+
MutexLock lock(&version_delete_mutex_);
55+
if (shutting_down_) {
56+
return; // Already shutting down
57+
}
58+
shutting_down_ = true;
59+
cv_.Signal(); // Wake up the background thread
60+
}
61+
62+
// Wait for the background thread to finish
63+
if (bg_thread_ != nullptr) {
64+
bg_thread_->join();
65+
bg_thread_.reset();
66+
}
67+
}
68+
69+
void VersionSetDeletionScheduler::BackgroundDeletionThread() {
70+
version_delete_mutex_.Lock();
71+
72+
while (!shutting_down_) {
73+
// Wait for work or shutdown signal
74+
while (deletion_queue_.empty() && !shutting_down_) {
75+
cv_.Wait();
76+
}
77+
78+
// Process all pending deletion operations
79+
while (!deletion_queue_.empty() && !shutting_down_) {
80+
VersionStorageInfo* storage_info = deletion_queue_.front();
81+
deletion_queue_.pop();
82+
pending_deletion_count_--;
83+
84+
// Unlock mutex while executing the deletion operation
85+
version_delete_mutex_.Unlock();
86+
delete storage_info;
87+
88+
// Re-acquire the mutex for the next iteration
89+
version_delete_mutex_.Lock();
90+
}
91+
}
92+
93+
version_delete_mutex_.Unlock();
94+
}
95+
96+
} // namespace ROCKSDB_NAMESPACE
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2+
// This source code is licensed under both the GPLv2 (found in the
3+
// COPYING file in the root directory) and Apache 2.0 License
4+
// (found in the LICENSE.Apache file in the root directory).
5+
6+
#pragma once
7+
8+
#include <memory>
9+
#include <queue>
10+
11+
#include "port/port.h"
12+
#include "rocksdb/rocksdb_namespace.h"
13+
14+
namespace ROCKSDB_NAMESPACE {
15+
16+
class VersionSet;
17+
class VersionStorageInfo;
18+
class Logger;
19+
20+
// VersionSetDeletionScheduler provides a dedicated background thread for
21+
// handling VersionStorageInfo deletion operations that were previously executed
22+
// via env_->Schedule() in the LOW priority thread pool.
23+
//
24+
// This dedicated thread helps to:
25+
// 1. Isolate version storage deletion operations from other background work
26+
// 2. Avoid potential interference with other low-priority operations
27+
// 3. Provide better control over deletion timing and parallelism
28+
//
29+
class VersionSetDeletionScheduler {
30+
public:
31+
explicit VersionSetDeletionScheduler(Logger* info_log);
32+
~VersionSetDeletionScheduler();
33+
34+
// Schedule a VersionStorageInfo deletion to be executed in the background
35+
// thread
36+
void ScheduleDeletion(VersionStorageInfo* storage_info);
37+
38+
// Wait for all pending operations to complete and shutdown the background
39+
// thread
40+
void Shutdown();
41+
42+
private:
43+
// Entry point for the background thread
44+
void BackgroundDeletionThread();
45+
46+
// Mutex to protect internal state
47+
mutable port::Mutex version_delete_mutex_;
48+
49+
// Condition variable for thread coordination
50+
port::CondVar cv_;
51+
52+
// Queue of deletion operations
53+
std::queue<VersionStorageInfo*> deletion_queue_;
54+
55+
// Background thread for executing deletion operations
56+
std::unique_ptr<port::Thread> bg_thread_;
57+
58+
// Flag to indicate shutdown
59+
bool shutting_down_;
60+
61+
// Number of pending deletion operations
62+
int pending_deletion_count_;
63+
64+
// Logger for debug/info messages
65+
Logger* info_log_;
66+
};
67+
68+
} // namespace ROCKSDB_NAMESPACE

src.mk

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ LIB_SOURCES = \
9292
db/version_edit.cc \
9393
db/version_edit_handler.cc \
9494
db/version_set.cc \
95+
db/version_set_deletion_scheduler.cc \
9596
db/wal_edit.cc \
9697
db/wal_manager.cc \
9798
db/wide/wide_column_serialization.cc \

0 commit comments

Comments
 (0)