Skip to content

Commit

Permalink
Add a new TransactionDBOptions txn_commit_bypass_memtable_threshold (
Browse files Browse the repository at this point in the history
…#13304)

Summary:
... to makes it easier to use the new transaction feature `commit_bypass_memtable`. Instead of needing to specify the option when creating a transaction, this option allows users to specify a threshold on the number of updates in a transaction to determine when to skip memtables writes for a transaction.

Pull Request resolved: #13304

Test Plan: a new unit test for the new option

Reviewed By: pdillinger

Differential Revision: D68288579

Pulled By: cbi42

fbshipit-source-id: d3076629891d8b1d427878d20f0ac40dc0dadd35
  • Loading branch information
cbi42 authored and facebook-github-bot committed Jan 21, 2025
1 parent 5405835 commit f6c1489
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 13 deletions.
2 changes: 0 additions & 2 deletions include/rocksdb/utilities/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -750,8 +750,6 @@ class Transaction {

virtual TxnTimestamp GetCommitTimestamp() const { return kMaxTxnTimestamp; }

virtual bool GetCommitBypassMemTable() const { return false; }

protected:
explicit Transaction(const TransactionDB* /*db*/) {}
Transaction() : log_number_(0), txn_state_(STARTED) {}
Expand Down
12 changes: 11 additions & 1 deletion include/rocksdb/utilities/transaction_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,16 @@ struct TransactionDBOptions {
// for more details.
std::vector<std::shared_ptr<SecondaryIndex>> secondary_indices;

// EXPERIMENTAL, SUBJECT TO CHANGE
// This option is only valid for write committed. If the number of updates in
// a transaction exceeds this threshold, then the transaction commit will skip
// insertions into memtable as an optimization to reduce commit latency.
// See comment for TransactionOptions::commit_bypass_memtable for more detail.
// Setting TransactionOptions::commit_bypass_memtable to true takes precedence
// over this option.
uint32_t txn_commit_bypass_memtable_threshold =
std::numeric_limits<uint32_t>::max();

private:
// 128 entries
// Should the default value change, please also update wp_snapshot_cache_bits
Expand Down Expand Up @@ -347,7 +357,7 @@ struct TransactionOptions {
// DeleteRange, SingleDelete.
bool write_batch_track_timestamp_size = false;

// EXPERIMENTAL
// EXPERIMENTAL, SUBJECT TO CHANGE
// Only supports write-committed policy. If set to true, the transaction will
// skip memtable write and ingest into the DB directly during Commit(). This
// makes Commit() much faster for transactions with many operations.
Expand Down
17 changes: 13 additions & 4 deletions utilities/transactions/pessimistic_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,14 @@ void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
read_timestamp_ = kMaxTxnTimestamp;
commit_timestamp_ = kMaxTxnTimestamp;

commit_bypass_memtable_ = txn_options.commit_bypass_memtable;
write_batch_.SetTrackPerCFStat(txn_options.commit_bypass_memtable);
if (txn_options.commit_bypass_memtable) {
commit_bypass_memtable_threshold_ = 0;
} else {
commit_bypass_memtable_threshold_ =
db_options.txn_commit_bypass_memtable_threshold;
}
write_batch_.SetTrackPerCFStat(commit_bypass_memtable_threshold_ <
std::numeric_limits<uint32_t>::max());
}

PessimisticTransaction::~PessimisticTransaction() {
Expand Down Expand Up @@ -846,7 +852,8 @@ Status WriteCommittedTxn::CommitInternal() {
if (!needs_ts) {
s = WriteBatchInternal::MarkCommit(working_batch, name_);
} else {
assert(!commit_bypass_memtable_);
assert(commit_bypass_memtable_threshold_ ==
std::numeric_limits<uint32_t>::max());
assert(commit_timestamp_ != kMaxTxnTimestamp);
char commit_ts_buf[sizeof(kMaxTxnTimestamp)];
EncodeFixed64(commit_ts_buf, commit_timestamp_);
Expand Down Expand Up @@ -882,7 +889,7 @@ Status WriteCommittedTxn::CommitInternal() {
// any operations appended to this working_batch will be ignored from WAL
working_batch->MarkWalTerminationPoint();

const bool bypass_memtable = commit_bypass_memtable_ && wb->Count() > 0;
bool bypass_memtable = wb->Count() > commit_bypass_memtable_threshold_;
if (!bypass_memtable) {
// insert prepared batch into Memtable only skipping WAL.
// Memtable will ignore BeginPrepare/EndPrepare markers
Expand All @@ -904,6 +911,8 @@ Status WriteCommittedTxn::CommitInternal() {
}
}
assert(log_number_ > 0);
TEST_SYNC_POINT_CALLBACK("WriteCommittedTxn::CommitInternal:bypass_memtable",
static_cast<void*>(&bypass_memtable));
if (bypass_memtable) {
s = db_impl_->WriteImpl(
write_options_, working_batch, /*callback*/ nullptr,
Expand Down
7 changes: 2 additions & 5 deletions utilities/transactions/pessimistic_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ class PessimisticTransaction : public TransactionBaseImpl {
bool skip_prepare_ = false;
// Refer to
// TransactionOptions::commit_bypass_memtable
bool commit_bypass_memtable_ = false;
uint32_t commit_bypass_memtable_threshold_ =
std::numeric_limits<uint32_t>::max();

private:
friend class TransactionTest_ValidateSnapshotTest_Test;
Expand Down Expand Up @@ -307,10 +308,6 @@ class WriteCommittedTxn : public PessimisticTransaction {
Status SetCommitTimestamp(TxnTimestamp ts) override;
TxnTimestamp GetCommitTimestamp() const override { return commit_timestamp_; }

bool GetCommitBypassMemTable() const override {
return commit_bypass_memtable_;
}

private:
template <typename TValue>
Status GetForUpdateImpl(const ReadOptions& read_options,
Expand Down
65 changes: 64 additions & 1 deletion utilities/transactions/transaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8830,7 +8830,8 @@ class CommitBypassMemtableTest : public DBTestBase,
Options options;
TransactionDBOptions txn_db_opts;

void SetUpTransactionDB() {
void SetUpTransactionDB(
uint32_t threshold = std::numeric_limits<uint32_t>::max()) {
options = CurrentOptions();
options.create_if_missing = true;
options.allow_2pc = true;
Expand All @@ -8842,6 +8843,7 @@ class CommitBypassMemtableTest : public DBTestBase,
Destroy(options, true);

txn_db_opts.write_policy = TxnDBWritePolicy::WRITE_COMMITTED;
txn_db_opts.txn_commit_bypass_memtable_threshold = threshold;
ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, &txn_db));
ASSERT_NE(txn_db, nullptr);
db_ = txn_db;
Expand Down Expand Up @@ -9297,6 +9299,67 @@ TEST_P(CommitBypassMemtableTest, Recovery) {
VerifyDBFromMap(expected);
}

TEST_P(CommitBypassMemtableTest, ThresholdTxnDBOption) {
// Tests TransactionDBOptions::txn_commit_bypass_memtable_threshold
const uint32_t threshold = 10;
SetUpTransactionDB(/*threshold=*/threshold);
bool commit_bypass_memtable = false;
// TODO: add and use stats for this
SyncPoint::GetInstance()->SetCallBack(
"WriteCommittedTxn::CommitInternal:bypass_memtable",
[&](void* arg) { commit_bypass_memtable = *(static_cast<bool*>(arg)); });
SyncPoint::GetInstance()->EnableProcessing();

// TransactionOptions::commit_bypass_memtable takes precedence
WriteOptions wopts;
TransactionOptions txn_opts;
txn_opts.commit_bypass_memtable = true;
Transaction* txn1 = txn_db->BeginTransaction(wopts, txn_opts, nullptr);
ASSERT_OK(txn1->SetName("xid1"));
ASSERT_OK(txn1->Put("k2", "v2"));
ASSERT_OK(txn1->Put("k1", "v1"));
ASSERT_OK(txn1->Prepare());
ASSERT_OK(txn1->Commit());
ASSERT_TRUE(commit_bypass_memtable);

// Below threshold
for (auto num_ops : {threshold, threshold + 1}) {
commit_bypass_memtable = false;
txn_opts.commit_bypass_memtable = false;
auto txn = txn_db->BeginTransaction(wopts, txn_opts, txn1);
txn1 = nullptr;
ASSERT_OK(txn->SetName("xid" + std::to_string(num_ops)));
for (uint32_t i = 0; i < num_ops; ++i) {
ASSERT_OK(
txn->Put("key" + std::to_string(i), "value" + std::to_string(i)));
}
ASSERT_OK(txn->Prepare());
ASSERT_OK(txn->Commit());
ASSERT_EQ(commit_bypass_memtable, num_ops > threshold);
delete txn;
}

// Repeat the same test with updates to two CFs
std::vector<std::string> cfs = {"pk", "sk"};
CreateColumnFamilies(cfs, options);

// Below threshold
for (auto num_ops : {threshold, threshold + 1}) {
commit_bypass_memtable = false;
txn_opts.commit_bypass_memtable = false;
auto txn_cf = txn_db->BeginTransaction(wopts, txn_opts, nullptr);
ASSERT_OK(txn_cf->SetName("xid_cf" + std::to_string(num_ops)));
for (uint32_t i = 0; i < num_ops; ++i) {
ASSERT_OK(txn_cf->Put(handles_[i % 2], "key" + std::to_string(i),
"value" + std::to_string(i)));
}
ASSERT_OK(txn_cf->Prepare());
ASSERT_OK(txn_cf->Commit());
ASSERT_EQ(commit_bypass_memtable, num_ops > threshold);
delete txn_cf;
}
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down

0 comments on commit f6c1489

Please sign in to comment.