Skip to content

Commit b99f819

Browse files
committed
Merge pull request #455 from NiteshKant/0.4.x
Disable auto-release buffers option.
2 parents 9d6d436 + 8d80c2a commit b99f819

File tree

12 files changed

+174
-6
lines changed

12 files changed

+174
-6
lines changed

build.gradle

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,10 @@ configure(subprojects) {
3232

3333
testCompile 'junit:junit-dep:4.10'
3434
}
35+
36+
test {
37+
testLogging {
38+
showStandardStreams = true
39+
}
40+
}
3541
}

rxnetty/src/main/java/io/reactivex/netty/channel/ObservableConnection.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.netty.channel.ChannelFuture;
2121
import io.netty.channel.ChannelFutureListener;
2222
import io.netty.channel.ChannelHandlerContext;
23+
import io.netty.util.AttributeKey;
2324
import io.reactivex.netty.metrics.Clock;
2425
import io.reactivex.netty.metrics.MetricEventsSubject;
2526
import io.reactivex.netty.pipeline.ReadTimeoutPipelineConfigurator;
@@ -38,6 +39,8 @@
3839
*/
3940
public class ObservableConnection<I, O> extends DefaultChannelWriter<O> {
4041

42+
public static AttributeKey<Boolean> AUTO_RELEASE_BUFFERS = AttributeKey.valueOf("rxnetty_auto_release_buffers");
43+
4144
private Subject<I, I> inputSubject;
4245
@SuppressWarnings("rawtypes")private final MetricEventsSubject eventsSubject;
4346
private final ChannelMetricEventProvider metricEventProvider;

rxnetty/src/main/java/io/reactivex/netty/client/AbstractClientBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,11 @@ public B withSslEngineFactory(SSLEngineFactory sslEngineFactory) {
214214
return returnBuilder();
215215
}
216216

217+
public B disableAutoReleaseBuffers() {
218+
bootstrap.attr(ObservableConnection.AUTO_RELEASE_BUFFERS, false);
219+
return returnBuilder();
220+
}
221+
217222
public Bootstrap getBootstrap() {
218223
return bootstrap;
219224
}

rxnetty/src/main/java/io/reactivex/netty/pipeline/ObservableAdapter.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.netty.channel.ChannelInboundHandlerAdapter;
2020
import io.netty.util.ReferenceCountUtil;
2121
import io.reactivex.netty.channel.NewRxConnectionEvent;
22+
import io.reactivex.netty.channel.ObservableConnection;
2223
import io.reactivex.netty.client.ConnectionReuseEvent;
2324
import rx.Observer;
2425

@@ -32,6 +33,15 @@ public class ObservableAdapter extends ChannelInboundHandlerAdapter {
3233
@SuppressWarnings("rawtypes")
3334
/*Nullable*/ private Observer bridgedObserver; /*This actually is an Rx Subject*/
3435

36+
private boolean autoReleaseBuffers;
37+
38+
@Override
39+
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
40+
Boolean autoRelease = ctx.channel().attr(ObservableConnection.AUTO_RELEASE_BUFFERS).get();
41+
autoReleaseBuffers = null == autoRelease || autoRelease;
42+
super.handlerAdded(ctx);
43+
}
44+
3545
@SuppressWarnings("unchecked")
3646
@Override
3747
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
@@ -41,7 +51,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
4151
} catch (ClassCastException cce) {
4252
bridgedObserver.onError(new RuntimeException("Mismatched message type.", cce));
4353
} finally {
44-
ReferenceCountUtil.release(msg);
54+
if (autoReleaseBuffers) {
55+
ReferenceCountUtil.release(msg);
56+
}
4557
}
4658
}
4759
}

rxnetty/src/main/java/io/reactivex/netty/protocol/http/client/ClientRequestResponseConverter.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,20 @@ public class ClientRequestResponseConverter extends ChannelDuplexHandler {
9090

9191
public static final IOException CONN_CLOSE_BEFORE_RESPONSE = new IOException("Connection closed by peer before sending a response.");
9292

93+
private boolean autoReleaseBuffers;
94+
9395
public ClientRequestResponseConverter(MetricEventsSubject<ClientMetricsEvent<?>> eventsSubject) {
9496
this.eventsSubject = eventsSubject;
9597
responseState = new ResponseState();
9698
}
9799

100+
@Override
101+
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
102+
Boolean autoRelease = ctx.channel().attr(ObservableConnection.AUTO_RELEASE_BUFFERS).get();
103+
autoReleaseBuffers = null == autoRelease || autoRelease;
104+
super.handlerAdded(ctx);
105+
}
106+
98107
@Override
99108
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
100109
Class<?> recievedMsgClass = msg.getClass();
@@ -244,13 +253,15 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
244253
}
245254

