11/*
2- * Copyright (c) 2015, 2021 , Oracle and/or its affiliates. All rights reserved.
2+ * Copyright (c) 2015, 2022 , Oracle and/or its affiliates. All rights reserved.
33 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44 *
55 * This code is free software; you can redistribute it and/or modify it
2727
2828import java .io .IOException ;
2929import java .net .InetSocketAddress ;
30- import java .net .http .HttpClient ;
3130import java .net .http .HttpResponse ;
3231import java .net .http .HttpResponse .BodyHandler ;
3332import java .net .http .HttpResponse .BodySubscriber ;
33+ import java .net .http .HttpResponse .ResponseInfo ;
3434import java .nio .ByteBuffer ;
3535import java .util .Objects ;
3636import java .util .concurrent .CompletableFuture ;
3939import java .util .concurrent .ConcurrentLinkedDeque ;
4040import java .util .concurrent .Executor ;
4141import java .util .concurrent .Flow ;
42+
4243import jdk .internal .net .http .common .Demand ;
44+ import jdk .internal .net .http .common .HttpBodySubscriberWrapper ;
4345import jdk .internal .net .http .common .Log ;
4446import jdk .internal .net .http .common .FlowTube ;
4547import jdk .internal .net .http .common .Logger ;
4648import jdk .internal .net .http .common .SequentialScheduler ;
4749import jdk .internal .net .http .common .MinimalFuture ;
4850import jdk .internal .net .http .common .Utils ;
4951import static java .net .http .HttpClient .Version .HTTP_1_1 ;
50- import static jdk .internal .net .http .common .Utils .wrapWithExtraDetail ;
5152
5253/**
5354 * Encapsulates one HTTP/1.1 request/response exchange.
@@ -78,16 +79,18 @@ class Http1Exchange<T> extends ExchangeImpl<T> {
7879 final ConcurrentLinkedDeque <DataPair > outgoing = new ConcurrentLinkedDeque <>();
7980
8081 /** The write publisher, responsible for writing the complete request ( both
81- * headers and body ( if any ). */
82+ * headers and body ( if any )) . */
8283 private final Http1Publisher writePublisher = new Http1Publisher ();
8384
8485 /** Completed when the header have been published, or there is an error */
8586 private final CompletableFuture <ExchangeImpl <T >> headersSentCF = new MinimalFuture <>();
8687 /** Completed when the body has been published, or there is an error */
8788 private final CompletableFuture <ExchangeImpl <T >> bodySentCF = new MinimalFuture <>();
8889
89- /** The subscriber to the request's body published. Maybe null. */
90- private volatile Http1BodySubscriber bodySubscriber ;
90+ /** The subscriber to the request's body published. May be null. */
91+ private volatile Http1RequestBodySubscriber bodySubscriber ;
92+ /** The subscriber to the response's body received. May be null. */
93+ private volatile BodySubscriber <T > responseSubscriber ;
9194
9295 enum State { INITIAL ,
9396 HEADERS ,
@@ -117,12 +120,12 @@ public String toString() {
117120 * concrete implementations: {@link Http1Request.StreamSubscriber}, and
118121 * {@link Http1Request.FixedContentSubscriber}, for receiving chunked and
119122 * fixed length bodies, respectively. */
120- static abstract class Http1BodySubscriber implements Flow .Subscriber <ByteBuffer > {
123+ abstract static class Http1RequestBodySubscriber implements Flow .Subscriber <ByteBuffer > {
121124 final MinimalFuture <Flow .Subscription > whenSubscribed = new MinimalFuture <>();
122125 private volatile Flow .Subscription subscription ;
123126 volatile boolean complete ;
124127 private final Logger debug ;
125- Http1BodySubscriber (Logger debug ) {
128+ Http1RequestBodySubscriber (Logger debug ) {
126129 assert debug != null ;
127130 this .debug = debug ;
128131 }
@@ -159,8 +162,8 @@ final void cancelSubscription() {
159162 }
160163 }
161164
162- static Http1BodySubscriber completeSubscriber (Logger debug ) {
163- return new Http1BodySubscriber (debug ) {
165+ static Http1RequestBodySubscriber completeSubscriber (Logger debug ) {
166+ return new Http1RequestBodySubscriber (debug ) {
164167 @ Override public void onSubscribe (Flow .Subscription subscription ) { error (); }
165168 @ Override public void onNext (ByteBuffer item ) { error (); }
166169 @ Override public void onError (Throwable throwable ) { error (); }
@@ -173,6 +176,34 @@ private void error() {
173176 }
174177 }
175178
179+ /**
180+ * The Http1AsyncReceiver ensures that all calls to
181+ * the subscriber, including onSubscribe, occur sequentially.
182+ * There could however be some race conditions that could happen
183+ * in case of unexpected errors thrown at unexpected places, which
184+ * may cause onError to be called multiple times.
185+ * The Http1BodySubscriber will ensure that the user subscriber
186+ * is actually completed only once - and only after it is
187+ * subscribed.
188+ * @param <U> The type of response.
189+ */
190+ static final class Http1ResponseBodySubscriber <U > extends HttpBodySubscriberWrapper <U > {
191+ final Http1Exchange <U > exchange ;
192+ Http1ResponseBodySubscriber (BodySubscriber <U > userSubscriber , Http1Exchange <U > exchange ) {
193+ super (userSubscriber );
194+ this .exchange = exchange ;
195+ }
196+
197+ @ Override
198+ protected void complete (Throwable t ) {
199+ try {
200+ exchange .responseSubscriberCompleted (this );
201+ } finally {
202+ super .complete (t );
203+ }
204+ }
205+ }
206+
176207 @ Override
177208 public String toString () {
178209 return "HTTP/1.1 " + request .toString ();
@@ -217,6 +248,28 @@ private void connectFlows(HttpConnection connection) {
217248 asyncReceiver .subscriber ());
218249 }
219250
251+ // The Http1ResponseBodySubscriber is registered with the HttpClient
252+ // to ensure that it gets completed if the SelectorManager aborts due
253+ // to unexpected exceptions.
254+ void registerResponseSubscriber (Http1ResponseBodySubscriber <T > subscriber ) {
255+ Throwable failed = null ;
256+ synchronized (lock ) {
257+ failed = this .failed ;
258+ if (failed == null ) {
259+ this .responseSubscriber = subscriber ;
260+ }
261+ }
262+ if (failed != null ) {
263+ subscriber .onError (failed );
264+ } else {
265+ client .registerSubscriber (subscriber );
266+ }
267+ }
268+
269+ void responseSubscriberCompleted (HttpBodySubscriberWrapper <T > subscriber ) {
270+ client .subscriberCompleted (subscriber );
271+ }
272+
220273 @ Override
221274 CompletableFuture <ExchangeImpl <T >> sendHeadersAsync () {
222275 // create the response before sending the request headers, so that
@@ -321,12 +374,12 @@ CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
321374 if (debug .on ()) debug .log ("bodySubscriber is %s" ,
322375 bodySubscriber == null ? null : bodySubscriber .getClass ());
323376 if (bodySubscriber == null ) {
324- bodySubscriber = Http1BodySubscriber .completeSubscriber (debug );
325- appendToOutgoing (Http1BodySubscriber .COMPLETED );
377+ bodySubscriber = Http1RequestBodySubscriber .completeSubscriber (debug );
378+ appendToOutgoing (Http1RequestBodySubscriber .COMPLETED );
326379 } else {
327380 // start
328381 bodySubscriber .whenSubscribed
329- .thenAccept (( s ) -> cancelIfFailed ( s ) )
382+ .thenAccept (this :: cancelIfFailed )
330383 .thenAccept ((s ) -> requestMoreBody ());
331384 }
332385 } catch (Throwable t ) {
@@ -370,15 +423,24 @@ CompletableFuture<T> readBodyAsync(BodyHandler<T> handler,
370423 boolean returnConnectionToPool ,
371424 Executor executor )
372425 {
373- BodySubscriber < T > bs = handler . apply ( new ResponseInfoImpl (response .responseCode (),
374- response .responseHeaders (),
375- HTTP_1_1 ) );
426+ var responseInfo = new ResponseInfoImpl (response .responseCode (),
427+ response .responseHeaders (), HTTP_1_1 );
428+ BodySubscriber < T > bs = createResponseSubscriber ( handler , responseInfo );
376429 CompletableFuture <T > bodyCF = response .readBody (bs ,
377430 returnConnectionToPool ,
378431 executor );
379432 return bodyCF ;
380433 }
381434
435+ @ Override
436+ Http1ResponseBodySubscriber <T > createResponseSubscriber (BodyHandler <T > handler , ResponseInfo response ) {
437+ BodySubscriber <T > subscriber = handler .apply (response );
438+ Http1ResponseBodySubscriber <T > bs =
439+ new Http1ResponseBodySubscriber <T >(subscriber , this );
440+ registerResponseSubscriber (bs );
441+ return bs ;
442+ }
443+
382444 @ Override
383445 CompletableFuture <Void > ignoreBody () {
384446 return response .ignoreBody (executor );
@@ -439,8 +501,10 @@ void onProtocolError(final IOException cause) {
439501 private void cancelImpl (Throwable cause ) {
440502 LinkedList <CompletableFuture <?>> toComplete = null ;
441503 int count = 0 ;
442- Throwable error ;
504+ Throwable error = null ;
505+ BodySubscriber <?> subscriber ;
443506 synchronized (lock ) {
507+ subscriber = responseSubscriber ;
444508 if ((error = failed ) == null ) {
445509 failed = error = cause ;
446510 }
@@ -473,6 +537,15 @@ private void cancelImpl(Throwable cause) {
473537 operations .clear ();
474538 }
475539 }
540+
541+ // complete subscriber if needed
542+ if (subscriber != null && error != null ) {
543+ var failure = error ;
544+ if (client .isSelectorThread ()) {
545+ executor .execute (() -> subscriber .onError (failure ));
546+ } else subscriber .onError (failure );
547+ }
548+
476549 try {
477550 Log .logError ("Http1Exchange.cancel: count=" + count );
478551 if (toComplete != null ) {
@@ -607,7 +680,7 @@ private DataPair getOutgoing() {
607680 headersSentCF .completeAsync (() -> this , exec );
608681 break ;
609682 case BODY :
610- if (dp .data == Http1BodySubscriber .COMPLETED ) {
683+ if (dp .data == Http1RequestBodySubscriber .COMPLETED ) {
611684 synchronized (lock ) {
612685 state = State .COMPLETING ;
613686 }
@@ -718,7 +791,7 @@ public void run() {
718791 writeScheduler .stop ();
719792 } else {
720793 List <ByteBuffer > data = dp .data ;
721- if (data == Http1BodySubscriber .COMPLETED ) {
794+ if (data == Http1RequestBodySubscriber .COMPLETED ) {
722795 synchronized (lock ) {
723796 assert state == State .COMPLETING : "Unexpected state:" + state ;
724797 state = State .COMPLETED ;
@@ -763,7 +836,8 @@ public void cancel() {
763836 }
764837 }
765838
766- HttpClient client () {
839+ @ Override
840+ final HttpClientImpl client () {
767841 return client ;
768842 }
769843
0 commit comments