Skip to content

Commit 5a067fc

Browse files
committed
Fixes #474 (disable auto release leaks buffers when content not subscribed) (#497)
When auto-release it turned off and the HTTP content is not subscribed, the buffers which are disposed from `UnicastContentSubject` are not released (which otherwise would have been released by the subscriber). This change properly release the disposed buffers when auto-release is turned off.
1 parent a279f29 commit 5a067fc

File tree

4 files changed

+58
-6
lines changed

4 files changed

+58
-6
lines changed

rxnetty/src/main/java/io/reactivex/netty/protocol/http/UnicastContentSubject.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ private UnicastContentSubject(final State<T> state, long noSubscriptionTimeout,
9292
* @return The new instance of {@link UnicastContentSubject}
9393
*/
9494
public static <T> UnicastContentSubject<T> createWithoutNoSubscriptionTimeout(Action0 onUnsubscribe) {
95-
State<T> state = new State<T>(onUnsubscribe);
95+
State<T> state = new State<T>(onUnsubscribe, null);
9696
return new UnicastContentSubject<T>(state);
9797
}
9898

@@ -125,7 +125,13 @@ public static <T> UnicastContentSubject<T> create(long noSubscriptionTimeout, Ti
125125

126126
public static <T> UnicastContentSubject<T> create(long noSubscriptionTimeout, TimeUnit timeUnit,
127127
Scheduler timeoutScheduler, Action0 onUnsubscribe) {
128-
State<T> state = new State<T>(onUnsubscribe);
128+
return create(noSubscriptionTimeout, timeUnit, timeoutScheduler, onUnsubscribe, null);
129+
}
130+
131+
public static <T> UnicastContentSubject<T> create(long noSubscriptionTimeout, TimeUnit timeUnit,
132+
Scheduler timeoutScheduler, Action0 onUnsubscribe,
133+
Action1<T> onDispose) {
134+
State<T> state = new State<T>(onUnsubscribe, onDispose);
129135
return new UnicastContentSubject<T>(state, noSubscriptionTimeout, timeUnit, timeoutScheduler);
130136
}
131137

@@ -140,7 +146,8 @@ public static <T> UnicastContentSubject<T> create(long noSubscriptionTimeout, Ti
140146
*/
141147
public boolean disposeIfNotSubscribed() {
142148
if (state.casState(State.STATES.UNSUBSCRIBED, State.STATES.DISPOSED)) {
143-
state.bufferedObservable.subscribe(Subscribers.empty()); // Drain all items so that ByteBuf gets released.
149+
Subscriber<T> sub = null == state.onDispose ? Subscribers.<T>empty() : Subscribers.create(state.onDispose);
150+
state.bufferedObservable.subscribe(sub); // Drain all items so that ByteBuf gets released.
144151
return true;
145152
}
146153
return false;
@@ -156,10 +163,12 @@ public void updateTimeoutIfNotScheduled(long noSubscriptionTimeout, TimeUnit tim
156163
private static final class State<T> {
157164

158165
private final Action0 onUnsubscribe;
166+
private final Action1<T> onDispose;
159167
private volatile Subscription releaseSubscription;
160168

161-
private State(Action0 onUnsubscribe) {
169+
private State(Action0 onUnsubscribe, Action1<T> onDispose) {
162170
this.onUnsubscribe = onUnsubscribe;
171+
this.onDispose = onDispose;
163172
final BufferUntilSubscriber<T> bufferedSubject = BufferUntilSubscriber.create();
164173
bufferedObservable = bufferedSubject.lift(new AutoReleaseByteBufOperator<T>()); // Always auto-release
165174
bufferedObserver = bufferedSubject;

rxnetty/src/main/java/io/reactivex/netty/protocol/http/client/ClientRequestResponseConverter.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import rx.Observer;
5353
import rx.Subscriber;
5454
import rx.functions.Action0;
55+
import rx.functions.Action1;
56+
import rx.schedulers.Schedulers;
5557

5658
/**
5759
* A channel handler for {@link HttpClient} to convert netty's http request/response objects to {@link HttpClient}'s
@@ -371,7 +373,15 @@ public void call() {
371373
};
372374

373375
if (subscriptionTimeout > 0) {
374-
contentSubject = UnicastContentSubject.create(subscriptionTimeout, subscriptionTimeoutUnit, onUnsubscribe);
376+
contentSubject = UnicastContentSubject.create(subscriptionTimeout, subscriptionTimeoutUnit,
377+
Schedulers.computation(), onUnsubscribe, new Action1() {
378+
@Override
379+
public void call(Object o) {
380+
if (!autoReleaseBuffers) {
381+
ReferenceCountUtil.release(o);
382+
}
383+
}
384+
});
375385
} else {
376386
contentSubject = UnicastContentSubject.createWithoutNoSubscriptionTimeout(onUnsubscribe);
377387
}

rxnetty/src/main/java/io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@
4242
import io.reactivex.netty.server.ServerMetricsEvent;
4343
import org.slf4j.Logger;
4444
import org.slf4j.LoggerFactory;
45+
import rx.functions.Action0;
46+
import rx.functions.Action1;
47+
import rx.functions.Actions;
48+
import rx.schedulers.Schedulers;
4549

4650
import java.io.IOException;
4751
import java.util.concurrent.TimeUnit;
@@ -217,7 +221,16 @@ private final class RequestState {
217221

218222
@SuppressWarnings({"rawtypes", "unchecked"})
219223
private void createRxRequest(ChannelHandlerContext ctx, HttpRequest httpRequest) {
220-
contentSubject = UnicastContentSubject.create(requestContentSubscriptionTimeoutMs, TimeUnit.MILLISECONDS);
224+
contentSubject = UnicastContentSubject.create(requestContentSubscriptionTimeoutMs, TimeUnit.MILLISECONDS,
225+
Schedulers.computation(), Actions.empty(), new Action1() {
226+
@Override
227+
public void call(Object o) {
228+
if (!autoReleaseBuffers) {
229+
ReferenceCountUtil.release(o);
230+
}
231+
232+
}
233+
});
221234
rxRequest = new HttpServerRequest(ctx.channel(), httpRequest, contentSubject);
222235
}
223236

rxnetty/src/test/java/io/reactivex/netty/protocol/http/UnicastContentSubjectTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,26 @@ public void call() {
212212
Assert.assertTrue("Inner subscriber did not unsubscribe on inner completion.", innerUnsubscribe.get());
213213
}
214214

215+
@Test
216+
public void testDispose() throws Exception {
217+
TestScheduler scheduler = Schedulers.test();
218+
UnicastContentSubject<ByteBuf> sub = UnicastContentSubject.create(1, TimeUnit.HOURS, scheduler, null,
219+
new Action1<ByteBuf>() {
220+
@Override
221+
public void call(ByteBuf bb) {
222+
bb.release();
223+
}
224+
});
225+
ByteBuf bb = Unpooled.buffer();
226+
sub.onNext(bb);
227+
228+
Assert.assertEquals("Unexpected ref count for buffer before dispose.", 2, bb.refCnt());
229+
230+
scheduler.advanceTimeBy(1, TimeUnit.HOURS);
231+
232+
Assert.assertEquals("Unexpected ref count for buffer post dispose.", 0, bb.refCnt());
233+
}
234+
215235
private static class OnUnsubscribeAction implements Action0 {
216236

217237
private volatile boolean called;

0 commit comments

Comments
 (0)