Skip to content

try optimize queue spinlock #60

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions bench/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@
| IntrusiveThreadPool_task_100000/iterations:10/repeats:5_stddev | 20.8 ms | 5.86 ms| 5 |
| IntrusiveThreadPool_task_100000/iterations:10/repeats:5_cv | 6.50 % | 6.31 % | 5 |

### DistributedPool Task 100,000 (std::mutex)

| Benchmark | Time | CPU | Iterations |
|-----------------------------------------------------------------|---------|--------|------------|
| DistributedPool_task_100000/iterations:10/repeats:5 | 254 ms | 96.8 ms| 10 |
| DistributedPool_task_100000/iterations:10/repeats:5 | 492 ms | 196 ms | 10 |
| DistributedPool_task_100000/iterations:10/repeats:5 | 563 ms | 224 ms | 10 |
| DistributedPool_task_100000/iterations:10/repeats:5 | 254 ms | 118 ms | 10 |
| DistributedPool_task_100000/iterations:10/repeats:5 | 175 ms | 83.4 ms| 10 |
| DistributedPool_task_100000/iterations:10/repeats:5_mean | 348 ms | 144 ms | 5 |
| DistributedPool_task_100000/iterations:10/repeats:5_median | 254 ms | 118 ms | 5 |
| DistributedPool_task_100000/iterations:10/repeats:5_stddev | 169 ms | 62.7 ms| 5 |
| DistributedPool_task_100000/iterations:10/repeats:5_cv | 48.67 % | 43.65 %| 5 |

### DistributedPool Task 100,000 (spinlock)

| Benchmark | Time | CPU | Iterations |
Expand Down Expand Up @@ -42,19 +56,19 @@
| DistributedPool_task_100000/iterations:10/repeats:5_stddev | 108 ms | 45.5 ms| 5 |
| DistributedPool_task_100000/iterations:10/repeats:5_cv | 37.98 %| 39.18 %| 5 |

### DistributedPool Task 100,000 (std::mutex)
### DistributedPool Task 100,000 (spinlock with weak mm)

| Benchmark | Time | CPU | Iterations |
|-----------------------------------------------------------------|---------|--------|------------|
| DistributedPool_task_100000/iterations:10/repeats:5 | 254 ms | 96.8 ms| 10 |
| DistributedPool_task_100000/iterations:10/repeats:5 | 492 ms | 196 ms | 10 |
| DistributedPool_task_100000/iterations:10/repeats:5 | 563 ms | 224 ms | 10 |
| DistributedPool_task_100000/iterations:10/repeats:5 | 254 ms | 118 ms | 10 |
| DistributedPool_task_100000/iterations:10/repeats:5 | 175 ms | 83.4 ms| 10 |
| DistributedPool_task_100000/iterations:10/repeats:5_mean | 348 ms | 144 ms | 5 |
| DistributedPool_task_100000/iterations:10/repeats:5_median | 254 ms | 118 ms | 5 |
| DistributedPool_task_100000/iterations:10/repeats:5_stddev | 169 ms | 62.7 ms| 5 |
| DistributedPool_task_100000/iterations:10/repeats:5_cv | 48.67 % | 43.65 %| 5 |
| Benchmark | Time | CPU | Iterations |
| --- | --- | --- | --- |
| DistributedPool_task_100000/iterations:10/repeats:5 | 186 ms | 73.0 ms | 10 |
| DistributedPool_task_100000/iterations:10/repeats:5 | 132 ms | 62.2 ms | 10 |
| DistributedPool_task_100000/iterations:10/repeats:5 | 145 ms | 59.1 ms | 10 |
| DistributedPool_task_100000/iterations:10/repeats:5 | 126 ms | 58.9 ms | 10 |
| DistributedPool_task_100000/iterations:10/repeats:5 | 149 ms | 62.4 ms | 10 |
| DistributedPool_task_100000/iterations:10/repeats:5_mean | 148 ms | 63.1 ms | 5 |
| DistributedPool_task_100000/iterations:10/repeats:5_median | 145 ms | 62.2 ms | 5 |
| DistributedPool_task_100000/iterations:10/repeats:5_stddev | 23.5 ms | 5.76 ms | 5 |
| DistributedPool_task_100000/iterations:10/repeats:5_cv | 15.90 % | 9.13 % | 5 |

