-
Notifications
You must be signed in to change notification settings - Fork 584
CpuBoundWork#CpuBoundWork(): don't spin on atomic int to acquire slot #9990
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Low load test
Just a few increments/decrements. 👍 |
High load testIf I literally DoS Icinga with https://github.com/Al2Klimov/i2all.tf/tree/master/i2dos, I get a few of these:
After I stop that program and fire one curl as in my low load test above, I get the same picture: still 12 free slots. 👍 Logs--- lib/base/io-engine.cpp
+++ lib/base/io-engine.cpp
@@ -24,6 +24,7 @@ CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc)
std::unique_lock<std::mutex> lock (sem.Mutex);
if (sem.FreeSlots) {
+ Log(LogInformation, "CpuBoundWork") << "Using one free slot, free: " << sem.FreeSlots << " => " << sem.FreeSlots - 1u;
--sem.FreeSlots;
return;
}
@@ -32,7 +33,9 @@ CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc)
sem.Waiting.emplace(&cv);
lock.unlock();
+ Log(LogInformation, "CpuBoundWork") << "Waiting...";
cv.Wait(yc);
+ Log(LogInformation, "CpuBoundWork") << "Waited!";
}
void CpuBoundWork::Done()
@@ -42,8 +45,10 @@ void CpuBoundWork::Done()
std::unique_lock<std::mutex> lock (sem.Mutex);
if (sem.Waiting.empty()) {
+ Log(LogInformation, "CpuBoundWork") << "Releasing one used slot, free: " << sem.FreeSlots << " => " << sem.FreeSlots + 1u;
++sem.FreeSlots;
} else {
+ Log(LogInformation, "CpuBoundWork") << "Handing over one used slot, free: " << sem.FreeSlots << " => " << sem.FreeSlots;
sem.Waiting.front()->Set();
sem.Waiting.pop();
} |
c11989f
to
8d24525
Compare
Is the way |
8d24525
to
bf74280
Compare
bf74280
to
9062934
Compare
9062934
to
a00262f
Compare
a00262f
to
26ef66e
Compare
In addition, v2.14.2 could theoretically misbehave once the free slot amount falls "temporarily" noticeably below zero. Like, three requestors achieve an https://github.com/Icinga/icinga2/blob/v2.14.2/lib/base/io-engine.cpp#L24-L31 So that spinlock blocks not only CPU time, but also slots from legit requestors. The father of all spinlocks, so to say. 🙈 #10117 (comment) |
lib/remote/eventshandler.cpp
Outdated
@@ -105,7 +106,7 @@ bool EventsHandler::HandleRequest( | |||
response.result(http::status::ok); | |||
response.set(http::field::content_type, "application/json"); | |||
|
|||
IoBoundWorkSlot dontLockTheIoThread (yc); | |||
handlingRequest.Done(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've figured out and pushed 75271fe how to get rid of this and the 38 changed files.
$ git diff --stat 74009f0fc..a663f98b2
lib/base/io-engine.cpp | 77 +++++++++++++++++++++++++++++++++++++++--------------------------------------
lib/base/io-engine.hpp | 64 +++++++++++++++++++++++++++++++++-------------------------------
lib/remote/eventshandler.cpp | 2 --
lib/remote/httpserverconnection.cpp | 19 +++++++++++++------
lib/remote/httpserverconnection.hpp | 2 ++
lib/remote/jsonrpcconnection.cpp | 2 +-
6 files changed, 88 insertions(+), 78 deletions(-)
$
Please let me know whether you consider this better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've figured out and pushed 75271fe how to get rid of this and the 38 changed files.
424e1bc
(#9990) is an annoying commit, but not something that would have to block this PR.
On the change suggested in 75271fe: that sounds like moving into the direction I suggested in #10142:
Related: when doing bigger changes to the interface there, one other improvement that comes to mind is how
HttpServerConnection::StartStreaming()
works: currently, to take control over the whole connection, this has to be called, but the underlying ASIO stream is still passed to every handler but it must not be used without callingStartStreaming()
, otherwise, there's a good chance the connection ends up in a broken state. This could be improved by only exposing the underlying stream as a return value of theStartStreaming()
method, similar to how it works in Go'snet/http
package.
So if the point of StartStreaming()
is to transfer ownership of the connection to the caller, for me it makes sense to release other resources related to the connection like the CpuBoundWork
.
Thus, overall I'd say this is a sane change for StartStreaming()
, so feel free to keep it in (but the commit history should be cleaned up, that doesn't need that revert commit in there).
lib/base/io-engine.cpp
Outdated
} | ||
try { | ||
cv->Wait(yc); | ||
} catch (...) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, yes – this can throw.
--- test/base-shellescape.cpp
+++ test/base-shellescape.cpp
@@ -1,6 +1,8 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "base/utility.hpp"
+#include <boost/asio.hpp>
+#include <boost/asio/spawn.hpp>
#include <BoostTestTargetConfig.h>
#include <iostream>
@@ -16,6 +18,26 @@ BOOST_AUTO_TEST_CASE(escape_basic)
BOOST_CHECK(Utility::EscapeShellCmd("$PATH") == "\\$PATH");
BOOST_CHECK(Utility::EscapeShellCmd("\\$PATH") == "\\\\\\$PATH");
#endif /* _WIN32 */
+
+ auto io = new boost::asio::io_context;
+ boost::asio::spawn(*io, [io](boost::asio::yield_context yc) {
+ boost::asio::deadline_timer timer(*io, boost::posix_time::seconds(3));
+ boost::system::error_code ec;
+
+ try {
+ timer.async_wait(yc[ec]);
+ } catch (const boost::coroutines::detail::forced_unwind&) {
+ BOOST_CHECK(false); // error: in "base_shellescape/escape_basic": check false has failed
+ throw;
+ }
+ });
+ boost::asio::deadline_timer timer(*io, boost::posix_time::seconds(1));
+ timer.async_wait([io](boost::system::error_code ec) {
+ if (!ec) {
+ delete io;
+ }
+ });
+ io->run();
}
BOOST_AUTO_TEST_CASE(escape_quoted)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, yes – this can throw.
You can't be serious :), you deliberately deleted the I/O object with delete io;
, what do you expect to happen in that case then? This is not a realistic test case, I mean where do we perform a questionable operation like this in Icinga 2 code? @julianbrost and I tried to trigger this exception last week but weren't able to, and reading the detailed 🙃 boost docs about it didn't help to understand this either.
If for some reason the I/O context gets deleted in Icinga 2, do you think you can ever recover from it? If something like this happens in Icinga 2, then you have far more severe problems than unreleased CPU semaphores.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If something like this happens in Icinga 2, then you have far more severe problems than unreleased CPU semaphores.
So you'd not catch it at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you'd not catch it at all?
No, I didn't say that! I just want to understand under what normal circumstances such an exception would be triggered and not by deleting the global io object. For instance, how can you explicitly trigger a stack unwinding of a coroutine? If one can force the destruction of a coroutine, then you can also verify that it enters into that new catch-all handler as intended, but none of us is able to do that, and that is the puzzle that needs to be solved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the puzzle that needs to be solved
Does it need to be resolved? To me, the bare lack of noexcept in async_wait is enough. Yes, I'm serious! If something could be thrown, I catch it and clean up after myself. As I said to Julian, if you want to test it, add a throw 1;
. The most often used function that theoretically can throw an exception is operator new
, but I don't know yet how to temporarily override malloc(3) locally. But I think I don't even need that, as #9990 (comment) already shows enough. (Also, I didn't test what happens across fork(2), I hope on_before_fork() (or whatever it's called in ASIO) preserves coroutine stacks.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Though if even deleting the io object triggers this exception (also tested with IoEngine::SpawnCoroutine()
), I guess it's an indicator for that if the coroutine gets destroyed for whatever reason, we'll be able to intercept it and handle it accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't see your intermediate comment above while sending my previous comment!
As I said to Julian, if you want to test it, add a
throw 1;
.
Where should I put that throw expression? Throwing an exception within the coroutine handler (the provided callback that is called within the coroutine) does not cause the coroutine to be destroyed.
Does it need to be resolved? To me, the bare lack of noexcept in async_wait is enough
If you don't want to make decisions just based on assumptions, then yes you need to understand when this exception could be triggered. I'm not talking about just any exception, but the specific force_unwind
exception. As I said before, if the user-supplied callback throws an exception, it will never hit that new catch-all handler here, nor will it destroy the coroutine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I said to Julian, if you want to test it, add a
throw 1;
.Where should I put that throw expression?
Just inside the try catch instead of cv->Wait(yc);
. Seriously. Actually I don't worry especially about a particular exception type. I just easily got forced_unwind in my comment above. Who knows, maybe a malloc(3) fails? It's just: theoretically, the method could throw – I handle it. Especially in this case the code above in CpuBoundWork#CpuBoundWork() has already deployed some pointers to stack variables to IoEngine#m_CpuBoundSemaphore.Waiting. If our super unlikely exception hits someone w/o my try/catch once in 10y, happy debugging!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now "better", colleagues?
--- a/lib/base/io-engine.cpp
+++ b/lib/base/io-engine.cpp
@@ -30,2 +30,5 @@ CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_contex
if (ie.m_CpuBoundSemaphore.compare_exchange_weak(freeSlots, freeSlots - 1)) {
+ if (cv) {
+ Log(LogCritical, "LOLCAT", "Got slot via ASIO!");
+ }
return;
@@ -36,3 +39,3 @@ CpuBoundWork::CpuBoundWork(boost::asio::yield_context yc, boost::asio::io_contex
cv = Shared<AsioConditionVariable>::Make(ie.GetIoContext());
- continue;
+ //continue;
}
[2024-12-05 17:37:35 +0100] critical/LOLCAT: Got slot via ASIO!
[2024-12-05 17:37:35 +0100] critical/LOLCAT: Got slot via ASIO!
[2024-12-05 17:37:35 +0100] critical/LOLCAT: Got slot via ASIO!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, it works well, a DoS even triggers the above logs, but the fair scheduling is unsurprisingly gone. :(
Also, FWIW, I gave up on df63a78 (boost::asio::async_result).
It would still make it faster compared to the status-quo by preventing the busy waiting situation where pointless CPU time can be burnt, taking away resources from useful coroutines. I mean |
Though to be fair: the " |
In the slow path, yes. For the fast path you're suggesting to replace an atomic with a mutex.
It's neither a prohibitively expensive operation in our getters/setters. Still, they use atomics wherever possible, because IT people are speed junkies. (Speed as in distance/FLOPS per time.😅)
Unlike the previous implementation, there's no spin lock. fastPath() is rather a "reverse spin lock".
Given the fast code is already there, I'd take that – unless you say it's too hard to understand it, despite my code comments. |
16a469f
to
0b4c10e
Compare
lib/base/io-engine.cpp
Outdated
// But even if it happens before (and gets lost), it's not a problem because now the ongoing subscriber | ||
// will lock the mutex and re-check the semaphore which is already >0 (fast path) due to fetch_add(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wording "ongoing subscriber" leaves room for misinterpretation, at least I first read it more like "a subscriber has already subscribed (and is now waiting)" whereas it's probably supposed to mean "coroutine currently in the process of subscribing".
Also, the comment misses a case (which I'd say is the most complicated one). The overall explanation is something like this:
It is possible that another coroutine is concurrently trying to acquire a slot and got to locking the mutex. In this case there are different cases possible:
- It acquires the mutex before this code reading the subscribers, then it's woken immediately.
- Otherwise, if it acquires the mutex after, there are two possibilities:
a. It acquires a slot using the fast path and never subscribes.
b. It can't acquire a slot using the fast path and subscribes. This implies the slot was stolen by yet another coroutine (which is possible as a fast path without holding the mutex is possible). In this case the semaphore is back to 0 and the next call to Done() will wake the coroutine that subscribed concurrently and was missed by this call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean, in short, if multiple ones try to claim the last remaining slot, then only one wins and the rest, obviously, subscribes to the slow path? (Which is ok as semaphore is 0 now, leading to a future wakeup.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I wrote in the quote block is supposed to describe everything than can happen inside if (ie.m_CpuBoundSemaphore.fetch_add(1) < 1)
(in combination with others trying to acquire a slot).
You mean, in short, if multiple ones try to claim the last remaining slot
Not just the last remaining slot, it's also about the interactions that happen when multiple ones try to claim the only slot that currently in the process of being released.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the sake of complexity, let's imagine there are 26+ coroutines and label some of them from A to Z. While there are 0 free slots, K is releasing one, the others are acquiring and all run their atomic ops in alphabetical order. Not because they run at distinct time, but because their ops run concurrently and the atomicity implies some order. So,
- A..J see 0 free slots and subscribe to the slow path
- K changes free slots 0 -> 1 and knows it's a special situation
- L changes free slots (back) 1 -> 0, it has acquired one
- K wakes up current subscribers
- M..Z see 0 free slots and subscribe to the slow path (lost wakeup? No, because:)
- Once L (or whoever of all the slots' consumers) is Done(), it will wake up A..J and M..Z
ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That argument is nice, but it ignores the mutex. Why can the mutex be ignored here? After all, you have to perform atomic checks again after the mutex, so they do interact which makes me doubt that they can be considered individually in isolation.
I currently don't know a way how the implementation would fail, but I also recognize it's complexity and I'm still unsure if I understand every detail. It just shouldn't take days looking into the code with the result being "well, I guess this probably works" and I'm still missing the argument that describes the overall synchronization and makes the complexity manageable. And the argument should not only be made as part of a discussion that will be forgotten over time, but also be reflected properly in the code comments. Hence, please update the code comments accordingly.
0b4c10e
to
36edc5a
Compare
for (;;) { | ||
auto availableSlots (ioEngine.m_CpuBoundSemaphore.fetch_sub(1)); | ||
auto& ie (IoEngine::Get()); | ||
Shared<AsioConditionVariable>::Ptr cv; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would place this where it is actually utilized, just before the while loop down below.
lib/base/io-engine.cpp
Outdated
IoEngine::YieldCurrentCoroutine(yc); | ||
continue; | ||
while (freeSlots > 0) { | ||
// A failure here is equivalent to loading the latest value into freeSlots |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A failure sounds more serious than it actually is to me, thus I would rephrase this to something like: When the value of m_CpuBoundSemaphore
was modified by some other threads after we loaded it earlier, the comparison in compare_exchange_weak
will fail and write the newly loaded value into freeSlots
.
lib/base/io-engine.cpp
Outdated
auto& ie (IoEngine::Get()); | ||
Shared<AsioConditionVariable>::Ptr cv; | ||
|
||
auto fastPath = [&ie] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about making this a private member method instead?
lib/base/io-engine.cpp
Outdated
// But even if it happens before (and gets lost), it's not a problem because now the constructor | ||
// will lock the mutex and re-check the semaphore which is already >0 (fast path) due to fetch_add(). | ||
|
||
for (auto& [strand, cv] : subscribers) { | ||
boost::asio::post(strand, [cv = std::move(cv)] { cv->NotifyOne(); }); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm unclear about what you mean by But even if it happens before in your comments. However, if I'm interpreting this correctly, wouldn't these wake-ups become meaningless if a new coroutine constantly takes the now available free slot from above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's the downside of unfair scheduling. 🤷♂️
lib/remote/httpserverconnection.cpp
Outdated
@@ -329,7 +331,8 @@ bool EnsureValidBody( | |||
ApiUser::Ptr& authenticatedUser, | |||
boost::beast::http::response<boost::beast::http::string_body>& response, | |||
bool& shuttingDown, | |||
boost::asio::yield_context& yc | |||
boost::asio::yield_context& yc, | |||
boost::asio::io_context::strand& strand |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The strand
parameter is added but never used?
lib/base/io-engine.cpp
Outdated
// But even if it happens before (and gets lost), it's not a problem because now the ongoing subscriber | ||
// will lock the mutex and re-check the semaphore which is already >0 (fast path) due to fetch_add(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I wrote in the quote block is supposed to describe everything than can happen inside if (ie.m_CpuBoundSemaphore.fetch_add(1) < 1)
(in combination with others trying to acquire a slot).
You mean, in short, if multiple ones try to claim the last remaining slot
Not just the last remaining slot, it's also about the interactions that happen when multiple ones try to claim the only slot that currently in the process of being released.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you've asked for an explicit review: Please update code comments (#9990 (comment)), address the comments from the previous review (#9990 (review)). Apart from that, I also agree with the comments in Yonas' review (#9990 (review)).
36edc5a
to
824211b
Compare
824211b
to
8e73397
Compare
This comment was marked as resolved.
This comment was marked as resolved.
I'm done with all of that (I think). |
The current implementation is rather similar to Python's threading.Event, than to a CV.
so that /v1/events doesn't have to use IoBoundWorkSlot. IoBoundWorkSlot#~IoBoundWorkSlot() will wait for a free semaphore slot which will be almost immediately released by CpuBoundWork#~CpuBoundWork(). Just releasing the already aquired slot in HttpServerConnection#StartStreaming() is more efficient.
This is inefficient and involves possible bad surprises regarding waiting durations on busy nodes. Instead, use AsioConditionVariable#Wait() if there are no free slots. It's notified by others' CpuBoundWork#~CpuBoundWork() once finished.
8e73397
to
df4f626
Compare
This is inefficient and implies
possible bad surprises regarding waiting durations on busy nodes. Instead,
use AsioConditionVariable#Wait() if there are no free slots. It's notified
by others' CpuBoundWork#~CpuBoundWork() once finished.
fixes #9988
Also, the current implementation is a spin-lock. 🙈 #10117 (comment)