Skip to content

bench mutex #56

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions bench/bench_async_mutex.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
//
// Created by konstantin on 26.07.24.
//

#include <chrono>
#include <coroutine>
#include <condition_variable>
#include <latch>
#include <thread>
#include <vector>

#include <benchmark/benchmark.h>
#include <components/async_mutex/async_mutex.h>

using namespace std::chrono_literals;

static constexpr size_t CountIteration = 5;
static constexpr size_t CountRepetitions = 10;
static constexpr size_t MaxCountThreads = 8;
static constexpr size_t CountIterationsInThread = 100'00;

struct TestAsyncMutex final {
NComponents::AsyncMutex mutex;
size_t number{};

std::latch latch;
const size_t count_iterations;

explicit TestAsyncMutex(size_t n, size_t count_iterations)
: latch(n), count_iterations(count_iterations) {}

NComponents::ResumableNoOwn run() {
{
std::unique_lock lock(cv_wait);
while (!wait_flag) cv.wait(lock);
}
for (size_t i = 0; i < count_iterations; i++) {
co_await mutex.lock();
number += 1;
mutex.unlock();

co_await mutex.lock();
number += 1;
mutex.unlock();
}

latch.count_down();
}

NComponents::ResumableNoOwn StartAll() {
std::unique_lock lock(cv_wait);
wait_flag.store(true);
cv.notify_all();

co_await mutex.lock();
mutex.unlock();
}

void Wait() { latch.wait(); }

private:
std::mutex cv_wait;
std::condition_variable cv;
std::atomic<bool> wait_flag{};
};

struct TestStdMutex final {
std::mutex mutex;
size_t number{};

std::latch latch;
const size_t count_iterations;

explicit TestStdMutex(size_t n, size_t count_iterations)
: latch(n), count_iterations(count_iterations) {}

void run() {
{
std::unique_lock lock(cv_wait);
while (!wait_flag) cv.wait(lock);
}
for (size_t i = 0; i < count_iterations; i++) {
mutex.lock();
number += 1;
mutex.unlock();

mutex.lock();
number += 1;
mutex.unlock();
}

latch.count_down();
}

void StartAll() {
std::unique_lock lock(cv_wait);
wait_flag.store(true);
cv.notify_all();

mutex.lock();
mutex.unlock();
}

void Wait() { latch.wait(); }

private:
std::mutex cv_wait;
std::condition_variable cv;
std::atomic<bool> wait_flag{};
};

static void BenchAsyncMutex(benchmark::State& state) {
for (auto _ : state) {


TestAsyncMutex worker(MaxCountThreads, CountIterationsInThread);
{
std::vector<std::jthread> workers;
for (size_t i = 0; i < MaxCountThreads; i++) {
workers.emplace_back(&TestAsyncMutex::run, &worker);
}

worker.StartAll();
worker.Wait();
}
}
}
BENCHMARK(BenchAsyncMutex)
->Name("AsyncMutex")
->Repetitions(CountRepetitions)
->Iterations(CountIteration)
->Unit(benchmark::kMillisecond);

static void BenchStdMutex(benchmark::State& state) {
for (auto _ : state) {
TestStdMutex worker(MaxCountThreads, CountIterationsInThread);
{
std::vector<std::jthread> workers;
for (size_t i = 0; i < MaxCountThreads; i++) {
workers.emplace_back(&TestStdMutex::run, &worker);
}

worker.StartAll();
worker.Wait();
}
}
}
BENCHMARK(BenchStdMutex)
->Name("StdMutex")
->Repetitions(CountRepetitions)
->Iterations(CountIteration)
->Unit(benchmark::kMillisecond);
17 changes: 15 additions & 2 deletions bench/meson.build
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
bench_inc = include_directories('.')