### Compare std::mutex and async_mutex with coro

Expand Down
19 changes: 11 additions & 8 deletions src/components/sync/queue_spinlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,23 @@ class QueueSpinLock final {
public:
explicit Guard(QueueSpinLock& host) : host(host) { host.Acquire(this); }
~Guard() {
if (is_owner) Release();
if (is_owner.load(std::memory_order_acquire)) Release();
}

void Release() {
host.Release(this);
is_owner.store(false);
is_owner.store(false, std::memory_order_release);
}

void SetOwner() { is_owner.store(true); }
void SetOwner() { is_owner.store(true, std::memory_order_release); }

void SetNext(Guard* guard) { next.store(guard); }
void SetNext(Guard* guard) { next.store(guard, std::memory_order_release); }

bool IsOwner() const { return is_owner.load(); }
bool IsOwner() const {
return is_owner.load(std::memory_order_acquire);
}

bool HasNext() const { return next.load() != nullptr; }
bool HasNext() const { return next.load(std::memory_order_acquire) != nullptr; }

void SetNextOwner() { next.load()->SetOwner(); }

Expand All @@ -40,7 +42,7 @@ class QueueSpinLock final {

private:
void Acquire(Guard* guard) {
auto ancestor = tail_.exchange(guard);
auto ancestor = tail_.exchange(guard/*, std::memory_order_acquire*/);
if (ancestor == nullptr) {
guard->SetOwner();
return;
Expand All @@ -58,7 +60,8 @@ class QueueSpinLock final {
}

Guard* old_guard = guard;
while (!tail_.compare_exchange_weak(old_guard, nullptr)) {
while (!tail_.compare_exchange_weak(old_guard, nullptr/*,
std::memory_order_release*/)) {
if (guard->HasNext()) {
guard->SetNextOwner();
return;
Expand Down
9 changes: 5 additions & 4 deletions src/fiber/awaiter/wait_group_awaiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <mutex>

#include <components/intrusive/list.h>
#include <components/sync/queue_spinlock.h>
#include <fiber/awaiter/awaiter.h>

namespace NFibers {
Expand All @@ -15,23 +16,23 @@ template <class W>
class WaitGroupWaiter : public IAwaiter,
public NComponents::Node<WaitGroupWaiter<W>> {
public:
using Guard = std::unique_lock<typename W::Spinlock>;
using Guard = NSync::QueueSpinLock::Guard;

WaitGroupWaiter(W* wg, Guard guard) : wg(wg), guard(std::move(guard)){};
WaitGroupWaiter(W* wg, Guard& guard) : wg(wg), guard(guard){};

void AwaitSuspend(StoppedFiber fiber) override {
assert(fiber.IsValid());

stopped_fiber = fiber;
wg->Park(this);
guard.release()->unlock();
guard.Release();
}

void Schedule() { stopped_fiber.Schedule(); }

private:
W* wg;
Guard guard;
Guard& guard;
StoppedFiber stopped_fiber;
};

Expand Down
2 changes: 1 addition & 1 deletion src/fiber/sync/wait_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void WaitGroup::Done() {
void WaitGroup::Wait() {
Waiter::Guard guard(spinlock_);
if (counter_ > 0) {
Waiter wg_waiter(this, std::move(guard));
Waiter wg_waiter(this, guard);
Suspend(&wg_waiter);
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/fiber/sync/wait_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@

#include <components/intrusive/list.h>
#include <components/sync/spinLock.h>
#include <components/sync/queue_spinlock.h>
#include <fiber/awaiter/wait_group_awaiter.h>

namespace NFibers {

class WaitGroup {
using Spinlock = NSync::SpinLock;
using Spinlock = NSync::QueueSpinLock;
using Waiter = WaitGroupWaiter<WaitGroup>;

friend Waiter;
Expand Down
Loading