Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new TransactionDBOptions txn_commit_bypass_memtable_threshold #13304

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions include/rocksdb/utilities/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -750,8 +750,6 @@ class Transaction {

virtual TxnTimestamp GetCommitTimestamp() const { return kMaxTxnTimestamp; }

virtual bool GetCommitBypassMemTable() const { return false; }

protected:
explicit Transaction(const TransactionDB* /*db*/) {}
Transaction() : log_number_(0), txn_state_(STARTED) {}
Expand Down
12 changes: 11 additions & 1 deletion include/rocksdb/utilities/transaction_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,16 @@ struct TransactionDBOptions {
// for more details.
std::vector<std::shared_ptr<SecondaryIndex>> secondary_indices;

// EXPERIMENTAL, SUBJECT TO CHANGE
// This option is only valid for write committed. If the number of updates in
// a transaction exceeds this threshold, then the transaction commit will skip
// insertions into memtable as an optimization to reduce commit latency.
// See comment for TransactionOptions::commit_bypass_memtable for more detail.
// Setting TransactionOptions::commit_bypass_memtable to true takes precedence
// over this option.
uint32_t txn_commit_bypass_memtable_threshold =
std::numeric_limits<uint32_t>::max();

private:
// 128 entries
// Should the default value change, please also update wp_snapshot_cache_bits
Expand Down Expand Up @@ -347,7 +357,7 @@ struct TransactionOptions {
// DeleteRange, SingleDelete.
bool write_batch_track_timestamp_size = false;

// EXPERIMENTAL
// EXPERIMENTAL, SUBJECT TO CHANGE
// Only supports write-committed policy. If set to true, the transaction will
// skip memtable write and ingest into the DB directly during Commit(). This
// makes Commit() much faster for transactions with many operations.
Expand Down
17 changes: 13 additions & 4 deletions utilities/transactions/pessimistic_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,14 @@ void PessimisticTransaction::Initialize(const TransactionOptions& txn_options) {
read_timestamp_ = kMaxTxnTimestamp;
commit_timestamp_ = kMaxTxnTimestamp;

commit_bypass_memtable_ = txn_options.commit_bypass_memtable;
write_batch_.SetTrackPerCFStat(txn_options.commit_bypass_memtable);
if (txn_options.commit_bypass_memtable) {
commit_bypass_memtable_threshold_ = 0;
} else {
commit_bypass_memtable_threshold_ =
db_options.txn_commit_bypass_memtable_threshold;
}
write_batch_.SetTrackPerCFStat(commit_bypass_memtable_threshold_ <
std::numeric_limits<uint32_t>::max());
}

PessimisticTransaction::~PessimisticTransaction() {
Expand Down Expand Up @@ -846,7 +852,8 @@ Status WriteCommittedTxn::CommitInternal() {
if (!needs_ts) {
s = WriteBatchInternal::MarkCommit(working_batch, name_);
} else {
assert(!commit_bypass_memtable_);
assert(commit_bypass_memtable_threshold_ ==
std::numeric_limits<uint32_t>::max());
assert(commit_timestamp_ != kMaxTxnTimestamp);
char commit_ts_buf[sizeof(kMaxTxnTimestamp)];
EncodeFixed64(commit_ts_buf, commit_timestamp_);
Expand Down Expand Up @@ -882,7 +889,7 @@ Status WriteCommittedTxn::CommitInternal() {
// any operations appended to this working_batch will be ignored from WAL
working_batch->MarkWalTerminationPoint();

const bool bypass_memtable = commit_bypass_memtable_ && wb->Count() > 0;
bool bypass_memtable = wb->Count() > commit_bypass_memtable_threshold_;
if (!bypass_memtable) {
// insert prepared batch into Memtable only skipping WAL.
// Memtable will ignore BeginPrepare/EndPrepare markers
Expand All @@ -904,6 +911,8 @@ Status WriteCommittedTxn::CommitInternal() {
}
}
assert(log_number_ > 0);
TEST_SYNC_POINT_CALLBACK("WriteCommittedTxn::CommitInternal:bypass_memtable",
static_cast<void*>(&bypass_memtable));
if (bypass_memtable) {
s = db_impl_->WriteImpl(
write_options_, working_batch, /*callback*/ nullptr,
Expand Down
7 changes: 2 additions & 5 deletions utilities/transactions/pessimistic_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ class PessimisticTransaction : public TransactionBaseImpl {
bool skip_prepare_ = false;
// Refer to
// TransactionOptions::commit_bypass_memtable
bool commit_bypass_memtable_ = false;
uint32_t commit_bypass_memtable_threshold_ =
std::numeric_limits<uint32_t>::max();

private:
friend class TransactionTest_ValidateSnapshotTest_Test;
Expand Down Expand Up @@ -307,10 +308,6 @@ class WriteCommittedTxn : public PessimisticTransaction {
Status SetCommitTimestamp(TxnTimestamp ts) override;
TxnTimestamp GetCommitTimestamp() const override { return commit_timestamp_; }

bool GetCommitBypassMemTable() const override {
return commit_bypass_memtable_;
}

private:
template <typename TValue>
Status GetForUpdateImpl(const ReadOptions& read_options,
Expand Down
65 changes: 64 additions & 1 deletion utilities/transactions/transaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8830,7 +8830,8 @@ class CommitBypassMemtableTest : public DBTestBase,
Options options;
TransactionDBOptions txn_db_opts;

void SetUpTransactionDB() {
void SetUpTransactionDB(
uint32_t threshold = std::numeric_limits<uint32_t>::max()) {
options = CurrentOptions();
options.create_if_missing = true;
options.allow_2pc = true;
Expand All @@ -8842,6 +8843,7 @@ class CommitBypassMemtableTest : public DBTestBase,
Destroy(options, true);

txn_db_opts.write_policy = TxnDBWritePolicy::WRITE_COMMITTED;
txn_db_opts.txn_commit_bypass_memtable_threshold = threshold;
ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, &txn_db));
ASSERT_NE(txn_db, nullptr);
db_ = txn_db;
Expand Down Expand Up @@ -9297,6 +9299,67 @@ TEST_P(CommitBypassMemtableTest, Recovery) {
VerifyDBFromMap(expected);
}

TEST_P(CommitBypassMemtableTest, ThresholdTxnDBOption) {
// Tests TransactionDBOptions::txn_commit_bypass_memtable_threshold
const uint32_t threshold = 10;
SetUpTransactionDB(/*threshold=*/threshold);
bool commit_bypass_memtable = false;
// TODO: add and use stats for this
SyncPoint::GetInstance()->SetCallBack(
"WriteCommittedTxn::CommitInternal:bypass_memtable",
[&](void* arg) { commit_bypass_memtable = *(static_cast<bool*>(arg)); });
SyncPoint::GetInstance()->EnableProcessing();

// TransactionOptions::commit_bypass_memtable takes precedence
WriteOptions wopts;
TransactionOptions txn_opts;
txn_opts.commit_bypass_memtable = true;
Transaction* txn1 = txn_db->BeginTransaction(wopts, txn_opts, nullptr);
ASSERT_OK(txn1->SetName("xid1"));
ASSERT_OK(txn1->Put("k2", "v2"));
ASSERT_OK(txn1->Put("k1", "v1"));
ASSERT_OK(txn1->Prepare());
ASSERT_OK(txn1->Commit());
ASSERT_TRUE(commit_bypass_memtable);

// Below threshold
for (auto num_ops : {threshold, threshold + 1}) {
commit_bypass_memtable = false;
txn_opts.commit_bypass_memtable = false;
auto txn = txn_db->BeginTransaction(wopts, txn_opts, txn1);
txn1 = nullptr;
ASSERT_OK(txn->SetName("xid" + std::to_string(num_ops)));
for (uint32_t i = 0; i < num_ops; ++i) {
ASSERT_OK(
txn->Put("key" + std::to_string(i), "value" + std::to_string(i)));
}
ASSERT_OK(txn->Prepare());
ASSERT_OK(txn->Commit());
ASSERT_EQ(commit_bypass_memtable, num_ops > threshold);
delete txn;
}

// Repeat the same test with updates to two CFs
std::vector<std::string> cfs = {"pk", "sk"};
CreateColumnFamilies(cfs, options);

// Below threshold
for (auto num_ops : {threshold, threshold + 1}) {
commit_bypass_memtable = false;
txn_opts.commit_bypass_memtable = false;
auto txn_cf = txn_db->BeginTransaction(wopts, txn_opts, nullptr);
ASSERT_OK(txn_cf->SetName("xid_cf" + std::to_string(num_ops)));
for (uint32_t i = 0; i < num_ops; ++i) {
ASSERT_OK(txn_cf->Put(handles_[i % 2], "key" + std::to_string(i),
"value" + std::to_string(i)));
}
ASSERT_OK(txn_cf->Prepare());
ASSERT_OK(txn_cf->Commit());
ASSERT_EQ(commit_bypass_memtable, num_ops > threshold);
delete txn_cf;
}
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
Loading