Skip to content

Commit

Permalink
new seqno assignment
Browse files Browse the repository at this point in the history
  • Loading branch information
cbi42 committed Feb 14, 2025
1 parent f6b2cdd commit ddd724e
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 161 deletions.
6 changes: 5 additions & 1 deletion include/rocksdb/utilities/write_batch_with_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ class WBWIIterator {

virtual void Prev() = 0;

virtual Status status() const = 0;

// The returned WriteEntry is only valid until the next mutation of
// WriteBatchWithIndex.
virtual WriteEntry Entry() const = 0;
Expand All @@ -85,7 +87,9 @@ class WBWIIterator {
// and it was overwritten by another update.
virtual bool HasOverWrittenSingleDel() const { return false; }

virtual Status status() const = 0;
// The number of times an update has issued for the current user key in this
// write batch up to this entry.
virtual uint32_t GetUpdateCount() const { return 0; }
};

// A WriteBatchWithIndex with a binary searchable index built for all the keys
Expand Down
4 changes: 4 additions & 0 deletions memtable/wbwi_memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ bool WBWIMemTable::Get(const LookupKey& key, std::string* value,

[[maybe_unused]] SequenceNumber read_seq =
GetInternalKeySeqno(key.internal_key());
// This is memtable is a single write batch, no snapshot can be taken within
// assigned seqnos for this memtable.
assert(read_seq >= assigned_seqno_.upper_bound ||
read_seq < assigned_seqno_.lower_bound);
std::unique_ptr<InternalIterator> iter{NewIterator()};
iter->Seek(key.internal_key());
const Slice lookup_user_key = key.user_key();
Expand Down
89 changes: 47 additions & 42 deletions memtable/wbwi_memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,27 @@ namespace ROCKSDB_NAMESPACE {
// a transaction (which is based on WBWI) into the DB as an immutable memtable.
//
// REQUIRE overwrite_key to be true for the WBWI
// Since the keys in WBWI do not have sequence number, the memtable needs to be
// Since the keys in WBWI do not have sequence number, this class is responsible
// for assigning sequence numbers to the keys. This memtable needs to be
// assigned a range of sequence numbers through AssignSequenceNumbers(seqno)
// before being available for reads.
// With overwrite_key = true, WBWI keeps track of the most recent update for
// each key, and each such key will be assigned seqno.upper_bound during reads.
// One exception is during flush, consider the following scenario:
//
// The sequence number assignment uses the update count for each key
// tracked in WBWI (see WBWIIterator::GetUpdateCount()). For each key, the
// sequence number assigned is seqno.lower_bound + update_count - 1. So more
// recent updates will have higher sequence number.
//
// There is a special case where this memtable needs to emit an extra
// SingleDelete even when the SD is overwritten by another update.
// Consider the following scenario:
// - WBWI has SD(k) then PUT(k, v1)
// - DB has PUT(k, v2) in L1
// - flush WBWI adds PUT(k, v1) into L0
// - live memtable contains SD(k)
// - flush live memtable and compact it with L0 will drop SD(k) and PUT(k, v1)
// - the PUT(k, v2) in L1 incorrectly becomes visible
// So during flush, iterator from this memtable will need emit overwritten
// single deletion. These single deletion entries will be
// assigned seqno.upper_bound - 1.
// single deletion. This SD will be assigned seqno.lower_bound.
class WBWIMemTable final : public ReadOnlyMemTable {
public:
struct SeqnoRange {
Expand Down Expand Up @@ -258,51 +264,40 @@ class WBWIMemTableIterator final : public InternalIterator {
}

void Seek(const Slice& target) override {
// `emit_overwritten_single_del_` is only used for flush, which does
// sequential forward scan from the beginning.
assert(!emit_overwritten_single_del_);
Slice target_user_key = ExtractUserKey(target);
// Moves to first update >= target_user_key
it_->Seek(target_user_key);
if (it_->Valid()) {
// compare seqno
SequenceNumber seqno = GetInternalKeySeqno(target);
assert(!emit_overwritten_single_del_);
// For now all keys are assigned seqno_ub_, this may change after merge
// is supported.
assert(seqno <= assigned_seqno_.lower_bound ||
seqno >= assigned_seqno_.upper_bound);
if (seqno < assigned_seqno_.upper_bound &&
comparator_->Compare(it_->Entry().key, target_user_key) == 0) {
it_->Next();
// TODO: cannot assume distinct keys once Merge is supported
if (it_->Valid()) {
assert(comparator_->Compare(it_->Entry().key, target_user_key) > 0);
}
}
SequenceNumber target_seqno = GetInternalKeySeqno(target);
// Compare seqno
while (it_->Valid() &&
comparator_->Compare(it_->Entry().key, target_user_key) == 0 &&
target_seqno < CurrentKeySeqno()) {
it_->Next();
}
UpdateKey();
}

void SeekForPrev(const Slice& target) override {
assert(!emit_overwritten_single_del_);
Slice target_user_key = ExtractUserKey(target);
// Moves to last update <= target_user_key
it_->SeekForPrev(target_user_key);
if (it_->Valid()) {
SequenceNumber seqno = GetInternalKeySeqno(target);
assert(seqno <= assigned_seqno_.lower_bound ||
seqno >= assigned_seqno_.upper_bound);
if (seqno > assigned_seqno_.upper_bound &&
comparator_->Compare(it_->Entry().key, target_user_key) == 0) {
it_->Prev();
if (it_->Valid()) {
// TODO: cannot assume distinct keys once Merge is supported
assert(comparator_->Compare(it_->Entry().key, target_user_key) < 0);
}
}
SequenceNumber target_seqno = GetInternalKeySeqno(target);
// Need to move to the first entry with seqno >= target_seqno for the same
// user key or a different user key.
// Compare seqno
while (it_->Valid() &&
comparator_->Compare(it_->Entry().key, target_user_key) == 0 &&
CurrentKeySeqno() < target_seqno) {
it_->Prev();
}
UpdateKey();
}

void Next() override {
// Only need to emit single deletion during flush. Since Flush does
// sequential forward scan, we only need to emit single deletion in Next(),
// and do not need to consider iterator direction change.
assert(Valid());
if (emit_overwritten_single_del_) {
if (it_->HasOverWrittenSingleDel() && !at_overwritten_single_del_) {
Expand All @@ -329,6 +324,7 @@ class WBWIMemTableIterator final : public InternalIterator {
}

void Prev() override {
assert(!emit_overwritten_single_del_);
assert(Valid());
it_->Prev();
UpdateKey();
Expand All @@ -341,7 +337,6 @@ class WBWIMemTableIterator final : public InternalIterator {

Slice value() const override {
assert(Valid());
// TODO: it_->Entry() is not trivial, cache it
return it_->Entry().value;
}

Expand All @@ -355,6 +350,16 @@ class WBWIMemTableIterator final : public InternalIterator {
private:
static const std::unordered_map<WriteType, ValueType> WriteTypeToValueTypeMap;

SequenceNumber CurrentKeySeqno() {
assert(it_->Valid());
assert(it_->GetUpdateCount() >= 1);
auto seq = assigned_seqno_.lower_bound + it_->GetUpdateCount() - 1;
assert(seq <= assigned_seqno_.upper_bound);
return seq;
}

// If it_ is valid, udate key_ to an internal key containing it_ current
// key, CurrentKeySeqno() and a type corresponding to it_ current entry type.
void UpdateKey() {
valid_ = it_->Valid();
if (!Valid()) {
Expand All @@ -370,16 +375,16 @@ class WBWIMemTableIterator final : public InternalIterator {
std::to_string(it_->Entry().type));
return;
}
key_buf_.SetInternalKey(it_->Entry().key, assigned_seqno_.upper_bound,
t->second);
key_buf_.SetInternalKey(it_->Entry().key, CurrentKeySeqno(), t->second);
key_ = key_buf_.GetInternalKey();
}

void UpdateSingleDeleteKey() {
assert(it_->Valid());
assert(Valid());
assert(assigned_seqno_.lower_bound < assigned_seqno_.upper_bound);
key_buf_.SetInternalKey(it_->Entry().key, assigned_seqno_.upper_bound - 1,
// The key that overwrites this SingleDelete will be assigned at least
// seqno lower_bound + 1 (see CurrentKeySeqno()).
key_buf_.SetInternalKey(it_->Entry().key, assigned_seqno_.lower_bound,
kTypeSingleDeletion);
key_ = key_buf_.GetInternalKey();
at_overwritten_single_del_ = true;
Expand Down
Loading

0 comments on commit ddd724e

Please sign in to comment.