Skip to content

Commit 55b1dfd

Browse files
committed
chore: support mvcc in coroutine mode, part 1
1 parent b2bab2a commit 55b1dfd

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+1303
-931
lines changed

.clang-format

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ Cpp11BracedListStyle: true
2121
PackConstructorInitializers: CurrentLine
2222

2323
# arguments
24+
BinPackArguments: true
25+
AllowAllArgumentsOnNextLine: true
2426
AlignAfterOpenBracket: Align
2527
QualifierAlignment: Custom
2628
QualifierOrder: ['inline', 'static', 'const', 'constexpr', 'volatile', 'type']

CMakePresets.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,4 @@
7474
}
7575
}
7676
]
77-
}
77+
}

benchmarks/micro-benchmarks/insert_update_bench.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ static void BenchUpdateInsert(benchmark::State& state) {
3939
std::unordered_set<std::string> dedup;
4040
for (auto _ : state) {
4141
leanstore->ExecSync(0, [&]() {
42-
cr::WorkerContext::My().StartTx();
42+
cr::TxManager::My().StartTx();
4343
std::string key;
4444
std::string val;
4545
for (size_t i = 0; i < 16; i++) {
@@ -48,13 +48,13 @@ static void BenchUpdateInsert(benchmark::State& state) {
4848
btree->Insert(Slice((const uint8_t*)key.data(), key.size()),
4949
Slice((const uint8_t*)val.data(), val.size()));
5050
}
51-
cr::WorkerContext::My().CommitTx();
51+
cr::TxManager::My().CommitTx();
5252
});
5353
}
5454

5555
leanstore->ExecSync(0, [&]() {
56-
cr::WorkerContext::My().StartTx();
57-
SCOPED_DEFER(cr::WorkerContext::My().CommitTx());
56+
cr::TxManager::My().StartTx();
57+
SCOPED_DEFER(cr::TxManager::My().CommitTx());
5858
leanstore->DropTransactionKV(btree_name);
5959
});
6060
}

benchmarks/ycsb/ycsb_leanstore.hpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
#include "leanstore/btree/basic_kv.hpp"
77
#include "leanstore/btree/transaction_kv.hpp"
88
#include "leanstore/concurrency/cr_manager.hpp"
9-
#include "leanstore/concurrency/worker_context.hpp"
9+
#include "leanstore/concurrency/tx_manager.hpp"
1010
#include "leanstore/kv_interface.hpp"
1111
#include "leanstore/lean_store.hpp"
1212
#include "leanstore/utils/jump_mu.hpp"
@@ -162,7 +162,7 @@ class YcsbLeanStore : public YcsbExecutor {
162162
utils::RandomGenerator::RandString(val, FLAGS_ycsb_val_size);
163163

164164
if (bench_transaction_kv_) {
165-
cr::WorkerContext::My().StartTx();
165+
cr::TxManager::My().StartTx();
166166
}
167167

168168
auto op_code =
@@ -172,7 +172,7 @@ class YcsbLeanStore : public YcsbExecutor {
172172
}
173173

174174
if (bench_transaction_kv_) {
175-
cr::WorkerContext::My().CommitTx();
175+
cr::TxManager::My().CommitTx();
176176
}
177177
}
178178
};
@@ -252,10 +252,10 @@ class YcsbLeanStore : public YcsbExecutor {
252252
// generate key for read
253253
GenYcsbKey(zipf_random, key);
254254
if (bench_transaction_kv_) {
255-
cr::WorkerContext::My().StartTx(TxMode::kShortRunning,
256-
IsolationLevel::kSnapshotIsolation, true);
255+
cr::TxManager::My().StartTx(TxMode::kShortRunning,
256+
IsolationLevel::kSnapshotIsolation, true);
257257
table->Lookup(Slice(key, FLAGS_ycsb_key_size), copy_value);
258-
cr::WorkerContext::My().CommitTx();
258+
cr::TxManager::My().CommitTx();
259259
} else {
260260
table->Lookup(Slice(key, FLAGS_ycsb_key_size), copy_value);
261261
}
@@ -264,10 +264,10 @@ class YcsbLeanStore : public YcsbExecutor {
264264
GenYcsbKey(zipf_random, key);
265265
// generate val for update
266266
if (bench_transaction_kv_) {
267-
cr::WorkerContext::My().StartTx();
267+
cr::TxManager::My().StartTx();
268268
table->UpdatePartial(Slice(key, FLAGS_ycsb_key_size), update_call_back,
269269
*update_desc);
270-
cr::WorkerContext::My().CommitTx();
270+
cr::TxManager::My().CommitTx();
271271
} else {
272272
table->UpdatePartial(Slice(key, FLAGS_ycsb_key_size), update_call_back,
273273
*update_desc);

benchmarks/ycsb/ycsb_leanstore_client.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
#include "leanstore/btree/basic_kv.hpp"
55
#include "leanstore/btree/transaction_kv.hpp"
66
#include "leanstore/concurrency/cr_manager.hpp"
7-
#include "leanstore/concurrency/worker_context.hpp"
7+
#include "leanstore/concurrency/tx_manager.hpp"
88
#include "leanstore/kv_interface.hpp"
99
#include "leanstore/lean_store.hpp"
1010
#include "leanstore/utils/log.hpp"

docker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ ENV VCPKG_DEFAULT_BINARY_CACHE=/opt/vcpkg_cache
3838
# install vcpkg packages
3939
RUN ./opt/vcpkg/bootstrap-vcpkg.sh \
4040
&& vcpkg install benchmark gflags spdlog gtest rapidjson gperftools \
41-
cpp-httplib crc32c libunwind cpptrace concurrentqueue boost-context
41+
cpp-httplib crc32c libunwind cpptrace concurrentqueue boost-context rocksdb
4242

4343

4444
# install gcov

examples/cpp/basic_kv_example.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#include <leanstore-c/store_option.h>
22
#include <leanstore/LeanStore.hpp>
33
#include <leanstore/btree/BasicKV.hpp>
4-
#include <leanstore/concurrency/WorkerContext.hpp>
4+
#include <leanstore/concurrency/TxManager.hpp>
55

66
#include <iostream>
77
#include <memory>

include/leanstore/btree/chained_tuple.hpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
#include "leanstore/btree/basic_kv.hpp"
44
#include "leanstore/btree/core/btree_iter_mut.hpp"
55
#include "leanstore/concurrency/cr_manager.hpp"
6-
#include "leanstore/concurrency/worker_context.hpp"
6+
#include "leanstore/concurrency/tx_manager.hpp"
77
#include "leanstore/units.hpp"
88
#include "leanstore/utils/portable.hpp"
99
#include "tuple.hpp"
10+
#include "utils/coroutine/mvcc_manager.hpp"
1011

1112
namespace leanstore::storage::btree {
1213

@@ -62,10 +63,10 @@ class PACKED ChainedTuple : public Tuple {
6263
std::tuple<OpCode, uint16_t> GetVisibleTuple(Slice payload, ValCallback callback) const;
6364

6465
void UpdateStats() {
65-
if (cr::WorkerContext::My().cc_.VisibleForAll(tx_id_) ||
66+
if (cr::TxManager::My().cc_.VisibleForAll(tx_id_) ||
6667
oldest_tx_ !=
6768
static_cast<uint16_t>(
68-
cr::WorkerContext::My().store_->crmanager_->global_wmk_info_.oldest_active_tx_ &
69+
cr::TxManager::My().store_->MvccManager()->GlobalWmkInfo().oldest_active_tx_ &
6970
0xFFFF)) {
7071
oldest_tx_ = 0;
7172
total_updates_ = 0;
@@ -77,11 +78,11 @@ class PACKED ChainedTuple : public Tuple {
7778
bool ShouldConvertToFatTuple() {
7879
bool command_valid = command_id_ != kInvalidCommandid;
7980
bool has_long_running_olap =
80-
cr::WorkerContext::My().store_->crmanager_->global_wmk_info_.HasActiveLongRunningTx();
81+
cr::TxManager::My().store_->MvccManager()->GlobalWmkInfo().HasActiveLongRunningTx();
8182
bool frequently_updated =
82-
total_updates_ > cr::WorkerContext::My().store_->store_option_->worker_threads_;
83+
total_updates_ > cr::TxManager::My().store_->store_option_->worker_threads_;
8384
bool recent_updated_by_others =
84-
worker_id_ != cr::WorkerContext::My().worker_id_ || tx_id_ != cr::ActiveTx().start_ts_;
85+
worker_id_ != cr::TxManager::My().worker_id_ || tx_id_ != cr::ActiveTx().start_ts_;
8586
return command_valid && has_long_running_olap && recent_updated_by_others && frequently_updated;
8687
}
8788

include/leanstore/btree/core/b_tree_generic.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ inline void BTreeGeneric::FindLeafCanJump(Slice key, GuardedBufferFrame<BTreeNod
316316
volatile uint16_t level = 0;
317317
while (!guarded_target->is_leaf_) {
318318
auto& child_swip = guarded_target->LookupInner(key);
319-
LS_DCHECK(!child_swip.IsEmpty());
319+
LEAN_DCHECK(!child_swip.IsEmpty());
320320
guarded_parent = std::move(guarded_target);
321321
if (level == height_ - 1) {
322322
guarded_target =
@@ -352,7 +352,7 @@ inline void BTreeGeneric::FindLeafCanJump(Slice key, GuardedBufferFrame<BTreeNod
352352
// auto level = 0u;
353353
// while (!child_node->is_leaf_) {
354354
// auto& child_swip = child_node->LookupInner(key);
355-
// LS_DCHECK(!child_swip.IsEmpty());
355+
// LEAN_DCHECK(!child_swip.IsEmpty());
356356
//
357357
// // TODO: yield and retry from the begining
358358
// if (locked_parent.IsConflicted()) {
@@ -441,8 +441,8 @@ inline ParentSwipHandler BTreeGeneric::FindParent(BTreeGeneric& btree, BufferFra
441441
leanstore::JumpContext::Jump();
442442
}
443443

444-
LS_DCHECK(pos_in_parent != std::numeric_limits<uint32_t>::max(), "Invalid posInParent={}",
445-
pos_in_parent);
444+
LEAN_DCHECK(pos_in_parent != std::numeric_limits<uint32_t>::max(), "Invalid posInParent={}",
445+
pos_in_parent);
446446
ParentSwipHandler parent_handler = {.parent_guard_ = std::move(guarded_child.guard_),
447447
.parent_bf_ = guarded_child.bf_,
448448
.child_swip_ = *child_swip,

include/leanstore/btree/core/b_tree_node.hpp

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ class BTreeNode : public BTreeNodeHeader {
237237
}
238238

239239
Swip* ChildSwip(uint16_t slot_id) {
240-
LS_DCHECK(slot_id < num_slots_);
240+
LEAN_DCHECK(slot_id < num_slots_);
241241
return reinterpret_cast<Swip*>(ValData(slot_id));
242242
}
243243

@@ -247,28 +247,28 @@ class BTreeNode : public BTreeNodeHeader {
247247

248248
// Attention: the caller has to hold a copy of the existing payload
249249
void ShortenPayload(uint16_t slot_id, uint16_t target_size) {
250-
LS_DCHECK(target_size <= slot_[slot_id].val_size_);
250+
LEAN_DCHECK(target_size <= slot_[slot_id].val_size_);
251251
const uint16_t free_space = slot_[slot_id].val_size_ - target_size;
252252
space_used_ -= free_space;
253253
slot_[slot_id].val_size_ = target_size;
254254
}
255255

256256
bool CanExtendPayload(uint16_t slot_id, uint16_t target_size) {
257-
LS_DCHECK(target_size > ValSize(slot_id),
258-
"Target size must be larger than current size, "
259-
"targetSize={}, currentSize={}",
260-
target_size, ValSize(slot_id));
257+
LEAN_DCHECK(target_size > ValSize(slot_id),
258+
"Target size must be larger than current size, "
259+
"targetSize={}, currentSize={}",
260+
target_size, ValSize(slot_id));
261261

262262
const uint16_t extra_space_needed = target_size - ValSize(slot_id);
263263
return FreeSpaceAfterCompaction() >= extra_space_needed;
264264
}
265265

266266
/// Move key-value pair to a new location
267267
void ExtendPayload(uint16_t slot_id, uint16_t target_size) {
268-
LS_DCHECK(CanExtendPayload(slot_id, target_size),
269-
"ExtendPayload failed, not enough space in the current node, "
270-
"slotId={}, targetSize={}, FreeSpace={}, currentSize={}",
271-
slot_id, target_size, FreeSpaceAfterCompaction(), ValSize(slot_id));
268+
LEAN_DCHECK(CanExtendPayload(slot_id, target_size),
269+
"ExtendPayload failed, not enough space in the current node, "
270+
"slotId={}, targetSize={}, FreeSpace={}, currentSize={}",
271+
slot_id, target_size, FreeSpaceAfterCompaction(), ValSize(slot_id));
272272
auto key_size_without_prefix = KeySizeWithoutPrefix(slot_id);
273273
const uint16_t old_total_size = key_size_without_prefix + ValSize(slot_id);
274274
const uint16_t new_total_size = key_size_without_prefix + target_size;
@@ -285,7 +285,7 @@ class BTreeNode : public BTreeNodeHeader {
285285
if (FreeSpace() < new_total_size) {
286286
Compactify();
287287
}
288-
LS_DCHECK(FreeSpace() >= new_total_size);
288+
LEAN_DCHECK(FreeSpace() >= new_total_size);
289289
advance_data_offset(new_total_size);
290290
slot_[slot_id].offset_ = data_offset_;
291291
slot_[slot_id].key_size_without_prefix_ = key_size_without_prefix;
@@ -498,7 +498,7 @@ inline int16_t BTreeNode::LinearSearchWithBias(Slice key, uint16_t start_pos, bo
498498
return -1;
499499
}
500500

501-
LS_DCHECK(key.size() >= prefix_size_ && bcmp(key.data(), LowerFenceAddr(), prefix_size_) == 0);
501+
LEAN_DCHECK(key.size() >= prefix_size_ && bcmp(key.data(), LowerFenceAddr(), prefix_size_) == 0);
502502

503503
// the compared key has the same prefix
504504
key.remove_prefix(prefix_size_);
@@ -575,8 +575,8 @@ inline int16_t BTreeNode::LowerBound(Slice key, bool* is_equal) {
575575
inline void BTreeNode::set_fences(Slice lower_key, Slice upper_key) {
576576
InsertFence(lower_fence_, lower_key);
577577
InsertFence(upper_fence_, upper_key);
578-
LS_DCHECK(LowerFenceAddr() == nullptr || UpperFenceAddr() == nullptr ||
579-
*LowerFenceAddr() <= *UpperFenceAddr());
578+
LEAN_DCHECK(LowerFenceAddr() == nullptr || UpperFenceAddr() == nullptr ||
579+
*LowerFenceAddr() <= *UpperFenceAddr());
580580

581581
// prefix compression
582582
for (prefix_size_ = 0; (prefix_size_ < std::min(lower_key.size(), upper_key.size())) &&

0 commit comments

Comments
 (0)