Skip to content
This repository was archived by the owner on May 28, 2018. It is now read-only.

Jersey-3214 #267

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -64,7 +63,7 @@
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;

import javax.ws.rs.core.Response.StatusType;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
Expand Down Expand Up @@ -100,6 +99,7 @@
import org.apache.http.config.ConnectionConfig;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ConnectionReleaseTrigger;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.conn.ManagedHttpClientConnection;
import org.apache.http.conn.routing.HttpRoute;
Expand Down Expand Up @@ -430,8 +430,8 @@ public ClientResponse apply(final ClientRequest clientRequest) throws Processing
final HttpUriRequest request = getUriHttpRequest(clientRequest);
final Map<String, String> clientHeadersSnapshot = writeOutBoundHeaders(clientRequest.getHeaders(), request);

CloseableHttpResponse response = null;
try {
final CloseableHttpResponse response;
final HttpClientContext context = HttpClientContext.create();
if (preemptiveBasicAuth) {
final AuthCache authCache = new BasicAuthCache();
Expand All @@ -442,11 +442,14 @@ public ClientResponse apply(final ClientRequest clientRequest) throws Processing
response = client.execute(getHost(request), request, context);
HeaderUtils.checkHeaderChanges(clientHeadersSnapshot, clientRequest.getHeaders(), this.getClass().getName());

final HttpEntity entity = response.getEntity();
final InputStream entityContent = entity != null ? entity.getContent() : null;

final Response.StatusType status = response.getStatusLine().getReasonPhrase() == null
? Statuses.from(response.getStatusLine().getStatusCode())
: Statuses.from(response.getStatusLine().getStatusCode(), response.getStatusLine().getReasonPhrase());

final ClientResponse responseContext = new ClientResponse(status, clientRequest);
final ClientResponse responseContext = new ApacheClientResponse(status, clientRequest, response, entityContent);
final List<URI> redirectLocations = context.getRedirectLocations();
if (redirectLocations != null && !redirectLocations.isEmpty()) {
responseContext.setResolvedRequestUri(redirectLocations.get(redirectLocations.size() - 1));
Expand All @@ -464,8 +467,6 @@ public ClientResponse apply(final ClientRequest clientRequest) throws Processing
headers.put(headerName, list);
}

final HttpEntity entity = response.getEntity();

if (entity != null) {
if (headers.get(HttpHeaders.CONTENT_LENGTH) == null) {
headers.add(HttpHeaders.CONTENT_LENGTH, String.valueOf(entity.getContentLength()));
Expand All @@ -477,15 +478,18 @@ public ClientResponse apply(final ClientRequest clientRequest) throws Processing
}
}

try {
responseContext.setEntityStream(new HttpClientResponseInputStream(getInputStream(response)));
} catch (final IOException e) {
LOGGER.log(Level.SEVERE, null, e);
}
responseContext.setEntityStream(bufferedStream(entityContent));

// prevent response-close on correct return
response = null;

return responseContext;
} catch (final Exception e) {
throw new ProcessingException(e);
} finally {
if (response != null) {
ReaderWriter.safelyClose(response);
}
}
}

Expand Down Expand Up @@ -617,40 +621,60 @@ private static Map<String, String> writeOutBoundHeaders(final MultivaluedMap<Str
return stringHeaders;
}

