Skip to content

CpuBoundWork#CpuBoundWork(): don't spin on atomic int to acquire slot #9990

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
158 changes: 121 additions & 37 deletions lib/base/io-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,60 +16,121 @@

using namespace icinga;

CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc)
/**
* Acquires a slot for CPU-bound work.
*
* If and as long as the lock-free TryAcquireSlot() doesn't succeed,
* subscribes to the slow path by waiting on a condition variable.
* It is woken up by Done() which is called by the destructor.
*
* @param yc Needed to asynchronously wait for the condition variable.
* @param strand Where to post the wake-up of the condition variable.
*/
CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand& strand)
: m_Done(false)
{
auto& ioEngine (IoEngine::Get());
VERIFY(strand.running_in_this_thread());

for (;;) {
auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1));
auto& ie (IoEngine::Get());
Shared<AsioConditionVariable>::Ptr cv;

while (!TryAcquireSlot()) {
if (!cv) {
cv = Shared<AsioConditionVariable>::Make(ie.GetIoContext());

if (availableSlots < 1) {
ioEngine.m_CpuBoundSemaphore.fetch_add(1);
IoEngine::YieldCurrentCoroutine(yc);
continue;
// The above line may take a little bit, so let's optimistically re-check
if (TryAcquireSlot()) {
break;
}
}

break;
{
std::unique_lock lock (ie.m_CpuBoundWaitingMutex);

// The above line may take even longer, so let's check again.
// Also mitigate lost wake-ups by re-checking during the lock:
//
// During our lock, Done() can't retrieve the subscribers to wake up,
// so any ongoing wake-up is either done at this point or has not started yet.
// If such a wake-up is done, it's a lost wake-up to us unless we re-check here
// whether the slot being freed (just before the wake-up) is still available.
if (TryAcquireSlot()) {
break;
}

// If the (hypothetical) slot mentioned above was taken by another coroutine,
// there are no free slots again, just as if no wake-ups happened just now.
ie.m_CpuBoundWaiting.emplace_back(strand, cv);
}

cv->Wait(yc);
}
}

CpuBoundWork::~CpuBoundWork()
/**
* Tries to acquire a slot for CPU-bound work.
*
* Specifically, decrements the number of free slots (semaphore) by one,
* but only if it's currently greater than zero.
* Not falling below zero requires an atomic#compare_exchange_weak() loop
* instead of a simple atomic#fetch_sub() call, but it's also atomic.
*
* @return Whether a slot was acquired.
*/
bool CpuBoundWork::TryAcquireSlot()
{
if (!m_Done) {
IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
auto& ie (IoEngine::Get());
auto freeSlots (ie.m_CpuBoundSemaphore.load());

while (freeSlots > 0) {
// If ie.m_CpuBoundSemaphore was changed after the last load,
// compare_exchange_weak() will load its latest value into freeSlots for us to retry until...
if (ie.m_CpuBoundSemaphore.compare_exchange_weak(freeSlots, freeSlots - 1)) {
// ... either we successfully decrement ie.m_CpuBoundSemaphore by one, ...
return true;
}
}

// ... or it becomes zero due to another coroutine.
return false;
}

/**
* Releases the own slot acquired by the constructor (TryAcquireSlot()) if not already done.
*
* Precisely, increments the number of free slots (semaphore) by one.
* Also wakes up all waiting constructors (slow path) if necessary.
*/
void CpuBoundWork::Done()
{
if (!m_Done) {
IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);

m_Done = true;
}
}

IoBoundWorkSlot::IoBoundWorkSlot(boost::asio::yield_context yc)
: yc(yc)
{
IoEngine::Get().m_CpuBoundSemaphore.fetch_add(1);
}

IoBoundWorkSlot::~IoBoundWorkSlot()
{
auto& ioEngine (IoEngine::Get());

for (;;) {
auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1));

