Skip to content

Commit 8d24525

Browse files
committed
CpuBoundWork#CpuBoundWork(): don't spin on atomic int to acquire slot
This is inefficient and involves unfair scheduling. The latter implies possible bad surprises regarding waiting durations on busy nodes. Instead, use AsioConditionVariable#Wait() if there are no free slots. It's notified by others' CpuBoundWork#~CpuBoundWork() once finished.
1 parent fe082a2 commit 8d24525

File tree

2 files changed

+57
-29
lines changed

2 files changed

+57
-29
lines changed

lib/base/io-engine.cpp

+29-10
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,39 @@ CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc)
2020
: m_Done(false)
2121
{
2222
auto& ioEngine (IoEngine::Get());
23+
auto& sem (ioEngine.m_CpuBoundSemaphore);
24+
std::unique_lock<std::mutex> lock (sem.Mutex);
2325

24-
for (;;) {
25-
auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1));
26+
if (sem.FreeSlots) {
27+
--sem.FreeSlots;
28+
return;
29+
}
2630

27-
if (availableSlots < 1) {
28-
ioEngine.m_CpuBoundSemaphore.fetch_add(1);
29-
IoEngine::YieldCurrentCoroutine(yc);
30-
continue;
31-
}
31+
auto cv (Shared<AsioConditionVariable>::Make(ioEngine.GetIoContext()));
32+
33+
sem.Waiting.emplace(cv);
34+
lock.unlock();
3235

33-
break;
36+
try {
37+
cv->Wait(yc);
38+
} catch (...) {
39+
Done();
40+
throw;
3441
}
3542
}
3643

3744
void CpuBoundWork::Done()
3845
{
3946
if (!m_Done) {
40-
IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
47+
auto& sem (IoEngine::Get().m_CpuBoundSemaphore);
48+
std::unique_lock<std::mutex> lock (sem.Mutex);
49+
50+
if (sem.Waiting.empty()) {
51+
++sem.FreeSlots;
52+
} else {
53+
sem.Waiting.front()->Set();
54+
sem.Waiting.pop();
55+
}
4156

4257
m_Done = true;
4358
}
@@ -58,7 +73,11 @@ boost::asio::io_context& IoEngine::GetIoContext()
5873
IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)), m_AlreadyExpiredTimer(m_IoContext)
5974
{
6075
m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin);
61-
m_CpuBoundSemaphore.store(Configuration::Concurrency * 3u / 2u);
76+
77+
{
78+
std::unique_lock<std::mutex> lock (m_CpuBoundSemaphore.Mutex);
79+
m_CpuBoundSemaphore.FreeSlots = Configuration::Concurrency * 3u / 2u;
80+
}
6281

6382
for (auto& thread : m_Threads) {
6483
thread = std::thread(&IoEngine::RunEventLoop, this);

lib/base/io-engine.hpp

+28-19
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,14 @@
66
#include "base/exception.hpp"
77
#include "base/lazy-init.hpp"
88
#include "base/logger.hpp"
9+
#include "base/shared.hpp"
910
#include "base/shared-object.hpp"
1011
#include <atomic>
12+
#include <cstdint>
1113
#include <exception>
1214
#include <memory>
15+
#include <mutex>
16+
#include <queue>
1317
#include <thread>
1418
#include <utility>
1519
#include <vector>
@@ -47,6 +51,24 @@ class CpuBoundWork
4751
bool m_Done;
4852
};
4953

54+
/**
55+
* Condition variable which doesn't block I/O threads
56+
*
57+
* @ingroup base
58+
*/
59+
class AsioConditionVariable
60+
{
61+
public:
62+
AsioConditionVariable(boost::asio::io_context& io, bool init = false);
63+
64+
void Set();
65+
void Clear();
66+
void Wait(boost::asio::yield_context yc);
67+
68+
private:
69+
boost::asio::deadline_timer m_Timer;
70+
};
71+
5072
/**
5173
* Async I/O engine
5274
*
@@ -120,29 +142,16 @@ class IoEngine
120142
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_KeepAlive;
121143
std::vector<std::thread> m_Threads;
122144
boost::asio::deadline_timer m_AlreadyExpiredTimer;
123-
std::atomic_int_fast32_t m_CpuBoundSemaphore;
124-
};
125145

126-
class TerminateIoThread : public std::exception
127-
{
146+
struct {
147+
std::mutex Mutex;
148+
uint_fast32_t FreeSlots;
149+
std::queue<Shared<AsioConditionVariable>::Ptr> Waiting;
150+
} m_CpuBoundSemaphore;
128151
};
129152

130-
/**
131-
* Condition variable which doesn't block I/O threads
132-
*
133-
* @ingroup base
134-
*/
135-
class AsioConditionVariable
153+
class TerminateIoThread : public std::exception
136154
{
137-
public:
138-
AsioConditionVariable(boost::asio::io_context& io, bool init = false);
139-
140-
void Set();
141-
void Clear();
142-
void Wait(boost::asio::yield_context yc);
143-
144-
private:
145-
boost::asio::deadline_timer m_Timer;
146155
};
147156

148157
/**

0 commit comments

Comments
 (0)