Skip to content

Commit 1f87c4f

Browse files
authored
queue spinlock (#59)
* add queue spinlock
1 parent 11eb58d commit 1f87c4f

File tree

8 files changed

+171
-22
lines changed

8 files changed

+171
-22
lines changed

bench/bench_pool.cpp

+13-13
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,19 @@ void bench_logic(NExecutors::IExecutor& pool) {
4646
});
4747
}
4848

49-
static void IntrusiveThreadPool(benchmark::State& state) {
50-
for (auto _ : state) {
51-
NExecutors::IntrusiveThreadPool pool{CountThreads};
52-
pool.Start();
53-
bench_logic(pool);
54-
pool.WaitIdle();
55-
}
56-
}
57-
BENCHMARK(IntrusiveThreadPool)
58-
->Name(std::format("IntrusiveThreadPool_task_{}", kTasks))
59-
->Repetitions(CountRepetitions)
60-
->Iterations(CountIteration)
61-
->Unit(benchmark::kMillisecond);
49+
//static void IntrusiveThreadPool(benchmark::State& state) {
50+
// for (auto _ : state) {
51+
// NExecutors::IntrusiveThreadPool pool{CountThreads};
52+
// pool.Start();
53+
// bench_logic(pool);
54+
// pool.WaitIdle();
55+
// }
56+
//}
57+
//BENCHMARK(IntrusiveThreadPool)
58+
// ->Name(std::format("IntrusiveThreadPool_task_{}", kTasks))
59+
// ->Repetitions(CountRepetitions)
60+
// ->Iterations(CountIteration)
61+
// ->Unit(benchmark::kMillisecond);
6262