if (availableSlots < 1) {
ioEngine.m_CpuBoundSemaphore.fetch_add(1);
IoEngine::YieldCurrentCoroutine(yc);
continue;
auto& ie (IoEngine::Get());

// The constructor takes the slow path only if the semaphore is full,
// so we only have to wake up constructors if the semaphore was full.
// This works because after fetch_add(), TryAcquireSlot() (fast path) will succeed.
if (ie.m_CpuBoundSemaphore.fetch_add(1) < 1) {
// So now there are only slow path subscribers from just before the fetch_add() to be woken up.
// Precisely, only subscribers from just before the fetch_add() which turned 0 to 1.

decltype(ie.m_CpuBoundWaiting) subscribers;

{
// Locking after fetch_add() is safe because a delayed wake-up is fine.
// Wake-up of constructors which subscribed after the fetch_add() is also not a problem.
// In worst case, they will just re-subscribe to the slow path.
// Lost wake-ups are mitigated by the constructor, see its implementation comments.
std::unique_lock lock (ie.m_CpuBoundWaitingMutex);
std::swap(subscribers, ie.m_CpuBoundWaiting);
}

// Again, a delayed wake-up is fine, hence unlocked.
for (auto& [strand, cv] : subscribers) {
boost::asio::post(strand, [cv = std::move(cv)] { cv->NotifyOne(); });
}
}

break;
}
}

Expand All @@ -85,9 +146,8 @@ boost::asio::io_context& IoEngine::GetIoContext()
return m_IoContext;
}

IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u)), m_AlreadyExpiredTimer(m_IoContext)
IoEngine::IoEngine() : m_IoContext(), m_KeepAlive(boost::asio::make_work_guard(m_IoContext)), m_Threads(decltype(m_Threads)::size_type(Configuration::Concurrency * 2u))
{
m_AlreadyExpiredTimer.expires_at(boost::posix_time::neg_infin);
m_CpuBoundSemaphore.store(Configuration::Concurrency * 3u / 2u);

for (auto& thread : m_Threads) {
Expand Down Expand Up @@ -173,6 +233,30 @@ void AsioDualEvent::WaitForClear(boost::asio::yield_context yc)
m_IsFalse.Wait(std::move(yc));
}

AsioConditionVariable::AsioConditionVariable(boost::asio::io_context& io)
: m_Timer(io)
{
m_Timer.expires_at(boost::posix_time::pos_infin);
}

void AsioConditionVariable::Wait(boost::asio::yield_context yc)
{
boost::system::error_code ec;
m_Timer.async_wait(yc[ec]);
}

bool AsioConditionVariable::NotifyOne()
{
boost::system::error_code ec;
return m_Timer.cancel_one(ec);
}

size_t AsioConditionVariable::NotifyAll()
{
boost::system::error_code ec;
return m_Timer.cancel(ec);
}
Comment on lines +248 to +258
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the deal with AsioEvent vs. AsioConditionVariable? You're introducing a new NotifyOne(), however this only seems to be used on an instance with one coroutine waiting, so what does this improve over the previous AsioConditionVariable::Set() (at least I don't immediately why that shouldn't work here).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the deal with AsioEvent vs. AsioConditionVariable?

Well, the one is (now) an actual CV (and complies to its name). The other is like Python's threading.Event – an awaitable bool.

You're introducing a new NotifyOne(), however this only seems to be used on an instance with one coroutine waiting,

I know I'm not using NotifyAll(). I just fully implemented the new AsioConditionVariable class.

so what does this improve over the previous AsioConditionVariable::Set() (at least I don't immediately why that shouldn't work here).

I could also use AsioEvent#Set(). But then I eventually had to call AsioEvent#Clear() somewhere, resetting the timer to where it was before. What for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g:

msg->second.Wait(yc);
msg->second.Clear();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then I eventually had to call AsioEvent#Clear() somewhere, resetting the timer to where it was before. What for?

You don't have to if you don't want to reuse the same instance elsewhere. AsioEvent provides literally the same functionality, so why re-inventing the wheel? You don't even need the new Notify* methods, just call AsioEvent::Set() where you you're currently calling NotifyOne(). They're bound to a specific coroutine, so there won't be any other waiters on that timer but only the coroutine that owns the strand.

Your argument for introducing yet another duplicate applies to your new class as well. As far as I know, you can't just call async_wait() on already cancelled timer, so you've to reset it the same way as you would with AsioEvent as well. So, please use the existing class instead of adding another useless duplicate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can't just call async_wait() on already cancelled timer, so you've to reset it

Source... ?

Cancelling the timer does not change the expiry time.

https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/basic_deadline_timer/cancel/overload1.html

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just call AsioEvent::Set() where you you're currently calling NotifyOne().

You don't have to [call AsioEvent#Clear()] if you don't want to reuse the same instance elsewhere.

Once an AsioEvent has been Set(), Wait() returns instantly until the next Clear().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can't just call async_wait() on already cancelled timer, so you've to reset it

Source... ?

Well, my source was something like that, and I didn't get the "ec.message()" to see, but going with the debugger through this, it actually hits that block, so my bad about that!

BOOST_AUTO_TEST_CASE(test_test)
{
	boost::asio::io_context io;
	boost::asio::deadline_timer timer(io);
	timer.expires_from_now(boost::posix_time::seconds(5));
	timer.async_wait([&timer](const boost::system::error_code& ec) {
		BOOST_TEST(ec == boost::system::errc::operation_canceled);
	});
	boost::asio::post(io, [&timer]() {
		timer.cancel();
		timer.async_wait([](boost::system::error_code ec) {
			std::cout << "ec.message()" << "\n";
		});
	});
	io.run();
}

Copy link
Member Author

@Al2Klimov Al2Klimov May 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That Heisenbug doesn't speak for the test's reliability. Mine, in contrast, confirms that one CAN wait for the timer after being cancelled:

BOOST_AUTO_TEST_CASE(test_test)
{
	boost::asio::io_context io;
	boost::asio::deadline_timer timer(io);

// OK yet, will change once lambda below called
	boost::system::error_code ec1;

// !OK yet, will change once lambda below called
	boost::system::error_code ec2;
	ec2.assign(boost::system::errc::operation_canceled, boost::system::generic_category());
	BOOST_CHECK_NE(ec2, boost::system::error_code());

	timer.expires_from_now(boost::posix_time::seconds(5));
	timer.async_wait([&ec1](const boost::system::error_code& ec) {
		ec1 = ec; // (2)
	});

	boost::asio::post(io, [&timer, &ec2] {
		timer.cancel(); // (1)
		timer.async_wait([&ec2](boost::system::error_code ec) {
			ec2 = ec; // (3), after 5s
		});
	});

	io.run();

// Free-standing checks, so that not-called lambda doesn't fake an OK
	BOOST_CHECK_EQUAL(ec1, boost::system::errc::operation_canceled);
	BOOST_CHECK_EQUAL(ec2, boost::system::error_code());
}


/**
* Cancels any pending timeout callback.
*
Expand Down
40 changes: 21 additions & 19 deletions lib/base/io-engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <atomic>
#include <exception>
#include <memory>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>
Expand All @@ -20,6 +21,7 @@
#include <boost/exception/all.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/io_context_strand.hpp>
#include <boost/asio/spawn.hpp>

#if BOOST_VERSION >= 108700
Expand All @@ -37,36 +39,41 @@ namespace icinga
class CpuBoundWork
{
public:
CpuBoundWork(boost::asio::yield_context yc);
CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_context::strand&);
CpuBoundWork(const CpuBoundWork&) = delete;
CpuBoundWork(CpuBoundWork&&) = delete;
CpuBoundWork& operator=(const CpuBoundWork&) = delete;
CpuBoundWork& operator=(CpuBoundWork&&) = delete;
~CpuBoundWork();

inline ~CpuBoundWork()
{
Done();
}

void Done();

private:
static bool TryAcquireSlot();

bool m_Done;
};

/**
* Scope break for CPU-bound work done in an I/O thread
* Condition variable which doesn't block I/O threads
*
* @ingroup base
*/
class IoBoundWorkSlot
class AsioConditionVariable
{
public:
IoBoundWorkSlot(boost::asio::yield_context yc);
IoBoundWorkSlot(const IoBoundWorkSlot&) = delete;
IoBoundWorkSlot(IoBoundWorkSlot&&) = delete;
IoBoundWorkSlot& operator=(const IoBoundWorkSlot&) = delete;
IoBoundWorkSlot& operator=(IoBoundWorkSlot&&) = delete;
~IoBoundWorkSlot();
AsioConditionVariable(boost::asio::io_context& io);

void Wait(boost::asio::yield_context yc);
bool NotifyOne();
size_t NotifyAll();

private:
boost::asio::yield_context yc;
boost::asio::deadline_timer m_Timer;
};

/**
Expand All @@ -77,7 +84,6 @@ class IoBoundWorkSlot
class IoEngine
{
friend CpuBoundWork;
friend IoBoundWorkSlot;

public:
IoEngine(const IoEngine&) = delete;
Expand Down Expand Up @@ -133,12 +139,6 @@ class IoEngine
#endif // BOOST_VERSION >= 108700
}

static inline
void YieldCurrentCoroutine(boost::asio::yield_context yc)
{
Get().m_AlreadyExpiredTimer.async_wait(yc);
}

private:
IoEngine();

Expand All @@ -149,8 +149,10 @@ class IoEngine
boost::asio::io_context m_IoContext;
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_KeepAlive;
std::vector<std::thread> m_Threads;
boost::asio::deadline_timer m_AlreadyExpiredTimer;

std::atomic_int_fast32_t m_CpuBoundSemaphore;
std::mutex m_CpuBoundWaitingMutex;
std::vector<std::pair<boost::asio::io_context::strand, Shared<AsioConditionVariable>::Ptr>> m_CpuBoundWaiting;
};

class TerminateIoThread : public std::exception
Expand Down
2 changes: 0 additions & 2 deletions lib/remote/eventshandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ bool EventsHandler::HandleRequest(
response.result(http::status::ok);
response.set(http::field::content_type, "application/json");

IoBoundWorkSlot dontLockTheIoThread (yc);

http::async_write(stream, response, yc);
stream.async_flush(yc);

Expand Down
15 changes: 11 additions & 4 deletions lib/remote/httpserverconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include "base/configtype.hpp"
#include "base/defer.hpp"
#include "base/exception.hpp"
#include "base/io-engine.hpp"
#include "base/logger.hpp"
#include "base/objectlock.hpp"
#include "base/timer.hpp"
Expand Down Expand Up @@ -105,6 +104,9 @@ void HttpServerConnection::StartStreaming()

m_HasStartedStreaming = true;

VERIFY(m_HandlingRequest);
m_HandlingRequest->Done();

HttpServerConnection::Ptr keepAlive (this);

IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) {
Expand Down Expand Up @@ -418,20 +420,25 @@ bool ProcessRequest(
ApiUser::Ptr& authenticatedUser,
boost::beast::http::response<boost::beast::http::string_body>& response,
HttpServerConnection& server,
CpuBoundWork*& m_HandlingRequest,
bool& hasStartedStreaming,
const WaitGroup::Ptr& waitGroup,
std::chrono::steady_clock::duration& cpuBoundWorkTime,
boost::asio::yield_context& yc
boost::asio::yield_context& yc,
boost::asio::io_context::strand& strand
)
{
namespace http = boost::beast::http;

try {
// Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
auto start (std::chrono::steady_clock::now());
CpuBoundWork handlingRequest (yc);
CpuBoundWork handlingRequest (yc, strand);
cpuBoundWorkTime = std::chrono::steady_clock::now() - start;

Defer resetHandlingRequest ([&m_HandlingRequest] { m_HandlingRequest = nullptr; });
m_HandlingRequest = &handlingRequest;

HttpHandler::ProcessRequest(waitGroup, stream, authenticatedUser, request, response, yc, server);
} catch (const std::exception& ex) {
if (hasStartedStreaming) {
Expand Down Expand Up @@ -549,7 +556,7 @@ void HttpServerConnection::ProcessMessages(boost::asio::yield_context yc)

m_Seen = std::numeric_limits<decltype(m_Seen)>::max();

if (!ProcessRequest(*m_Stream, request, authenticatedUser, response, *this, m_HasStartedStreaming, m_WaitGroup, cpuBoundWorkTime, yc)) {
if (!ProcessRequest(*m_Stream, request, authenticatedUser, response, *this, m_HandlingRequest, m_HasStartedStreaming, m_WaitGroup, cpuBoundWorkTime, yc, m_IoStrand)) {
break;
}

Expand Down
2 changes: 2 additions & 0 deletions lib/remote/httpserverconnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#define HTTPSERVERCONNECTION_H

#include "remote/apiuser.hpp"
#include "base/io-engine.hpp"
#include "base/string.hpp"
#include "base/tlsstream.hpp"
#include "base/wait-group.hpp"
Expand Down Expand Up @@ -42,6 +43,7 @@ class HttpServerConnection final : public Object
boost::asio::io_context::strand m_IoStrand;
bool m_ShuttingDown;
bool m_HasStartedStreaming;
CpuBoundWork* m_HandlingRequest = nullptr;
boost::asio::deadline_timer m_CheckLivenessTimer;

HttpServerConnection(const WaitGroup::Ptr& waitGroup, const String& identity, bool authenticated,
Expand Down
2 changes: 1 addition & 1 deletion lib/remote/jsonrpcconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
auto start (ch::steady_clock::now());

try {
CpuBoundWork handleMessage (yc);
CpuBoundWork handleMessage (yc, m_IoStrand);

// Cache the elapsed time to acquire a CPU semaphore used to detect extremely heavy workloads.
cpuBoundDuration = ch::steady_clock::now() - start;
Expand Down
Loading