Skip to content

queue spinlock #59

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions bench/bench_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,19 @@ void bench_logic(NExecutors::IExecutor& pool) {
});
}

static void IntrusiveThreadPool(benchmark::State& state) {
for (auto _ : state) {
NExecutors::IntrusiveThreadPool pool{CountThreads};
pool.Start();
bench_logic(pool);
pool.WaitIdle();
}
}
BENCHMARK(IntrusiveThreadPool)
->Name(std::format("IntrusiveThreadPool_task_{}", kTasks))
->Repetitions(CountRepetitions)
->Iterations(CountIteration)
->Unit(benchmark::kMillisecond);
//static void IntrusiveThreadPool(benchmark::State& state) {
// for (auto _ : state) {
// NExecutors::IntrusiveThreadPool pool{CountThreads};
// pool.Start();
// bench_logic(pool);
// pool.WaitIdle();
// }
//}
//BENCHMARK(IntrusiveThreadPool)
// ->Name(std::format("IntrusiveThreadPool_task_{}", kTasks))
// ->Repetitions(CountRepetitions)
// ->Iterations(CountIteration)
// ->Unit(benchmark::kMillisecond);

static void DistributedPool(benchmark::State& state) {
for (auto _ : state) {
Expand Down
14 changes: 14 additions & 0 deletions bench/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@
| DistributedPool_task_100000/iterations:10/repeats:5_stddev | 8.96 ms| 0.477 ms| 5 |
| DistributedPool_task_100000/iterations:10/repeats:5_cv | 7.05 % | 0.80 % | 5 |

### DistributedPool Task 100,000 (queue spinlock)

| Benchmark | Time | CPU | Iterations |
|---------------------------------------------------------------|--------|--------|------------|
| DistributedPool_task_100000/iterations:10/repeats:5 | 304 ms | 126 ms | 10 |
| DistributedPool_task_100000/iterations:10/repeats:5 | 263 ms | 107 ms | 10 |
| DistributedPool_task_100000/iterations:10/repeats:5 | 205 ms | 79.7 ms| 10 |
| DistributedPool_task_100000/iterations:10/repeats:5 | 190 ms | 78.7 ms| 10 |
| DistributedPool_task_100000/iterations:10/repeats:5 | 460 ms | 189 ms | 10 |
| DistributedPool_task_100000/iterations:10/repeats:5_mean | 285 ms | 116 ms | 5 |
| DistributedPool_task_100000/iterations:10/repeats:5_median | 263 ms | 107 ms | 5 |
| DistributedPool_task_100000/iterations:10/repeats:5_stddev | 108 ms | 45.5 ms| 5 |
| DistributedPool_task_100000/iterations:10/repeats:5_cv | 37.98 %| 39.18 %| 5 |

### DistributedPool Task 100,000 (std::mutex)

| Benchmark | Time | CPU | Iterations |
Expand Down
73 changes: 73 additions & 0 deletions src/components/sync/queue_spinlock.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//
// Created by konstantin on 28.07.24.
//

#pragma once

#include <atomic>

namespace NSync {

class QueueSpinLock final {
public:
class Guard final {
public:
explicit Guard(QueueSpinLock& host) : host(host) { host.Acquire(this); }
~Guard() {
if (is_owner) Release();
}

void Release() {
host.Release(this);
is_owner.store(false);
}

void SetOwner() { is_owner.store(true); }

void SetNext(Guard* guard) { next.store(guard); }

bool IsOwner() const { return is_owner.load(); }

bool HasNext() const { return next.load() != nullptr; }

void SetNextOwner() { next.load()->SetOwner(); }

private:
QueueSpinLock& host;
std::atomic<Guard*> next{};
std::atomic<bool> is_owner{};
};

private:
void Acquire(Guard* guard) {
auto ancestor = tail_.exchange(guard);
if (ancestor == nullptr) {
guard->SetOwner();
return;
}

ancestor->SetNext(guard);
while (!guard->IsOwner()) {
}
}

void Release(Guard* guard) {
if (guard->HasNext()) {
guard->SetNextOwner();
return;
}

Guard* old_guard = guard;
while (!tail_.compare_exchange_weak(old_guard, nullptr)) {
if (guard->HasNext()) {
guard->SetNextOwner();
return;
}
old_guard = guard;
}
}

std::atomic<Guard*> tail_{};
};

} // namespace NSync
16 changes: 9 additions & 7 deletions src/fiber/awaiter/mutex_awaiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <components/intrusive/list.h>
#include <components/sync/spinLock.h>
#include <components/sync/queue_spinlock.h>
#include <fiber/awaiter/awaiter.h>

namespace NFibers {
Expand All @@ -16,25 +17,26 @@ template <class M>
class AsyncMutexWaiter : public IAwaiter,
public NComponents::Node<AsyncMutexWaiter<M>> {
public:
using Guard = std::unique_lock<typename M::Spinlock>;
using Guard = NSync::QueueSpinLock::Guard;

AsyncMutexWaiter(M* mutex, Guard guard)
: mutex(mutex), guard(std::move(guard)){};
AsyncMutexWaiter(M* async_mutex, Guard& guard)
: async_mutex(async_mutex), guard(guard){};

void AwaitSuspend(StoppedFiber handle) override {
assert(handle.IsValid());

stopped_handle = handle;
mutex->Park(this);
guard.release()->unlock();
async_mutex->Park(this);

guard.Release();
}

void Schedule() { stopped_handle.Schedule(); }

private:
M* mutex;
M* async_mutex;
StoppedFiber stopped_handle;
Guard guard;
Guard& guard;
};

} // namespace NFibers
2 changes: 1 addition & 1 deletion src/fiber/sync/async_mutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace NFibers {
void AsyncMutex::Lock() {
Waiter::Guard guard(spinlock_);
if (locked_) {
Waiter waiter(this, std::move(guard));
Waiter waiter(this, guard);
Suspend(&waiter);
} else {
locked_ = true;
Expand Down
3 changes: 2 additions & 1 deletion src/fiber/sync/async_mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@

#include <components/intrusive/list.h>
#include <components/sync/spinLock.h>
#include <components/sync/queue_spinlock.h>
#include <fiber/awaiter/mutex_awaiter.h>

namespace NFibers {

class AsyncMutex {
using Spinlock = NSync::SpinLock;
using Spinlock = NSync::QueueSpinLock;
using Waiter = AsyncMutexWaiter<AsyncMutex>;

friend Waiter;
Expand Down
1 change: 1 addition & 0 deletions test/components/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ test_source_files = [
'test_intrusive_list.cpp',
'test_ms_queue.cpp',
'test_async_mutex.cpp',
'test_queue_spinlock.cpp',
]

cpp = meson.get_compiler('cpp')
Expand Down
58 changes: 58 additions & 0 deletions test/components/test_queue_spinlock.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//
// Created by konstantin on 28.07.24.
//

#include "gtest/gtest.h"

#include <thread>

#include <components/sync/queue_spinlock.h>

TEST(TestQueueSpinlock, LockUnlock) {
NSync::QueueSpinLock spinlock;

{
NSync::QueueSpinLock::Guard guard(spinlock); // <-- Acquired
// Critical section
} // <-- Released
}

TEST(TestQueueSpinlock, SequentialLockUnlock) {
NSync::QueueSpinLock spinlock;

{
NSync::QueueSpinLock::Guard guard(spinlock);
// Critical section
}

{
NSync::QueueSpinLock::Guard guard(spinlock);
// Critical section
}
}

TEST(TestQueueSpinlock, ConcurrentIncrements) {
NSync::QueueSpinLock spinlock;
size_t counter = 0;

const size_t kIncrementsPerThread = 1000;

auto contender = [&] {
for (size_t i = 0; i < kIncrementsPerThread; ++i) {
NSync::QueueSpinLock::Guard guard(spinlock);

size_t current = counter;
std::this_thread::yield();
counter = current + 1;
}
};

std::thread t1(contender);
std::thread t2(contender);
t1.join();
t2.join();

std::cout << "Shared counter value: " << counter << std::endl;

ASSERT_EQ(counter, 2 * kIncrementsPerThread);
}
Loading