Skip to content

Commit fdfc08d

Browse files
committed
Backport 04df8b74379c9de7b20931fea1642f82569d3a2d
1 parent 89770f2 commit fdfc08d

File tree

7 files changed

+135
-28
lines changed

7 files changed

+135
-28
lines changed

src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,13 +148,26 @@ final boolean isSubscribed() {
148148
}
149149

150150
final void setSubscription(Flow.Subscription subscription) {
151-
this.subscription = subscription;
152-
whenSubscribed.complete(subscription);
151+
Flow.Subscription sub;
152+
synchronized (this) {
153+
if ((sub = this.subscription) == null) {
154+
this.subscription = sub = subscription;
155+
}
156+
}
157+
if (sub == subscription) {
158+
whenSubscribed.complete(subscription);
159+
} else subscription.cancel();
153160
}
154161

155162
final void cancelSubscription() {
156163
try {
157-
subscription.cancel();
164+
Flow.Subscription sub;
165+
synchronized (this) {
166+
if ((sub = this.subscription) == null) {
167+
this.subscription = sub = HttpBodySubscriberWrapper.NOP;
168+
}
169+
}
170+
sub.cancel();
158171
} catch(Throwable t) {
159172
String msg = "Ignoring exception raised when canceling BodyPublisher subscription";
160173
if (debug.on()) debug.log("%s: %s", msg, t);
@@ -779,9 +792,10 @@ public void run() {
779792
return;
780793
}
781794

782-
if (debug.on()) debug.log(() -> "hasOutgoing = " + hasOutgoing());
795+
if (debug.on()) debug.log(() -> "hasOutgoing = " + hasOutgoing() + ", demand = " + demand.get());
783796
while (hasOutgoing() && demand.tryDecrement()) {
784797
DataPair dp = getOutgoing();
798+
if (debug.on()) debug.log("outgoing: " + dp);
785799
if (dp == null)
786800
break;
787801

@@ -803,7 +817,10 @@ public void run() {
803817
// The next Subscriber will eventually take over.
804818

805819
} else {
806-
if (checkRequestCancelled()) return;
820+
if (checkRequestCancelled()) {
821+
if (debug.on()) debug.log("Request cancelled!");
822+
return;
823+
}
807824
if (debug.on())
808825
debug.log("onNext with " + Utils.remaining(data) + " bytes");
809826
subscriber.onNext(data);

src/java.net.http/share/classes/jdk/internal/net/http/Http1Request.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ public void onSubscribe(Flow.Subscription subscription) {
338338
if (isSubscribed()) {
339339
Throwable t = new IllegalStateException("already subscribed");
340340
http1Exchange.appendToOutgoing(t);
341+
subscription.cancel();
341342
} else {
342343
setSubscription(subscription);
343344
}
@@ -402,6 +403,7 @@ public void onSubscribe(Flow.Subscription subscription) {
402403
if (isSubscribed()) {
403404
Throwable t = new IllegalStateException("already subscribed");
404405
http1Exchange.appendToOutgoing(t);
406+
subscription.cancel();
405407
} else {
406408
setSubscription(subscription);
407409
}

src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -704,11 +704,13 @@ final int maxConcurrentServerInitiatedStreams() {
704704

705705
void close() {
706706
Log.logTrace("Closing HTTP/2 connection: to {0}", connection.address());
707-
GoAwayFrame f = new GoAwayFrame(0,
708-
ErrorFrame.NO_ERROR,
709-
"Requested by user".getBytes(UTF_8));
710-
// TODO: set last stream. For now zero ok.
711-
sendFrame(f);
707+
if (connection.channel().isOpen()) {
708+
GoAwayFrame f = new GoAwayFrame(0,
709+
ErrorFrame.NO_ERROR,
710+
"Requested by user".getBytes(UTF_8));
711+
// TODO: set last stream. For now zero ok.
712+
sendFrame(f);
713+
}
712714
}
713715

714716
long count;

src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,18 +1032,21 @@ void eventUpdated(AsyncEvent e) throws ClosedChannelException {
10321032

10331033
// This returns immediately. So caller not allowed to send/receive
10341034
// on connection.
1035-
synchronized void register(AsyncEvent e) {
1036-
if (closed) e.abort(selectorClosedException());
1037-
registrations.add(e);
1038-
selector.wakeup();
1039-
}
1040-
1041-
synchronized void cancel(SocketChannel e) {
1042-
SelectionKey key = e.keyFor(selector);
1043-
if (key != null) {
1044-
key.cancel();
1035+
void register(AsyncEvent e) {
1036+
var closed = this.closed;
1037+
if (!closed) {
1038+
synchronized (this) {
1039+
closed = this.closed;
1040+
if (!closed) {
1041+
registrations.add(e);
1042+
}
1043+
}
1044+
}
1045+
if (closed) {
1046+
e.abort(selectorClosedException());
1047+
} else {
1048+
selector.wakeup();
10451049
}
1046-
selector.wakeup();
10471050
}
10481051

10491052
void wakeupSelector() {
@@ -1092,12 +1095,15 @@ void abort(Throwable t) {
10921095
if (!inSelectorThread) selector.wakeup();
10931096
}
10941097

1095-
synchronized void shutdown() {
1096-
Log.logTrace("{0}: shutting down", getName());
1097-
if (debug.on()) debug.log("SelectorManager shutting down");
1098-
closed = true;
1098+
// Only called by the selector manager thread
1099+
private void shutdown() {
10991100
try {
1100-
selector.close();
1101+
synchronized (this) {
1102+
Log.logTrace("{0}: shutting down", getName());
1103+
if (debug.on()) debug.log("SelectorManager shutting down");
1104+
closed = true;
1105+
selector.close();
1106+
}
11011107
} catch (IOException ignored) {
11021108
} finally {
11031109
owner.stop();

src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -795,6 +795,7 @@ private void processData() {
795795
+ hsTriggered() + ", needWrap:" + needWrap());
796796

797797
while (Utils.synchronizedRemaining(writeList) > 0 || hsTriggered() || needWrap()) {
798+
if (scheduler.isStopped()) return;
798799
ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
799800
EngineResult result = wrapBuffers(outbufs);
800801
if (debugw.on())

src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2017, 2022, Oracle and/or its affiliates. All rights reserved.
33
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
44
*
55
* This code is free software; you can redistribute it and/or modify it
@@ -344,6 +344,7 @@ final boolean hasNoOutputData() {
344344
}
345345

346346
void upstreamWindowUpdate() {
347+
if (pushScheduler.isStopped()) return;
347348
long downstreamQueueSize = outputQ.size();
348349
long upstreamWindowSize = upstreamWindow.get();
349350
long n = upstreamWindowUpdate(upstreamWindowSize, downstreamQueueSize);
@@ -379,6 +380,7 @@ public void onNext(List<ByteBuffer> item) {
379380
}
380381

381382
private void upstreamRequest(long n) {
383+
if (pushScheduler.isStopped()) return;
382384
if (debug.on()) debug.log("requesting %d", n);
383385
upstreamWindow.getAndAdd(n);
384386
upstreamSubscription.request(n);

test/jdk/java/net/httpclient/ReferenceTracker.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525
import jdk.internal.net.http.common.OperationTrackers.Tracker;
2626

2727
import java.io.PrintStream;
28+
import java.lang.management.LockInfo;
2829
import java.lang.management.ManagementFactory;
30+
import java.lang.management.MonitorInfo;
31+
import java.lang.management.ThreadInfo;
2932
import java.net.http.HttpClient;
3033
import java.util.Arrays;
3134
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -89,10 +92,84 @@ public AssertionError check(long graceDelayMs) {
8992
"outstanding operations", true);
9093
}
9194

95+
// This method is copied from ThreadInfo::toString, but removes the
96+
// limit on the stack trace depth (8 frames max) that ThreadInfo::toString
97+
// forcefully implement. We want to print all frames for better diagnosis.
98+
private static String toString(ThreadInfo info) {
99+
StringBuilder sb = new StringBuilder("\"" + info.getThreadName() + "\"" +
100+
(info.isDaemon() ? " daemon" : "") +
101+
" prio=" + info.getPriority() +
102+
" Id=" + info.getThreadId() + " " +
103+
info.getThreadState());
104+
if (info.getLockName() != null) {
105+
sb.append(" on " + info.getLockName());
106+
}
107+
if (info.getLockOwnerName() != null) {
108+
sb.append(" owned by \"" + info.getLockOwnerName() +
109+
"\" Id=" + info.getLockOwnerId());
110+
}
111+
if (info.isSuspended()) {
112+
sb.append(" (suspended)");
113+
}
114+
if (info.isInNative()) {
115+
sb.append(" (in native)");
116+
}
117+
sb.append('\n');
118+
int i = 0;
119+
var stackTrace = info.getStackTrace();
120+
for (; i < stackTrace.length ; i++) {
121+
StackTraceElement ste = stackTrace[i];
122+
sb.append("\tat " + ste.toString());
123+
sb.append('\n');
124+
if (i == 0 && info.getLockInfo() != null) {
125+
Thread.State ts = info.getThreadState();
126+
switch (ts) {
127+
case BLOCKED:
128+
sb.append("\t- blocked on " + info.getLockInfo());
129+
sb.append('\n');
130+
break;
131+
case WAITING:
132+
sb.append("\t- waiting on " + info.getLockInfo());
133+
sb.append('\n');
134+
break;
135+
case TIMED_WAITING:
136+
sb.append("\t- waiting on " + info.getLockInfo());
137+
sb.append('\n');
138+
break;
139+
default:
140+
}
141+
}
142+
143+
for (MonitorInfo mi : info.getLockedMonitors()) {
144+
if (mi.getLockedStackDepth() == i) {
145+
sb.append("\t- locked " + mi);
146+
sb.append('\n');
147+
}
148+
}
149+
}
150+
if (i < stackTrace.length) {
151+
sb.append("\t...");
152+
sb.append('\n');
153+
}
154+
155+
LockInfo[] locks = info.getLockedSynchronizers();
156+
if (locks.length > 0) {
157+
sb.append("\n\tNumber of locked synchronizers = " + locks.length);
158+
sb.append('\n');
159+
for (LockInfo li : locks) {
160+
sb.append("\t- " + li);
161+
sb.append('\n');
162+
}
163+
}
164+
sb.append('\n');
165+
return sb.toString();
166+
}
167+
92168
private void printThreads(String why, PrintStream out) {
93169
out.println(why);
94170
Arrays.stream(ManagementFactory.getThreadMXBean()
95171
.dumpAllThreads(true, true))
172+
.map(ReferenceTracker::toString)
96173
.forEach(out::println);
97174
}
98175

0 commit comments

Comments
 (0)