246255
@SuppressWarnings("unchecked")
247-
private static void invokeContentOnNext(Object nextObject, ResponseState stateToUse) {
256+
private void invokeContentOnNext(Object nextObject, ResponseState stateToUse) {
248257
try {
249258
stateToUse.contentSubject.onNext(nextObject);
250259
} catch (ClassCastException e) {
251260
stateToUse.contentSubject.onError(e);
252261
} finally {
253-
ReferenceCountUtil.release(nextObject);
262+
if (autoReleaseBuffers) {
263+
ReferenceCountUtil.release(nextObject);
264+
}
254265
}
255266
}
256267

rxnetty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.netty.handler.codec.http.HttpResponseStatus;
3636
import io.netty.handler.codec.http.LastHttpContent;
3737
import io.netty.util.ReferenceCountUtil;
38+
import io.reactivex.netty.channel.ObservableConnection;
3839
import io.reactivex.netty.metrics.Clock;
3940
import io.reactivex.netty.metrics.MetricEventsSubject;
4041
import io.reactivex.netty.protocol.http.UnicastContentSubject;
@@ -76,6 +77,7 @@ public class ServerRequestResponseConverter extends ChannelDuplexHandler {
7677
private final MetricEventsSubject<ServerMetricsEvent<?>> eventsSubject;
7778
private final long requestContentSubscriptionTimeoutMs;
7879
private RequestState currentRequestState;
80+
private boolean autoReleaseBuffers = true;
7981

8082
public ServerRequestResponseConverter(MetricEventsSubject<ServerMetricsEvent<?>> eventsSubject,
8183
long requestContentSubscriptionTimeoutMs) {
@@ -84,6 +86,13 @@ public ServerRequestResponseConverter(MetricEventsSubject<ServerMetricsEvent<?>>
8486
currentRequestState = new RequestState();
8587
}
8688

89+
@Override
90+
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
91+
Boolean autoRelease = ctx.channel().attr(ObservableConnection.AUTO_RELEASE_BUFFERS).get();
92+
autoReleaseBuffers = null == autoRelease || autoRelease;
93+
super.handlerAdded(ctx);
94+
}
95+
8796
@Override
8897
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
8998
Class<?> recievedMsgClass = msg.getClass();
@@ -188,13 +197,15 @@ public void operationComplete(ChannelFuture future) throws Exception {
188197
}
189198

190199
@SuppressWarnings({"unchecked", "rawtypes"})
191-
private static void invokeContentOnNext(Object nextObject, UnicastContentSubject contentSubject) {
200+
private void invokeContentOnNext(Object nextObject, UnicastContentSubject contentSubject) {
192201
try {
193202
contentSubject.onNext(nextObject);
194203
} catch (ClassCastException e) {
195204
contentSubject.onError(e);
196205
} finally {
197-
ReferenceCountUtil.release(nextObject);
206+
if (autoReleaseBuffers) {
207+
ReferenceCountUtil.release(nextObject);
208+
}
198209
}
199210
}
200211

rxnetty/src/main/java/io/reactivex/netty/server/AbstractServerBuilder.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.netty.util.concurrent.EventExecutorGroup;
2727
import io.reactivex.netty.RxNetty;
2828
import io.reactivex.netty.channel.ConnectionHandler;
29+
import io.reactivex.netty.channel.ObservableConnection;
2930
import io.reactivex.netty.metrics.MetricEventsListener;
3031
import io.reactivex.netty.metrics.MetricEventsListenerFactory;
3132
import io.reactivex.netty.pipeline.PipelineConfigurator;
@@ -138,6 +139,11 @@ public B withEventExecutorGroup(EventExecutorGroup eventExecutorGroup) {
138139
return returnBuilder();
139140
}
140141

142+
public B disableAutoReleaseBuffers() {
143+
serverBootstrap.attr(ObservableConnection.AUTO_RELEASE_BUFFERS, false);
144+
return returnBuilder();
145+
}
146+
141147
public PipelineConfigurator<I, O> getPipelineConfigurator() {
142148
return pipelineConfigurator;
143149
}

rxnetty/src/main/java/io/reactivex/netty/server/ConnectionBasedServerBuilder.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.netty.channel.socket.nio.NioServerSocketChannel;
2626
import io.reactivex.netty.RxNetty;
2727
import io.reactivex.netty.channel.ConnectionHandler;
28+
import io.reactivex.netty.channel.ObservableConnection;
2829

2930
/**
3031
* @author Nitesh Kant
@@ -73,4 +74,10 @@ public B defaultChannelOptions() {
7374
childChannelOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
7475
return super.defaultChannelOptions();
7576
}
77+
78+
@Override
79+
public B disableAutoReleaseBuffers() {
80+
serverBootstrap.childAttr(ObservableConnection.AUTO_RELEASE_BUFFERS, false);
81+
return returnBuilder();
82+
}
7683
}

rxnetty/src/test/java/io/reactivex/netty/protocol/http/client/HttpClientTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,33 @@ public void call(String t1) {
461461
});
462462
}
463463

464+
@Test(timeout = 60000)
465+
public void testDisableAutoRelease() throws Exception {
466+
HttpClient<ByteBuf, ByteBuf> client = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder("localhost", port)
467+
.disableAutoReleaseBuffers()
468+
.enableWireLogging(LogLevel.ERROR).build();
469+
Observable<HttpClientResponse<ByteBuf>> response = client.submit(HttpClientRequest.createGet("test/singleEntity"));
470+
final List<ByteBuf> result = new ArrayList<ByteBuf>();
471+
response.flatMap(new Func1<HttpClientResponse<ByteBuf>, Observable<ByteBuf>>() {
472+
@Override
473+
public Observable<ByteBuf> call(HttpClientResponse<ByteBuf> response) {
474+
return response.getContent();
475+
}
476+
}).toBlocking().forEach(new Action1<ByteBuf>() {
477+
478+
@Override
479+
public void call(ByteBuf t1) {
480+
result.add(t1);
481+
}
482+
});
483+
484+
assertEquals("Response not found.", 1, result.size());
485+
assertEquals("Hello world", result.get(0).toString(Charset.defaultCharset()));
486+
487+
assertEquals("Byte buf auto-released", 1, result.get(0).refCnt());
488+
result.get(0).release();
489+
}
490+
464491
private static void readResponseContent(Observable<HttpClientResponse<ServerSentEvent>> response,
465492
final List<String> result) {
466493
response.flatMap(
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package io.reactivex.netty.protocol.http.server;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.handler.codec.http.HttpResponseStatus;
5+
import io.netty.handler.logging.LogLevel;
6+
import io.reactivex.netty.RxNetty;
7+
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
8+
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
9+
import org.junit.Assert;
10+
import org.junit.Test;
11+
import rx.Observable;
12+
import rx.functions.Action0;
13+
import rx.functions.Func1;
14+
15+
import java.util.ArrayList;
16+
import java.util.List;
17+
import java.util.concurrent.CountDownLatch;
18+
import java.util.concurrent.TimeUnit;
19+
20+
public class DisbleAutoReleaseTest {
21+
22+
@Test(timeout = 60000)
23+
public void testDisableAutoRelease() throws Exception {
24+
25+
final List<ByteBuf> requestBufs = new ArrayList<ByteBuf>();
26+
27+
HttpServer<ByteBuf, ByteBuf> server = RxNetty.newHttpServerBuilder(0, new RequestHandler<ByteBuf, ByteBuf>() {
28+
@Override
29+
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
30+
return request.getContent()
31+
.map(new Func1<ByteBuf, Void>() {
32+
@Override
33+
public Void call(ByteBuf byteBuf) {
34+
requestBufs.add(byteBuf);
35+
return null;
36+
}
37+
})
38+
.ignoreElements()
39+
.cast(Void.class)
40+
.concatWith(response.writeStringAndFlush("Welcome!"));
41+
}
42+
}).disableAutoReleaseBuffers().enableWireLogging(LogLevel.DEBUG).build().start();
43+
44+
final CountDownLatch finishLatch = new CountDownLatch(1);
45+
HttpClientResponse<ByteBuf> response = RxNetty.createHttpClient("localhost", server.getServerPort())
46+
.submit(HttpClientRequest.createPost("/").withContent("Hello"))
47+
.finallyDo(new Action0() {
48+
@Override
49+
public void call() {
50+
finishLatch.countDown();
51+
}
52+
}).toBlocking().toFuture().get(10, TimeUnit.SECONDS);
53+
Assert.assertTrue("The returned observable did not finish.", finishLatch.await(1, TimeUnit.MINUTES));
54+
Assert.assertEquals("Request failed.", response.getStatus(), HttpResponseStatus.OK);
55+
56+
Assert.assertEquals("Unexpected request content on server.", 1, requestBufs.size());
57+
Assert.assertEquals("Unexpected request content buffer ref count.", 1, requestBufs.get(0).refCnt());
58+
59+
requestBufs.get(0).release();
60+
}
61+
}

0 commit comments

Comments
 (0)