Skip to content

Commit 251f5b8

Browse files
committed
chore: clear logging and tx manager API
1 parent eee43f7 commit 251f5b8

File tree

10 files changed

+73
-95
lines changed

10 files changed

+73
-95
lines changed

include/leanstore/buffer-manager/guarded_buffer_frame.hpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
#include "leanstore/buffer-manager/buffer_frame.hpp"
44
#include "leanstore/buffer-manager/buffer_manager.hpp"
5-
#include "leanstore/concurrency/logging_impl.hpp"
65
#include "leanstore/concurrency/tx_manager.hpp"
76
#include "leanstore/concurrency/wal_payload_handler.hpp"
87
#include "leanstore/sync/hybrid_guard.hpp"
@@ -180,13 +179,12 @@ class GuardedBufferFrame {
180179

181180
template <typename WT, typename... Args>
182181
cr::WalPayloadHandler<WT> ReserveWALPayload(uint64_t wal_size, Args&&... args) {
183-
LEAN_DCHECK(CoroEnv::CurTxMgr().ActiveTx().is_durable_);
184182
LEAN_DCHECK(guard_.state_ == GuardState::kExclusivePessimistic);
185183

186184
const auto page_id = bf_->header_.page_id_;
187185
const auto tree_id = bf_->page_.btree_id_;
188186
wal_size = ((wal_size - 1) / 8 + 1) * 8;
189-
auto handler = CoroEnv::CurLogging().ReserveWALEntryComplex<WT, Args...>(
187+
auto handler = CoroEnv::CurTxMgr().ReserveWALEntryComplex<WT, Args...>(
190188
sizeof(WT) + wal_size, page_id, bf_->page_.psn_, tree_id, std::forward<Args>(args)...);
191189

192190
return handler;

include/leanstore/concurrency/logging.hpp

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "leanstore-c/perf_counters.h"
44
#include "leanstore/concurrency/transaction.hpp"
5+
#include "leanstore/concurrency/wal_entry.hpp"
56
#include "leanstore/sync/optimistic_guarded.hpp"
67
#include "leanstore/units.hpp"
78
#include "leanstore/utils/counter_util.hpp"
@@ -54,6 +55,10 @@ class WalPayloadHandler;
5455
/// Helps to transaction concurrenct control and write-ahead logging.
5556
class Logging {
5657
public:
58+
/// Logical sequence number, i.e., the unique ID of each WAL.
59+
LID lsn_clock_ = 0;
60+
61+
/// The previous LSN of the current transaction, used to link WAL entries.
5762
LID prev_lsn_;
5863

5964
/// The active complex WalEntry for the current transaction, usually used for insert, update,
@@ -85,18 +90,17 @@ class Logging {
8590

8691
storage::OptimisticGuarded<WalFlushReq> wal_flush_req_;
8792

88-
/// The ring buffer of the current worker thread. All the wal entries of the current worker are
89-
/// writtern to this ring buffer firstly, then flushed to disk by the group commit thread.
90-
ALIGNAS(512) uint8_t* wal_buffer_;
93+
/// The maximum writtern system transaction ID in the worker.
94+
TXID sys_tx_writtern_ = 0;
9195

92-
/// The size of the wal ring buffer.
93-
uint64_t wal_buffer_bytes_;
96+
/// File descriptor for the write-ahead log.
97+
int32_t wal_fd_ = -1;
9498

95-
/// Used to track the write order of wal entries.
96-
LID lsn_clock_ = 0;
99+
/// Start offset of the next WalEntry.
100+
uint64_t wal_size_ = 0;
97101

98-
/// The maximum writtern system transaction ID in the worker.
99-
TXID sys_tx_writtern_ = 0;
102+
/// The size of the wal ring buffer.
103+
uint64_t wal_buffer_bytes_;
100104

101105
/// Written offset of the wal ring buffer.
102106
uint64_t wal_buffered_ = 0;
@@ -106,19 +110,14 @@ class Logging {
106110
/// thread.
107111
std::atomic<uint64_t> wal_flushed_ = 0;
108112

109-
/// The first WAL record of the current active transaction.
110-
uint64_t tx_wal_begin_;
111-
112-
/// File descriptor for the write-ahead log.
113-
int32_t wal_fd_ = -1;
114-
115-
/// Start offset of the next WalEntry.
116-
uint64_t wal_size_ = 0;
113+
/// The ring buffer of the current worker thread. All the wal entries of the current worker are
114+
/// writtern to this ring buffer firstly, then flushed to disk by the group commit thread.
115+
ALIGNAS(512) uint8_t* wal_buffer_;
117116

118117
public:
119118
Logging(uint64_t wal_buffer_bytes)
120-
: wal_buffer_((uint8_t*)(std::aligned_alloc(512, wal_buffer_bytes))),
121-
wal_buffer_bytes_(wal_buffer_bytes) {
119+
: wal_buffer_bytes_(wal_buffer_bytes),
120+
wal_buffer_((uint8_t*)(std::aligned_alloc(512, wal_buffer_bytes))) {
122121
std::memset(wal_buffer_, 0, wal_buffer_bytes);
123122
}
124123

@@ -129,6 +128,12 @@ class Logging {
129128
}
130129
}
131130

131+
LID ReserveLsn() {
132+
return lsn_clock_++;
133+
}
134+
135+
void ReserveWalBuffer(uint32_t requested_size);
136+
132137
void UpdateSignaledCommitTs(const LID signaled_commit_ts) {
133138
signaled_commit_ts_.store(signaled_commit_ts, std::memory_order_release);
134139
}
@@ -139,19 +144,14 @@ class Logging {
139144
}
140145
}
141146

142-
void ReserveContiguousBuffer(uint32_t requested_size);
143-
144147
/// Iterate over current TX entries
145-
void IterateCurrentTxWALs(std::function<void(const WalEntry& entry)> callback);
148+
void IterateCurrentTxWALs(uint64_t first_wal,
149+
std::function<void(const WalEntry& entry)> callback);
146150

147151
void WriteWalTxAbort();
148152
void WriteWalTxFinish();
149153
void WriteWalCarriageReturn();
150154

151-
template <typename T, typename... Args>
152-
WalPayloadHandler<T> ReserveWALEntryComplex(uint64_t payload_size, PID page_id, LID psn,
153-
TREEID tree_id, Args&&... args);
154-
155155
/// Submits wal record to group committer when it is ready to flush to disk.
156156
/// @param totalSize size of the wal record to be flush.
157157
void SubmitWALEntryComplex(uint64_t total_size);

include/leanstore/concurrency/logging_impl.hpp

Lines changed: 0 additions & 41 deletions
This file was deleted.

include/leanstore/concurrency/transaction.hpp

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@
55
#include "leanstore/units.hpp"
66
#include "leanstore/utils/managed_thread.hpp"
77

8-
namespace leanstore {
9-
namespace cr {
10-
8+
namespace leanstore ::cr {
119
enum class TxState { kIdle, kStarted, kCommitted, kAborted };
1210

1311
struct TxStatUtil {
@@ -59,6 +57,13 @@ class Transaction {
5957
/// tx_isolation_level_ is the isolation level for the current transaction.
6058
IsolationLevel tx_isolation_level_ = IsolationLevel::kSnapshotIsolation;
6159

60+
/// The first WAL record of the current active transaction.
61+
uint64_t first_wal_ = 0;
62+
63+
/// The previous WAL record of the current active transaction, 0 for the first
64+
/// WAL record. Used when writing compensation log records for tx abort.
65+
uint64_t prev_wal_lsn_ = 0;
66+
6267
/// Whether the transaction has any data writes. Transaction writes can be
6368
/// detected once it generates a WAL entry.
6469
bool has_wrote_ = false;
@@ -80,16 +85,13 @@ class Transaction {
8085

8186
// Start a new transaction, initialize all fields
8287
void Start(TxMode mode, IsolationLevel level) {
88+
Transaction tx_inited;
89+
*this = tx_inited;
90+
8391
state_ = TxState::kStarted;
84-
start_ts_ = 0;
85-
commit_ts_ = 0;
86-
max_observed_sys_tx_id_ = 0;
87-
has_remote_dependency_ = false;
8892
tx_mode_ = mode;
8993
tx_isolation_level_ = level;
90-
has_wrote_ = false;
9194
is_durable_ = CoroEnv::CurStore()->store_option_->enable_wal_;
92-
wal_exceed_buffer_ = false;
9395
}
9496

9597
/// Check whether a user transaction with remote dependencies can be committed.
@@ -98,5 +100,4 @@ class Transaction {
98100
}
99101
};
100102

101-
} // namespace cr
102-
} // namespace leanstore
103+
} // namespace leanstore::cr

include/leanstore/concurrency/tx_manager.hpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,29 @@ class TxManager {
9898
last_committed_usr_tx_.store(usr_tx_id, std::memory_order_release);
9999
}
100100

101+
template <typename T, typename... Args>
102+
WalPayloadHandler<T> ReserveWALEntryComplex(uint64_t payload_size, PID page_id, LID psn,
103+
TREEID tree_id, Args&&... args) {
104+
auto& logging = CoroEnv::CurLogging();
105+
106+
// write transaction start on demand
107+
auto prev_lsn = active_tx_.prev_wal_lsn_;
108+
active_tx_.has_wrote_ = true;
109+
SCOPED_DEFER(active_tx_.prev_wal_lsn_ = logging.active_walentry_complex_->lsn_);
110+
111+
auto entry_lsn = logging.lsn_clock_++;
112+
auto* entry_ptr = logging.wal_buffer_ + logging.wal_buffered_;
113+
auto entry_size = sizeof(WalEntryComplex) + payload_size;
114+
logging.ReserveWalBuffer(entry_size);
115+
116+
logging.active_walentry_complex_ = new (entry_ptr) WalEntryComplex(
117+
entry_lsn, prev_lsn, entry_size, worker_id_, active_tx_.start_ts_, psn, page_id, tree_id);
118+
119+
auto* payload_ptr = logging.active_walentry_complex_->payload_;
120+
auto wal_payload = new (payload_ptr) T(std::forward<Args>(args)...);
121+
return {wal_payload, entry_size};
122+
}
123+
101124
static constexpr uint64_t kRcBit = (1ull << 63);
102125
static constexpr uint64_t kLongRunningBit = (1ull << 62);
103126
static constexpr uint64_t kCleanBitsMask = ~(kRcBit | kLongRunningBit);

include/leanstore/concurrency/wal_entry.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@ class PACKED WalEntryComplex : public WalEntry {
132132

133133
uint32_t ComputeCRC32() const {
134134
auto type_field_size = sizeof(Type);
135-
auto crc32FieldSize = sizeof(uint32_t);
136-
auto crc_skip_size = type_field_size + crc32FieldSize;
135+
auto crc_field_size = sizeof(uint32_t);
136+
auto crc_skip_size = type_field_size + crc_field_size;
137137
const auto* src = reinterpret_cast<const uint8_t*>(this) + crc_skip_size;
138138
auto src_size = size_ - crc_skip_size;
139139
return utils::CRC(src, src_size);

include/leanstore/concurrency/wal_payload_handler.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#pragma once
22

3-
#include "leanstore/concurrency/group_committer.hpp"
43
#include "leanstore/concurrency/tx_manager.hpp"
54

65
namespace leanstore::cr {

src/concurrency/logging.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
#include "leanstore/exceptions.hpp"
66
#include "leanstore/utils/log.hpp"
77
#include "utils/coroutine/coro_env.hpp"
8-
#include "utils/coroutine/coroutine.hpp"
98
#include "utils/to_json.hpp"
109

1110
#include <cstring>
@@ -20,7 +19,7 @@ uint32_t Logging::WalContiguousFreeSpace() {
2019
return flushed - wal_buffered_;
2120
}
2221

23-
void Logging::ReserveContiguousBuffer(uint32_t bytes_required) {
22+
void Logging::ReserveWalBuffer(uint32_t bytes_required) {
2423
// Spin until there is enough space. The wal ring buffer space is reclaimed
2524
// when the group commit thread commits the written wal entries.
2625
while (true) {
@@ -52,7 +51,7 @@ void Logging::ReserveContiguousBuffer(uint32_t bytes_required) {
5251
void Logging::WriteWalTxAbort() {
5352
// Reserve space
5453
auto size = sizeof(WalTxAbort);
55-
ReserveContiguousBuffer(size);
54+
ReserveWalBuffer(size);
5655

5756
// Initialize a WalTxAbort
5857
auto* data = wal_buffer_ + wal_buffered_;
@@ -70,7 +69,7 @@ void Logging::WriteWalTxAbort() {
7069
void Logging::WriteWalTxFinish() {
7170
// Reserve space
7271
auto size = sizeof(WalTxFinish);
73-
ReserveContiguousBuffer(size);
72+
ReserveWalBuffer(size);
7473

7574
// Initialize a WalTxFinish
7675
auto* data = wal_buffer_ + wal_buffered_;
@@ -116,8 +115,9 @@ void Logging::PublishWalFlushReq() {
116115
}
117116

118117
// Called by worker, so concurrent writes on the buffer
119-
void Logging::IterateCurrentTxWALs(std::function<void(const WalEntry& entry)> callback) {
120-
uint64_t cursor = tx_wal_begin_;
118+
void Logging::IterateCurrentTxWALs(uint64_t first_wal,
119+
std::function<void(const WalEntry& entry)> callback) {
120+
uint64_t cursor = first_wal;
121121
while (cursor != wal_buffered_) {
122122
const WalEntry& entry = *reinterpret_cast<WalEntry*>(wal_buffer_ + cursor);
123123
DEBUG_BLOCK() {

src/concurrency/tx_manager.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ void TxManager::StartTx(TxMode mode, IsolationLevel level, bool is_read_only) {
5353
active_tx_.max_observed_sys_tx_id_ = store_->MvccManager()->GetMinCommittedSysTx();
5454

5555
// Init wal and group commit related transaction information
56-
CoroEnv::CurLogging().tx_wal_begin_ = CoroEnv::CurLogging().wal_buffered_;
56+
active_tx_.first_wal_ = CoroEnv::CurLogging().wal_buffered_;
5757

5858
// For now, we only support SI and SSI
5959
if (level < IsolationLevel::kSnapshotIsolation) {
@@ -172,7 +172,7 @@ void TxManager::AbortTx() {
172172
// TODO(jian.z): support reading from WAL file once
173173
LEAN_DCHECK(!active_tx_.wal_exceed_buffer_, "Aborting from WAL file is not supported yet");
174174
std::vector<const WalEntry*> entries;
175-
CoroEnv::CurLogging().IterateCurrentTxWALs([&](const WalEntry& entry) {
175+
CoroEnv::CurLogging().IterateCurrentTxWALs(active_tx_.first_wal_, [&](const WalEntry& entry) {
176176
if (entry.type_ == WalEntry::Type::kComplex) {
177177
entries.push_back(&entry);
178178
}

tests/coroutine/coro_leanstore_test.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,11 @@ namespace leanstore::test {
2525
class CoroLeanStoreTest : public LeanTestSuite {};
2626

2727
TEST_F(CoroLeanStoreTest, BasicKv) {
28-
// GTEST_SKIP() << "Skipping test BasicKv, as logging is not correctly implemented yet.";
29-
3028
static constexpr auto kBtreeName = "test_btree";
3129
static constexpr auto kNumKeys = 100;
3230
static constexpr auto kKeyPattern = "key_btree_LL_xxxxxxxxxxxx_{}";
3331
static constexpr auto kValPattern = "VAL_BTREE_LL_YYYYYYYYYYYY_{}";
34-
static constexpr auto kEnableWal = false;
32+
static constexpr auto kEnableWal = true;
3533
static constexpr auto kBtreeConfig = BTreeConfig{
3634
.enable_wal_ = kEnableWal,
3735
.use_bulk_insert_ = false,

0 commit comments

Comments
 (0)