Skip to content

Commit c11989f

Browse files
committed
CpuBoundWork#CpuBoundWork(): don't spin on atomic int to acquire slot
This is inefficient and involves unfair scheduling. The latter implies possible bad surprises regarding waiting durations on busy nodes. Instead, use AsioConditionVariable#Wait() if there are no free slots. It's notified by others' CpuBoundWork#~CpuBoundWork() once finished.
1 parent fe082a2 commit c11989f

File tree

2 files changed

+35
-12
lines changed

2 files changed

+35
-12
lines changed

lib/base/io-engine.cpp

+24-11
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,33 @@ CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc)
2020
: m_Done(false)
2121
{
2222
auto& ioEngine (IoEngine::Get());
23+
auto& sem (ioEngine.m_CpuBoundSemaphore);
24+
std::unique_lock<std::mutex> lock (sem.Mutex);
2325

24-
for (;;) {
25-
auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1));
26+
if (sem.FreeSlots) {
27+
--sem.FreeSlots;
28+
return;
29+
}
2630

27-
if (availableSlots < 1) {
28-
ioEngine.m_CpuBoundSemaphore.fetch_add(1);
29-
IoEngine::YieldCurrentCoroutine(yc);
30-
continue;
31-
}
31+
AsioConditionVariable cv (ioEngine.GetIoContext());
3232

33-
break;
34-
}
33+
sem.Waiting.emplace(&cv);
34+
lock.unlock();
35+
cv.Wait(yc);
3536
}
3637

3738
void CpuBoundWork::Done()
3839
{
3940
if (!m_Done) {
40-
IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
41+
auto& sem (IoEngine::Get().m_CpuBoundSemaphore);
42+
std::unique_lock<std::mutex> lock (sem.Mutex);
43+
44+
if (sem.Waiting.empty()) {
45+
++sem.FreeSlots;
46+
} else {
47+
sem.Waiting.front()->Set();
48+
sem.Waiting.pop();
49+
}
4150

4251
m_Done = true;
4352
}
@@ -58,7 +67,11 @@ boost::asio::io_context& IoEngine::GetIoContext()
5867
IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)), m_AlreadyExpiredTimer(m_IoContext)
5968
{
6069
m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin);
61-
m_CpuBoundSemaphore.store(Configuration::Concurrency * 3u / 2u);
70+
71+
{
72+
std::unique_lock<std::mutex> lock (m_CpuBoundSemaphore.Mutex);
73+
m_CpuBoundSemaphore.FreeSlots = Configuration::Concurrency * 3u / 2u;
74+
}
6275

6376
for (auto& thread : m_Threads) {
6477
thread = std::thread(&IoEngine::RunEventLoop, this);

lib/base/io-engine.hpp

+11-1
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88
#include "base/logger.hpp"
99
#include "base/shared-object.hpp"
1010
#include <atomic>
11+
#include <cstdint>
1112
#include <exception>
1213
#include <memory>
14+
#include <mutex>
15+
#include <queue>
1316
#include <thread>
1417
#include <utility>
1518
#include <vector>
@@ -47,6 +50,8 @@ class CpuBoundWork
4750
bool m_Done;
4851
};
4952

53+
class AsioConditionVariable;
54+
5055
/**
5156
* Async I/O engine
5257
*
@@ -120,7 +125,12 @@ class IoEngine
120125
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_KeepAlive;
121126
std::vector<std::thread> m_Threads;
122127
boost::asio::deadline_timer m_AlreadyExpiredTimer;
123-
std::atomic_int_fast32_t m_CpuBoundSemaphore;
128+
129+
struct {
130+
std::mutex Mutex;
131+
uint_fast32_t FreeSlots;
132+
std::queue<AsioConditionVariable*> Waiting;
133+
} m_CpuBoundSemaphore;
124134
};
125135

126136
class TerminateIoThread : public std::exception

0 commit comments

Comments
 (0)