6363
static void DistributedPool(benchmark::State& state) {
6464
for (auto _ : state) {

bench/readme.md

+14
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,20 @@
2828
| DistributedPool_task_100000/iterations:10/repeats:5_stddev | 8.96 ms| 0.477 ms| 5 |
2929
| DistributedPool_task_100000/iterations:10/repeats:5_cv | 7.05 % | 0.80 % | 5 |
3030

31+
### DistributedPool Task 100,000 (queue spinlock)
32+
33+
| Benchmark | Time | CPU | Iterations |
34+
|---------------------------------------------------------------|--------|--------|------------|
35+
| DistributedPool_task_100000/iterations:10/repeats:5 | 304 ms | 126 ms | 10 |
36+
| DistributedPool_task_100000/iterations:10/repeats:5 | 263 ms | 107 ms | 10 |
37+
| DistributedPool_task_100000/iterations:10/repeats:5 | 205 ms | 79.7 ms| 10 |
38+
| DistributedPool_task_100000/iterations:10/repeats:5 | 190 ms | 78.7 ms| 10 |
39+
| DistributedPool_task_100000/iterations:10/repeats:5 | 460 ms | 189 ms | 10 |
40+
| DistributedPool_task_100000/iterations:10/repeats:5_mean | 285 ms | 116 ms | 5 |
41+
| DistributedPool_task_100000/iterations:10/repeats:5_median | 263 ms | 107 ms | 5 |
42+
| DistributedPool_task_100000/iterations:10/repeats:5_stddev | 108 ms | 45.5 ms| 5 |
43+
| DistributedPool_task_100000/iterations:10/repeats:5_cv | 37.98 %| 39.18 %| 5 |
44+
3145
### DistributedPool Task 100,000 (std::mutex)
3246

3347
| Benchmark | Time | CPU | Iterations |

src/components/sync/queue_spinlock.h

+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
//
2+
// Created by konstantin on 28.07.24.
3+
//
4+
5+
#pragma once
6+
7+
#include <atomic>
8+
9+
namespace NSync {
10+
11+
class QueueSpinLock final {
12+
public:
13+
class Guard final {
14+
public:
15+
explicit Guard(QueueSpinLock& host) : host(host) { host.Acquire(this); }
16+
~Guard() {
17+
if (is_owner) Release();
18+
}
19+
20+
void Release() {
21+
host.Release(this);
22+
is_owner.store(false);
23+
}
24+
25+
void SetOwner() { is_owner.store(true); }
26+
27+
void SetNext(Guard* guard) { next.store(guard); }
28+
29+
bool IsOwner() const { return is_owner.load(); }
30+
31+
bool HasNext() const { return next.load() != nullptr; }
32+
33+
void SetNextOwner() { next.load()->SetOwner(); }
34+
35+
private:
36+
QueueSpinLock& host;
37+
std::atomic<Guard*> next{};
38+
std::atomic<bool> is_owner{};
39+
};
40+
41+
private:
42+
void Acquire(Guard* guard) {
43+
auto ancestor = tail_.exchange(guard);
44+
if (ancestor == nullptr) {
45+
guard->SetOwner();
46+
return;
47+
}
48+
49+
ancestor->SetNext(guard);
50+
while (!guard->IsOwner()) {
51+
}
52+
}
53+
54+
void Release(Guard* guard) {
55+
if (guard->HasNext()) {
56+
guard->SetNextOwner();
57+
return;
58+
}
59+
60+
Guard* old_guard = guard;
61+
while (!tail_.compare_exchange_weak(old_guard, nullptr)) {
62+
if (guard->HasNext()) {
63+
guard->SetNextOwner();
64+
return;
65+
}
66+
old_guard = guard;
67+
}
68+
}
69+
70+
std::atomic<Guard*> tail_{};
71+
};
72+
73+
} // namespace NSync

src/fiber/awaiter/mutex_awaiter.h

+9-7
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
#include <components/intrusive/list.h>
1010
#include <components/sync/spinLock.h>
11+
#include <components/sync/queue_spinlock.h>
1112
#include <fiber/awaiter/awaiter.h>
1213

1314
namespace NFibers {
@@ -16,25 +17,26 @@ template <class M>
1617
class AsyncMutexWaiter : public IAwaiter,
1718
public NComponents::Node<AsyncMutexWaiter<M>> {
1819
public:
19-
using Guard = std::unique_lock<typename M::Spinlock>;
20+
using Guard = NSync::QueueSpinLock::Guard;
2021

21-
AsyncMutexWaiter(M* mutex, Guard guard)
22-
: mutex(mutex), guard(std::move(guard)){};
22+
AsyncMutexWaiter(M* async_mutex, Guard& guard)
23+
: async_mutex(async_mutex), guard(guard){};
2324

2425
void AwaitSuspend(StoppedFiber handle) override {
2526
assert(handle.IsValid());
2627

2728
stopped_handle = handle;
28-
mutex->Park(this);
29-
guard.release()->unlock();
29+
async_mutex->Park(this);
30+
31+
guard.Release();
3032
}
3133

3234
void Schedule() { stopped_handle.Schedule(); }
3335

3436
private:
35-
M* mutex;
37+
M* async_mutex;
3638
StoppedFiber stopped_handle;
37-
Guard guard;
39+
Guard& guard;
3840
};
3941

4042
} // namespace NFibers

src/fiber/sync/async_mutex.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ namespace NFibers {
1111
void AsyncMutex::Lock() {
1212
Waiter::Guard guard(spinlock_);
1313
if (locked_) {
14-
Waiter waiter(this, std::move(guard));
14+
Waiter waiter(this, guard);
1515
Suspend(&waiter);
1616
} else {
1717
locked_ = true;

src/fiber/sync/async_mutex.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@
88

99
#include <components/intrusive/list.h>
1010
#include <components/sync/spinLock.h>
11+
#include <components/sync/queue_spinlock.h>
1112
#include <fiber/awaiter/mutex_awaiter.h>
1213

1314
namespace NFibers {
1415

1516
class AsyncMutex {
16-
using Spinlock = NSync::SpinLock;
17+
using Spinlock = NSync::QueueSpinLock;
1718
using Waiter = AsyncMutexWaiter<AsyncMutex>;
1819

1920
friend Waiter;

test/components/meson.build

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ test_source_files = [
55
'test_intrusive_list.cpp',
66
'test_ms_queue.cpp',
77
'test_async_mutex.cpp',
8+
'test_queue_spinlock.cpp',
89
]
910

1011
cpp = meson.get_compiler('cpp')
+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
//
2+
// Created by konstantin on 28.07.24.
3+
//
4+
5+
#include "gtest/gtest.h"
6+
7+
#include <thread>
8+
9+
#include <components/sync/queue_spinlock.h>
10+
11+
TEST(TestQueueSpinlock, LockUnlock) {
12+
NSync::QueueSpinLock spinlock;
13+
14+
{
15+
NSync::QueueSpinLock::Guard guard(spinlock); // <-- Acquired
16+
// Critical section
17+
} // <-- Released
18+
}
19+
20+
TEST(TestQueueSpinlock, SequentialLockUnlock) {
21+
NSync::QueueSpinLock spinlock;
22+
23+
{
24+
NSync::QueueSpinLock::Guard guard(spinlock);
25+
// Critical section
26+
}
27+
28+
{
29+
NSync::QueueSpinLock::Guard guard(spinlock);
30+
// Critical section
31+
}
32+
}
33+
34+
TEST(TestQueueSpinlock, ConcurrentIncrements) {
35+
NSync::QueueSpinLock spinlock;
36+
size_t counter = 0;
37+
38+
const size_t kIncrementsPerThread = 1000;
39+
40+
auto contender = [&] {
41+
for (size_t i = 0; i < kIncrementsPerThread; ++i) {
42+
NSync::QueueSpinLock::Guard guard(spinlock);
43+
44+
size_t current = counter;
45+
std::this_thread::yield();
46+
counter = current + 1;
47+
}
48+
};
49+
50+
std::thread t1(contender);
51+
std::thread t2(contender);
52+
t1.join();
53+
t2.join();
54+
55+
std::cout << "Shared counter value: " << counter << std::endl;
56+
57+
ASSERT_EQ(counter, 2 * kIncrementsPerThread);
58+
}

0 commit comments

Comments
 (0)