Skip to content

Commit 462c4ec

Browse files
committed
chore: access TxManagers via CoroEnv
1 parent e91c705 commit 462c4ec

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+450
-395
lines changed

benchmarks/micro-benchmarks/insert_update_bench.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "leanstore/concurrency/cr_manager.hpp"
66
#include "leanstore/lean_store.hpp"
77
#include "leanstore/utils/random_generator.hpp"
8+
#include "utils/coroutine/coro_env.hpp"
89

910
#include <benchmark/benchmark.h>
1011

@@ -39,7 +40,7 @@ static void BenchUpdateInsert(benchmark::State& state) {
3940
std::unordered_set<std::string> dedup;
4041
for (auto _ : state) {
4142
leanstore->ExecSync(0, [&]() {
42-
cr::TxManager::My().StartTx();
43+
CoroEnv::CurTxMgr().StartTx();
4344
std::string key;
4445
std::string val;
4546
for (size_t i = 0; i < 16; i++) {
@@ -48,13 +49,13 @@ static void BenchUpdateInsert(benchmark::State& state) {
4849
btree->Insert(Slice((const uint8_t*)key.data(), key.size()),
4950
Slice((const uint8_t*)val.data(), val.size()));
5051
}
51-
cr::TxManager::My().CommitTx();
52+
CoroEnv::CurTxMgr().CommitTx();
5253
});
5354
}
5455

5556
leanstore->ExecSync(0, [&]() {
56-
cr::TxManager::My().StartTx();
57-
SCOPED_DEFER(cr::TxManager::My().CommitTx());
57+
CoroEnv::CurTxMgr().StartTx();
58+
SCOPED_DEFER(CoroEnv::CurTxMgr().CommitTx());
5859
leanstore->DropTransactionKV(btree_name);
5960
});
6061
}

benchmarks/ycsb/ycsb_leanstore.hpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ class YcsbLeanStore : public YcsbExecutor {
162162
utils::RandomGenerator::RandString(val, FLAGS_ycsb_val_size);
163163

164164
if (bench_transaction_kv_) {
165-
cr::TxManager::My().StartTx();
165+
CoroEnv::CurTxMgr().StartTx();
166166
}
167167

168168
auto op_code =
@@ -172,7 +172,7 @@ class YcsbLeanStore : public YcsbExecutor {
172172
}
173173

174174
if (bench_transaction_kv_) {
175-
cr::TxManager::My().CommitTx();
175+
CoroEnv::CurTxMgr().CommitTx();
176176
}
177177
}
178178
};
@@ -252,10 +252,10 @@ class YcsbLeanStore : public YcsbExecutor {
252252
// generate key for read
253253
GenYcsbKey(zipf_random, key);
254254
if (bench_transaction_kv_) {
255-
cr::TxManager::My().StartTx(TxMode::kShortRunning,
255+
CoroEnv::CurTxMgr().StartTx(TxMode::kShortRunning,
256256
IsolationLevel::kSnapshotIsolation, true);
257257
table->Lookup(Slice(key, FLAGS_ycsb_key_size), copy_value);
258-
cr::TxManager::My().CommitTx();
258+
CoroEnv::CurTxMgr().CommitTx();
259259
} else {
260260
table->Lookup(Slice(key, FLAGS_ycsb_key_size), copy_value);
261261
}
@@ -264,10 +264,10 @@ class YcsbLeanStore : public YcsbExecutor {
264264
GenYcsbKey(zipf_random, key);
265265
// generate val for update
266266
if (bench_transaction_kv_) {
267-
cr::TxManager::My().StartTx();
267+
CoroEnv::CurTxMgr().StartTx();
268268
table->UpdatePartial(Slice(key, FLAGS_ycsb_key_size), update_call_back,
269269
*update_desc);
270-
cr::TxManager::My().CommitTx();
270+
CoroEnv::CurTxMgr().CommitTx();
271271
} else {
272272
table->UpdatePartial(Slice(key, FLAGS_ycsb_key_size), update_call_back,
273273
*update_desc);

include/leanstore-c/store_option.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ typedef struct StoreOption {
4242
/// The number of worker threads.
4343
uint64_t worker_threads_;
4444

45+
/// Maximum number of concurrent transactions per worker thread.
46+
uint64_t max_concurrent_tx_per_worker_;
47+
4548
// ---------------------------------------------------------------------------
4649
// Buffer pool related options
4750
// ---------------------------------------------------------------------------

include/leanstore/btree/chained_tuple.hpp

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "leanstore/units.hpp"
88
#include "leanstore/utils/portable.hpp"
99
#include "tuple.hpp"
10+
#include "utils/coroutine/coro_env.hpp"
1011
#include "utils/coroutine/mvcc_manager.hpp"
1112

1213
namespace leanstore::storage::btree {
@@ -63,10 +64,10 @@ class PACKED ChainedTuple : public Tuple {
6364
std::tuple<OpCode, uint16_t> GetVisibleTuple(Slice payload, ValCallback callback) const;
6465

6566
void UpdateStats() {
66-
if (cr::TxManager::My().cc_.VisibleForAll(tx_id_) ||
67+
if (CoroEnv::CurTxMgr().cc_.VisibleForAll(tx_id_) ||
6768
oldest_tx_ !=
6869
static_cast<uint16_t>(
69-
cr::TxManager::My().store_->MvccManager()->GlobalWmkInfo().oldest_active_tx_ &
70+
CoroEnv::CurTxMgr().store_->MvccManager()->GlobalWmkInfo().oldest_active_tx_ &
7071
0xFFFF)) {
7172
oldest_tx_ = 0;
7273
total_updates_ = 0;
@@ -76,13 +77,14 @@ class PACKED ChainedTuple : public Tuple {
7677
}
7778

7879
bool ShouldConvertToFatTuple() {
80+
auto& tx_mgr = CoroEnv::CurTxMgr();
81+
auto* store = tx_mgr.store_;
82+
7983
bool command_valid = command_id_ != kInvalidCommandid;
80-
bool has_long_running_olap =
81-
cr::TxManager::My().store_->MvccManager()->GlobalWmkInfo().HasActiveLongRunningTx();
82-
bool frequently_updated =
83-
total_updates_ > cr::TxManager::My().store_->store_option_->worker_threads_;
84+
bool has_long_running_olap = store->MvccManager()->GlobalWmkInfo().HasActiveLongRunningTx();
85+
bool frequently_updated = total_updates_ > store->store_option_->worker_threads_;
8486
bool recent_updated_by_others =
85-
worker_id_ != cr::TxManager::My().worker_id_ || tx_id_ != cr::ActiveTx().start_ts_;
87+
worker_id_ != tx_mgr.worker_id_ || tx_id_ != tx_mgr.ActiveTx().start_ts_;
8688
return command_valid && has_long_running_olap && recent_updated_by_others && frequently_updated;
8789
}
8890

include/leanstore/btree/core/b_tree_node.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ class BTreeNode : public BTreeNodeHeader {
471471
}
472472

473473
static uint16_t Size() {
474-
return static_cast<uint16_t>(utils::tls_store->store_option_->page_size_ - sizeof(Page));
474+
return static_cast<uint16_t>(CoroEnv::CurStore()->store_option_->page_size_ - sizeof(Page));
475475
}
476476

477477
static uint16_t UnderFullSize() {
@@ -556,7 +556,7 @@ inline int16_t BTreeNode::LowerBound(Slice key, bool* is_equal) {
556556
SearchHint(key_head, lower, upper);
557557
while (lower < upper) {
558558
bool found_equal(false);
559-
if (utils::tls_store->store_option_->enable_head_optimization_) {
559+
if (CoroEnv::CurStore()->store_option_->enable_head_optimization_) {
560560
found_equal = shrink_search_range_with_head(lower, upper, key, key_head);
561561
} else {
562562
found_equal = shrink_search_range(lower, upper, key);

include/leanstore/btree/core/btree_iter_mut.hpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,15 +135,15 @@ class BTreeIterMut : public BTreeIterPessistic {
135135

136136
/// Updates contention statistics after each slot modification on the page.
137137
void UpdateContentionStats() {
138-
if (!utils::tls_store->store_option_->enable_contention_split_) {
138+
if (!CoroEnv::CurStore()->store_option_->enable_contention_split_) {
139139
return;
140140
}
141141
const uint64_t random_number = utils::RandomGenerator::RandU64();
142142

143143
// haven't met the contention stats update probability
144144
if ((random_number &
145-
((1ull << utils::tls_store->store_option_->contention_split_sample_probability_) - 1)) !=
146-
0) {
145+
((1ull << CoroEnv::CurStore()->store_option_->contention_split_sample_probability_) -
146+
1)) != 0) {
147147
return;
148148
}
149149
auto& contention_stats = guarded_leaf_.bf_->header_.contention_stats_;
@@ -155,13 +155,13 @@ class BTreeIterMut : public BTreeIterPessistic {
155155

156156
// haven't met the contention split validation probability
157157
if ((random_number &
158-
((1ull << utils::tls_store->store_option_->contention_split_probility_) - 1)) != 0) {
158+
((1ull << CoroEnv::CurStore()->store_option_->contention_split_probility_) - 1)) != 0) {
159159
return;
160160
}
161161
auto contention_pct = contention_stats.ContentionPercentage();
162162
contention_stats.Reset();
163163
if (last_updated_slot != slot_id_ &&
164-
contention_pct >= utils::tls_store->store_option_->contention_split_threshold_pct_ &&
164+
contention_pct >= CoroEnv::CurStore()->store_option_->contention_split_threshold_pct_ &&
165165
guarded_leaf_->num_slots_ > 2) {
166166
int16_t split_slot = std::min<int16_t>(last_updated_slot, slot_id_);
167167
guarded_leaf_.unlock();

include/leanstore/btree/core/btree_iter_pessistic.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ inline void BTreeIterPessistic::Next() {
337337
func_clean_up_ = nullptr;
338338
}
339339

340-
if (utils::tls_store->store_option_->enable_optimistic_scan_ && leaf_pos_in_parent_ != -1) {
340+
if (CoroEnv::CurStore()->store_option_->enable_optimistic_scan_ && leaf_pos_in_parent_ != -1) {
341341
JUMPMU_TRY() {
342342
if ((leaf_pos_in_parent_ + 1) <= guarded_parent_->num_slots_) {
343343
int32_t next_leaf_pos = leaf_pos_in_parent_ + 1;
@@ -435,7 +435,7 @@ inline void BTreeIterPessistic::Prev() {
435435
func_clean_up_ = nullptr;
436436
}
437437

438-
if (utils::tls_store->store_option_->enable_optimistic_scan_ && leaf_pos_in_parent_ != -1) {
438+
if (CoroEnv::CurStore()->store_option_->enable_optimistic_scan_ && leaf_pos_in_parent_ != -1) {
439439
JUMPMU_TRY() {
440440
if ((leaf_pos_in_parent_ - 1) >= 0) {
441441
int32_t next_leaf_pos = leaf_pos_in_parent_ - 1;

include/leanstore/btree/transaction_kv.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ class TransactionKV : public BasicKV {
111111
}
112112

113113
inline static uint64_t ConvertToFatTupleThreshold() {
114-
return cr::TxManager::My().store_->store_option_->worker_threads_;
114+
return CoroEnv::CurTxMgr().store_->store_option_->worker_threads_;
115115
}
116116

117117
/// Updates the value stored in FatTuple. The former newest version value is

include/leanstore/buffer-manager/buffer_frame.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ class Page {
152152

153153
public:
154154
uint64_t CRC() {
155-
return utils::CRC(payload_, utils::tls_store->store_option_->page_size_ - sizeof(Page));
155+
return utils::CRC(payload_, CoroEnv::CurStore()->store_option_->page_size_ - sizeof(Page));
156156
}
157157
};
158158

include/leanstore/buffer-manager/guarded_buffer_frame.hpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -153,13 +153,13 @@ class GuardedBufferFrame {
153153
LEAN_DCHECK(bf_ != nullptr);
154154

155155
// update last writer worker
156-
bf_->header_.last_writer_worker_ = cr::TxManager::My().worker_id_;
156+
bf_->header_.last_writer_worker_ = CoroEnv::CurTxMgr().worker_id_;
157157

158158
// update system transaction id
159159
bf_->page_.sys_tx_id_ = sys_tx_id;
160160

161161
// update the maximum system transaction id written by the worker
162-
cr::TxManager::My().logging_.UpdateSysTxToHarden(sys_tx_id);
162+
CoroEnv::CurTxMgr().logging_.UpdateSysTxToHarden(sys_tx_id);
163163
}
164164

165165
/// Check remote dependency
@@ -170,22 +170,22 @@ class GuardedBufferFrame {
170170
return;
171171
}
172172

173-
if (bf_->header_.last_writer_worker_ != cr::TxManager::My().worker_id_ &&
174-
bf_->page_.sys_tx_id_ > cr::ActiveTx().max_observed_sys_tx_id_) {
175-
cr::ActiveTx().max_observed_sys_tx_id_ = bf_->page_.sys_tx_id_;
176-
cr::ActiveTx().has_remote_dependency_ = true;
173+
if (bf_->header_.last_writer_worker_ != CoroEnv::CurTxMgr().worker_id_ &&
174+
bf_->page_.sys_tx_id_ > CoroEnv::CurTxMgr().ActiveTx().max_observed_sys_tx_id_) {
175+
CoroEnv::CurTxMgr().ActiveTx().max_observed_sys_tx_id_ = bf_->page_.sys_tx_id_;
176+
CoroEnv::CurTxMgr().ActiveTx().has_remote_dependency_ = true;
177177
}
178178
}
179179

180180
template <typename WT, typename... Args>
181181
cr::WalPayloadHandler<WT> ReserveWALPayload(uint64_t wal_size, Args&&... args) {
182-
LEAN_DCHECK(cr::ActiveTx().is_durable_);
182+
LEAN_DCHECK(CoroEnv::CurTxMgr().ActiveTx().is_durable_);
183183
LEAN_DCHECK(guard_.state_ == GuardState::kExclusivePessimistic);
184184

185185
const auto page_id = bf_->header_.page_id_;
186186
const auto tree_id = bf_->page_.btree_id_;
187187
wal_size = ((wal_size - 1) / 8 + 1) * 8;
188-
auto handler = cr::TxManager::My().logging_.ReserveWALEntryComplex<WT, Args...>(
188+
auto handler = CoroEnv::CurTxMgr().logging_.ReserveWALEntryComplex<WT, Args...>(
189189
sizeof(WT) + wal_size, page_id, bf_->page_.psn_, tree_id, std::forward<Args>(args)...);
190190

191191
return handler;

0 commit comments

Comments
 (0)