Skip to content

Faster Inbound Pipeline #80656

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
88aadd2
faster inbound pipe
original-brownbear Nov 10, 2021
19588ed
Merge remote-tracking branch 'elastic/master' into faster-inbound-pip…
original-brownbear Nov 11, 2021
9d34c01
nicer
original-brownbear Nov 11, 2021
7197812
Merge branch 'master' into faster-inbound-pipeline
elasticmachine Nov 24, 2021
6572045
Merge remote-tracking branch 'elastic/master' into faster-inbound-pip…
original-brownbear Nov 25, 2021
715d7b6
assert + simplify
original-brownbear Nov 25, 2021
b9d46d5
Merge remote-tracking branch 'elastic/master' into faster-inbound-pip…
original-brownbear Nov 25, 2021
f401799
Merge remote-tracking branch 'elastic/master' into faster-inbound-pip…
original-brownbear Nov 29, 2021
1da7f79
add test
original-brownbear Nov 29, 2021
437c7e7
Merge remote-tracking branch 'elastic/master' into faster-inbound-pip…
original-brownbear Nov 29, 2021
9c83d44
Merge remote-tracking branch 'elastic/master' into faster-inbound-pip…
original-brownbear Nov 30, 2021
718c975
Merge remote-tracking branch 'elastic/master' into faster-inbound-pip…
original-brownbear Dec 21, 2021
6f847f7
Merge remote-tracking branch 'elastic/master' into faster-inbound-pip…
original-brownbear Dec 21, 2021
439f950
Merge remote-tracking branch 'elastic/master' into faster-inbound-pip…
original-brownbear Feb 2, 2022
73713d6
Merge remote-tracking branch 'elastic/master' into faster-inbound-pip…
original-brownbear Apr 17, 2022
0cc8ac0
Merge remote-tracking branch 'elastic/main' into faster-inbound-pipeline
original-brownbear Aug 17, 2022
69665bc
Merge remote-tracking branch 'elastic/main' into faster-inbound-pipeline
original-brownbear Oct 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,20 @@ public ReleasableBytesReference retain() {
return this;
}

public ReleasableBytesReference releasableSlice(int from) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be like a "transfer" ownership operation. It seems to me that it is a bit artificial in that it's primary purpose is to avoid an incref/decref solely for the leak tracker.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found the whole thing rather artificial before. We would retain the original, then slice and then build a new releasable from the original. This way at least, we're honest about the fact that this is all backed by the same ref-count and all we really did is move the read offset. In the end part of the motivation for this change is that we aren't actually transferring ownership all the time in the two spots that this method is used in.

if (from == 0) {
return this;
}
return new ReleasableBytesReference(delegate.slice(from, length() - from), refCounted);
}

public ReleasableBytesReference releasableSlice(int from, int length) {
if (from == 0 && length() == length) {
return this;
}
return new ReleasableBytesReference(delegate.slice(from, length), refCounted);
}

