Skip to content

Commit 2964b15

Browse files
Remove lots of redundant ref-counting from transport pipeline (elastic#123390)
We can do with a whole lot less in ref-counting, avoiding lots of contention and speeding up the logic in general by only incrementing ref-counts where ownership is unclear while avoiding count changes on obvious "moves".
1 parent 6b1a6f7 commit 2964b15

File tree

6 files changed

+97
-130
lines changed

6 files changed

+97
-130
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageInboundHandler.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
import org.elasticsearch.ElasticsearchException;
1616
import org.elasticsearch.ExceptionsHelper;
17-
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1817
import org.elasticsearch.common.network.ThreadWatchdog;
1918
import org.elasticsearch.core.Releasables;
2019
import org.elasticsearch.transport.InboundPipeline;
@@ -51,8 +50,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
5150
final ByteBuf buffer = (ByteBuf) msg;
5251
Netty4TcpChannel channel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get();
5352
activityTracker.startActivity();
54-
try (ReleasableBytesReference reference = Netty4Utils.toReleasableBytesReference(buffer)) {
55-
pipeline.handleBytes(channel, reference);
53+
try {
54+
pipeline.handleBytes(channel, Netty4Utils.toReleasableBytesReference(buffer));
5655
} finally {
5756
activityTracker.stopActivity();
5857
}

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyByteBufSizer.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,10 @@
1212
import io.netty.buffer.ByteBuf;
1313
import io.netty.channel.ChannelHandler;
1414
import io.netty.channel.ChannelHandlerContext;
15-
import io.netty.handler.codec.MessageToMessageDecoder;
16-
17-
import java.util.List;
15+
import io.netty.channel.ChannelInboundHandlerAdapter;
1816

1917
@ChannelHandler.Sharable
20-
public class NettyByteBufSizer extends MessageToMessageDecoder<ByteBuf> {
18+
public class NettyByteBufSizer extends ChannelInboundHandlerAdapter {
2119

2220
public static final NettyByteBufSizer INSTANCE = new NettyByteBufSizer();
2321

@@ -26,14 +24,12 @@ private NettyByteBufSizer() {
2624
}
2725

2826
@Override
29-
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) {
30-
int readableBytes = buf.readableBytes();
31-
if (buf.capacity() >= 1024) {
32-
ByteBuf resized = buf.discardReadBytes().capacity(readableBytes);
33-
assert resized.readableBytes() == readableBytes;
34-
out.add(resized.retain());
35-
} else {
36-
out.add(buf.retain());
27+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
28+
if (msg instanceof ByteBuf buf && buf.capacity() >= 1024) {
29+
int readableBytes = buf.readableBytes();
30+
buf = buf.discardReadBytes().capacity(readableBytes);
31+
assert buf.readableBytes() == readableBytes;
3732
}
33+
ctx.fireChannelRead(msg);
3834
}
3935
}

server/src/main/java/org/elasticsearch/transport/InboundDecoder.java

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@
1818
import org.elasticsearch.common.recycler.Recycler;
1919
import org.elasticsearch.common.unit.ByteSizeUnit;
2020
import org.elasticsearch.common.unit.ByteSizeValue;
21+
import org.elasticsearch.core.CheckedConsumer;
2122
import org.elasticsearch.core.Releasable;
2223
import org.elasticsearch.core.Releasables;
2324

2425
import java.io.IOException;
2526
import java.io.StreamCorruptedException;
26-
import java.util.function.Consumer;
2727

2828
public class InboundDecoder implements Releasable {
2929

@@ -53,7 +53,7 @@ public InboundDecoder(Recycler<BytesRef> recycler, ByteSizeValue maxHeaderSize,
5353
this.channelType = channelType;
5454
}
5555

56-
public int decode(ReleasableBytesReference reference, Consumer<Object> fragmentConsumer) throws IOException {
56+
public int decode(ReleasableBytesReference reference, CheckedConsumer<Object, IOException> fragmentConsumer) throws IOException {
5757
ensureOpen();
5858
try {
5959
return internalDecode(reference, fragmentConsumer);
@@ -63,7 +63,8 @@ public int decode(ReleasableBytesReference reference, Consumer<Object> fragmentC
6363
}
6464
}
6565

66-
public int internalDecode(ReleasableBytesReference reference, Consumer<Object> fragmentConsumer) throws IOException {
66+
public int internalDecode(ReleasableBytesReference reference, CheckedConsumer<Object, IOException> fragmentConsumer)
67+
throws IOException {
6768
if (isOnHeader()) {
6869
int messageLength = TcpTransport.readMessageLength(reference);
6970
if (messageLength == -1) {
@@ -104,25 +105,28 @@ public int internalDecode(ReleasableBytesReference reference, Consumer<Object> f
104105
}
105106
int remainingToConsume = totalNetworkSize - bytesConsumed;
106107
int maxBytesToConsume = Math.min(reference.length(), remainingToConsume);
107-
ReleasableBytesReference retainedContent;
108-
if (maxBytesToConsume == remainingToConsume) {
109-
retainedContent = reference.retainedSlice(0, maxBytesToConsume);
110-
} else {
111-
retainedContent = reference.retain();
112-
}
113-
114108
int bytesConsumedThisDecode = 0;
115109
if (decompressor != null) {
116-
bytesConsumedThisDecode += decompress(retainedContent);
110+
bytesConsumedThisDecode += decompressor.decompress(
111+
maxBytesToConsume == remainingToConsume ? reference.slice(0, maxBytesToConsume) : reference
112+
);
117113
bytesConsumed += bytesConsumedThisDecode;
118114
ReleasableBytesReference decompressed;
119115
while ((decompressed = decompressor.pollDecompressedPage(isDone())) != null) {
120-
fragmentConsumer.accept(decompressed);
116+
try (var buf = decompressed) {
117+
fragmentConsumer.accept(buf);
118+
}
121119
}
122120
} else {
123121
bytesConsumedThisDecode += maxBytesToConsume;
124122
bytesConsumed += maxBytesToConsume;
125-
fragmentConsumer.accept(retainedContent);
123+
if (maxBytesToConsume == remainingToConsume) {
124+
try (ReleasableBytesReference retained = reference.retainedSlice(0, maxBytesToConsume)) {
125+
fragmentConsumer.accept(retained);
126+
}
127+
} else {
128+
fragmentConsumer.accept(reference);
129+
}
126130
}
127131
if (isDone()) {
128132
finishMessage(fragmentConsumer);
@@ -138,7 +142,7 @@ public void close() {
138142
cleanDecodeState();
139143
}
140144

141-
private void finishMessage(Consumer<Object> fragmentConsumer) {
145+
private void finishMessage(CheckedConsumer<Object, IOException> fragmentConsumer) throws IOException {
142146
cleanDecodeState();
143147
fragmentConsumer.accept(END_CONTENT);
144148
}
@@ -154,12 +158,6 @@ private void cleanDecodeState() {
154158
}
155159
}
156160

157-
private int decompress(ReleasableBytesReference content) throws IOException {
158-
try (content) {
159-
return decompressor.decompress(content);
160-
}
161-
}
162-
163161
private boolean isDone() {
164162
return bytesConsumed == totalNetworkSize;
165163
}

server/src/main/java/org/elasticsearch/transport/InboundPipeline.java

Lines changed: 53 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,17 @@
1111

1212
import org.elasticsearch.common.bytes.CompositeBytesReference;
1313
import org.elasticsearch.common.bytes.ReleasableBytesReference;
14+
import org.elasticsearch.core.CheckedConsumer;
1415
import org.elasticsearch.core.Releasable;
1516
import org.elasticsearch.core.Releasables;
1617

1718
import java.io.IOException;
1819
import java.util.ArrayDeque;
19-
import java.util.ArrayList;
2020
import java.util.function.BiConsumer;
2121
import java.util.function.LongSupplier;
2222

2323
public class InboundPipeline implements Releasable {
2424

25-
private static final ThreadLocal<ArrayList<Object>> fragmentList = ThreadLocal.withInitial(ArrayList::new);
2625
private static final InboundMessage PING_MESSAGE = new InboundMessage(null, true);
2726

2827
private final LongSupplier relativeTimeInMillis;
@@ -56,81 +55,74 @@ public void close() {
5655

5756
public void handleBytes(TcpChannel channel, ReleasableBytesReference reference) throws IOException {
5857
if (uncaughtException != null) {
58+
reference.close();
5959
throw new IllegalStateException("Pipeline state corrupted by uncaught exception", uncaughtException);
6060
}
6161
try {
62-
doHandleBytes(channel, reference);
62+
channel.getChannelStats().markAccessed(relativeTimeInMillis.getAsLong());
63+
statsTracker.markBytesRead(reference.length());
64+
if (isClosed) {
65+
reference.close();
66+
return;
67+
}
68+
pending.add(reference);
69+
doHandleBytes(channel);
6370
} catch (Exception e) {
6471
uncaughtException = e;
6572
throw e;
6673
}
6774
}
6875

69-
public void doHandleBytes(TcpChannel channel, ReleasableBytesReference reference) throws IOException {
70-
channel.getChannelStats().markAccessed(relativeTimeInMillis.getAsLong());
71-
statsTracker.markBytesRead(reference.length());
72-
pending.add(reference.retain());
73-
74-
final ArrayList<Object> fragments = fragmentList.get();
75-
boolean continueHandling = true;
76-
77-
while (continueHandling && isClosed == false) {
78-
boolean continueDecoding = true;
79-
while (continueDecoding && pending.isEmpty() == false) {
80-
try (ReleasableBytesReference toDecode = getPendingBytes()) {
81-
final int bytesDecoded = decoder.decode(toDecode, fragments::add);
82-
if (bytesDecoded != 0) {
83-
releasePendingBytes(bytesDecoded);
84-
if (fragments.isEmpty() == false && endOfMessage(fragments.get(fragments.size() - 1))) {
85-
continueDecoding = false;
86-
}
87-
} else {
88-
continueDecoding = false;
89-
}
76+
private void doHandleBytes(TcpChannel channel) throws IOException {
77+
do {
78+
CheckedConsumer<Object, IOException> decodeConsumer = f -> forwardFragment(channel, f);
79+
int bytesDecoded = decoder.decode(pending.peekFirst(), decodeConsumer);
80+
if (bytesDecoded == 0 && pending.size() > 1) {
81+
final ReleasableBytesReference[] bytesReferences = new ReleasableBytesReference[pending.size()];
82+
int index = 0;
83+
for (ReleasableBytesReference pendingReference : pending) {
84+
bytesReferences[index] = pendingReference.retain();
85+
++index;
86+
}
87+
try (
88+
ReleasableBytesReference toDecode = new ReleasableBytesReference(
89+
CompositeBytesReference.of(bytesReferences),
90+
() -> Releasables.closeExpectNoException(bytesReferences)
91+
)
92+
) {
93+
bytesDecoded = decoder.decode(toDecode, decodeConsumer);
9094
}
9195
}
92-
93-
if (fragments.isEmpty()) {
94-
continueHandling = false;
96+
if (bytesDecoded != 0) {
97+
releasePendingBytes(bytesDecoded);
9598
} else {
96-
try {
97-
forwardFragments(channel, fragments);
98-
} finally {
99-
for (Object fragment : fragments) {
100-
if (fragment instanceof ReleasableBytesReference) {
101-
((ReleasableBytesReference) fragment).close();
102-
}
103-
}
104-
fragments.clear();
105-
}
99+
break;
106100
}
107-
}
101+
} while (pending.isEmpty() == false);
108102
}
109103

110-
private void forwardFragments(TcpChannel channel, ArrayList<Object> fragments) throws IOException {
111-
for (Object fragment : fragments) {
112-
if (fragment instanceof Header) {
113-
headerReceived((Header) fragment);
114-
} else if (fragment instanceof Compression.Scheme) {
115-
assert aggregator.isAggregating();
116-
aggregator.updateCompressionScheme((Compression.Scheme) fragment);
117-
} else if (fragment == InboundDecoder.PING) {
118-
assert aggregator.isAggregating() == false;
119-
messageHandler.accept(channel, PING_MESSAGE);
120-
} else if (fragment == InboundDecoder.END_CONTENT) {
121-
assert aggregator.isAggregating();
122-
InboundMessage aggregated = aggregator.finishAggregation();
123-
try {
124-
statsTracker.markMessageReceived();
125-
messageHandler.accept(channel, aggregated);
126-
} finally {
127-
aggregated.decRef();
128-
}
129-
} else {
130-
assert aggregator.isAggregating();
131-
assert fragment instanceof ReleasableBytesReference;
132-
aggregator.aggregate((ReleasableBytesReference) fragment);
104+
private void forwardFragment(TcpChannel channel, Object fragment) throws IOException {
105+
if (fragment instanceof Header) {
106+
headerReceived((Header) fragment);
107+
} else if (fragment instanceof Compression.Scheme) {
108+
assert aggregator.isAggregating();
109+
aggregator.updateCompressionScheme((Compression.Scheme) fragment);
110+
} else if (fragment == InboundDecoder.PING) {
111+
assert aggregator.isAggregating() == false;
112+
messageHandler.accept(channel, PING_MESSAGE);
113+
} else if (fragment == InboundDecoder.END_CONTENT) {
114+
assert aggregator.isAggregating();
115+
InboundMessage aggregated = aggregator.finishAggregation();
116+
try {
117+
statsTracker.markMessageReceived();
118+
messageHandler.accept(channel, aggregated);
119+
} finally {
120+
aggregated.decRef();
133121
}
122+
} else {
123+
assert aggregator.isAggregating();
124+
assert fragment instanceof ReleasableBytesReference;
125+
aggregator.aggregate((ReleasableBytesReference) fragment);
134126
}
135127
}
136128

@@ -139,25 +131,6 @@ protected void headerReceived(Header header) {
139131
aggregator.headerReceived(header);
140132
}
141133

142-
private static boolean endOfMessage(Object fragment) {
143-
return fragment == InboundDecoder.PING || fragment == InboundDecoder.END_CONTENT || fragment instanceof Exception;
144-
}
145-
146-
private ReleasableBytesReference getPendingBytes() {
147-
if (pending.size() == 1) {
148-
return pending.peekFirst().retain();
149-
} else {
150-
final ReleasableBytesReference[] bytesReferences = new ReleasableBytesReference[pending.size()];
151-
int index = 0;
152-
for (ReleasableBytesReference pendingReference : pending) {
153-
bytesReferences[index] = pendingReference.retain();
154-
++index;
155-
}
156-
final Releasable releasable = () -> Releasables.closeExpectNoException(bytesReferences);
157-
return new ReleasableBytesReference(CompositeBytesReference.of(bytesReferences), releasable);
158-
}
159-
}
160-
161134
private void releasePendingBytes(int bytesConsumed) {
162135
int bytesToRelease = bytesConsumed;
163136
while (bytesToRelease != 0) {

server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,6 @@ public void testDecode() throws IOException {
117117
assertEquals(messageBytes, content);
118118
// Ref count is incremented since the bytes are forwarded as a fragment
119119
assertTrue(releasable2.hasReferences());
120-
releasable2.decRef();
121-
assertTrue(releasable2.hasReferences());
122120
assertTrue(releasable2.decRef());
123121
assertEquals(InboundDecoder.END_CONTENT, endMarker);
124122
}
@@ -433,7 +431,12 @@ public void testCompressedDecode() throws IOException {
433431

434432
final BytesReference bytes2 = totalBytes.slice(bytesConsumed, totalBytes.length() - bytesConsumed);
435433
final ReleasableBytesReference releasable2 = wrapAsReleasable(bytes2);
436-
int bytesConsumed2 = decoder.decode(releasable2, fragments::add);
434+
int bytesConsumed2 = decoder.decode(releasable2, e -> {
435+
fragments.add(e);
436+
if (e instanceof ReleasableBytesReference reference) {
437+
reference.retain();
438+
}
439+
});
437440
assertEquals(totalBytes.length() - totalHeaderSize, bytesConsumed2);
438441

439442
final Object compressionScheme = fragments.get(0);

server/src/test/java/org/elasticsearch/transport/InboundPipelineTests.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -159,12 +159,11 @@ public void testPipelineHandling() throws IOException {
159159
final int remainingBytes = networkBytes.length() - currentOffset;
160160
final int bytesToRead = Math.min(randomIntBetween(1, 32 * 1024), remainingBytes);
161161
final BytesReference slice = networkBytes.slice(currentOffset, bytesToRead);
162-
try (ReleasableBytesReference reference = new ReleasableBytesReference(slice, () -> {})) {
163-
toRelease.add(reference);
164-
bytesReceived += reference.length();
165-
pipeline.handleBytes(channel, reference);
166-
currentOffset += bytesToRead;
167-
}
162+
ReleasableBytesReference reference = new ReleasableBytesReference(slice, () -> {});
163+
toRelease.add(reference);
164+
bytesReceived += reference.length();
165+
pipeline.handleBytes(channel, reference);
166+
currentOffset += bytesToRead;
168167
}
169168

170169
final int messages = expected.size();
@@ -288,13 +287,12 @@ public void testEnsureBodyIsNotPrematurelyReleased() throws IOException {
288287
final Releasable releasable = () -> bodyReleased.set(true);
289288
final int from = totalHeaderSize - 1;
290289
final BytesReference partHeaderPartBody = reference.slice(from, reference.length() - from - 1);
291-
try (ReleasableBytesReference slice = new ReleasableBytesReference(partHeaderPartBody, releasable)) {
292-
pipeline.handleBytes(new FakeTcpChannel(), slice);
293-
}
290+
pipeline.handleBytes(new FakeTcpChannel(), new ReleasableBytesReference(partHeaderPartBody, releasable));
294291
assertFalse(bodyReleased.get());
295-
try (ReleasableBytesReference slice = new ReleasableBytesReference(reference.slice(reference.length() - 1, 1), releasable)) {
296-
pipeline.handleBytes(new FakeTcpChannel(), slice);
297-
}
292+
pipeline.handleBytes(
293+
new FakeTcpChannel(),
294+
new ReleasableBytesReference(reference.slice(reference.length() - 1, 1), releasable)
295+
);
298296
assertTrue(bodyReleased.get());
299297
}
300298
}

0 commit comments

Comments
 (0)