Skip to content

Commit d464ce0

Browse files
Refactor, still poc
1 parent edc6ff9 commit d464ce0

File tree

3 files changed

+56
-49
lines changed

3 files changed

+56
-49
lines changed

cpp/arcticdb/storage/failure_simulation.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ namespace arcticdb {
2323
enum class FailureType : int {
2424
WRITE = 0,
2525
READ,
26-
WRITE_SLOW,
26+
WRITE_LOCK,
2727
ITERATE,
2828
DELETE,
2929
};
@@ -176,7 +176,7 @@ class StorageFailureSimulator {
176176
Params default_actions = {
177177
{WRITE, {action_factories::fault(cfg.write_failure_prob())}},
178178
{READ, {action_factories::fault(cfg.read_failure_prob())}},
179-
{WRITE_SLOW, {action_factories::slow_action(cfg.write_slowdown_prob(), cfg.slow_down_min_ms(), cfg.slow_down_max_ms())}}
179+
{WRITE_LOCK, {action_factories::slow_action(cfg.write_slowdown_prob(), cfg.slow_down_min_ms(), cfg.slow_down_max_ms())}}
180180
};
181181

182182
configure(default_actions);

cpp/arcticdb/util/storage_lock.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,8 @@ class StorageLock {
168168
auto read_ts = read_timestamp(store);
169169
auto duration = ClockType::coarse_nanos_since_epoch() - start;
170170
auto duration_in_ms = duration / ONE_MILLISECOND;
171-
ARCTICDB_INFO(log::lock(), "Took {} ms", duration_in_ms);
172-
ARCTICDB_INFO(log::lock(), "Max is {} ms", 1.5 * lock_sleep_ms);
171+
ARCTICDB_DEBUG(log::lock(), "Took {} ms", duration_in_ms);
172+
ARCTICDB_DEBUG(log::lock(), "Max is {} ms", 1.5 * lock_sleep_ms);
173173
if (duration > 1.5 * lock_sleep_ms * ONE_MILLISECOND) {
174174
ARCTICDB_DEBUG(log::lock(), "Took too long to read and write the lock. Aborting.");
175175
return false;
@@ -194,7 +194,7 @@ class StorageLock {
194194

195195
timestamp create_ref_key(const std::shared_ptr<Store>& store) {
196196
auto ts = ClockType::nanos_since_epoch();
197-
StorageFailureSimulator::instance()->go(FailureType::WRITE_SLOW);
197+
StorageFailureSimulator::instance()->go(FailureType::WRITE_LOCK);
198198
store->write_sync(KeyType::LOCK, name_, lock_segment(name_, ts));
199199
ARCTICDB_DEBUG(log::lock(), "Created lock with timestamp {}", ts);
200200
return ts;

cpp/arcticdb/util/test/test_storage_lock.cpp

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -365,69 +365,76 @@ TEST(StorageLock, OptimisticForceReleaseLock) {
365365
first_lock._test_release_local_lock();
366366
}
367367

368-
TEST(StorageLock, ConcurrentWrites) {
369-
log::lock().set_level(spdlog::level::debug);
370-
const StorageFailureSimulator::ParamActionSequence SLOW_ON_2ND_CALL = {
371-
action_factories::slow_action(1, 10, 10), action_factories::slow_action(1, 1700, 1700)
368+
369+
class StorageLockWithSlowWrites : public ::testing::TestWithParam<std::tuple<int, int>> {
370+
protected:
371+
void SetUp() override {
372+
log::lock().set_level(spdlog::level::debug);
373+
StorageFailureSimulator::instance()->reset();
374+
}
375+
};
376+
377+
TEST_P(StorageLockWithSlowWrites, ConcurrentWrites) {
378+
const int first_delay = std::get<0>(GetParam());
379+
const int second_delay = std::get<1>(GetParam());
380+
381+
const StorageFailureSimulator::ParamActionSequence SLOW_ACTIONS = {
382+
action_factories::slow_action(1, first_delay, first_delay),
383+
action_factories::slow_action(1, second_delay, second_delay)
372384
};
385+
373386
constexpr size_t num_writers = 2;
387+
FutureExecutor<CPUThreadPoolExecutor> exec{num_writers};
388+
389+
StorageFailureSimulator::instance()->configure({{FailureType::WRITE_LOCK, SLOW_ACTIONS}});
374390

375-
using StorageLockType = StorageLock<>;
376-
std::vector<std::unique_ptr<StorageLockType>> locks;
377-
locks.reserve(num_writers);
378-
while (locks.size() < num_writers) { locks.emplace_back(std::make_unique<StorageLockType>("test")); }
379-
auto store = std::make_shared<InMemoryStore>();
380-
folly::FutureExecutor<folly::CPUThreadPoolExecutor> exec{num_writers};
381-
StorageFailureSimulator::instance()->configure({{FailureType::WRITE_SLOW, SLOW_ON_2ND_CALL}});
382391
std::vector<Future<Unit>> futures;
383-
std::vector<bool> writer_has_lock;
384-
writer_has_lock.resize(num_writers);
385-
for(size_t i = 0; i < num_writers; ++i) {
386-
futures.emplace_back(exec.addFuture([&store, &lock = locks[i], i, &writer_has_lock] {
387-
writer_has_lock[i] = lock->try_lock(store);
388-
}));
392+
auto lock_data = std::make_shared<LockData>(num_writers);
393+
lock_data->store_ = std::make_shared<InMemoryStore>();
394+
395+
for (size_t i = 0; i < num_writers; ++i) {
396+
futures.emplace_back(exec.addFuture(OptimisticLockTask(lock_data)));
389397
}
390398
collect(futures).get();
391399

392-
const long have_lock = std::ranges::count(writer_has_lock, true);
393-
ASSERT_EQ(have_lock, 0); // Both should fail
400+
const int expected_locks = second_delay < 1000 ? 0 : 1;
401+
ASSERT_EQ(lock_data->atomic_, expected_locks); // Both should fail
394402
}
395403

396-
TEST(StorageLock, ConcurrentWrites2) {
397-
log::lock().set_level(spdlog::level::debug);
398-
log::lock().set_level(spdlog::level::debug);
399-
const StorageFailureSimulator::ParamActionSequence SLOW_ON_2ND_CALL = {
400-
action_factories::slow_action(1, 10, 10), action_factories::slow_action(1, 500, 600)
404+
TEST_P(StorageLockWithSlowWrites, ConcurrentWritesWithRetrying) {
405+
const int first_delay = std::get<0>(GetParam());
406+
const int second_delay = std::get<1>(GetParam());
407+
408+
const StorageFailureSimulator::ParamActionSequence SLOW_ACTIONS = {
409+
action_factories::slow_action(1, first_delay, first_delay),
410+
action_factories::slow_action(1, second_delay, second_delay)
401411
};
402412
constexpr size_t num_writers = 2;
403413

404-
using StorageLockType = StorageLock<>;
405-
std::vector<std::unique_ptr<StorageLockType>> locks;
406-
locks.reserve(num_writers);
407-
while (locks.size() < num_writers) { locks.emplace_back(std::make_unique<StorageLockType>("test")); }
408-
auto store = std::make_shared<InMemoryStore>();
409-
folly::FutureExecutor<folly::CPUThreadPoolExecutor> exec{num_writers};
410-
StorageFailureSimulator::instance()->configure({{FailureType::WRITE_SLOW, SLOW_ON_2ND_CALL}});
414+
auto lock_data = std::make_shared<LockData>(num_writers);
415+
lock_data->store_ = std::make_shared<InMemoryStore>();
416+
FutureExecutor<CPUThreadPoolExecutor> exec{num_writers};
417+
StorageFailureSimulator::instance()->configure({{FailureType::WRITE_LOCK, SLOW_ACTIONS}});
411418
std::vector<Future<Unit>> futures;
412419
std::vector<bool> writer_has_lock;
413420
writer_has_lock.resize(num_writers);
414421
for(size_t i = 0; i < num_writers; ++i) {
415-
futures.emplace_back(exec.addFuture([&store, &lock = locks[i], i, &writer_has_lock] {
416-
try {
417-
lock->lock_timeout(store, 2000);
418-
writer_has_lock[i] = true;
419-
}
420-
catch(const StorageLockTimeout&) {
421-
writer_has_lock[i] = false;
422-
}
423-
}));
422+
futures.emplace_back(exec.addFuture(PessimisticLockTask(lock_data, 3000)));
424423
}
425424
collect(futures).get();
426425

427-
const long have_lock = std::ranges::count(writer_has_lock, true);
428-
ASSERT_EQ(have_lock, 1);
426+
ASSERT_EQ(lock_data->atomic_, 1);
429427
}
430428

429+
INSTANTIATE_TEST_SUITE_P(
430+
DelayPairs,
431+
StorageLockWithSlowWrites,
432+
::testing::Values(
433+
std::make_tuple(10, 800),
434+
std::make_tuple(10, 1700),
435+
std::make_tuple(10, 2000)
436+
)
437+
);
431438

432439
TEST(StorageLock, SlowWrites) {
433440
const auto current_lock_sleep_wait_ms = ConfigsMap::instance()->get_int("StorageLock.WaitMs", StorageLock<>::DEFAULT_WAIT_MS);
@@ -436,7 +443,7 @@ TEST(StorageLock, SlowWrites) {
436443
const StorageFailureSimulator::ParamActionSequence SLOW_WRITE = {
437444
action_factories::slow_action(1, min_ms, max_ms)
438445
};
439-
StorageFailureSimulator::instance()->configure({{FailureType::WRITE_SLOW, SLOW_WRITE}});
446+
StorageFailureSimulator::instance()->configure({{FailureType::WRITE_LOCK, SLOW_WRITE}});
440447
auto lock = StorageLock("test");
441448
ASSERT_FALSE(lock.try_lock(std::make_shared<InMemoryStore>()));
442449
}
@@ -455,7 +462,7 @@ TEST(StorageLock, DISABLED_LockSameTimestamp) { // Not yet implemented
455462
while (locks.size() < num_writers) { locks.emplace_back(std::make_unique<StorageLockType>("test")); };
456463
auto store = std::make_shared<InMemoryStore>();
457464
folly::FutureExecutor<folly::CPUThreadPoolExecutor> exec{num_writers};
458-
StorageFailureSimulator::instance()->configure({{FailureType::WRITE_SLOW, SLOW_ON_2ND_CALL}});
465+
StorageFailureSimulator::instance()->configure({{FailureType::WRITE_LOCK, SLOW_ON_2ND_CALL}});
459466
std::vector<Future<Unit>> futures;
460467
std::vector<bool> writer_has_lock;
461468
writer_has_lock.resize(num_writers);

0 commit comments

Comments
 (0)