Skip to content

Commit 824211b

Browse files
committed
CpuBoundWork#CpuBoundWork(): don't spin on atomic int to acquire slot
This is inefficient and involves possible bad surprises regarding waiting durations on busy nodes. Instead, use AsioConditionVariable#Wait() if there are no free slots. It's notified by others' CpuBoundWork#~CpuBoundWork() once finished.
1 parent e8c7853 commit 824211b

File tree

2 files changed

+108
-11
lines changed

2 files changed

+108
-11
lines changed

lib/base/io-engine.cpp

+102-11
Original file line numberDiff line numberDiff line change
@@ -16,30 +16,121 @@
1616

1717
using namespace icinga;
1818

19-
CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&)
19+
/**
20+
* Acquires a slot for CPU-bound work.
21+
*
22+
* If and as long as the lock-free TryAcquireSlot() doesn't succeed,
23+
* subscribes to the slow path by waiting on a condition variable.
24+
* It is woken up by Done() which is called by the destructor.
25+
*
26+
* @param yc Needed to asynchronously wait for the condition variable.
27+
* @param strand Where to post the wake-up of the condition variable.
28+
*/
29+
CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand)
2030
: m_Done(false)
2131
{
22-
auto& ioEngine (IoEngine::Get());
32+
VERIFY(strand.running_in_this_thread());
2333

24-
for (;;) {
25-
auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1));
34+
auto& ie (IoEngine::Get());
35+
Shared<AsioConditionVariable>::Ptr cv;
36+
37+
while (!TryAcquireSlot()) {
38+
if (!cv) {
39+
cv = Shared<AsioConditionVariable>::Make(ie.GetIoContext());
2640

27-
if (availableSlots < 1) {
28-
ioEngine.m_CpuBoundSemaphore.fetch_add(1);
29-
IoEngine::YieldCurrentCoroutine(yc);
30-
continue;
41+
// The above line may take a little bit, so let's optimistically re-check
42+
if (TryAcquireSlot()) {
43+
break;
44+
}
3145
}
3246

33-
break;
47+
{
48+
std::unique_lock lock (ie.m_CpuBoundWaitingMutex);
49+
50+
// The above line may take even longer, so let's check again.
51+
// Also mitigate lost wake-ups by re-checking during the lock:
52+
//
53+
// During our lock, Done() can't retrieve the subscribers to wake up,
54+
// so any ongoing wake-up is either done at this point or has not started yet.
55+
// If such a wake-up is done, it's a lost wake-up to us unless we re-check here
56+
// whether the slot being freed (just before the wake-up) is still available.
57+
if (TryAcquireSlot()) {
58+
break;
59+
}
60+
61+
// If the (hypothetical) slot mentioned above was taken by another coroutine,
62+
// there are no free slots again, just as if no wake-ups happened just now.
63+
ie.m_CpuBoundWaiting.emplace_back(strand, cv);
64+
}
65+
66+
cv->Wait(yc);
3467
}
3568
}
3669

70+
/**
71+
* Tries to acquire a slot for CPU-bound work.
72+
*
73+
* Specifically, decrements the number of free slots (semaphore) by one,
74+
* but only if it's currently greater than zero.
75+
* Not falling below zero requires an atomic#compare_exchange_weak() loop
76+
* instead of a simple atomic#fetch_sub() call, but it's also atomic.
77+
*
78+
* @return Whether a slot was acquired.
79+
*/
80+
bool CpuBoundWork::TryAcquireSlot()
81+
{
82+
auto& ie (IoEngine::Get());
83+
auto freeSlots (ie.m_CpuBoundSemaphore.load());
84+
85+
while (freeSlots > 0) {
86+
// If ie.m_CpuBoundSemaphore was changed after the last load,
87+
// compare_exchange_weak() will load its latest value into freeSlots for us to retry until...
88+
if (ie.m_CpuBoundSemaphore.compare_exchange_weak(freeSlots, freeSlots - 1)) {
89+
// ... either we successfully decrement ie.m_CpuBoundSemaphore by one, ...
90+
return true;
91+
}
92+
}
93+
94+
// ... or it becomes zero due to another coroutine.
95+
return false;
96+
}
97+
98+
/**
99+
* Releases the own slot acquired by the constructor (TryAcquireSlot()) if not already done.
100+
*
101+
* Precisely, increments the number of free slots (semaphore) by one.
102+
* Also wakes up all waiting constructors (slow path) if necessary.
103+
*/
37104
void CpuBoundWork::Done()
38105
{
39106
if (!m_Done) {
40-
IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
41-
42107
m_Done = true;
108+
109+
auto& ie (IoEngine::Get());
110+
111+
// The constructor takes the slow path only if the semaphore is full,
112+
// so we only have to wake up constructors if the semaphore was full.
113+
// This works because after fetch_add(), TryAcquireSlot() (fast path) will succeed.
114+
if (ie.m_CpuBoundSemaphore.fetch_add(1) < 1) {
115+
// So now there are only slow path subscribers from just before the fetch_add() to be woken up.
116+
// Precisely, only subscribers from just before the fetch_add() which turned 0 to 1.
117+
118+
decltype(ie.m_CpuBoundWaiting) subscribers;
119+
120+
{
121+
// Locking after fetch_add() is safe because a delayed wake-up is fine.
122+
// Wake-up of constructors which subscribed after the fetch_add() is also not a problem.
123+
// In worst case, they will just re-subscribe to the slow path.
124+
// Lost wake-ups are mitigated by the constructor, see its implementation comments.
125+
std::unique_lock lock (ie.m_CpuBoundWaitingMutex);
126+
std::swap(subscribers, ie.m_CpuBoundWaiting);
127+
}
128+
129+
// Again, a delayed wake-up is fine, hence unlocked.
130+
for (auto& [strand, cv] : subscribers) {
131+
boost::asio::post(strand, [cv = std::move(cv)] { cv->NotifyOne(); });
132+
}
133+
}
43134
}
44135
}
45136

lib/base/io-engine.hpp

+6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <atomic>
1313
#include <exception>
1414
#include <memory>
15+
#include <mutex>
1516
#include <thread>
1617
#include <utility>
1718
#include <vector>
@@ -47,6 +48,8 @@ class CpuBoundWork
4748
void Done();
4849

4950
private:
51+
static bool TryAcquireSlot();
52+
5053
bool m_Done;
5154
};
5255

@@ -140,7 +143,10 @@ class IoEngine
140143
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_KeepAlive;
141144
std::vector<std::thread> m_Threads;
142145
boost::asio::deadline_timer m_AlreadyExpiredTimer;
146+
143147
std::atomic_int_fast32_t m_CpuBoundSemaphore;
148+
std::mutex m_CpuBoundWaitingMutex;
149+
std::vector<std::pair<boost::asio::io_context::strand, Shared<AsioConditionVariable>::Ptr>> m_CpuBoundWaiting;
144150
};
145151

146152
class TerminateIoThread : public std::exception

0 commit comments

Comments
 (0)