bench_source_files = [
bench_pool_source_files = [
'main.cpp',
'bench_pool.cpp',
]

bench_mutex_source_files = [
'main.cpp',
'bench_async_mutex.cpp',
]

bench_dependencies = dependency(
[
'benchmark',
Expand All @@ -13,8 +18,16 @@ bench_dependencies = dependency(

bench_pool = executable(
'bench_pool',
sources : bench_source_files,
sources : bench_pool_source_files,
dependencies : bench_dependencies,
include_directories : [bench_inc, root_src_dir],
link_with : [executor, fiber, go]
)

bench_async_mutex = executable(
'bench_async_mutex',
sources : bench_mutex_source_files,
dependencies : bench_dependencies,
include_directories : [bench_inc, root_src_dir],
link_with : [components]
)
25 changes: 24 additions & 1 deletion bench/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,27 @@
| DistributedPool_task_100000/iterations:10/repeats:5_mean | 127 ms | 59.3 ms | 5 |
| DistributedPool_task_100000/iterations:10/repeats:5_median | 123 ms | 59.3 ms | 5 |
| DistributedPool_task_100000/iterations:10/repeats:5_stddev | 8.96 ms| 0.477 ms| 5 |
| DistributedPool_task_100000/iterations:10/repeats:5_cv | 7.05 % | 0.80 % | 5 |
| DistributedPool_task_100000/iterations:10/repeats:5_cv | 7.05 % | 0.80 % | 5 |

### Mutex

| Benchmark | Time | CPU | Iterations |
|-----------------------------------------------|----------|---------|------------|
| AsyncMutex/iterations:10/repeats:5 | 29.0 ms | 0.243 ms| 10 |
| AsyncMutex/iterations:10/repeats:5 | 29.1 ms | 0.189 ms| 10 |
| AsyncMutex/iterations:10/repeats:5 | 29.7 ms | 0.227 ms| 10 |
| AsyncMutex/iterations:10/repeats:5 | 28.3 ms | 0.223 ms| 10 |
| AsyncMutex/iterations:10/repeats:5 | 28.7 ms | 0.221 ms| 10 |
| AsyncMutex/iterations:10/repeats:5_mean | 29.0 ms | 0.221 ms| 5 |
| AsyncMutex/iterations:10/repeats:5_median | 29.0 ms | 0.223 ms| 5 |
| AsyncMutex/iterations:10/repeats:5_stddev | 0.522 ms | 0.020 ms| 5 |
| AsyncMutex/iterations:10/repeats:5_cv | 1.80 % | 8.91 % | 5 |
| StdMutex/iterations:10/repeats:5 | 154 ms | 0.205 ms| 10 |
| StdMutex/iterations:10/repeats:5 | 156 ms | 0.187 ms| 10 |
| StdMutex/iterations:10/repeats:5 | 157 ms | 0.182 ms| 10 |
| StdMutex/iterations:10/repeats:5 | 158 ms | 0.193 ms| 10 |
| StdMutex/iterations:10/repeats:5 | 159 ms | 0.190 ms| 10 |
| StdMutex/iterations:10/repeats:5_mean | 157 ms | 0.191 ms| 5 |
| StdMutex/iterations:10/repeats:5_median | 157 ms | 0.190 ms| 5 |
| StdMutex/iterations:10/repeats:5_stddev | 1.90 ms | 0.009 ms| 5 |
| StdMutex/iterations:10/repeats:5_cv | 1.21 % | 4.54 % | 5 |
53 changes: 35 additions & 18 deletions src/components/async_mutex/async_mutex_coro_impl.cpp
Original file line number Diff line number Diff line change
@@ -1,38 +1,49 @@
//
// Created by konstantin on 19.07.24.
//
#include "async_mutex_coro_impl.h"

#include <iostream>
#include <syncstream>

#include "async_mutex_coro_impl.h"
#include <components/async_mutex/config.h>

namespace NComponents {

bool AsyncMutexCoroImpl::TryLock() {
if (lock_flag) {
std::osyncstream(std::cout)
<< "[AsyncMutexCoroImpl::TryLock][thread_id=" << std::this_thread::get_id()
<< "] lock_flag was locked. Need park." << std::endl;
if (ENABLE_DEBUG) {
std::osyncstream(std::cout)
<< "[AsyncMutexCoroImpl::TryLock][thread_id="
<< std::this_thread::get_id()
<< "] lock_flag was locked. Need park." << std::endl;
}

return false;
}

std::osyncstream(std::cout)
<< "[AsyncMutexCoroImpl::TryLock][thread_id=" << std::this_thread::get_id()
<< "] lock lock_flag." << std::endl;
if (ENABLE_DEBUG) {
std::osyncstream(std::cout)
<< "[AsyncMutexCoroImpl::TryLock][thread_id="
<< std::this_thread::get_id() << "] lock lock_flag." << std::endl;
}
lock_flag = true;
return true;
}

void AsyncMutexCoroImpl::Unlock() {
std::unique_lock lock(spinlock);

std::osyncstream(std::cout)
<< "[AsyncMutexCoroImpl::Unlock][thread_id=" << std::this_thread::get_id()
<< "] call" << std::endl;

if (ENABLE_DEBUG) {
std::osyncstream(std::cout)
<< "[AsyncMutexCoroImpl::Unlock][thread_id="
<< std::this_thread::get_id() << "] call" << std::endl;
}
if (!waiters.empty()) {
std::cout << "[AsyncMutexCoroImpl::Unlock] Waiters size=" << waiters.size()
<< ", wake up first" << std::endl;
if (ENABLE_DEBUG) {
std::cout << "[AsyncMutexCoroImpl::Unlock] Waiters size="
<< waiters.size() << ", wake up first" << std::endl;
}
MutexAwaiter waiter = std::move(waiters.front());
waiters.pop_front();
lock.unlock();
Expand All @@ -41,17 +52,23 @@ void AsyncMutexCoroImpl::Unlock() {
return;
}

std::cout << "[AsyncMutexCoroImpl::Unlock] Waiters empty. Set lock_flag to false"
<< std::endl;
if (ENABLE_DEBUG) {
std::cout << "[AsyncMutexCoroImpl::Unlock] Waiters empty. Set "
"lock_flag to false"
<< std::endl;
}
lock_flag = false;
}

void AsyncMutexCoroImpl::ParkAwaiter(MutexAwaiter* awaiter) {
assert(awaiter);

std::osyncstream(std::cout)
<< "[AsyncMutexCoroImpl::ParkAwaiter][thead_id=" << std::this_thread::get_id()
<< "] add waiter, new size=" << waiters.size() + 1 << std::endl;
if (ENABLE_DEBUG) {
std::osyncstream(std::cout)
<< "[AsyncMutexCoroImpl::ParkAwaiter][thead_id="
<< std::this_thread::get_id()
<< "] add waiter, new size=" << waiters.size() + 1 << std::endl;
}

waiters.emplace_back(std::move(*awaiter));
waiters.back().ReleaseLock();
Expand Down
11 changes: 11 additions & 0 deletions src/components/async_mutex/config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//
// Created by konstantin on 26.07.24.
//

#pragma once

namespace NComponents {

static constexpr bool ENABLE_DEBUG = false;

}
Loading
Loading