-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Faster Inbound Pipeline #80656
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Faster Inbound Pipeline #80656
Conversation
Pinging @elastic/es-distributed (Team:Distributed) |
@elasticmachine update branch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a big fan of this change, I'd prefer to keep the ref counting more obviously correct and deal with the leak-detector test slowness directly: perhaps stop collecting stack traces on the platforms on which it's too slow to cope while we're not chasing any particular leak there, or else just extend timeouts as needed. I left a few inline comments too.
} | ||
} | ||
// if handling the messages didn't cause the channel to get closed and we did not fully consume the buffer retain it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we have a test in InboundPipelineTests
showing that we handle the isClosed
case correctly here? And elsewhere I guess, but I checked that we never actually exercise the isClosed == true
branch here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me try :) Should be doable, these paths are exercised by some internal cluster tests so there should be an obvious way to do it. On it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test added, should be good for another round now :)
assert aggregator.isAggregating(); | ||
assert fragment instanceof ReleasableBytesReference; | ||
aggregator.aggregate((ReleasableBytesReference) fragment); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If isClosed
can we assert that pending
is now empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea that should be fine, will add
do { | ||
final int bytesDecoded; | ||
if (pending.size() == 1) { | ||
bytesDecoded = decode(channel, pending.peekFirst()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the channel is closed then the bytes get released mid-decode. Is that a problem? (possibly not, but I'd like to hear the reasoning)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channel is closed on the same thread that this runs on, we don't release mid-decode. We release after deserializing and handling what we deserialized in all cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I really just meant within the decode()
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see. I think there we're good because we don't have any incremental decode on a message. We decode a full message and pass it along to message handling, then return from decode. So closing the channel will always be the last step in decode
as far as I can see.
} | ||
} | ||
} while (isClosed == false && pending.isEmpty() == false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If isClosed
didn't we already return?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++ right
public int decode( | ||
TcpChannel channel, | ||
ReleasableBytesReference reference, | ||
CheckedBiConsumer<TcpChannel, Object, IOException> fragmentConsumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we not pass the TcpChannel
around everywhere like this? It's always the same channel isn't it?
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hiding my above comment, I misread the question initially. I added passing around the channel here to avoid instantiating a capturing lambda here where previously we had a non-capturing one.
import java.util.function.BiConsumer; | ||
import java.util.function.Function; | ||
import java.util.function.LongSupplier; | ||
import java.util.function.Supplier; | ||
|
||
public class InboundPipeline implements Releasable { | ||
|
||
private static final ThreadLocal<ArrayList<Object>> fragmentList = ThreadLocal.withInitial(ArrayList::new); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
IMO this is exactly what this PR does. We shouldn't be ref-count incrementing and decrementing left and right when we don't need it. We should increment when we need to hold on to something outside of the current execution stack and only then, anything else just makes the code needlessly hard to follow IMO. To me incrementing a reference always means we fork off or need something later and this wasn't the case here at all. |
I sort of see what you mean but also this change means that refcounting is different depending on whether there are multiple fragments to combine or not. I don't see a way to avoid the extra refcounting in the multi-fragment case, but it isn't as simple as it was and it took some thought. |
I looked at this briefly yesterday. I was also opposed to the change. I don’t think it fixes the underlying issue. If normal ref count incs and decs kill the test framework, I think that is an issue. And removing incs/decs used in production does not seem like a good solution. Additionally this code was structured similar to Netty specifically to separate decoding from aggregation, from handling. I didn’t want the decoder wrapping further pipeline steps. Always attempted to step back to the top (Pipeline) before moving to the next step. I did not want the decoder state to be concerned about what happened further down the pipeline. The Consumer interface was just for testing. If I did it again I would probably pass the list in similar to netty. I also don’t agree on the correctness point. try with resource for each scope and passing a new retain into another scope guarantees an uncaught exception does not lead to a leak. I’m not rejecting the PR if anything. You all can find whatever approach you want. Those were just my thoughts looking at it. |
Today we use a leaky `NON_RECYCLING_INSTANCE` in `InboundPipelineTests#testPipelineHandling`. It's actually fine, we don't use it for anything important, but it looks suspicious and it means that a little bit of harmless-looking reordering would seriously affect the coverage of these tests. This commit gets rid of it to ensure that we're always watching for leaks. Noticed when reviewing elastic#80656.
One important point I'd make here is that the problem is not necessarily just that we collect stack traces for every message. It's simply how many we have to collect with the redundant ref counting we do. This is the traces we get for a tiny message (so no aggregation or anything in here) once it hits
After my change it's this:
... IMO this is logically much closer to the reality of what gets retained where and makes it far easier to understand where a leak may be coming from when debugging. I agree that many of the redundant ref-count changes we have right now are trivial to reason about because they're all try-with-resources, but they also don't add anything because of their simplicity. They burn cycles now (though not that many in production) and make debugging a leak needlessly hard, because you have to go through all of them and make sure none of the 7 (instead of 2 that are needed) recorded touch points are correct. Also. without this change we are not actually operating the way Netty operates either I'd say because we'd just deserialize messages one by one anyway and then pass them along the pipeline as soon as we had one full message instead of Netty which at least optionally batches things in decoding here and there via the flag |
True, but that simplicity also meant a lot of needless cycles went into wrapping trivial messages multiple times. In a world where we cache 3-element lists thread-local because we think this code is hot enough to justify it, I find this to be an acceptable bit of complexity to add. |
Yeah it's neater for sure, but that's still fundamentally a bug in the test-only leak tracker for which we're proposing a production code change. If we properly paired every acquire with its corresponding release then we could clean up the stack traces for the acquires that weren't released. Have we considered not separating the parts of the |
I'd be all for it. Technically, there is no reason for the code to be spread out like this in our case IMO. Netty allows for things like multiple layers of decoding and aggregating messages where tracking a list makes sense to neatly do stuff like read a chunked HTTP message step by step and whatnot. We don't do anything like that and IMO could just make the code as flat as the ref counting is now, rather than have redundant complicated ref-counting to wrap around all the various steps in the current code.
Not sure I fully agree with this. I'd say yes, I wouldn't have proposed this without the leak-tracker causing us trouble, but I think it's worthwhile in isolation to not have needless wrapping of buffers and ref counting in code this hot. Also, I still don't see how redundant ref count incrementing and releasing without forking adds anything but confusion to this code (the case where we handle aggregate and single buffer message is the only exception to this where it maybe does). |
Build failure is just a Jenkins issue, the build (part-2) went through fine. |
Pinging @elastic/es-distributed-obsolete (Team:Distributed (Obsolete)) |
Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination) |
closing in favor of #123390 |
Optimize away a number of refcounting in the inbound pipeline and remove the indirection of collecting pieces of a message into an
ArrayList
before passing them to the aggregator which does its own collecting of these pieces anyway.The production impact of this change is expected to be relatively minor, though I think it does save a little indirection and might allow for further optimisations down the line. Also, it makes the code more obviously correct by only incrementing the reference count of buffers that are queued for later use.
I found this while looking into why #79718 breaks on OSX CI while logging an endless stream of slow transport warnings. Theses are in large part caused by instantiating lots and lots of leak tracking exceptions with stack-traces in tests. This change reduces the number of increments and decrements massively, thus speeding up tests (on all platforms not just OSX for what it's worth).
Rough illustration (no real benchmark, just some profiling from tests but you can see the right hand side of the profile going down even for an expensive transport action like get-snapshots with many pending snapshots). I don't have the charts for other tests with me right now, but the effect is obviously much bigger when dealing with smaller transport messages in tests:
before:
after:
closes #79718 (I assume this will be enough to stabilize the test now combined with the other fix already applied to the issue)