Skip to content

Commit 07d201a

Browse files
committed
Merge pull request #429 from NiteshKant/0.4.x
Fixes #419
2 parents 2c55dc4 + de09d99 commit 07d201a

File tree

4 files changed

+49
-3
lines changed

4 files changed

+49
-3
lines changed

rxnetty/src/main/java/io/reactivex/netty/protocol/http/client/ClientRequestResponseConverter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
15+
*
1516
*/
1617

1718
package io.reactivex.netty.protocol.http.client;
@@ -276,7 +277,7 @@ public void onCompleted() {
276277
public void onError(Throwable e) {
277278
eventsSubject.onEvent(HttpClientMetricsEvent.REQUEST_CONTENT_SOURCE_ERROR, e);
278279
promise.tryFailure(e);
279-
rxRequest.onWriteComplete();
280+
rxRequest.onWriteFailed(e);
280281
}
281282

282283
@Override

rxnetty/src/main/java/io/reactivex/netty/protocol/http/client/HttpClientRequest.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014 Netflix, Inc.
2+
* Copyright 2015 Netflix, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -12,6 +12,7 @@
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
15+
*
1516
*/
1617

1718
package io.reactivex.netty.protocol.http.client;
@@ -29,6 +30,7 @@
2930
import io.reactivex.netty.channel.ContentTransformer;
3031
import rx.Observable;
3132
import rx.functions.Action0;
33+
import rx.functions.Action1;
3234
import rx.functions.Func1;
3335

3436
import java.nio.charset.Charset;
@@ -44,6 +46,7 @@ public class HttpClientRequest<T> {
4446
private Observable<ByteBuf> rawContentSource;
4547
private String absoluteUri;
4648
private Action0 onWriteCompleteAction;
49+
private Action1<Throwable> onWriteFailedAction;
4750

4851
HttpClientRequest(HttpRequest nettyRequest) {
4952
this.nettyRequest = nettyRequest;
@@ -196,12 +199,22 @@ void doOnWriteComplete(Action0 onWriteCompleteAction) {
196199
this.onWriteCompleteAction = onWriteCompleteAction;
197200
}
198201

202+
void doOnWriteFailed(Action1<Throwable> onWriteFailedAction) {
203+
this.onWriteFailedAction = onWriteFailedAction;
204+
}
205+
199206
void onWriteComplete() {
200207
if (null != onWriteCompleteAction) {
201208
onWriteCompleteAction.call();
202209
}
203210
}
204211

212+
void onWriteFailed(Throwable throwable) {
213+
if (null != onWriteFailedAction) {
214+
onWriteFailedAction.call(throwable);
215+
}
216+
}
217+
205218
/*Set by HttpClient*/void setDynamicUriParts(String host, int port, boolean secure) {
206219
absoluteUri = secure ? "https" : "http" + "://" + host + ':' + port; // Uri in netty always starts with a slash
207220
}

rxnetty/src/main/java/io/reactivex/netty/protocol/http/client/RequestProcessingOperator.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014 Netflix, Inc.
2+
* Copyright 2015 Netflix, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -12,6 +12,7 @@
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
15+
*
1516
*/
1617

1718
package io.reactivex.netty.protocol.http.client;
@@ -89,6 +90,16 @@ public void call(Throwable throwable) {
8990
}
9091
})
9192
.subscribe(child)); //subscribe the child for response.
93+
94+
request.doOnWriteFailed(new Action1<Throwable>() {
95+
@Override
96+
public void call(Throwable throwable) {
97+
eventsSubject.onEvent(HttpClientMetricsEvent.REQUEST_WRITE_FAILED,
98+
Clock.onEndMillis(startTimeMillis), throwable);
99+
child.onError(throwable);
100+
}
101+
});
102+
92103
request.doOnWriteComplete(new Action0() {
93104
@Override
94105
public void call() {

rxnetty/src/test/java/io/reactivex/netty/protocol/http/client/HttpClientTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
15+
*
1516
*/
1617

1718
package io.reactivex.netty.protocol.http.client;
@@ -45,6 +46,7 @@
4546
import rx.functions.Action0;
4647
import rx.functions.Action1;
4748
import rx.functions.Func1;
49+
import rx.observers.TestSubscriber;
4850

4951
import java.net.ConnectException;
5052
import java.nio.charset.Charset;
@@ -119,6 +121,25 @@ public void testChunkedStreaming() throws Exception {
119121
assertEquals(RequestProcessor.smallStreamContent, result);
120122
}
121123

124+
@Test(timeout = 60000)
125+
public void testWriteWithErrorContent() throws Exception {
126+
HttpClient<ByteBuf, ByteBuf> client = RxNetty.createHttpClient("localhost", port);
127+
Observable<ByteBuf> errSource = Observable.error(new NullPointerException());
128+
129+
HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost("/")
130+
.withContentSource(errSource);
131+
132+
Observable<HttpClientResponse<ByteBuf>> response = client.submit(request);
133+
134+
final List<String> result = new ArrayList<String>();
135+
136+
TestSubscriber<HttpClientResponse<ByteBuf>> testSub = new TestSubscriber<HttpClientResponse<ByteBuf>>();
137+
response.subscribe(testSub);
138+
139+
testSub.awaitTerminalEvent();
140+
testSub.assertError(NullPointerException.class);
141+
}
142+
122143
@Test
123144
public void testMultipleChunks() throws Exception {
124145
HttpClient<ByteBuf, ServerSentEvent> client = RxNetty.createHttpClient("localhost", port,

0 commit comments

Comments
 (0)