Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new TransactionDBOptions txn_commit_bypass_memtable_threshold #13304

Closed
wants to merge 2 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
threshold option
cbi42 committed Jan 16, 2025
commit 1aafce45dea6575a3a9c9e19e4cad18c328100bc
2 changes: 0 additions & 2 deletions include/rocksdb/utilities/transaction.h
Original file line number Diff line number Diff line change
@@ -750,8 +750,6 @@ class Transaction {

virtual TxnTimestamp GetCommitTimestamp() const { return kMaxTxnTimestamp; }

virtual bool GetCommitBypassMemTable() const { return false; }

protected:
explicit Transaction(const TransactionDB* /*db*/) {}
Transaction() : log_number_(0), txn_state_(STARTED) {}
12 changes: 11 additions & 1 deletion include/rocksdb/utilities/transaction_db.h
Original file line number Diff line number Diff line change
@@ -247,6 +247,16 @@ struct TransactionDBOptions {
// for more details.
std::vector<std::shared_ptr<SecondaryIndex>> secondary_indices;

// EXPERIMENTAL, SUBJECT TO CHANGE
// This option is only valid for write committed. If the number of updates in
// a transaction exceeds this threshold, then the transaction commit will skip
// insertions into memtable as an optimization to reduce commit latency.
// See comment for TransactionOptions::commit_bypass_memtable for more detail.
// Setting TransactionOptions::commit_bypass_memtable to true takes precedence
// over this option.
uint32_t txn_commit_bypass_memtable_threshold =
std::numeric_limits<uint32_t>::max();

private:
// 128 entries
// Should the default value change, please also update wp_snapshot_cache_bits
@@ -347,7 +357,7 @@ struct TransactionOptions {
// DeleteRange, SingleDelete.
bool write_batch_track_timestamp_size = false;

// EXPERIMENTAL
// EXPERIMENTAL, SUBJECT TO CHANGE
// Only supports write-committed policy. If set to true, the transaction will
// skip memtable write and ingest into the DB directly during Commit(). This
// makes Commit() much faster for transactions with many operations.
17 changes: 13 additions & 4 deletions utilities/transactions/pessimistic_transaction.cc
Original file line number Diff line number Diff line change
@@ -104,8 +104,14 @@ void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
read_timestamp_ = kMaxTxnTimestamp;
commit_timestamp_ = kMaxTxnTimestamp;

commit_bypass_memtable_ = txn_options.commit_bypass_memtable;
write_batch_.SetTrackPerCFStat(txn_options.commit_bypass_memtable);
if (txn_options.commit_bypass_memtable) {
commit_bypass_memtable_threshold_ = 0;
} else {
commit_bypass_memtable_threshold_ =
db_options.txn_commit_bypass_memtable_threshold;
}
write_batch_.SetTrackPerCFStat(commit_bypass_memtable_threshold_ <
std::numeric_limits<uint32_t>::max());
}

PessimisticTransaction::~PessimisticTransaction() {
@@ -846,7 +852,8 @@ Status WriteCommittedTxn::CommitInternal() {
if (!needs_ts) {
s = WriteBatchInternal::MarkCommit(working_batch, name_);
} else {
assert(!commit_bypass_memtable_);
assert(commit_bypass_memtable_threshold_ ==
std::numeric_limits<uint32_t>::max());
assert(commit_timestamp_ != kMaxTxnTimestamp);
char commit_ts_buf[sizeof(kMaxTxnTimestamp)];
EncodeFixed64(commit_ts_buf, commit_timestamp_);
@@ -882,7 +889,7 @@ Status WriteCommittedTxn::CommitInternal() {
// any operations appended to this working_batch will be ignored from WAL
working_batch->MarkWalTerminationPoint();

const bool bypass_memtable = commit_bypass_memtable_ && wb->Count() > 0;
bool bypass_memtable = wb->Count() > commit_bypass_memtable_threshold_;
if (!bypass_memtable) {
// insert prepared batch into Memtable only skipping WAL.
// Memtable will ignore BeginPrepare/EndPrepare markers
@@ -904,6 +911,8 @@ Status WriteCommittedTxn::CommitInternal() {
}
}
assert(log_number_ > 0);
TEST_SYNC_POINT_CALLBACK("WriteCommittedTxn::CommitInternal:bypass_memtable",
static_cast<void*>(&bypass_memtable));
if (bypass_memtable) {
s = db_impl_->WriteImpl(
write_options_, working_batch, /*callback*/ nullptr,
7 changes: 2 additions & 5 deletions utilities/transactions/pessimistic_transaction.h
Original file line number Diff line number Diff line change
@@ -168,7 +168,8 @@ class PessimisticTransaction : public TransactionBaseImpl {
bool skip_prepare_ = false;
// Refer to
// TransactionOptions::commit_bypass_memtable
bool commit_bypass_memtable_ = false;
uint32_t commit_bypass_memtable_threshold_ =
std::numeric_limits<uint32_t>::max();

private:
friend class TransactionTest_ValidateSnapshotTest_Test;
@@ -307,10 +308,6 @@ class WriteCommittedTxn : public PessimisticTransaction {
Status SetCommitTimestamp(TxnTimestamp ts) override;
TxnTimestamp GetCommitTimestamp() const override { return commit_timestamp_; }

bool GetCommitBypassMemTable() const override {
return commit_bypass_memtable_;
}

private:
template <typename TValue>
Status GetForUpdateImpl(const ReadOptions& read_options,
64 changes: 63 additions & 1 deletion utilities/transactions/transaction_test.cc
Original file line number Diff line number Diff line change
@@ -8830,7 +8830,8 @@ class CommitBypassMemtableTest : public DBTestBase,
Options options;
TransactionDBOptions txn_db_opts;

void SetUpTransactionDB() {
void SetUpTransactionDB(
uint32_t threshold = std::numeric_limits<uint32_t>::max()) {
options = CurrentOptions();
options.create_if_missing = true;
options.allow_2pc = true;
@@ -8842,6 +8843,7 @@ class CommitBypassMemtableTest : public DBTestBase,
Destroy(options, true);

txn_db_opts.write_policy = TxnDBWritePolicy::WRITE_COMMITTED;
txn_db_opts.txn_commit_bypass_memtable_threshold = threshold;
ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, &txn_db));
ASSERT_NE(txn_db, nullptr);
db_ = txn_db;
@@ -9297,6 +9299,66 @@ TEST_P(CommitBypassMemtableTest, Recovery) {
VerifyDBFromMap(expected);
}

TEST_P(CommitBypassMemtableTest, ThresholdTxnDBOption) {
// Tests TransactionDBOptions::txn_commit_bypass_memtable_threshold
const uint32_t threshold = 10;
SetUpTransactionDB(/*threshold=*/threshold);
bool commit_bypass_memtable = false;
// TODO: add and use stats for this
SyncPoint::GetInstance()->SetCallBack(
"WriteCommittedTxn::CommitInternal:bypass_memtable",
[&](void* arg) { commit_bypass_memtable = *(static_cast<bool*>(arg)); });
SyncPoint::GetInstance()->EnableProcessing();

// TransactionOptions::commit_bypass_memtable takes precedence
WriteOptions wopts;
TransactionOptions txn_opts;
txn_opts.commit_bypass_memtable = true;
Transaction* txn1 = txn_db->BeginTransaction(wopts, txn_opts, nullptr);
ASSERT_OK(txn1->SetName("xid1"));
ASSERT_OK(txn1->Put("k2", "v2"));
ASSERT_OK(txn1->Put("k1", "v1"));
ASSERT_OK(txn1->Prepare());
ASSERT_OK(txn1->Commit());
ASSERT_TRUE(commit_bypass_memtable);

// Below threshold
for (auto num_ops : {threshold, threshold + 1}) {
commit_bypass_memtable = false;
txn_opts.commit_bypass_memtable = false;
auto txn = txn_db->BeginTransaction(wopts, txn_opts, nullptr);
ASSERT_OK(txn->SetName("xid" + std::to_string(num_ops)));
for (uint32_t i = 0; i < num_ops; ++i) {
ASSERT_OK(
txn->Put("key" + std::to_string(i), "value" + std::to_string(i)));
}
ASSERT_OK(txn->Prepare());
ASSERT_OK(txn->Commit());
ASSERT_EQ(commit_bypass_memtable, num_ops > threshold);
delete txn;
}

// Repeat the same test with updates to two CFs
std::vector<std::string> cfs = {"pk", "sk"};
CreateColumnFamilies(cfs, options);

// Below threshold
for (auto num_ops : {threshold, threshold + 1}) {
commit_bypass_memtable = false;
txn_opts.commit_bypass_memtable = false;
auto txn_cf = txn_db->BeginTransaction(wopts, txn_opts, nullptr);
ASSERT_OK(txn_cf->SetName("xid_cf" + std::to_string(num_ops)));
for (uint32_t i = 0; i < num_ops; ++i) {
ASSERT_OK(txn_cf->Put(handles_[i % 2], "key" + std::to_string(i),
"value" + std::to_string(i)));
}
ASSERT_OK(txn_cf->Prepare());
ASSERT_OK(txn_cf->Commit());
ASSERT_EQ(commit_bypass_memtable, num_ops > threshold);
delete txn_cf;
}
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {