Skip to content

Commit 68735dc

Browse files
authored
Fixes #12646 - CompleteListener may be invoked twice. (#12647)
Fixed by capturing the HttpChannel before other code could disassociate it from the HttpExchange. Signed-off-by: Simone Bordet <[email protected]>
1 parent cd4a09b commit 68735dc

File tree

2 files changed

+128
-78
lines changed

2 files changed

+128
-78
lines changed

jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpExchange.java

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public HttpRequest getRequest()
6969

7070
public Throwable getRequestFailure()
7171
{
72-
try (AutoLock l = lock.lock())
72+
try (AutoLock ignored = lock.lock())
7373
{
7474
return requestFailure;
7575
}
@@ -87,7 +87,7 @@ public HttpResponse getResponse()
8787

8888
public Throwable getResponseFailure()
8989
{
90-
try (AutoLock l = lock.lock())
90+
try (AutoLock ignored = lock.lock())
9191
{
9292
return responseFailure;
9393
}
@@ -110,7 +110,7 @@ boolean associate(HttpChannel channel)
110110
{
111111
boolean result = false;
112112
boolean abort = false;
113-
try (AutoLock l = lock.lock())
113+
try (AutoLock ignored = lock.lock())
114114
{
115115
// Only associate if the exchange state is initial,
116116
// as the exchange could be already failed.
@@ -134,7 +134,7 @@ boolean associate(HttpChannel channel)
134134
void disassociate(HttpChannel channel)
135135
{
136136
boolean abort = false;
137-
try (AutoLock l = lock.lock())
137+
try (AutoLock ignored = lock.lock())
138138
{
139139
if (_channel != channel || requestState != State.TERMINATED || responseState != State.TERMINATED)
140140
abort = true;
@@ -147,22 +147,23 @@ void disassociate(HttpChannel channel)
147147

148148
private HttpChannel getHttpChannel()
149149
{
150-
try (AutoLock l = lock.lock())
150+
try (AutoLock ignored = lock.lock())
151151
{
152152
return _channel;
153153
}
154154
}
155155

156156
public boolean requestComplete(Throwable failure)
157157
{
158-
try (AutoLock l = lock.lock())
158+
try (AutoLock ignored = lock.lock())
159159
{
160-
return completeRequest(failure);
160+
return lockedCompleteRequest(failure);
161161
}
162162
}
163163

164-
private boolean completeRequest(Throwable failure)
164+
private boolean lockedCompleteRequest(Throwable failure)
165165
{
166+
assert lock.isHeldByCurrentThread();
166167
if (requestState == State.PENDING)
167168
{
168169
requestState = State.COMPLETED;
@@ -174,22 +175,23 @@ private boolean completeRequest(Throwable failure)
174175

175176
public boolean isResponseComplete()
176177
{
177-
try (AutoLock l = lock.lock())
178+
try (AutoLock ignored = lock.lock())
178179
{
179180
return responseState == State.COMPLETED;
180181
}
181182
}
182183

183184
public boolean responseComplete(Throwable failure)
184185
{
185-
try (AutoLock l = lock.lock())
186+
try (AutoLock ignored = lock.lock())
186187
{
187-
return completeResponse(failure);
188+
return lockedCompleteResponse(failure);
188189
}
189190
}
190191

191-
private boolean completeResponse(Throwable failure)
192+
private boolean lockedCompleteResponse(Throwable failure)
192193
{
194+
assert lock.isHeldByCurrentThread();
193195
if (responseState == State.PENDING)
194196
{
195197
responseState = State.COMPLETED;
@@ -202,7 +204,7 @@ private boolean completeResponse(Throwable failure)
202204
public Result terminateRequest()
203205
{
204206
Result result = null;
205-
try (AutoLock l = lock.lock())
207+
try (AutoLock ignored = lock.lock())
206208
{
207209
if (requestState == State.COMPLETED)
208210
requestState = State.TERMINATED;
@@ -219,7 +221,7 @@ public Result terminateRequest()
219221
public Result terminateResponse()
220222
{
221223
Result result = null;
222-
try (AutoLock l = lock.lock())
224+
try (AutoLock ignored = lock.lock())
223225
{
224226
if (responseState == State.COMPLETED)
225227
responseState = State.TERMINATED;
@@ -235,7 +237,7 @@ public Result terminateResponse()
235237

236238
boolean isResponseCompleteOrTerminated()
237239
{
238-
try (AutoLock l = lock.lock())
240+
try (AutoLock ignored = lock.lock())
239241
{
240242
return responseState == State.COMPLETED || responseState == State.TERMINATED;
241243
}
@@ -245,12 +247,14 @@ public void abort(Throwable failure, Promise<Boolean> promise)
245247
{
246248
// Atomically change the state of this exchange to be completed.
247249
// This will avoid that this exchange can be associated to a channel.
250+
HttpChannel channel;
248251
boolean abortRequest;
249252
boolean abortResponse;
250-
try (AutoLock l = lock.lock())
253+
try (AutoLock ignored = lock.lock())
251254
{
252-
abortRequest = completeRequest(failure);
253-
abortResponse = completeResponse(failure);
255+
channel = _channel;
256+
abortRequest = lockedCompleteRequest(failure);
257+
abortResponse = lockedCompleteResponse(failure);
254258
}
255259

256260
if (!abortRequest && !abortResponse)
@@ -268,7 +272,12 @@ public void abort(Throwable failure, Promise<Boolean> promise)
268272
// request content, notify them of the failure.
269273
Request.Content body = request.getBody();
270274
if (abortRequest && body != null)
275+
{
276+
// This may eventually complete the request,
277+
// and if the response is already completed
278+
// also invoke the Response.CompleteListeners.
271279
body.fail(failure);
280+
}
272281

273282
// Case #1: exchange was in the destination queue.
274283
if (destination.remove(this))
@@ -280,8 +289,7 @@ public void abort(Throwable failure, Promise<Boolean> promise)
280289
return;
281290
}
282291

283-
HttpChannel channel = getHttpChannel();
284-
if (channel == null)
292+
if (channel == null && abortRequest)
285293
{
286294
// Case #2: exchange was not yet associated.
287295
// Because this exchange is failed, when associate() is called
@@ -309,7 +317,7 @@ private void notifyFailureComplete(Throwable failure)
309317

310318
public void resetResponse()
311319
{
312-
try (AutoLock l = lock.lock())
320+
try (AutoLock ignored = lock.lock())
313321
{
314322
responseState = State.PENDING;
315323
responseFailure = null;
@@ -327,7 +335,7 @@ public void proceed(Runnable proceedAction, Throwable failure)
327335
@Override
328336
public String toString()
329337
{
330-
try (AutoLock l = lock.lock())
338+
try (AutoLock ignored = lock.lock())
331339
{
332340
return String.format("%s@%x{req=%s[%s/%s] res=%s[%s/%s]}",
333341
HttpExchange.class.getSimpleName(),

jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientFailureTest.java

Lines changed: 98 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,29 @@
1919
import java.util.concurrent.CountDownLatch;
2020
import java.util.concurrent.ExecutionException;
2121
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicInteger;
2223
import java.util.concurrent.atomic.AtomicReference;
2324

2425
import org.eclipse.jetty.client.transport.HttpClientTransportOverHTTP;
2526
import org.eclipse.jetty.client.transport.HttpDestination;
2627
import org.eclipse.jetty.client.transport.internal.HttpConnectionOverHTTP;
28+
import org.eclipse.jetty.http.HttpMethod;
29+
import org.eclipse.jetty.http.HttpStatus;
2730
import org.eclipse.jetty.io.EndPoint;
2831
import org.eclipse.jetty.server.Handler;
2932
import org.eclipse.jetty.server.Server;
3033
import org.eclipse.jetty.server.ServerConnector;
3134
import org.eclipse.jetty.util.Callback;
35+
import org.eclipse.jetty.util.component.LifeCycle;
3236
import org.eclipse.jetty.util.thread.QueuedThreadPool;
3337
import org.junit.jupiter.api.AfterEach;
3438
import org.junit.jupiter.api.Test;
3539

40+
import static org.awaitility.Awaitility.await;
41+
import static org.hamcrest.MatcherAssert.assertThat;
42+
import static org.hamcrest.Matchers.is;
43+
import static org.hamcrest.Matchers.notNullValue;
44+
import static org.hamcrest.Matchers.nullValue;
3645
import static org.junit.jupiter.api.Assertions.assertEquals;
3746
import static org.junit.jupiter.api.Assertions.assertThrows;
3847
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -41,7 +50,6 @@ public class HttpClientFailureTest
4150
{
4251
private Server server;
4352
private ServerConnector connector;
44-
private HttpClient client;
4553

4654
private void startServer(Handler handler) throws Exception
4755
{
@@ -57,30 +65,29 @@ private void startServer(Handler handler) throws Exception
5765
@AfterEach
5866
public void dispose() throws Exception
5967
{
60-
if (server != null)
61-
server.stop();
62-
if (client != null)
63-
client.stop();
68+
LifeCycle.stop(server);
6469
}
6570

6671
@Test
6772
public void testFailureBeforeRequestCommit() throws Exception
6873
{
6974
startServer(new EmptyServerHandler());
7075

71-
client = new HttpClient(new HttpClientTransportOverHTTP(1));
72-
client.start();
73-
74-
Request request = client.newRequest("localhost", connector.getLocalPort())
75-
.onRequestHeaders(r -> r.getConnection().close())
76-
.timeout(5, TimeUnit.SECONDS);
77-
assertThrows(ExecutionException.class, request::send);
78-
79-
HttpDestination destination = (HttpDestination)client.resolveDestination(request);
80-
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
81-
assertEquals(0, connectionPool.getConnectionCount());
82-
assertEquals(0, connectionPool.getActiveConnections().size());
83-
assertEquals(0, connectionPool.getIdleConnections().size());
76+
try (HttpClient client = new HttpClient(new HttpClientTransportOverHTTP(1)))
77+
{
78+
client.start();
79+
80+
Request request = client.newRequest("localhost", connector.getLocalPort())
81+
.onRequestHeaders(r -> r.getConnection().close())
82+
.timeout(5, TimeUnit.SECONDS);
83+
assertThrows(ExecutionException.class, request::send);
84+
85+
HttpDestination destination = (HttpDestination)client.resolveDestination(request);
86+
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
87+
assertEquals(0, connectionPool.getConnectionCount());
88+
assertEquals(0, connectionPool.getActiveConnections().size());
89+
assertEquals(0, connectionPool.getIdleConnections().size());
90+
}
8491
}
8592

8693
@Test
@@ -89,7 +96,7 @@ public void testFailureAfterRequestCommit() throws Exception
8996
startServer(new EmptyServerHandler());
9097

9198
AtomicReference<HttpConnectionOverHTTP> connectionRef = new AtomicReference<>();
92-
client = new HttpClient(new HttpClientTransportOverHTTP(1)
99+
try (HttpClient client = new HttpClient(new HttpClientTransportOverHTTP(1)
93100
{
94101
@Override
95102
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
@@ -98,48 +105,83 @@ public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<Stri
98105
connectionRef.set(connection);
99106
return connection;
100107
}
101-
});
102-
client.start();
103-
104-
CountDownLatch commitLatch = new CountDownLatch(1);
105-
CountDownLatch completeLatch = new CountDownLatch(1);
106-
AsyncRequestContent content = new AsyncRequestContent();
107-
client.newRequest("localhost", connector.getLocalPort())
108-
.onRequestCommit(request ->
109-
{
110-
connectionRef.get().getEndPoint().close();
111-
commitLatch.countDown();
112-
})
113-
.body(content)
114-
.idleTimeout(2, TimeUnit.SECONDS)
115-
.send(result ->
108+
}))
109+
{
110+
client.start();
111+
112+
CountDownLatch commitLatch = new CountDownLatch(1);
113+
CountDownLatch completeLatch = new CountDownLatch(1);
114+
AsyncRequestContent content = new AsyncRequestContent();
115+
client.newRequest("localhost", connector.getLocalPort())
116+
.onRequestCommit(request ->
117+
{
118+
connectionRef.get().getEndPoint().close();
119+
commitLatch.countDown();
120+
})
121+
.body(content)
122+
.idleTimeout(2, TimeUnit.SECONDS)
123+
.send(result ->
124+
{
125+
if (result.isFailed())
126+
completeLatch.countDown();
127+
});
128+
129+
assertTrue(commitLatch.await(5, TimeUnit.SECONDS));
130+
131+
// The first chunk will be read but its write will fail.
132+
content.write(ByteBuffer.allocate(1024), Callback.NOOP);
133+
134+
// The second chunk is failed because the content is failed.
135+
CountDownLatch contentLatch = new CountDownLatch(1);
136+
content.write(ByteBuffer.allocate(1024), new Callback()
116137
{
117-
if (result.isFailed())
118-
completeLatch.countDown();
138+
@Override
139+
public void failed(Throwable x)
140+
{
141+
contentLatch.countDown();
142+
}
119143
});
120144

121-
assertTrue(commitLatch.await(5, TimeUnit.SECONDS));
122-
123-
// The first chunk will be read but its write will fail.
124-
content.write(ByteBuffer.allocate(1024), Callback.NOOP);
145+
assertTrue(contentLatch.await(5, TimeUnit.SECONDS));
146+
assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
125147

126-
// The second chunk is failed because the content is failed.
127-
CountDownLatch contentLatch = new CountDownLatch(1);
128-
content.write(ByteBuffer.allocate(1024), new Callback()
129-
{
130-
@Override
131-
public void failed(Throwable x)
132-
{
133-
contentLatch.countDown();
134-
}
135-
});
148+
DuplexConnectionPool connectionPool = (DuplexConnectionPool)connectionRef.get().getHttpDestination().getConnectionPool();
149+
assertEquals(0, connectionPool.getConnectionCount());
150+
assertEquals(0, connectionPool.getActiveConnections().size());
151+
assertEquals(0, connectionPool.getIdleConnections().size());
152+
}
153+
}
136154

137-
assertTrue(contentLatch.await(5, TimeUnit.SECONDS));
138-
assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
155+
@Test
156+
public void testPendingRequestContentThenTotalTimeout() throws Exception
157+
{
158+
startServer(new EmptyServerHandler());
139159

140-
DuplexConnectionPool connectionPool = (DuplexConnectionPool)connectionRef.get().getHttpDestination().getConnectionPool();
141-
assertEquals(0, connectionPool.getConnectionCount());
142-
assertEquals(0, connectionPool.getActiveConnections().size());
143-
assertEquals(0, connectionPool.getIdleConnections().size());
160+
try (HttpClient client = new HttpClient(new HttpClientTransportOverHTTP(1)))
161+
{
162+
client.start();
163+
164+
long timeout = 1000;
165+
AsyncRequestContent content = new AsyncRequestContent();
166+
AtomicInteger completed = new AtomicInteger();
167+
CountDownLatch resultLatch = new CountDownLatch(1);
168+
client.newRequest("localhost", connector.getLocalPort())
169+
.method(HttpMethod.POST)
170+
.body(content)
171+
.timeout(timeout, TimeUnit.MILLISECONDS)
172+
.send(result ->
173+
{
174+
// This is invoked only when the total timeout elapses.
175+
completed.incrementAndGet();
176+
assertThat(result.getRequestFailure(), notNullValue());
177+
assertThat(result.getResponseFailure(), nullValue());
178+
assertThat(result.getResponse().getStatus(), is(HttpStatus.OK_200));
179+
resultLatch.countDown();
180+
});
181+
182+
assertTrue(resultLatch.await(2 * timeout, TimeUnit.MILLISECONDS));
183+
// Verify that the CompleteListener is invoked only once.
184+
await().during(1, TimeUnit.SECONDS).atMost(5, TimeUnit.SECONDS).until(completed::get, is(1));
185+
}
144186
}
145187
}

0 commit comments

Comments
 (0)