public ReleasableBytesReference retainedSlice(int from, int length) {
if (from == 0 && length() == length) {
return retain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -18,7 +19,6 @@
import org.elasticsearch.core.Releasables;

import java.io.IOException;
import java.util.function.Consumer;

public class InboundDecoder implements Releasable {

Expand All @@ -38,23 +38,31 @@ public InboundDecoder(Version version, Recycler<BytesRef> recycler) {
this.recycler = recycler;
}

public int decode(ReleasableBytesReference reference, Consumer<Object> fragmentConsumer) throws IOException {
public int decode(
TcpChannel channel,
ReleasableBytesReference reference,
CheckedBiConsumer<TcpChannel, Object, IOException> fragmentConsumer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we not pass the TcpChannel around everywhere like this? It's always the same channel isn't it?

This comment was marked as outdated.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hiding my above comment, I misread the question initially. I added passing around the channel here to avoid instantiating a capturing lambda here where previously we had a non-capturing one.

) throws IOException {
ensureOpen();
try {
return internalDecode(reference, fragmentConsumer);
return internalDecode(channel, reference, fragmentConsumer);
} catch (Exception e) {
cleanDecodeState();
throw e;
}
}

public int internalDecode(ReleasableBytesReference reference, Consumer<Object> fragmentConsumer) throws IOException {
private int internalDecode(
TcpChannel channel,
ReleasableBytesReference reference,
CheckedBiConsumer<TcpChannel, Object, IOException> fragmentConsumer
) throws IOException {
if (isOnHeader()) {
int messageLength = TcpTransport.readMessageLength(reference);
if (messageLength == -1) {
return 0;
} else if (messageLength == 0) {
fragmentConsumer.accept(PING);
fragmentConsumer.accept(channel, PING);
return 6;
} else {
int headerBytesToRead = headerBytesToRead(reference);
Expand All @@ -68,10 +76,10 @@ public int internalDecode(ReleasableBytesReference reference, Consumer<Object> f
if (header.isCompressed()) {
isCompressed = true;
}
fragmentConsumer.accept(header);
fragmentConsumer.accept(channel, header);

if (isDone()) {
finishMessage(fragmentConsumer);
finishMessage(channel, fragmentConsumer);
}
return headerBytesToRead;
}
Expand All @@ -84,33 +92,39 @@ public int internalDecode(ReleasableBytesReference reference, Consumer<Object> f
return 0;
} else {
this.decompressor = decompressor;
fragmentConsumer.accept(this.decompressor.getScheme());
fragmentConsumer.accept(channel, this.decompressor.getScheme());
}
}
int remainingToConsume = totalNetworkSize - bytesConsumed;
int maxBytesToConsume = Math.min(reference.length(), remainingToConsume);
ReleasableBytesReference retainedContent;
if (maxBytesToConsume == remainingToConsume) {
retainedContent = reference.retainedSlice(0, maxBytesToConsume);
} else {
retainedContent = reference.retain();
}

int bytesConsumedThisDecode = 0;
if (decompressor != null) {
bytesConsumedThisDecode += decompress(retainedContent);
bytesConsumedThisDecode += decompress(
maxBytesToConsume == remainingToConsume ? reference.slice(0, maxBytesToConsume) : reference
);
bytesConsumed += bytesConsumedThisDecode;
ReleasableBytesReference decompressed;
while ((decompressed = decompressor.pollDecompressedPage(isDone())) != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if a push-style interaction out of decompress would be easier, since then the ref-counting is mostly not an issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I guess that would be another possible simplification and would make decompress work more similarly to how the rest of the pipeline works with this change as well. Not sure we'd want to blow this one up with making that change here also. I'd rather push that down to a follow-up to keep this somewhat tricky change small? :)

fragmentConsumer.accept(decompressed);
try {
fragmentConsumer.accept(channel, decompressed);
} finally {
decompressed.decRef();
}
}
} else {
ReleasableBytesReference contentToConsume;
if (maxBytesToConsume == remainingToConsume) {
contentToConsume = reference.releasableSlice(0, maxBytesToConsume);
} else {
contentToConsume = reference;
}
bytesConsumedThisDecode += maxBytesToConsume;
bytesConsumed += maxBytesToConsume;
fragmentConsumer.accept(retainedContent);
fragmentConsumer.accept(channel, contentToConsume);
}
if (isDone()) {
finishMessage(fragmentConsumer);
finishMessage(channel, fragmentConsumer);
}

return bytesConsumedThisDecode;
Expand All @@ -123,9 +137,9 @@ public void close() {
cleanDecodeState();
}

private void finishMessage(Consumer<Object> fragmentConsumer) {
private void finishMessage(TcpChannel channel, CheckedBiConsumer<TcpChannel, Object, IOException> fragmentConsumer) throws IOException {
cleanDecodeState();
fragmentConsumer.accept(END_CONTENT);
fragmentConsumer.accept(channel, END_CONTENT);
}

private void cleanDecodeState() {
Expand All @@ -139,10 +153,8 @@ private void cleanDecodeState() {
}
}

private int decompress(ReleasableBytesReference content) throws IOException {
try (content) {
return decompressor.decompress(content);
}
private int decompress(BytesReference content) throws IOException {
return decompressor.decompress(content);
}

private boolean isDone() {
Expand Down
144 changes: 70 additions & 74 deletions server/src/main/java/org/elasticsearch/transport/InboundPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

public class InboundPipeline implements Releasable {

private static final ThreadLocal<ArrayList<Object>> fragmentList = ThreadLocal.withInitial(ArrayList::new);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

private static final InboundMessage PING_MESSAGE = new InboundMessage(null, true);

private final LongSupplier relativeTimeInMillis;
Expand Down Expand Up @@ -93,99 +91,97 @@ public void handleBytes(TcpChannel channel, ReleasableBytesReference reference)
public void doHandleBytes(TcpChannel channel, ReleasableBytesReference reference) throws IOException {
channel.getChannelStats().markAccessed(relativeTimeInMillis.getAsLong());
statsTracker.markBytesRead(reference.length());
pending.add(reference.retain());

final ArrayList<Object> fragments = fragmentList.get();
boolean continueHandling = true;

while (continueHandling && isClosed == false) {
boolean continueDecoding = true;
while (continueDecoding && pending.isEmpty() == false) {
try (ReleasableBytesReference toDecode = getPendingBytes()) {
final int bytesDecoded = decoder.decode(toDecode, fragments::add);
if (bytesDecoded != 0) {
releasePendingBytes(bytesDecoded);
if (fragments.isEmpty() == false && endOfMessage(fragments.get(fragments.size() - 1))) {
continueDecoding = false;
}
} else {
continueDecoding = false;
}
}
}
if (pending.isEmpty() == false) {
// we already have pending bytes, so we queue these bytes after them and then try to decode from the combined message
pending.add(reference.retain());
doHandleBytesWithPending(channel);
return;
}

if (fragments.isEmpty()) {
continueHandling = false;
while (isClosed == false && reference.length() > 0) {
final int bytesDecoded = decode(channel, reference);
if (bytesDecoded != 0) {
reference = reference.releasableSlice(bytesDecoded);
} else {
try {
forwardFragments(channel, fragments);
} finally {
for (Object fragment : fragments) {
if (fragment instanceof ReleasableBytesReference) {
((ReleasableBytesReference) fragment).close();
}
}
fragments.clear();
}
break;
}
}
// if handling the messages didn't cause the channel to get closed and we did not fully consume the buffer retain it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a test in InboundPipelineTests showing that we handle the isClosed case correctly here? And elsewhere I guess, but I checked that we never actually exercise the isClosed == true branch here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try :) Should be doable, these paths are exercised by some internal cluster tests so there should be an obvious way to do it. On it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test added, should be good for another round now :)

if (isClosed == false && reference.length() > 0) {
pending.add(reference.retain());
}
}

private void forwardFragments(TcpChannel channel, ArrayList<Object> fragments) throws IOException {
for (Object fragment : fragments) {
if (fragment instanceof Header) {
assert aggregator.isAggregating() == false;
aggregator.headerReceived((Header) fragment);
} else if (fragment instanceof Compression.Scheme) {
assert aggregator.isAggregating();
aggregator.updateCompressionScheme((Compression.Scheme) fragment);
} else if (fragment == InboundDecoder.PING) {
assert aggregator.isAggregating() == false;
messageHandler.accept(channel, PING_MESSAGE);
} else if (fragment == InboundDecoder.END_CONTENT) {
assert aggregator.isAggregating();
try (InboundMessage aggregated = aggregator.finishAggregation()) {
statsTracker.markMessageReceived();
messageHandler.accept(channel, aggregated);
private int decode(TcpChannel channel, ReleasableBytesReference reference) throws IOException {
return decoder.decode(channel, reference, this::forwardFragment);
}

private void doHandleBytesWithPending(TcpChannel channel) throws IOException {
do {
final int bytesDecoded;
if (pending.size() == 1) {
bytesDecoded = decode(channel, pending.peekFirst());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the channel is closed then the bytes get released mid-decode. Is that a problem? (possibly not, but I'd like to hear the reasoning)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The channel is closed on the same thread that this runs on, we don't release mid-decode. We release after deserializing and handling what we deserialized in all cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I really just meant within the decode() method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. I think there we're good because we don't have any incremental decode on a message. We decode a full message and pass it along to message handling, then return from decode. So closing the channel will always be the last step in decode as far as I can see.

} else {
try (ReleasableBytesReference toDecode = getPendingBytes()) {
bytesDecoded = decode(channel, toDecode);
}
}
if (bytesDecoded != 0 && isClosed == false) {
releasePendingBytes(bytesDecoded);
} else {
assert aggregator.isAggregating();
assert fragment instanceof ReleasableBytesReference;
aggregator.aggregate((ReleasableBytesReference) fragment);
assert isClosed == false || pending.isEmpty() : "pending chunks should be empty if closed but saw [" + pending + "]";
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If isClosed can we assert that pending is now empty?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea that should be fine, will add

}
}
} while (pending.isEmpty() == false);
}

private static boolean endOfMessage(Object fragment) {
return fragment == InboundDecoder.PING || fragment == InboundDecoder.END_CONTENT || fragment instanceof Exception;
private void forwardFragment(TcpChannel channel, Object fragment) throws IOException {
if (fragment instanceof Header) {
assert aggregator.isAggregating() == false;
aggregator.headerReceived((Header) fragment);
} else if (fragment instanceof Compression.Scheme) {
assert aggregator.isAggregating();
aggregator.updateCompressionScheme((Compression.Scheme) fragment);
} else if (fragment == InboundDecoder.PING) {
assert aggregator.isAggregating() == false;
messageHandler.accept(channel, PING_MESSAGE);
} else if (fragment == InboundDecoder.END_CONTENT) {
assert aggregator.isAggregating();
try (InboundMessage aggregated = aggregator.finishAggregation()) {
statsTracker.markMessageReceived();
messageHandler.accept(channel, aggregated);
}
} else {
assert aggregator.isAggregating();
assert fragment instanceof ReleasableBytesReference;
aggregator.aggregate((ReleasableBytesReference) fragment);
}
}

private ReleasableBytesReference getPendingBytes() {
if (pending.size() == 1) {
return pending.peekFirst().retain();
} else {
final ReleasableBytesReference[] bytesReferences = new ReleasableBytesReference[pending.size()];
int index = 0;
for (ReleasableBytesReference pendingReference : pending) {
bytesReferences[index] = pendingReference.retain();
++index;
}
final Releasable releasable = () -> Releasables.closeExpectNoException(bytesReferences);
return new ReleasableBytesReference(CompositeBytesReference.of(bytesReferences), releasable);
assert pending.size() > 1 : "must use this method with multiple pending references but used with " + pending;
final ReleasableBytesReference[] bytesReferences = new ReleasableBytesReference[pending.size()];
int index = 0;
for (ReleasableBytesReference pendingReference : pending) {
bytesReferences[index] = pendingReference.retain();
++index;
}
final Releasable releasable = () -> Releasables.closeExpectNoException(bytesReferences);
return new ReleasableBytesReference(CompositeBytesReference.of(bytesReferences), releasable);
}

private void releasePendingBytes(int bytesConsumed) {
int bytesToRelease = bytesConsumed;
while (bytesToRelease != 0) {
try (ReleasableBytesReference reference = pending.pollFirst()) {
assert reference != null;
if (bytesToRelease < reference.length()) {
pending.addFirst(reference.retainedSlice(bytesToRelease, reference.length() - bytesToRelease));
bytesToRelease -= bytesToRelease;
} else {
bytesToRelease -= reference.length();
}
ReleasableBytesReference reference = pending.pollFirst();
assert reference != null;
if (bytesToRelease < reference.length()) {
pending.addFirst(reference.releasableSlice(bytesToRelease));
return;
} else {
bytesToRelease -= reference.length();
reference.decRef();
}
}
}
Expand Down
Loading