Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify values in secondary database against expected state #13281

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
10 changes: 5 additions & 5 deletions db_stress_tool/cf_consistency_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1040,17 +1040,17 @@ class CfConsistencyStressTest : public StressTest {
assert(thread);
Status status;

DB* db_ptr = cmp_db_ ? cmp_db_ : db_;
const auto& cfhs = cmp_db_ ? cmp_cfhs_ : column_families_;
DB* db_ptr = secondary_db_ ? secondary_db_ : db_;
const auto& cfhs = secondary_db_ ? secondary_cfhs_ : column_families_;

// Take a snapshot to preserve the state of primary db.
ManagedSnapshot snapshot_guard(db_);

SharedState* shared = thread->shared;
assert(shared);

if (cmp_db_) {
status = cmp_db_->TryCatchUpWithPrimary();
if (secondary_db_) {
status = secondary_db_->TryCatchUpWithPrimary();
if (!status.ok()) {
fprintf(stderr, "TryCatchUpWithPrimary: %s\n",
status.ToString().c_str());
Expand Down Expand Up @@ -1083,7 +1083,7 @@ class CfConsistencyStressTest : public StressTest {
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropts(FLAGS_verify_checksum, true);
ropts.total_order_seek = true;
if (nullptr == cmp_db_) {
if (nullptr == secondary_db_) {
ropts.snapshot = snapshot_guard.snapshot();
}
uint32_t crc = 0;
Expand Down
13 changes: 7 additions & 6 deletions db_stress_tool/db_stress_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ StressTest::StressTest()
new_column_family_name_(1),
num_times_reopened_(0),
db_preload_finished_(false),
cmp_db_(nullptr),
secondary_db_(nullptr),
is_db_stopped_(false) {
if (FLAGS_destroy_db_initially) {
std::vector<std::string> files;
Expand Down Expand Up @@ -114,11 +114,11 @@ void StressTest::CleanUp() {
delete db_;
db_ = nullptr;

for (auto* cf : cmp_cfhs_) {
for (auto* cf : secondary_cfhs_) {
delete cf;
}
cmp_cfhs_.clear();
delete cmp_db_;
secondary_cfhs_.clear();
delete secondary_db_;
}

std::shared_ptr<Cache> StressTest::NewCache(size_t capacity,
Expand Down Expand Up @@ -3696,9 +3696,10 @@ void StressTest::Open(SharedState* shared, bool reopen) {
tmp_opts.env = db_stress_env;
const std::string& secondary_path = FLAGS_secondaries_base;
s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
cf_descriptors, &cmp_cfhs_, &cmp_db_);
cf_descriptors, &secondary_cfhs_, &secondary_db_);
assert(s.ok());
assert(cmp_cfhs_.size() == static_cast<size_t>(FLAGS_column_families));
assert(secondary_cfhs_.size() ==
static_cast<size_t>(FLAGS_column_families));
}
} else {
DBWithTTL* db_with_ttl;
Expand Down
5 changes: 2 additions & 3 deletions db_stress_tool/db_stress_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -412,9 +412,8 @@ class StressTest {
std::atomic<bool> db_preload_finished_;
std::shared_ptr<SstQueryFilterConfigsManager::Factory> sqfc_factory_;

// Fields used for continuous verification from another thread
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the original others intended for cmp_db_ to potentially be used for other purposes, but right now the only usages are for opening secondary databases. So I think we can improve the naming

DB* cmp_db_;
std::vector<ColumnFamilyHandle*> cmp_cfhs_;
DB* secondary_db_;
std::vector<ColumnFamilyHandle*> secondary_cfhs_;
bool is_db_stopped_;
};

Expand Down
149 changes: 140 additions & 9 deletions db_stress_tool/no_batched_ops_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,49 @@ class NonBatchedOpsStressTest : public StressTest {
from_db.data(), from_db.size());
}
}

if (secondary_db_) {
assert(secondary_cfhs_.size() == column_families_.size());
// We are going to read in the expected values before catching the
// secondary up to the primary. This sets the lower bound of the
// acceptable values that can be returned from the secondary. After
// each Get() to the secondary, we are going to read in the expected
// value again to determine the upper bound. As long as the returned
// value from Get() is within these bounds, we consider that okay. The
// lower bound will always be moving forwards anyways as
// TryCatchUpWithPrimary() gets called.
std::vector<ExpectedValue> pre_read_expected_values;
for (int64_t i = start; i < end; ++i) {
pre_read_expected_values.push_back(
shared->Get(static_cast<int>(cf), i));
}

Status s = secondary_db_->TryCatchUpWithPrimary();
if (!s.ok()) {
VerificationAbort(shared,
"Secondary failed to catch up to the primary");
}

for (int64_t i = start; i < end; ++i) {
if (thread->shared->HasVerificationFailedYet()) {
break;
}

const std::string key = Key(i);
std::string from_db;

s = secondary_db_->Get(options, column_families_[cf], key,
&from_db);

assert(!pre_read_expected_values.empty() &&
Copy link
Contributor Author

@archang19 archang19 Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was to get our internal code linter to stop complaining about the vector index access

static_cast<size_t>(i - start) <
pre_read_expected_values.size());
VerifyValueRange(static_cast<int>(cf), i, options, shared, from_db,
/* msg_prefix */ "Secondary get verification", s,
pre_read_expected_values[i - start]);
}
}

} else if (method == VerificationMethod::kGetEntity) {
for (int64_t i = start; i < end; ++i) {
if (thread->shared->HasVerificationFailedYet()) {
Expand Down Expand Up @@ -333,12 +376,20 @@ class NonBatchedOpsStressTest : public StressTest {
}

void ContinuouslyVerifyDb(ThreadState* thread) const override {
if (!cmp_db_) {
// Currently this method gets called even when
// FLAGS_continuous_verification_interval == 0 as long as
// FLAGS_verify_db_one_in > 0. Previously, this was not causing a problem in
// the crash tests since test_secondary was always equal to 0, and thus we
// returned early from this method. When test_secondary is set and we have a
// secondary_db_, the crash test fails during this iterator scan. The stack
// trace mentions BlobReader/BlobSource but it may not necessarily be
// related to BlobDB
if (!secondary_db_ || !FLAGS_continuous_verification_interval) {
return;
}
assert(cmp_db_);
assert(!cmp_cfhs_.empty());
Status s = cmp_db_->TryCatchUpWithPrimary();
assert(secondary_db_);
assert(!secondary_cfhs_.empty());
Status s = secondary_db_->TryCatchUpWithPrimary();
if (!s.ok()) {
assert(false);
exit(1);
Expand Down Expand Up @@ -372,7 +423,7 @@ class NonBatchedOpsStressTest : public StressTest {

{
uint32_t crc = 0;
std::unique_ptr<Iterator> it(cmp_db_->NewIterator(read_opts));
std::unique_ptr<Iterator> it(secondary_db_->NewIterator(read_opts));
s = checksum_column_family(it.get(), &crc);
if (!s.ok()) {
fprintf(stderr, "Computing checksum of default cf: %s\n",
Expand All @@ -381,19 +432,21 @@ class NonBatchedOpsStressTest : public StressTest {
}
}

for (auto* handle : cmp_cfhs_) {
for (auto* handle : secondary_cfhs_) {
if (thread->rand.OneInOpt(3)) {
// Use Get()
uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
std::string key_str = Key(key);
std::string value;
std::string key_ts;
s = cmp_db_->Get(read_opts, handle, key_str, &value,
FLAGS_user_timestamp_size > 0 ? &key_ts : nullptr);
s = secondary_db_->Get(
read_opts, handle, key_str, &value,
FLAGS_user_timestamp_size > 0 ? &key_ts : nullptr);
s.PermitUncheckedError();
} else {
// Use range scan
std::unique_ptr<Iterator> iter(cmp_db_->NewIterator(read_opts, handle));
std::unique_ptr<Iterator> iter(
secondary_db_->NewIterator(read_opts, handle));
uint32_t rnd = (thread->rand.Next()) % 4;
if (0 == rnd) {
// SeekToFirst() + Next()*5
Expand Down Expand Up @@ -2810,6 +2863,84 @@ class NonBatchedOpsStressTest : public StressTest {
return true;
}

// Compared to VerifyOrSyncValue, VerifyValueRange takes in a
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about adding this functionality into VerifyOrSyncValue but that would make the method signature and the implementation even more complicated

// pre_read_expected_value to determine the lower bound of acceptable values.
// Anything from the pre_read_expected_value to the post_read_expected_value
// is considered acceptable. VerifyValueRange does not perform the initial
// "sync" step and does not compare the exact data/lengths for the values.
// This verification is suitable for verifying secondary or follower databases
bool VerifyValueRange(int cf, int64_t key, const ReadOptions& opts,
SharedState* shared, const std::string& value_from_db,
const std::string& msg_prefix, const Status& s,
const ExpectedValue& pre_read_expected_value) const {
if (shared->HasVerificationFailedYet()) {
return false;
}
const ExpectedValue post_read_expected_value = shared->Get(cf, key);
char expected_value_data[kValueMaxLen];
size_t expected_value_data_size =
GenerateValue(post_read_expected_value.GetValueBase(),
expected_value_data, sizeof(expected_value_data));

std::ostringstream read_u64ts;
if (opts.timestamp) {
read_u64ts << " while read with timestamp: ";
uint64_t read_ts;
if (DecodeU64Ts(*opts.timestamp, &read_ts).ok()) {
read_u64ts << std::to_string(read_ts) << ", ";
} else {
read_u64ts << s.ToString()
<< " Encoded read timestamp: " << opts.timestamp->ToString()
<< ", ";
}
}

// Compare value_from_db with the range of possible values from
// pre_read_expected_value to post_read_expected_value
if (s.ok()) {
const Slice slice(value_from_db);
const uint32_t value_base_from_db = GetValueBase(slice);
if (ExpectedValueHelper::MustHaveNotExisted(pre_read_expected_value,
post_read_expected_value)) {
VerificationAbort(shared,
msg_prefix +
": Unexpected value found that should not exist" +
read_u64ts.str(),
cf, key, value_from_db, "");
return false;
}
if (!ExpectedValueHelper::InExpectedValueBaseRange(
value_base_from_db, pre_read_expected_value,
post_read_expected_value)) {
VerificationAbort(
shared,
msg_prefix +
": Unexpected value found outside of the value base range" +
read_u64ts.str(),
cf, key, value_from_db,
Slice(expected_value_data, expected_value_data_size));
return false;
}
} else if (s.IsNotFound()) {
if (ExpectedValueHelper::MustHaveExisted(pre_read_expected_value,
post_read_expected_value)) {
VerificationAbort(shared,
msg_prefix + ": Value not found which should exist" +
read_u64ts.str() + s.ToString(),
cf, key, "",
Slice(expected_value_data, expected_value_data_size));
return false;
}
} else {
VerificationAbort(
shared,
msg_prefix + "Non-OK status" + read_u64ts.str() + s.ToString(), cf,
key, "", Slice(expected_value_data, expected_value_data_size));
return false;
}
return true;
}

void PrepareTxnDbOptions(SharedState* shared,
TransactionDBOptions& txn_db_opts) override {
txn_db_opts.rollback_deletion_type_callback =
Expand Down
4 changes: 4 additions & 0 deletions tools/db_crashtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ def is_direct_io_supported(dbname):
"write_buffer_size": 32 * 1024 * 1024,
"level_compaction_dynamic_level_bytes": lambda: random.randint(0, 1),
"paranoid_file_checks": lambda: random.choice([0, 1, 1, 1]),
"test_secondary": lambda: random.choice([0, 1]),
}

blackbox_simple_default_params = {
Expand Down Expand Up @@ -1022,6 +1023,9 @@ def finalize_and_sanitize(src_params):
if dest_params.get("track_and_verify_wals", 0) == 1:
dest_params["metadata_write_fault_one_in"] = 0
dest_params["write_fault_one_in"] = 0
# Continuous verification fails with secondaries inside NonBatchedOpsStressTest
if dest_params.get("test_secondary") == 1:
dest_params["continuous_verification_interval"] = 0
return dest_params


Expand Down
Loading