Skip to content

Commit fbfd7dd

Browse files
Add test + some comments addressed
1 parent d464ce0 commit fbfd7dd

File tree

3 files changed

+37
-28
lines changed

3 files changed

+37
-28
lines changed

cpp/arcticdb/storage/failure_simulation.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ enum class FailureType : int {
3131
static const char* failure_names[] = {
3232
"WRITE",
3333
"READ",
34+
"WRITE_LOCK",
3435
"ITERATE",
3536
"DELETE",
3637
};

cpp/arcticdb/util/storage_lock.hpp

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ class StorageLock {
159159

160160
bool try_acquire_lock(const std::shared_ptr<Store>& store) {
161161
auto start = ClockType::coarse_nanos_since_epoch();
162-
if(!ref_key_exists(store) || !ttl_not_expired(store)) {
162+
if(!ttl_not_expired(store)) {
163163
ts_= create_ref_key(store);
164164
auto lock_sleep_ms = ConfigsMap::instance()->get_int("StorageLock.WaitMs", DEFAULT_WAIT_MS);
165165
ARCTICDB_DEBUG(log::lock(), "Waiting for {} ms, thread id: {}", lock_sleep_ms, std::this_thread::get_id());
@@ -208,12 +208,6 @@ class StorageLock {
208208
return get_ref_key(name_);
209209
}
210210

211-
bool ref_key_exists(const std::shared_ptr<Store>& store) {
212-
auto exists = store->key_exists_sync(ref_key());
213-
ARCTICDB_DEBUG(log::lock(), "Ref key exists: {}", exists ? "true" : "false");
214-
return exists;
215-
}
216-
217211
static void do_remove_ref_key(const std::shared_ptr<Store>& store, const StreamId& name) {
218212
ARCTICDB_DEBUG(log::lock(), "Removing ref key");
219213
try {

cpp/arcticdb/util/test/test_storage_lock.cpp

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -48,23 +48,16 @@ TEST(StorageLock, Timeout) {
4848
}
4949

5050
struct LockData {
51-
std::string lock_name_;
52-
std::shared_ptr<InMemoryStore> store_;
53-
volatile uint64_t vol_;
54-
std::atomic<uint64_t> atomic_;
55-
std::atomic<bool> contended_;
51+
std::string lock_name_ = "stress_test_lock";
52+
std::shared_ptr<InMemoryStore> store_ = std::make_shared<InMemoryStore>();
53+
volatile uint64_t vol_ { 0 };
54+
std::atomic<uint64_t> atomic_ = { 0 };
55+
std::atomic<bool> contended_ { false };
56+
std::atomic<bool> timedout_ { false };
5657
const size_t num_tests_;
57-
std::atomic<bool> timedout_;
58-
59-
LockData(size_t num_tests) :
60-
lock_name_("stress_test_lock"),
61-
store_(std::make_shared<InMemoryStore>()),
62-
vol_(0),
63-
atomic_(0),
64-
contended_(false),
65-
num_tests_(num_tests),
66-
timedout_(false){
67-
}
58+
59+
explicit LockData(size_t num_tests) :
60+
num_tests_(num_tests){}
6861

6962
};
7063

@@ -77,6 +70,7 @@ struct OptimisticLockTask {
7770

7871
folly::Future<folly::Unit> operator()() {
7972
StorageLock<> lock{data_->lock_name_};
73+
using namespace std::chrono_literals;
8074

8175
for (auto i = size_t(0); i < data_->num_tests_; ++i) {
8276
if (!lock.try_lock(data_->store_)) {
@@ -85,6 +79,7 @@ struct OptimisticLockTask {
8579
else {
8680
// As of C++20, '++' expression of 'volatile'-qualified type is deprecated.
8781
const uint64_t vol_ = data_->vol_ + 1;
82+
std::this_thread::sleep_for(10ms);
8883
data_->vol_ = vol_;
8984
++data_->atomic_;
9085
lock.unlock(data_->store_);
@@ -370,7 +365,7 @@ class StorageLockWithSlowWrites : public ::testing::TestWithParam<std::tuple<int
370365
protected:
371366
void SetUp() override {
372367
log::lock().set_level(spdlog::level::debug);
373-
StorageFailureSimulator::instance()->reset();
368+
StorageFailureSimulator::reset();
374369
}
375370
};
376371

@@ -426,6 +421,29 @@ TEST_P(StorageLockWithSlowWrites, ConcurrentWritesWithRetrying) {
426421
ASSERT_EQ(lock_data->atomic_, 1);
427422
}
428423

424+
TEST(StorageLock, StressManyWriters) {
425+
426+
const StorageFailureSimulator::ParamActionSequence SLOW_ACTIONS = {
427+
action_factories::slow_action(0.3, 600, 1100),
428+
};
429+
430+
constexpr size_t num_writers = 50;
431+
FutureExecutor<CPUThreadPoolExecutor> exec{num_writers};
432+
433+
StorageFailureSimulator::instance()->configure({{FailureType::WRITE_LOCK, SLOW_ACTIONS}});
434+
435+
std::vector<Future<Unit>> futures;
436+
auto lock_data = std::make_shared<LockData>(num_writers);
437+
lock_data->store_ = std::make_shared<InMemoryStore>();
438+
439+
for (size_t i = 0; i < num_writers; ++i) {
440+
futures.emplace_back(exec.addFuture(OptimisticLockTask(lock_data)));
441+
}
442+
collect(futures).get();
443+
444+
ASSERT_EQ(lock_data->atomic_, lock_data->vol_);
445+
}
446+
429447
INSTANTIATE_TEST_SUITE_P(
430448
DelayPairs,
431449
StorageLockWithSlowWrites,
@@ -450,9 +468,6 @@ TEST(StorageLock, SlowWrites) {
450468

451469
TEST(StorageLock, DISABLED_LockSameTimestamp) { // Not yet implemented
452470
log::lock().set_level(spdlog::level::debug);
453-
const StorageFailureSimulator::ParamActionSequence SLOW_ON_2ND_CALL = {
454-
action_factories::slow_action(1, 10, 10), action_factories::slow_action(1, 10, 10)
455-
};
456471
constexpr size_t num_writers = 2;
457472

458473
using StorageLockType = StorageLock<util::ManualClock>;
@@ -462,7 +477,6 @@ TEST(StorageLock, DISABLED_LockSameTimestamp) { // Not yet implemented
462477
while (locks.size() < num_writers) { locks.emplace_back(std::make_unique<StorageLockType>("test")); };
463478
auto store = std::make_shared<InMemoryStore>();
464479
folly::FutureExecutor<folly::CPUThreadPoolExecutor> exec{num_writers};
465-
StorageFailureSimulator::instance()->configure({{FailureType::WRITE_LOCK, SLOW_ON_2ND_CALL}});
466480
std::vector<Future<Unit>> futures;
467481
std::vector<bool> writer_has_lock;
468482
writer_has_lock.resize(num_writers);

0 commit comments

Comments
 (0)