private static final class HttpClientResponseInputStream extends FilterInputStream {
/**
* Overrides Response-close() to release the connection without consuming it.
*
* From <a href="http://hc.apache.org/httpcomponents-client-4.5.x/tutorial/html/fundamentals.html#d5e145">Apache HttpClient
* documentation</a>:
*
* <q>The difference between closing the content stream and closing the response is that the former will attempt to keep
* the underlying connection alive by consuming the entity content while the latter immediately shuts down and discards
* the connection.</q>
*
* JAX-RS spec is silent whether closing the content stream consumes the response or closes the connection. This
* ApacheConnector follows apache-behaviour.
*/
private static final class ApacheClientResponse extends ClientResponse {

private final CloseableHttpResponse httpResponse;

HttpClientResponseInputStream(final InputStream inputStream) throws IOException {
super(inputStream);
private final InputStream entityContent;

public ApacheClientResponse(StatusType status, ClientRequest requestContext, CloseableHttpResponse httpResponse,
InputStream entityContent) {
super(status, requestContext);
this.httpResponse = httpResponse;
this.entityContent = entityContent;
}

@Override
public void close() throws IOException {
super.close();
public void close() {
try {
if (entityContent instanceof ConnectionReleaseTrigger) {
// necessary to prevent an exception during stream-close in apache httpclient 4.5.1+
((ConnectionReleaseTrigger) entityContent).abortConnection();
}
httpResponse.close();
} catch (IOException e) {
// Cannot happen according to ConnectionHolder#releaseConnection
throw new ProcessingException(e);
} finally {
super.close();
}
}
}

private static InputStream getInputStream(final CloseableHttpResponse response) throws IOException {
private static InputStream bufferedStream(final InputStream entityContent) {

final InputStream inputStream;

if (response.getEntity() == null) {
if (entityContent == null) {
inputStream = new ByteArrayInputStream(new byte[0]);
} else {
final InputStream i = response.getEntity().getContent();
if (i.markSupported()) {
inputStream = i;
} else {
inputStream = new BufferedInputStream(i, ReaderWriter.BUFFER_SIZE);
}
inputStream = new BufferedInputStream(entityContent, ReaderWriter.BUFFER_SIZE);
}

return new FilterInputStream(inputStream) {
@Override
public void close() throws IOException {
response.close();
super.close();
}
};
return inputStream;
}

private static class ConnectionFactory extends ManagedHttpClientConnectionFactory {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package org.glassfish.jersey.apache.connector;

import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.test.JerseyTest;
import org.junit.Test;

public class ClosingTest extends JerseyTest {

private PoolingHttpClientConnectionManager connectionManager;

@Override
protected Application configure() {
ResourceConfig config = new ResourceConfig();
config.register(TestResource.class);
return config;
}

@Override
protected void configureClient(ClientConfig config) {
this.connectionManager = new PoolingHttpClientConnectionManager(60, TimeUnit.SECONDS);
config.property(ApacheClientProperties.CONNECTION_MANAGER, connectionManager);
config.connectorProvider(new ApacheConnectorProvider());
}

private int getLeasedConnections() {
return connectionManager.getTotalStats().getLeased();
}

private int getAvailableConnections() {
return connectionManager.getTotalStats().getAvailable();
}

private int getOpenConnections() {
return getLeasedConnections() + getAvailableConnections();
}

@Test
public void testClosingUnconsumedResponseAbortsConnection() throws Exception {
assertEquals(0, getOpenConnections());

Response response = target().path("productInfo")
.request(MediaType.TEXT_PLAIN_TYPE)
.get();
assertEquals(200, response.getStatus());

assertEquals(1, getLeasedConnections());
InputStream entityStream = response.readEntity(InputStream.class);

// should close the connection without consuming it. must not throw here
response.close();
assertEquals(0, getOpenConnections());

// must not throw here
entityStream.close();
assertEquals(0, getOpenConnections());
}

@Test
public void testClosingUnconsumedStreamConsumesConnection() throws Exception {
assertEquals(0, getOpenConnections());

Response response = target().path("productInfo")
.request(MediaType.TEXT_PLAIN_TYPE)
.get();
assertEquals(200, response.getStatus());

InputStream entityStream = response.readEntity(InputStream.class);

// should consume the stream. must not throw here
entityStream.close();
// connection should be kept alive after consume
assertEquals(0, getLeasedConnections());
assertEquals(1, getAvailableConnections());

// must not throw here
response.close();
assertEquals(0, getLeasedConnections());
assertEquals(1, getAvailableConnections());
}

@Test
public void testClosingConsumedStream() throws Exception {
assertEquals(0, getOpenConnections());

Response response = target().path("productInfo")
.request(MediaType.TEXT_PLAIN_TYPE)
.get();
assertEquals(200, response.getStatus());

InputStream entityStream = response.readEntity(InputStream.class);

consume(entityStream);

// connection should be kept alive after consume
assertEquals(0, getLeasedConnections());
assertEquals(1, getAvailableConnections());

entityStream.close();
response.close();

assertEquals(0, getLeasedConnections());
assertEquals(1, getAvailableConnections());
}

@Test
public void testClosingConsumedResponse() throws Exception {
assertEquals(0, getOpenConnections());

Response response = target().path("productInfo")
.request(MediaType.TEXT_PLAIN_TYPE)
.get();
assertEquals(200, response.getStatus());

InputStream entityStream = response.readEntity(InputStream.class);

consume(entityStream);

// connection should be kept alive after consume
assertEquals(0, getLeasedConnections());
assertEquals(1, getAvailableConnections());

response.close();
entityStream.close();

assertEquals(0, getLeasedConnections());
assertEquals(1, getAvailableConnections());
}

@Test
public void testBufferedMultipleReadEntity() throws Exception {
assertEquals(0, getOpenConnections());

Response response = target().path("productInfo")
.request(MediaType.TEXT_PLAIN_TYPE)
.get();

response.bufferEntity();
assertEquals(0, getLeasedConnections());
assertEquals(1, getAvailableConnections());

assertEquals("foo\n", new String(response.readEntity(byte[].class), "us-ascii"));
assertEquals("foo\n", response.readEntity(String.class));

response.close();

assertEquals(0, getLeasedConnections());
assertEquals(1, getAvailableConnections());
}

private static void consume(InputStream in) throws IOException {
byte[] buffer = new byte[1024];
for (int readden = in.read(buffer); readden >= 0; readden = in.read(buffer)) {
}
}

@Path("/")
public static class TestResource {
@GET
@Path("/productInfo")
@Produces(MediaType.TEXT_PLAIN)
public String getProductInfo() {
return "foo\n";
}
}
}
Loading