-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Remove lots of redundant ref-counting from transport pipeline #123390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,12 +12,10 @@ | |
import io.netty.buffer.ByteBuf; | ||
import io.netty.channel.ChannelHandler; | ||
import io.netty.channel.ChannelHandlerContext; | ||
import io.netty.handler.codec.MessageToMessageDecoder; | ||
|
||
import java.util.List; | ||
import io.netty.channel.ChannelInboundHandlerAdapter; | ||
|
||
@ChannelHandler.Sharable | ||
public class NettyByteBufSizer extends MessageToMessageDecoder<ByteBuf> { | ||
public class NettyByteBufSizer extends ChannelInboundHandlerAdapter { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This just isn't message to message, no need to mess with the ref-counts when purely filtering. |
||
|
||
public static final NettyByteBufSizer INSTANCE = new NettyByteBufSizer(); | ||
|
||
|
@@ -26,14 +24,12 @@ private NettyByteBufSizer() { | |
} | ||
|
||
@Override | ||
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) { | ||
int readableBytes = buf.readableBytes(); | ||
if (buf.capacity() >= 1024) { | ||
ByteBuf resized = buf.discardReadBytes().capacity(readableBytes); | ||
assert resized.readableBytes() == readableBytes; | ||
out.add(resized.retain()); | ||
} else { | ||
out.add(buf.retain()); | ||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | ||
if (msg instanceof ByteBuf buf && buf.capacity() >= 1024) { | ||
int readableBytes = buf.readableBytes(); | ||
buf = buf.discardReadBytes().capacity(readableBytes); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be moved down or the message delineation moved up into Netty, only necessary to resize the buffer if we're planning to retain it beyond the current read really. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should go back to the AdaptiveRecvByteBufAllocator with min=4kb init=8kb max=64kb and remove resizer. Allocating 64kb chunks and then trim them, likely copying, looks wasteful. We should not see GC pressure with HTTP content streams for bulk. Different story in transport, I guess. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea that sounds like a good idea to me. The problem might be in those tiny requests in some way, I think we have a lot of potential for batching things together to save latency+bytes end-to-end in search in particular. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can look into that in a follow-up? We have to be a tiny bit careful here to maybe test the situation that motivated this thing in the first place where we had degenerate cases of slow/flaky networks creating series of ~2k sized buffers that then assembled to 100M transport messages with enormous overhead (stuff like CCR was the problem here). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Of course, I was thinking aloud. |
||
assert buf.readableBytes() == readableBytes; | ||
} | ||
ctx.fireChannelRead(msg); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,12 +18,12 @@ | |
import org.elasticsearch.common.recycler.Recycler; | ||
import org.elasticsearch.common.unit.ByteSizeUnit; | ||
import org.elasticsearch.common.unit.ByteSizeValue; | ||
import org.elasticsearch.core.CheckedConsumer; | ||
import org.elasticsearch.core.Releasable; | ||
import org.elasticsearch.core.Releasables; | ||
|
||
import java.io.IOException; | ||
import java.io.StreamCorruptedException; | ||
import java.util.function.Consumer; | ||
|
||
public class InboundDecoder implements Releasable { | ||
|
||
|
@@ -53,7 +53,7 @@ public InboundDecoder(Recycler<BytesRef> recycler, ByteSizeValue maxHeaderSize, | |
this.channelType = channelType; | ||
} | ||
|
||
public int decode(ReleasableBytesReference reference, Consumer<Object> fragmentConsumer) throws IOException { | ||
public int decode(ReleasableBytesReference reference, CheckedConsumer<Object, IOException> fragmentConsumer) throws IOException { | ||
ensureOpen(); | ||
try { | ||
return internalDecode(reference, fragmentConsumer); | ||
|
@@ -63,7 +63,8 @@ public int decode(ReleasableBytesReference reference, Consumer<Object> fragmentC | |
} | ||
} | ||
|
||
public int internalDecode(ReleasableBytesReference reference, Consumer<Object> fragmentConsumer) throws IOException { | ||
public int internalDecode(ReleasableBytesReference reference, CheckedConsumer<Object, IOException> fragmentConsumer) | ||
throws IOException { | ||
if (isOnHeader()) { | ||
int messageLength = TcpTransport.readMessageLength(reference); | ||
if (messageLength == -1) { | ||
|
@@ -104,25 +105,30 @@ public int internalDecode(ReleasableBytesReference reference, Consumer<Object> f | |
} | ||
int remainingToConsume = totalNetworkSize - bytesConsumed; | ||
int maxBytesToConsume = Math.min(reference.length(), remainingToConsume); | ||
ReleasableBytesReference retainedContent; | ||
if (maxBytesToConsume == remainingToConsume) { | ||
retainedContent = reference.retainedSlice(0, maxBytesToConsume); | ||
} else { | ||
retainedContent = reference.retain(); | ||
} | ||
|
||
int bytesConsumedThisDecode = 0; | ||
if (decompressor != null) { | ||
bytesConsumedThisDecode += decompress(retainedContent); | ||
bytesConsumedThisDecode += decompressor.decompress( | ||
maxBytesToConsume == remainingToConsume ? reference.slice(0, maxBytesToConsume) : reference | ||
); | ||
bytesConsumed += bytesConsumedThisDecode; | ||
ReleasableBytesReference decompressed; | ||
while ((decompressed = decompressor.pollDecompressedPage(isDone())) != null) { | ||
fragmentConsumer.accept(decompressed); | ||
try { | ||
fragmentConsumer.accept(decompressed); | ||
} finally { | ||
decompressed.close(); | ||
} | ||
} | ||
} else { | ||
bytesConsumedThisDecode += maxBytesToConsume; | ||
bytesConsumed += maxBytesToConsume; | ||
fragmentConsumer.accept(retainedContent); | ||
if (maxBytesToConsume == remainingToConsume) { | ||
try (ReleasableBytesReference retained = reference.retainedSlice(0, maxBytesToConsume)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would love a follow-up to remove the retaining on the slice, we should have a slice mechanism that doesn't retain and only retain if we need to in the accumulator. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where original reference released? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This all goes to |
||
fragmentConsumer.accept(retained); | ||
} | ||
} else { | ||
fragmentConsumer.accept(reference); | ||
} | ||
} | ||
if (isDone()) { | ||
finishMessage(fragmentConsumer); | ||
|
@@ -138,7 +144,7 @@ public void close() { | |
cleanDecodeState(); | ||
} | ||
|
||
private void finishMessage(Consumer<Object> fragmentConsumer) { | ||
private void finishMessage(CheckedConsumer<Object, IOException> fragmentConsumer) throws IOException { | ||
cleanDecodeState(); | ||
fragmentConsumer.accept(END_CONTENT); | ||
} | ||
|
@@ -154,12 +160,6 @@ private void cleanDecodeState() { | |
} | ||
} | ||
|
||
private int decompress(ReleasableBytesReference content) throws IOException { | ||
try (content) { | ||
return decompressor.decompress(content); | ||
} | ||
} | ||
|
||
private boolean isDone() { | ||
return bytesConsumed == totalNetworkSize; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,18 +11,17 @@ | |
|
||
import org.elasticsearch.common.bytes.CompositeBytesReference; | ||
import org.elasticsearch.common.bytes.ReleasableBytesReference; | ||
import org.elasticsearch.core.CheckedConsumer; | ||
import org.elasticsearch.core.Releasable; | ||
import org.elasticsearch.core.Releasables; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayDeque; | ||
import java.util.ArrayList; | ||
import java.util.function.BiConsumer; | ||
import java.util.function.LongSupplier; | ||
|
||
public class InboundPipeline implements Releasable { | ||
|
||
private static final ThreadLocal<ArrayList<Object>> fragmentList = ThreadLocal.withInitial(ArrayList::new); | ||
private static final InboundMessage PING_MESSAGE = new InboundMessage(null, true); | ||
|
||
private final LongSupplier relativeTimeInMillis; | ||
|
@@ -56,81 +55,74 @@ public void close() { | |
|
||
public void handleBytes(TcpChannel channel, ReleasableBytesReference reference) throws IOException { | ||
if (uncaughtException != null) { | ||
reference.close(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we either close outright or put the buffer in the list before doing anything, we're always safe I think. |
||
throw new IllegalStateException("Pipeline state corrupted by uncaught exception", uncaughtException); | ||
} | ||
try { | ||
doHandleBytes(channel, reference); | ||
channel.getChannelStats().markAccessed(relativeTimeInMillis.getAsLong()); | ||
statsTracker.markBytesRead(reference.length()); | ||
if (isClosed) { | ||
reference.close(); | ||
return; | ||
} | ||
pending.add(reference); | ||
doHandleBytes(channel); | ||
} catch (Exception e) { | ||
uncaughtException = e; | ||
throw e; | ||
} | ||
} | ||
|
||
public void doHandleBytes(TcpChannel channel, ReleasableBytesReference reference) throws IOException { | ||
channel.getChannelStats().markAccessed(relativeTimeInMillis.getAsLong()); | ||
statsTracker.markBytesRead(reference.length()); | ||
pending.add(reference.retain()); | ||
|
||
final ArrayList<Object> fragments = fragmentList.get(); | ||
boolean continueHandling = true; | ||
|
||
while (continueHandling && isClosed == false) { | ||
boolean continueDecoding = true; | ||
while (continueDecoding && pending.isEmpty() == false) { | ||
try (ReleasableBytesReference toDecode = getPendingBytes()) { | ||
final int bytesDecoded = decoder.decode(toDecode, fragments::add); | ||
if (bytesDecoded != 0) { | ||
releasePendingBytes(bytesDecoded); | ||
if (fragments.isEmpty() == false && endOfMessage(fragments.get(fragments.size() - 1))) { | ||
continueDecoding = false; | ||
} | ||
} else { | ||
continueDecoding = false; | ||
} | ||
private void doHandleBytes(TcpChannel channel) throws IOException { | ||
do { | ||
CheckedConsumer<Object, IOException> decodeConsumer = f -> forwardFragment(channel, f); | ||
int bytesDecoded = decoder.decode(pending.peekFirst(), decodeConsumer); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Always try a single buffer first. Otherwise if a message was stuck/flushed behind the second half of a larger message in a pending chunk all slicing etc. will reference the full buffer needlessly which is uncool for contention reasons and especially when we do a retained slice out of the second message. |
||
if (bytesDecoded == 0 && pending.size() > 1) { | ||
final ReleasableBytesReference[] bytesReferences = new ReleasableBytesReference[pending.size()]; | ||
int index = 0; | ||
for (ReleasableBytesReference pendingReference : pending) { | ||
bytesReferences[index] = pendingReference.retain(); | ||
++index; | ||
} | ||
try ( | ||
ReleasableBytesReference toDecode = new ReleasableBytesReference( | ||
CompositeBytesReference.of(bytesReferences), | ||
() -> Releasables.closeExpectNoException(bytesReferences) | ||
) | ||
) { | ||
bytesDecoded = decoder.decode(toDecode, decodeConsumer); | ||
} | ||
} | ||
|
||
if (fragments.isEmpty()) { | ||
continueHandling = false; | ||
if (bytesDecoded != 0) { | ||
releasePendingBytes(bytesDecoded); | ||
} else { | ||
try { | ||
forwardFragments(channel, fragments); | ||
} finally { | ||
for (Object fragment : fragments) { | ||
if (fragment instanceof ReleasableBytesReference) { | ||
((ReleasableBytesReference) fragment).close(); | ||
} | ||
} | ||
fragments.clear(); | ||
} | ||
break; | ||
} | ||
} | ||
} while (pending.isEmpty() == false); | ||
} | ||
|
||
private void forwardFragments(TcpChannel channel, ArrayList<Object> fragments) throws IOException { | ||
for (Object fragment : fragments) { | ||
if (fragment instanceof Header) { | ||
headerReceived((Header) fragment); | ||
} else if (fragment instanceof Compression.Scheme) { | ||
assert aggregator.isAggregating(); | ||
aggregator.updateCompressionScheme((Compression.Scheme) fragment); | ||
} else if (fragment == InboundDecoder.PING) { | ||
assert aggregator.isAggregating() == false; | ||
messageHandler.accept(channel, PING_MESSAGE); | ||
} else if (fragment == InboundDecoder.END_CONTENT) { | ||
assert aggregator.isAggregating(); | ||
InboundMessage aggregated = aggregator.finishAggregation(); | ||
try { | ||
statsTracker.markMessageReceived(); | ||
messageHandler.accept(channel, aggregated); | ||
} finally { | ||
aggregated.decRef(); | ||
} | ||
} else { | ||
assert aggregator.isAggregating(); | ||
assert fragment instanceof ReleasableBytesReference; | ||
aggregator.aggregate((ReleasableBytesReference) fragment); | ||
private void forwardFragment(TcpChannel channel, Object fragment) throws IOException { | ||
if (fragment instanceof Header) { | ||
headerReceived((Header) fragment); | ||
} else if (fragment instanceof Compression.Scheme) { | ||
assert aggregator.isAggregating(); | ||
aggregator.updateCompressionScheme((Compression.Scheme) fragment); | ||
} else if (fragment == InboundDecoder.PING) { | ||
assert aggregator.isAggregating() == false; | ||
messageHandler.accept(channel, PING_MESSAGE); | ||
} else if (fragment == InboundDecoder.END_CONTENT) { | ||
assert aggregator.isAggregating(); | ||
InboundMessage aggregated = aggregator.finishAggregation(); | ||
try { | ||
statsTracker.markMessageReceived(); | ||
messageHandler.accept(channel, aggregated); | ||
} finally { | ||
aggregated.decRef(); | ||
} | ||
} else { | ||
assert aggregator.isAggregating(); | ||
assert fragment instanceof ReleasableBytesReference; | ||
aggregator.aggregate((ReleasableBytesReference) fragment); | ||
} | ||
} | ||
|
||
|
@@ -139,25 +131,6 @@ protected void headerReceived(Header header) { | |
aggregator.headerReceived(header); | ||
} | ||
|
||
private static boolean endOfMessage(Object fragment) { | ||
return fragment == InboundDecoder.PING || fragment == InboundDecoder.END_CONTENT || fragment instanceof Exception; | ||
} | ||
|
||
private ReleasableBytesReference getPendingBytes() { | ||
if (pending.size() == 1) { | ||
return pending.peekFirst().retain(); | ||
} else { | ||
final ReleasableBytesReference[] bytesReferences = new ReleasableBytesReference[pending.size()]; | ||
int index = 0; | ||
for (ReleasableBytesReference pendingReference : pending) { | ||
bytesReferences[index] = pendingReference.retain(); | ||
++index; | ||
} | ||
final Releasable releasable = () -> Releasables.closeExpectNoException(bytesReferences); | ||
return new ReleasableBytesReference(CompositeBytesReference.of(bytesReferences), releasable); | ||
} | ||
} | ||
|
||
private void releasePendingBytes(int bytesConsumed) { | ||
int bytesToRelease = bytesConsumed; | ||
while (bytesToRelease != 0) { | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember talking with @DaveCTurner about reference acquisition. It's easier to trace references when we acquire and release those next to each other - visually. I think previous version is preferable, it's clear that at the end of
try
reference will be released. When we pass reference to different thread or buffer chunks we explicitly acquire/release ref, that make "move" more explicit. I cannot say that I prefer a bit faster but hard to trace references :)Otherwise it's not trivial. Every good and exceptional termination need to release refs explicitly. I tried to do HTTP path, there are places where ref is released "out of the blue" because there some exception going on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that discussion, the two of us had it many times over :D
What it comes down to is performance vs. easy of use pretty much.
I fully agree that making each block of code keep the ref count constant when it's not escaping an object and incrementing it by one when escaping makes the code much easier to read.
There is a considerable performance downside to the pattern though.
We are escaping these buffers to another thread (a do so quite often actually).
Now the difference between incrementing on the transport thread and decrementing on whatever pool we fork to + decrementing the original reference on the transport thread after vs. just "moving" from one thread to another without the fork is quite relevant. It's still costly to "free on another thread" but still considerably cheaper (and more importantly: considerably more scalable) without the contention of decrementing on two separate threads.
For other ref-counted code around search hits or so where there's a wider user base I think we should keep things simple for now as David suggests, but in the core transport logic I think it's quite worthwhile to save some ref-counting to keep latencies and scaling behavior as predictable as possible shouldn't we? (long story short: I'd rather have an easier time understanding the performance of the approach conceptually than trade that for slightly easier to read code if that makes any sense? :))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know how much performance we gain from this, roughly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's impossible to quantify I'm afraid. It's more the more cores/threads you run and the effect is more visible for in throughput under heavy load that it is in lightly loaded end-to-end latency benchmarks.
We are eliminating some negative cache effects here but there's a lot more of them all over the codebase. Best we can do is deal with this stuff one by one I guess :) Ideally we simplify counting like this PR does more and more and move away from ref-counting (and towards just a single close and a well understood lifecycle) wherever possible for both performance and eventually also conformance with Netty 5's approach to buffers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That said, I can see ref-counting take up O(1-5%) of transport thread time in some heavy search throughput benchmarks as well as some visible amount of CPU cycles from contention on releasing buffers on search threads as well. I'd expect this change may already show a visible improvement in throughput benchmarks or at least get us very close to one in a follow-up.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, I hope it will show up in the nightlies :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's where we disagree :) IMO in the long run given the overall team structure "easier to read" is a close proxy to "correct", and that overrules any performance considerations (we could have both if only we had a borrow-checker 😉).
I'm ok with this in the heart of the transport code tho, especially since the refcounting discipline in the rest of the codebase is in much better shape than it used to be. Indeed I wonder if we can further simplify and flatten all the inbound pipeline stuff.