Skip to content

Commit 0a268c0

Browse files
committed
Resolves
1 parent 8c7152d commit 0a268c0

File tree

5 files changed

+24
-119
lines changed

5 files changed

+24
-119
lines changed

src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,12 @@ public String toString() {
120120
* concrete implementations: {@link Http1Request.StreamSubscriber}, and
121121
* {@link Http1Request.FixedContentSubscriber}, for receiving chunked and
122122
* fixed length bodies, respectively. */
123-
static abstract class Http1BodySubscriber implements Flow.Subscriber<ByteBuffer> {
123+
abstract static class Http1RequestBodySubscriber implements Flow.Subscriber<ByteBuffer> {
124124
final MinimalFuture<Flow.Subscription> whenSubscribed = new MinimalFuture<>();
125125
private volatile Flow.Subscription subscription;
126126
volatile boolean complete;
127127
private final Logger debug;
128-
Http1BodySubscriber(Logger debug) {
128+
Http1RequestBodySubscriber(Logger debug) {
129129
assert debug != null;
130130
this.debug = debug;
131131
}

src/java.net.http/share/classes/jdk/internal/net/http/Http1Request.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it

src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java

Lines changed: 1 addition & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -275,119 +275,6 @@ public void nullBody(HttpResponse<T> resp, Throwable t) {
275275
}
276276
}
277277

278-
static final Flow.Subscription NOP = new Flow.Subscription() {
279-
@Override
280-
public void request(long n) { }
281-
public void cancel() { }
282-
};
283-
284-
/**
285-
* The Http1AsyncReceiver ensures that all calls to
286-
* the subscriber, including onSubscribe, occur sequentially.
287-
* There could however be some race conditions that could happen
288-
* in case of unexpected errors thrown at unexpected places, which
289-
* may cause onError to be called multiple times.
290-
* The Http1BodySubscriber will ensure that the user subscriber
291-
* is actually completed only once - and only after it is
292-
* subscribed.
293-
* @param <U> The type of response.
294-
*/
295-
final static class Http1BodySubscriber<U> implements TrustedSubscriber<U> {
296-
final HttpResponse.BodySubscriber<U> userSubscriber;
297-
final AtomicBoolean completed = new AtomicBoolean();
298-
volatile Throwable withError;
299-
volatile boolean subscribed;
300-
Http1BodySubscriber(HttpResponse.BodySubscriber<U> userSubscriber) {
301-
this.userSubscriber = userSubscriber;
302-
}
303-
304-
@Override
305-
public boolean needsExecutor() {
306-
return TrustedSubscriber.needsExecutor(userSubscriber);
307-
}
308-
309-
// propagate the error to the user subscriber, even if not
310-
// subscribed yet.
311-
private void propagateError(Throwable t) {
312-
assert t != null;
313-
try {
314-
// if unsubscribed at this point, it will not
315-
// get subscribed later - so do it now and
316-
// propagate the error
317-
if (subscribed == false) {
318-
subscribed = true;
319-
userSubscriber.onSubscribe(NOP);
320-
}
321-
} finally {
322-
// if onError throws then there is nothing to do
323-
// here: let the caller deal with it by logging
324-
// and closing the connection.
325-
userSubscriber.onError(t);
326-
}
327-
}
328-
329-
// complete the subscriber, either normally or exceptionally
330-
// ensure that the subscriber is completed only once.
331-
private void complete(Throwable t) {
332-
if (completed.compareAndSet(false, true)) {
333-
t = withError = Utils.getCompletionCause(t);
334-
if (t == null) {
335-
assert subscribed;
336-
try {
337-
userSubscriber.onComplete();
338-
} catch (Throwable x) {
339-
// Simply propagate the error by calling
340-
// onError on the user subscriber, and let the
341-
// connection be reused since we should have received
342-
// and parsed all the bytes when we reach here.
343-
// If onError throws in turn, then we will simply
344-
// let that new exception flow up to the caller
345-
// and let it deal with it.
346-
// (i.e: log and close the connection)
347-
// Note that rethrowing here could introduce a
348-
// race that might cause the next send() operation to
349-
// fail as the connection has already been put back
350-
// into the cache when we reach here.
351-
propagateError(t = withError = Utils.getCompletionCause(x));
352-
}
353-
} else {
354-
propagateError(t);
355-
}
356-
}
357-
}
358-
359-
@Override
360-
public CompletionStage<U> getBody() {
361-
return userSubscriber.getBody();
362-
}
363-
364-
@Override
365-
public void onSubscribe(Flow.Subscription subscription) {
366-
if (!subscribed) {
367-
subscribed = true;
368-
userSubscriber.onSubscribe(subscription);
369-
} else {
370-
// could be already subscribed and completed
371-
// if an unexpected error occurred before the actual
372-
// subscription - though that's not supposed
373-
// happen.
374-
assert completed.get();
375-
}
376-
}
377-
@Override
378-
public void onNext(List<ByteBuffer> item) {
379-
assert !completed.get();
380-
userSubscriber.onNext(item);
381-
}
382-
@Override
383-
public void onError(Throwable throwable) {
384-
complete(throwable);
385-
}
386-
@Override
387-
public void onComplete() {
388-
complete(null);
389-
}
390-
}
391278

392279
public <U> CompletableFuture<U> readBody(HttpResponse.BodySubscriber<U> p,
393280
boolean return2Cache,

src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,19 @@
6565
import java.util.Set;
6666
import java.util.TreeSet;
6767
import java.util.concurrent.CompletableFuture;
68+
import java.util.concurrent.ConcurrentSkipListSet;
6869
import java.util.concurrent.CompletionException;
6970
import java.util.concurrent.ExecutionException;
7071
import java.util.concurrent.Executor;
7172
import java.util.concurrent.ExecutorService;
7273
import java.util.concurrent.Executors;
74+
import java.util.concurrent.RejectedExecutionException;
7375
import java.util.concurrent.ThreadFactory;
76+
import java.util.concurrent.atomic.AtomicBoolean;
7477
import java.util.concurrent.atomic.AtomicInteger;
7578
import java.util.concurrent.atomic.AtomicLong;
79+
import java.util.concurrent.atomic.AtomicReference;
80+
import java.util.function.BiConsumer;
7681
import java.util.function.BooleanSupplier;
7782
import java.util.stream.Stream;
7883
import java.net.http.HttpClient;
@@ -659,28 +664,41 @@ final long webSocketClose() {
659664
}
660665

661666
// Returns the pendingOperationCount.
667+
// Incremented with any operation, whether it's HTTP/1.1, HTTP/2, or WebSocket
662668
final long referenceCount() {
663669
return pendingOperationCount.get();
664670
}
665671

672+
// Trackers are used in test to verify that an instance of
673+
// HttpClient has shutdown correctly, and that all operations
674+
// have terminated.
666675
final static class HttpClientTracker implements Tracker {
676+
final AtomicLong requestCount;
667677
final AtomicLong httpCount;
668678
final AtomicLong http2Count;
669679
final AtomicLong websocketCount;
670680
final AtomicLong operationsCount;
681+
final AtomicLong connnectionsCount;
671682
final Reference<?> reference;
683+
final AtomicBoolean isAlive;
672684
final String name;
673-
HttpClientTracker(AtomicLong http,
685+
HttpClientTracker(AtomicLong request,
686+
AtomicLong http,
674687
AtomicLong http2,
675688
AtomicLong ws,
676689
AtomicLong ops,
690+
AtomicLong conns,
677691
Reference<?> ref,
692+
AtomicBoolean isAlive,
678693
String name) {
694+
this.requestCount = request;
679695
this.httpCount = http;
680696
this.http2Count = http2;
681697
this.websocketCount = ws;
682698
this.operationsCount = ops;
699+
this.connnectionsCount = conns;
683700
this.reference = ref;
701+
this.isAlive = isAlive;
684702
this.name = name;
685703
}
686704
@Override

src/java.net.http/share/classes/jdk/internal/net/http/ResponseContent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it

0 commit comments

Comments
 (0)