Skip to content

Commit a8bd6a3

Browse files
archang19facebook-github-bot
authored andcommitted
Verify values in secondary database against expected state (#13281)
Summary: TLDR: This PR enables secondary DB verification inside the "simple" crash tests (`NonBatchedOpsStressTest`). Essentially, we want to be able to verify that the secondary is a valid "prefix" of the primary. This PR allows us to do this by piggybacking on the existing verification of the primary through `Get()` requests. I originally proposed replaying the trace file to recreate the `ExpectedState` as of a specific sequence number. This could be used to run verifications against the secondary database. I did some experimenting in #13266 and got a "mostly working" implementation of this approach. I could sometimes get through entire key space verifications but eventually one of the keys would fail verification. I have not figured out the root cause yet, but I assume that something caused the sequence number to trace record alignment to break. The approach in this PR is considerably simpler. We can just check that the secondary database's value is in the correct "range," which we already have functionality for checking that. Compared to the approach in #13266, this approach is _much, much simpler_ since we do not have to go through the whole headache of replaying the trace and creating an entire new `ExpectedState`. (Look at #13266 to see how much of a mess that creates.) I think this approach is better than my original approach in almost most aspects: it's faster, uses less space, and has less room for implementation errors. Other nice aspects of this approach: 1. We don't need to block the primary. (Another approach you could imagine would be to block writes to the primary, have the secondary catch up, do the whole verification, and then re-enable writes to the primary.) 2. We don't need to block the secondary or do any special coordination (locks, sync points, etc). (If we insist on one "golden" expected value to be read from the secondary, then we need to make sure that another thread does not call `TryCatchUpWithPrimary` while we are trying to perform a `Get()`) 3. More "realistic" usage of the secondary. For instance, writes to the primary and secondary would continue on in production while we try to read from the secondary. The main drawback of course is that we verify against a range of expected values, rather than one particular expected value. However, I think this is acceptable and "good enough" especially with all of other the aforementioned benefits. Historical context: There is some very old code that attempted to verify secondaries, but is not enabled. This code has not been touched or executed in an extremely long time, and the crash tests started failing when I tried enabling it, most likely because the code is not compatible with certain other crash test options. This code is for the "continuous verification" and involves long iterator scans over the secondary database. Some of the code involved the cross CF consistency test type. I don't think the old checks are what we really want for our purposes of verifying the secondary functionality. Since I don't think we will get much value out of this old "continuous verification" code, I integrated my secondary verification with the "regular" database verification. This also makes the rollout simpler on my end, since I can control whether my secondary verifications are enabled through one `test_secondary` configuration. To make sure the old code does not execute for our recurring crash test runs, I had to enforce that `continuous_verification_interval` is 0 whenever `test_secondary` is set. Monitoring: I will want to monitor the Sandcastle "simple" runs for failures where `test_secondary` is set. All of my error messages are prefixed with "Secondary" so it should be easy to tell if this PR causes any crash test issues. Future work: 1. Extend this to followers. I think the same verification method should work, so most of the code from this PR should be reusable 2. Add additional checks to make sure the sequence number of the follower/secondary is actually increasing. For instance, if the primary's sequence number has advanced, and in that period the secondary has not (even after calling `TryCatchUpWithPrimary`), then we know there is a problem 3. Potentially checking things other than `Get()` for the secondary (i.e. iterators). I think the focus here should be testing replication-specific logic, and since we will already have separate unit tests, we do not need to repeat all of tests against both the primary and the secondary. Pull Request resolved: #13281 Test Plan: The primary crash test commands I ran were: ``` python3 tools/db_crashtest.py --simple blackbox --test_secondary=1 python3 tools/db_crashtest.py --simple whitebox --test_secondary=1 ``` As a sanity check, I added an `assert(false)` right after my secondary verification code to make sure that my code was actually being run. Reviewed By: anand1976 Differential Revision: D67953821 Pulled By: archang19 fbshipit-source-id: 0bd853580ea53566be41639f5499eb9b5e0e9376
1 parent ac2ad21 commit a8bd6a3

5 files changed

+157
-23
lines changed

db_stress_tool/cf_consistency_stress.cc

+5-5
Original file line numberDiff line numberDiff line change
@@ -1040,17 +1040,17 @@ class CfConsistencyStressTest : public StressTest {
10401040
assert(thread);
10411041
Status status;
10421042

1043-
DB* db_ptr = cmp_db_ ? cmp_db_ : db_;
1044-
const auto& cfhs = cmp_db_ ? cmp_cfhs_ : column_families_;
1043+
DB* db_ptr = secondary_db_ ? secondary_db_ : db_;
1044+
const auto& cfhs = secondary_db_ ? secondary_cfhs_ : column_families_;
10451045

10461046
// Take a snapshot to preserve the state of primary db.
10471047
ManagedSnapshot snapshot_guard(db_);
10481048

10491049
SharedState* shared = thread->shared;
10501050
assert(shared);
10511051

1052-
if (cmp_db_) {
1053-
status = cmp_db_->TryCatchUpWithPrimary();
1052+
if (secondary_db_) {
1053+
status = secondary_db_->TryCatchUpWithPrimary();
10541054
if (!status.ok()) {
10551055
fprintf(stderr, "TryCatchUpWithPrimary: %s\n",
10561056
status.ToString().c_str());
@@ -1083,7 +1083,7 @@ class CfConsistencyStressTest : public StressTest {
10831083
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
10841084
ReadOptions ropts(FLAGS_verify_checksum, true);
10851085
ropts.total_order_seek = true;
1086-
if (nullptr == cmp_db_) {
1086+
if (nullptr == secondary_db_) {
10871087
ropts.snapshot = snapshot_guard.snapshot();
10881088
}
10891089
uint32_t crc = 0;

db_stress_tool/db_stress_test_base.cc

+7-6
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ StressTest::StressTest()
6969
new_column_family_name_(1),
7070
num_times_reopened_(0),
7171
db_preload_finished_(false),
72-
cmp_db_(nullptr),
72+
secondary_db_(nullptr),
7373
is_db_stopped_(false) {
7474
if (FLAGS_destroy_db_initially) {
7575
std::vector<std::string> files;
@@ -114,11 +114,11 @@ void StressTest::CleanUp() {
114114
delete db_;
115115
db_ = nullptr;
116116

117-
for (auto* cf : cmp_cfhs_) {
117+
for (auto* cf : secondary_cfhs_) {
118118
delete cf;
119119
}
120-
cmp_cfhs_.clear();
121-
delete cmp_db_;
120+
secondary_cfhs_.clear();
121+
delete secondary_db_;
122122
}
123123

124124
std::shared_ptr<Cache> StressTest::NewCache(size_t capacity,
@@ -3696,9 +3696,10 @@ void StressTest::Open(SharedState* shared, bool reopen) {
36963696
tmp_opts.env = db_stress_env;
36973697
const std::string& secondary_path = FLAGS_secondaries_base;
36983698
s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
3699-
cf_descriptors, &cmp_cfhs_, &cmp_db_);
3699+
cf_descriptors, &secondary_cfhs_, &secondary_db_);
37003700
assert(s.ok());
3701-
assert(cmp_cfhs_.size() == static_cast<size_t>(FLAGS_column_families));
3701+
assert(secondary_cfhs_.size() ==
3702+
static_cast<size_t>(FLAGS_column_families));
37023703
}
37033704
} else {
37043705
DBWithTTL* db_with_ttl;

db_stress_tool/db_stress_test_base.h

+2-3
Original file line numberDiff line numberDiff line change
@@ -412,9 +412,8 @@ class StressTest {
412412
std::atomic<bool> db_preload_finished_;
413413
std::shared_ptr<SstQueryFilterConfigsManager::Factory> sqfc_factory_;
414414

415-
// Fields used for continuous verification from another thread
416-
DB* cmp_db_;
417-
std::vector<ColumnFamilyHandle*> cmp_cfhs_;
415+
DB* secondary_db_;
416+
std::vector<ColumnFamilyHandle*> secondary_cfhs_;
418417
bool is_db_stopped_;
419418
};
420419

db_stress_tool/no_batched_ops_stress.cc

+139-9
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,49 @@ class NonBatchedOpsStressTest : public StressTest {
153153
from_db.data(), from_db.size());
154154
}
155155
}
156+
157+
if (secondary_db_) {
158+
assert(secondary_cfhs_.size() == column_families_.size());
159+
// We are going to read in the expected values before catching the
160+
// secondary up to the primary. This sets the lower bound of the
161+
// acceptable values that can be returned from the secondary. After
162+
// each Get() to the secondary, we are going to read in the expected
163+
// value again to determine the upper bound. As long as the returned
164+
// value from Get() is within these bounds, we consider that okay. The
165+
// lower bound will always be moving forwards anyways as
166+
// TryCatchUpWithPrimary() gets called.
167+
std::vector<ExpectedValue> pre_read_expected_values;
168+
for (int64_t i = start; i < end; ++i) {
169+
pre_read_expected_values.push_back(
170+
shared->Get(static_cast<int>(cf), i));
171+
}
172+
173+
Status s = secondary_db_->TryCatchUpWithPrimary();
174+
if (!s.ok()) {
175+
VerificationAbort(shared,
176+
"Secondary failed to catch up to the primary");
177+
}
178+
179+
for (int64_t i = start; i < end; ++i) {
180+
if (thread->shared->HasVerificationFailedYet()) {
181+
break;
182+
}
183+
184+
const std::string key = Key(i);
185+
std::string from_db;
186+
187+
s = secondary_db_->Get(options, column_families_[cf], key,
188+
&from_db);
189+
190+
assert(!pre_read_expected_values.empty() &&
191+
static_cast<size_t>(i - start) <
192+
pre_read_expected_values.size());
193+
VerifyValueRange(static_cast<int>(cf), i, options, shared, from_db,
194+
/* msg_prefix */ "Secondary get verification", s,
195+
pre_read_expected_values[i - start]);
196+
}
197+
}
198+
156199
} else if (method == VerificationMethod::kGetEntity) {
157200
for (int64_t i = start; i < end; ++i) {
158201
if (thread->shared->HasVerificationFailedYet()) {
@@ -333,12 +376,19 @@ class NonBatchedOpsStressTest : public StressTest {
333376
}
334377

335378
void ContinuouslyVerifyDb(ThreadState* thread) const override {
336-
if (!cmp_db_) {
379+
// For automated crash tests, we only want to run this continous
380+
// verification when continuous_verification_interval > 0 and there is
381+
// a secondary db. This continous verification currently fails when there is
382+
// a secondary db during the iterator scan. The stack trace mentions
383+
// BlobReader/BlobSource but it may not necessarily be related to BlobDB.
384+
// Regardless, we only want to run this function if we are experimenting and
385+
// explicitly setting continuous_verification_interval.
386+
if (!secondary_db_ || !FLAGS_continuous_verification_interval) {
337387
return;
338388
}
339-
assert(cmp_db_);
340-
assert(!cmp_cfhs_.empty());
341-
Status s = cmp_db_->TryCatchUpWithPrimary();
389+
assert(secondary_db_);
390+
assert(!secondary_cfhs_.empty());
391+
Status s = secondary_db_->TryCatchUpWithPrimary();
342392
if (!s.ok()) {
343393
assert(false);
344394
exit(1);
@@ -372,7 +422,7 @@ class NonBatchedOpsStressTest : public StressTest {
372422

373423
{
374424
uint32_t crc = 0;
375-
std::unique_ptr<Iterator> it(cmp_db_->NewIterator(read_opts));
425+
std::unique_ptr<Iterator> it(secondary_db_->NewIterator(read_opts));
376426
s = checksum_column_family(it.get(), &crc);
377427
if (!s.ok()) {
378428
fprintf(stderr, "Computing checksum of default cf: %s\n",
@@ -381,19 +431,21 @@ class NonBatchedOpsStressTest : public StressTest {
381431
}
382432
}
383433

384-
for (auto* handle : cmp_cfhs_) {
434+
for (auto* handle : secondary_cfhs_) {
385435
if (thread->rand.OneInOpt(3)) {
386436
// Use Get()
387437
uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
388438
std::string key_str = Key(key);
389439
std::string value;
390440
std::string key_ts;
391-
s = cmp_db_->Get(read_opts, handle, key_str, &value,
392-
FLAGS_user_timestamp_size > 0 ? &key_ts : nullptr);
441+
s = secondary_db_->Get(
442+
read_opts, handle, key_str, &value,
443+
FLAGS_user_timestamp_size > 0 ? &key_ts : nullptr);
393444
s.PermitUncheckedError();
394445
} else {
395446
// Use range scan
396-
std::unique_ptr<Iterator> iter(cmp_db_->NewIterator(read_opts, handle));
447+
std::unique_ptr<Iterator> iter(
448+
secondary_db_->NewIterator(read_opts, handle));
397449
uint32_t rnd = (thread->rand.Next()) % 4;
398450
if (0 == rnd) {
399451
// SeekToFirst() + Next()*5
@@ -2810,6 +2862,84 @@ class NonBatchedOpsStressTest : public StressTest {
28102862
return true;
28112863
}
28122864

2865+
// Compared to VerifyOrSyncValue, VerifyValueRange takes in a
2866+
// pre_read_expected_value to determine the lower bound of acceptable values.
2867+
// Anything from the pre_read_expected_value to the post_read_expected_value
2868+
// is considered acceptable. VerifyValueRange does not perform the initial
2869+
// "sync" step and does not compare the exact data/lengths for the values.
2870+
// This verification is suitable for verifying secondary or follower databases
2871+
bool VerifyValueRange(int cf, int64_t key, const ReadOptions& opts,
2872+
SharedState* shared, const std::string& value_from_db,
2873+
const std::string& msg_prefix, const Status& s,
2874+
const ExpectedValue& pre_read_expected_value) const {
2875+
if (shared->HasVerificationFailedYet()) {
2876+
return false;
2877+
}
2878+
const ExpectedValue post_read_expected_value = shared->Get(cf, key);
2879+
char expected_value_data[kValueMaxLen];
2880+
size_t expected_value_data_size =
2881+
GenerateValue(post_read_expected_value.GetValueBase(),
2882+
expected_value_data, sizeof(expected_value_data));
2883+
2884+
std::ostringstream read_u64ts;
2885+
if (opts.timestamp) {
2886+
read_u64ts << " while read with timestamp: ";
2887+
uint64_t read_ts;
2888+
if (DecodeU64Ts(*opts.timestamp, &read_ts).ok()) {
2889+
read_u64ts << std::to_string(read_ts) << ", ";
2890+
} else {
2891+
read_u64ts << s.ToString()
2892+
<< " Encoded read timestamp: " << opts.timestamp->ToString()
2893+
<< ", ";
2894+
}
2895+
}
2896+
2897+
// Compare value_from_db with the range of possible values from
2898+
// pre_read_expected_value to post_read_expected_value
2899+
if (s.ok()) {
2900+
const Slice slice(value_from_db);
2901+
const uint32_t value_base_from_db = GetValueBase(slice);
2902+
if (ExpectedValueHelper::MustHaveNotExisted(pre_read_expected_value,
2903+
post_read_expected_value)) {
2904+
VerificationAbort(shared,
2905+
msg_prefix +
2906+
": Unexpected value found that should not exist" +
2907+
read_u64ts.str(),
2908+
cf, key, value_from_db, "");
2909+
return false;
2910+
}
2911+
if (!ExpectedValueHelper::InExpectedValueBaseRange(
2912+
value_base_from_db, pre_read_expected_value,
2913+
post_read_expected_value)) {
2914+
VerificationAbort(
2915+
shared,
2916+
msg_prefix +
2917+
": Unexpected value found outside of the value base range" +
2918+
read_u64ts.str(),
2919+
cf, key, value_from_db,
2920+
Slice(expected_value_data, expected_value_data_size));
2921+
return false;
2922+
}
2923+
} else if (s.IsNotFound()) {
2924+
if (ExpectedValueHelper::MustHaveExisted(pre_read_expected_value,
2925+
post_read_expected_value)) {
2926+
VerificationAbort(shared,
2927+
msg_prefix + ": Value not found which should exist" +
2928+
read_u64ts.str() + s.ToString(),
2929+
cf, key, "",
2930+
Slice(expected_value_data, expected_value_data_size));
2931+
return false;
2932+
}
2933+
} else {
2934+
VerificationAbort(
2935+
shared,
2936+
msg_prefix + "Non-OK status" + read_u64ts.str() + s.ToString(), cf,
2937+
key, "", Slice(expected_value_data, expected_value_data_size));
2938+
return false;
2939+
}
2940+
return true;
2941+
}
2942+
28132943
void PrepareTxnDbOptions(SharedState* shared,
28142944
TransactionDBOptions& txn_db_opts) override {
28152945
txn_db_opts.rollback_deletion_type_callback =

tools/db_crashtest.py

+4
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,7 @@ def is_direct_io_supported(dbname):
485485
"write_buffer_size": 32 * 1024 * 1024,
486486
"level_compaction_dynamic_level_bytes": lambda: random.randint(0, 1),
487487
"paranoid_file_checks": lambda: random.choice([0, 1, 1, 1]),
488+
"test_secondary": lambda: random.choice([0, 1]),
488489
}
489490

490491
blackbox_simple_default_params = {
@@ -1022,6 +1023,9 @@ def finalize_and_sanitize(src_params):
10221023
if dest_params.get("track_and_verify_wals", 0) == 1:
10231024
dest_params["metadata_write_fault_one_in"] = 0
10241025
dest_params["write_fault_one_in"] = 0
1026+
# Continuous verification fails with secondaries inside NonBatchedOpsStressTest
1027+
if dest_params.get("test_secondary") == 1:
1028+
dest_params["continuous_verification_interval"] = 0
10251029
return dest_params
10261030

10271031

0 commit comments

Comments
 (0)