Skip to content

Commit 16a469f

Browse files
committed
CpuBoundWork#CpuBoundWork(): don't spin on atomic int to acquire slot
This is inefficient and involves possible bad surprises regarding waiting durations on busy nodes. Instead, use AsioConditionVariable#Wait() if there are no free slots. It's notified by others' CpuBoundWork#~CpuBoundWork() once finished.
1 parent 3d9fe89 commit 16a469f

File tree

2 files changed

+70
-11
lines changed

2 files changed

+70
-11
lines changed

lib/base/io-engine.cpp

+65-11
Original file line numberDiff line numberDiff line change
@@ -16,30 +16,84 @@
1616

1717
using namespace icinga;
1818

19-
CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&)
19+
CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand)
2020
: m_Done(false)
2121
{
22-
auto& ioEngine (IoEngine::Get());
22+
VERIFY(strand.running_in_this_thread());
2323

24-
for (;;) {
25-
auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1));
24+
auto& ie (IoEngine::Get());
25+
Shared<AsioConditionVariable>::Ptr cv;
26+
27+
auto fastPath = [&ie] {
28+
auto freeSlots (ie.m_CpuBoundSemaphore.load());
2629

27-
if (availableSlots < 1) {
28-
ioEngine.m_CpuBoundSemaphore.fetch_add(1);
29-
IoEngine::YieldCurrentCoroutine(yc);
30-
continue;
30+
while (freeSlots > 0) {
31+
// In case of failure, compare_exchange_weak() updates the first argument with the current value.
32+
// https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange
33+
if (ie.m_CpuBoundSemaphore.compare_exchange_weak(freeSlots, freeSlots - 1)) {
34+
return true;
35+
}
3136
}
3237

33-
break;
38+
return false;
39+
};
40+
41+
while (!fastPath()) {
42+
if (!cv) {
43+
cv = Shared<AsioConditionVariable>::Make(ie.GetIoContext());
44+
45+
// The above line may take a little bit, so let's optimistically re-check
46+
if (fastPath()) {
47+
break;
48+
}
49+
}
50+
51+
{
52+
std::unique_lock lock (ie.m_CpuBoundWaitingMutex);
53+
54+
// The above line may take even longer, so let's check again.
55+
// Also mitigate lost wake-ups by synchronizing with Done().
56+
// On details, see Done() implementation inline comments.
57+
if (fastPath()) {
58+
break;
59+
}
60+
61+
ie.m_CpuBoundWaiting.emplace_back(strand, cv);
62+
}
63+
64+
cv->Wait(yc);
3465
}
3566
}
3667

3768
void CpuBoundWork::Done()
3869
{
3970
if (!m_Done) {
40-
IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
41-
4271
m_Done = true;
72+
73+
auto& ie (IoEngine::Get());
74+
75+
// The constructor takes the slow path only if the semaphore is full,
76+
// so we only have to wake up constructors if the semaphore was full.
77+
// This works because after fetch_add(), fast paths in the constructor will succeed.
78+
if (ie.m_CpuBoundSemaphore.fetch_add(1) < 1) {
79+
// So now there are only slow path subscribers from just before the fetch_add() to be woken up.
80+
// Precisely, only subscribers from just before the fetch_add() which turned 0 to 1.
81+
82+
decltype(ie.m_CpuBoundWaiting) subscribers;
83+
84+
{
85+
// Due to the mutex, wake-ups obviously may happen only before or after constructor subscribtions.
86+
// Ideally, this wake-up happens after any ongoing subscription (obviously).
87+
std::unique_lock lock (ie.m_CpuBoundWaitingMutex);
88+
std::swap(subscribers, ie.m_CpuBoundWaiting);
89+
}
90+
// But even if it happens before (and gets lost), it's not a problem because now the ongoing subscriber
91+
// will lock the mutex and re-check the semaphore which is already >0 (fast path) due to fetch_add().
92+
93+
for (auto& subscriber : subscribers) {
94+
boost::asio::post(subscriber.first, [cv = std::move(subscriber.second)] { cv->NotifyOne(); });
95+
}
96+
}
4397
}
4498
}
4599

lib/base/io-engine.hpp

+5
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
#include "base/exception.hpp"
77
#include "base/lazy-init.hpp"
88
#include "base/logger.hpp"
9+
#include "base/shared.hpp"
910
#include "base/shared-object.hpp"
1011
#include <atomic>
1112
#include <exception>
1213
#include <memory>
14+
#include <mutex>
1315
#include <thread>
1416
#include <utility>
1517
#include <vector>
@@ -138,7 +140,10 @@ class IoEngine
138140
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_KeepAlive;
139141
std::vector<std::thread> m_Threads;
140142
boost::asio::deadline_timer m_AlreadyExpiredTimer;
143+
141144
std::atomic_int_fast32_t m_CpuBoundSemaphore;
145+
std::mutex m_CpuBoundWaitingMutex;
146+
std::vector<std::pair<boost::asio::io_context::strand, Shared<AsioConditionVariable>::Ptr>> m_CpuBoundWaiting;
142147
};
143148

144149
class TerminateIoThread : public std::exception

0 commit comments

Comments
 (0)