Skip to content

Commit d6556b5

Browse files
Refactor, still poc
1 parent edc6ff9 commit d6556b5

File tree

1 file changed

+49
-42
lines changed

1 file changed

+49
-42
lines changed

cpp/arcticdb/util/test/test_storage_lock.cpp

Lines changed: 49 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -365,69 +365,76 @@ TEST(StorageLock, OptimisticForceReleaseLock) {
365365
first_lock._test_release_local_lock();
366366
}
367367

368-
TEST(StorageLock, ConcurrentWrites) {
369-
log::lock().set_level(spdlog::level::debug);
370-
const StorageFailureSimulator::ParamActionSequence SLOW_ON_2ND_CALL = {
371-
action_factories::slow_action(1, 10, 10), action_factories::slow_action(1, 1700, 1700)
368+
369+
class StorageLockWithSlowWrites : public ::testing::TestWithParam<std::tuple<int, int>> {
370+
protected:
371+
void SetUp() override {
372+
log::lock().set_level(spdlog::level::debug);
373+
StorageFailureSimulator::instance()->reset();
374+
}
375+
};
376+
377+
TEST_P(StorageLockWithSlowWrites, ConcurrentWrites) {
378+
const int first_delay = std::get<0>(GetParam());
379+
const int second_delay = std::get<1>(GetParam());
380+
381+
const StorageFailureSimulator::ParamActionSequence SLOW_ACTIONS = {
382+
action_factories::slow_action(1, first_delay, first_delay),
383+
action_factories::slow_action(1, second_delay, second_delay)
372384
};
385+
373386
constexpr size_t num_writers = 2;
387+
FutureExecutor<CPUThreadPoolExecutor> exec{num_writers};
388+
389+
StorageFailureSimulator::instance()->configure({{FailureType::WRITE_SLOW, SLOW_ACTIONS}});
374390

375-
using StorageLockType = StorageLock<>;
376-
std::vector<std::unique_ptr<StorageLockType>> locks;
377-
locks.reserve(num_writers);
378-
while (locks.size() < num_writers) { locks.emplace_back(std::make_unique<StorageLockType>("test")); }
379-
auto store = std::make_shared<InMemoryStore>();
380-
folly::FutureExecutor<folly::CPUThreadPoolExecutor> exec{num_writers};
381-
StorageFailureSimulator::instance()->configure({{FailureType::WRITE_SLOW, SLOW_ON_2ND_CALL}});
382391
std::vector<Future<Unit>> futures;
383-
std::vector<bool> writer_has_lock;
384-
writer_has_lock.resize(num_writers);
385-
for(size_t i = 0; i < num_writers; ++i) {
386-
futures.emplace_back(exec.addFuture([&store, &lock = locks[i], i, &writer_has_lock] {
387-
writer_has_lock[i] = lock->try_lock(store);
388-
}));
392+
auto lock_data = std::make_shared<LockData>(num_writers);
393+
lock_data->store_ = std::make_shared<InMemoryStore>();
394+
395+
for (size_t i = 0; i < num_writers; ++i) {
396+
futures.emplace_back(exec.addFuture(OptimisticLockTask(lock_data)));
389397
}
390398
collect(futures).get();
391399

392-
const long have_lock = std::ranges::count(writer_has_lock, true);
393-
ASSERT_EQ(have_lock, 0); // Both should fail
400+
const int expected_locks = second_delay < 1000 ? 0 : 1;
401+
ASSERT_EQ(lock_data->atomic_, expected_locks); // Both should fail
394402
}
395403

396-
TEST(StorageLock, ConcurrentWrites2) {
397-
log::lock().set_level(spdlog::level::debug);
398-
log::lock().set_level(spdlog::level::debug);
399-
const StorageFailureSimulator::ParamActionSequence SLOW_ON_2ND_CALL = {
400-
action_factories::slow_action(1, 10, 10), action_factories::slow_action(1, 500, 600)
404+
TEST_P(StorageLockWithSlowWrites, ConcurrentWritesWithRetrying) {
405+
const int first_delay = std::get<0>(GetParam());
406+
const int second_delay = std::get<1>(GetParam());
407+
408+
const StorageFailureSimulator::ParamActionSequence SLOW_ACTIONS = {
409+
action_factories::slow_action(1, first_delay, first_delay),
410+
action_factories::slow_action(1, second_delay, second_delay)
401411
};
402412
constexpr size_t num_writers = 2;
403413

404-
using StorageLockType = StorageLock<>;
405-
std::vector<std::unique_ptr<StorageLockType>> locks;
406-
locks.reserve(num_writers);
407-
while (locks.size() < num_writers) { locks.emplace_back(std::make_unique<StorageLockType>("test")); }
408-
auto store = std::make_shared<InMemoryStore>();
409-
folly::FutureExecutor<folly::CPUThreadPoolExecutor> exec{num_writers};
410-
StorageFailureSimulator::instance()->configure({{FailureType::WRITE_SLOW, SLOW_ON_2ND_CALL}});
414+
auto lock_data = std::make_shared<LockData>(num_writers);
415+
lock_data->store_ = std::make_shared<InMemoryStore>();
416+
FutureExecutor<CPUThreadPoolExecutor> exec{num_writers};
417+
StorageFailureSimulator::instance()->configure({{FailureType::WRITE_SLOW, SLOW_ACTIONS}});
411418
std::vector<Future<Unit>> futures;
412419
std::vector<bool> writer_has_lock;
413420
writer_has_lock.resize(num_writers);
414421
for(size_t i = 0; i < num_writers; ++i) {
415-
futures.emplace_back(exec.addFuture([&store, &lock = locks[i], i, &writer_has_lock] {
416-
try {
417-
lock->lock_timeout(store, 2000);
418-
writer_has_lock[i] = true;
419-
}
420-
catch(const StorageLockTimeout&) {
421-
writer_has_lock[i] = false;
422-
}
423-
}));
422+
futures.emplace_back(exec.addFuture(PessimisticLockTask(lock_data, 3000)));
424423
}
425424
collect(futures).get();
426425

427-
const long have_lock = std::ranges::count(writer_has_lock, true);
428-
ASSERT_EQ(have_lock, 1);
426+
ASSERT_EQ(lock_data->atomic_, 1);
429427
}
430428

429+
INSTANTIATE_TEST_SUITE_P(
430+
DelayPairs,
431+
StorageLockWithSlowWrites,
432+
::testing::Values(
433+
std::make_tuple(10, 800),
434+
std::make_tuple(10, 1700),
435+
std::make_tuple(10, 2000)
436+
)
437+
);
431438

432439
TEST(StorageLock, SlowWrites) {
433440
const auto current_lock_sleep_wait_ms = ConfigsMap::instance()->get_int("StorageLock.WaitMs", StorageLock<>::DEFAULT_WAIT_MS);

0 commit comments

Comments
 (0)