From 74069a140737603a0a59435de308401c4351a80b Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Sat, 11 Nov 2023 10:30:39 -0500 Subject: [PATCH] CXF-8911:Allow wrapping AsyncHTTPConduit response processing (using new AsyncHttpResponseWrapperFactory bus extension) (#1510) --- .../http/asyncclient/AsyncHTTPConduit.java | 32 ++++++++--- .../AsyncHttpResponseWrapperFactory.java | 55 +++++++++++++++++++ .../asyncclient/AsyncHTTPConduitTest.java | 38 ++++++++++++- .../asyncclient/hc5/AsyncHTTPConduit.java | 21 ++++++- .../hc5/AsyncHttpResponseWrapperFactory.java | 55 +++++++++++++++++++ .../asyncclient/hc5/AsyncHTTPConduitTest.java | 38 ++++++++++++- 6 files changed, 227 insertions(+), 12 deletions(-) create mode 100644 rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHttpResponseWrapperFactory.java create mode 100644 rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHttpResponseWrapperFactory.java diff --git a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java index 848e98b8732..098008039b5 100755 --- a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java +++ b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduit.java @@ -66,6 +66,7 @@ import org.apache.cxf.transport.http.Headers; import org.apache.cxf.transport.http.HttpClientHTTPConduit; import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduitFactory.UseAsyncPolicy; +import org.apache.cxf.transport.http.asyncclient.AsyncHttpResponseWrapperFactory.AsyncHttpResponseWrapper; import org.apache.cxf.transport.https.HttpsURLConnectionInfo; import org.apache.cxf.transports.http.configuration.HTTPClientPolicy; import org.apache.cxf.version.Version; @@ -99,13 +100,15 @@ public class AsyncHTTPConduit extends HttpClientHTTPConduit { public static final String USE_ASYNC = "use.async.http.conduit"; - final AsyncHTTPConduitFactory factory; - volatile int lastTlsHash = -1; - volatile Object sslState; - volatile URI sslURL; - volatile SSLContext sslContext; - volatile SSLSession session; - volatile CloseableHttpAsyncClient client; + private final AsyncHTTPConduitFactory factory; + private final AsyncHttpResponseWrapperFactory asyncHttpResponseWrapperFactory; + + private volatile int lastTlsHash = -1; + private volatile Object sslState; + private volatile URI sslURL; + private volatile SSLContext sslContext; + private volatile SSLSession session; + private volatile CloseableHttpAsyncClient client; public AsyncHTTPConduit(Bus b, @@ -114,6 +117,7 @@ public AsyncHTTPConduit(Bus b, AsyncHTTPConduitFactory factory) throws IOException { super(b, ei, t); this.factory = factory; + this.asyncHttpResponseWrapperFactory = bus.getExtension(AsyncHttpResponseWrapperFactory.class); } public synchronized CloseableHttpAsyncClient getHttpAsyncClient() throws IOException { @@ -478,7 +482,7 @@ protected void connect(boolean output) throws IOException { return; } - CXFResponseCallback responseCallback = new CXFResponseCallback() { + CXFResponseCallback delegate = new CXFResponseCallback() { @Override public void responseReceived(HttpResponse response) { setHttpResponse(response); @@ -486,6 +490,18 @@ public void responseReceived(HttpResponse response) { }; + CXFResponseCallback responseCallback = delegate; + if (asyncHttpResponseWrapperFactory != null) { + final AsyncHttpResponseWrapper wrapper = asyncHttpResponseWrapperFactory.create(); + if (wrapper != null) { + responseCallback = new CXFResponseCallback() { + @Override + public void responseReceived(HttpResponse response) { + wrapper.responseReceived(response, delegate::responseReceived); + } + }; + } + } FutureCallback callback = new FutureCallback() { public void completed(Boolean result) { diff --git a/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHttpResponseWrapperFactory.java b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHttpResponseWrapperFactory.java new file mode 100644 index 00000000000..bec9e695fed --- /dev/null +++ b/rt/transports/http-hc/src/main/java/org/apache/cxf/transport/http/asyncclient/AsyncHttpResponseWrapperFactory.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.http.asyncclient; + +import java.util.function.Consumer; + +import org.apache.cxf.Bus; +import org.apache.http.HttpResponse; + +/** + * The {@link Bus} extension to allow wrapping up the response processing of + * the {@link AsyncHTTPConduit} instance. + */ +@FunctionalInterface +public interface AsyncHttpResponseWrapperFactory { + /** + * Creates new instance of the {@link AsyncHttpResponseWrapper} + * @return new instance of the {@link AsyncHttpResponseWrapper} (or null) + */ + AsyncHttpResponseWrapper create(); + + /** + * The wrapper around the response that will be called by the {@link AsyncHTTPConduit} + * instance once the response is received. + */ + interface AsyncHttpResponseWrapper { + /** + * The callback which is called by the {@link AsyncHTTPConduit} instance once + * the response is received. The delegating response handler is passed as the + * an argument and has to be called. + * @param response the response received + * @param delegate delegating response handler + */ + default void responseReceived(HttpResponse response, Consumer delegate) { + delegate.accept(response); + } + } +} diff --git a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java index 29bcca1ef1c..8911ab29259 100644 --- a/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java +++ b/rt/transports/http-hc/src/test/java/org/apache/cxf/transport/http/asyncclient/AsyncHTTPConduitTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import jakarta.xml.ws.AsyncHandler; import jakarta.xml.ws.Endpoint; @@ -43,6 +44,7 @@ import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; import org.apache.cxf.transport.http.HTTPConduit; import org.apache.cxf.transport.http.HTTPConduitFactory; +import org.apache.cxf.transport.http.asyncclient.AsyncHttpResponseWrapperFactory.AsyncHttpResponseWrapper; import org.apache.cxf.transports.http.configuration.HTTPClientPolicy; import org.apache.cxf.workqueue.AutomaticWorkQueueImpl; import org.apache.cxf.workqueue.WorkQueueManager; @@ -50,12 +52,15 @@ import org.apache.hello_world_soap_http.SOAPService; import org.apache.hello_world_soap_http.types.GreetMeLaterResponse; import org.apache.hello_world_soap_http.types.GreetMeResponse; +import org.apache.http.HttpResponse; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; @@ -242,7 +247,7 @@ public void testConnectIssue() throws Exception { } @Test - public void testInovationWithHCAddress() throws Exception { + public void testInvocationWithHCAddress() throws Exception { String address = "hc://http://localhost:" + PORT + "/SoapContext/SoapPort"; JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); factory.setServiceClass(Greeter.class); @@ -263,6 +268,7 @@ public void testInvocationWithTransportId() throws Exception { String response = greeter.greetMe("test"); assertEquals("Get a wrong response", "Hello test", response); } + @Test public void testCall() throws Exception { updateAddressPort(g, PORT); @@ -273,6 +279,7 @@ public void testCall() throws Exception { c.setClient(cp); assertEquals("Hello " + request, g.greetMe(request)); } + @Test public void testCallAsync() throws Exception { updateAddressPort(g, PORT); @@ -293,6 +300,35 @@ public void handleResponse(Response res) { }).get(); } + @Test + public void testCallAsyncWithResponseWrapper() throws Exception { + try { + final CountDownLatch latch = new CountDownLatch(1); + final AsyncHttpResponseWrapper wrapper = new AsyncHttpResponseWrapper() { + @Override + public void responseReceived(HttpResponse response, Consumer delegate) { + delegate.accept(response); + latch.countDown(); + } + }; + + getStaticBus().setExtension(() -> wrapper, AsyncHttpResponseWrapperFactory.class); + + final String address = "hc://http://localhost:" + PORT + "/SoapContext/SoapPort"; + final Greeter greeter = new SOAPService().getSoapPort(); + setAddress(greeter, address); + + greeter.greetMeLaterAsync(1000, new AsyncHandler() { + public void handleResponse(Response res) { + } + }).get(); + + assertThat(latch.await(5, TimeUnit.SECONDS), is(true)); + } finally { + getStaticBus().setExtension(null, AsyncHttpResponseWrapperFactory.class); + } + } + @Test public void testCallAsyncCallbackInvokedOnlyOnce() throws Exception { // This test is especially targeted for RHEL 6.8 diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java index e888cfd27b0..780a51772d4 100644 --- a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java +++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java @@ -66,6 +66,7 @@ import org.apache.cxf.transport.http.Headers; import org.apache.cxf.transport.http.HttpClientHTTPConduit; import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduitFactory.UseAsyncPolicy; +import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHttpResponseWrapperFactory.AsyncHttpResponseWrapper; import org.apache.cxf.transport.https.HttpsURLConnectionInfo; import org.apache.cxf.transports.http.configuration.HTTPClientPolicy; import org.apache.cxf.version.Version; @@ -105,6 +106,7 @@ public class AsyncHTTPConduit extends HttpClientHTTPConduit { public static final String USE_ASYNC = "use.async.http.conduit"; private final AsyncHTTPConduitFactory factory; + private final AsyncHttpResponseWrapperFactory asyncHttpResponseWrapperFactory; private volatile int lastTlsHash = -1; private volatile Object sslState; private volatile URI sslURL; @@ -116,6 +118,7 @@ public AsyncHTTPConduit(Bus b, EndpointInfo ei, EndpointReferenceType t, AsyncHT throws IOException { super(b, ei, t); this.factory = factory; + this.asyncHttpResponseWrapperFactory = bus.getExtension(AsyncHttpResponseWrapperFactory.class); } public synchronized CloseableHttpAsyncClient getHttpAsyncClient(final TlsStrategy tlsStrategy) @@ -491,8 +494,8 @@ protected void connect(boolean output) throws IOException { if (connectionFuture != null) { return; } - - CXFResponseCallback responseCallback = new CXFResponseCallback() { + + final CXFResponseCallback delegate = new CXFResponseCallback() { @Override public void responseReceived(HttpResponse response) { setHttpResponse(response); @@ -500,6 +503,20 @@ public void responseReceived(HttpResponse response) { }; + CXFResponseCallback responseCallback = delegate; + if (asyncHttpResponseWrapperFactory != null) { + final AsyncHttpResponseWrapper wrapper = asyncHttpResponseWrapperFactory.create(); + if (wrapper != null) { + responseCallback = new CXFResponseCallback() { + @Override + public void responseReceived(HttpResponse response) { + wrapper.responseReceived(response, delegate::responseReceived); + } + }; + } + } + + FutureCallback callback = new FutureCallback() { public void completed(Boolean result) { diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHttpResponseWrapperFactory.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHttpResponseWrapperFactory.java new file mode 100644 index 00000000000..eb0725c4c8a --- /dev/null +++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHttpResponseWrapperFactory.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.transport.http.asyncclient.hc5; + +import java.util.function.Consumer; + +import org.apache.cxf.Bus; +import org.apache.hc.core5.http.HttpResponse; + +/** + * The {@link Bus} extension to allow wrapping up the response processing of + * the {@link AsyncHTTPConduit} instance. + */ +@FunctionalInterface +public interface AsyncHttpResponseWrapperFactory { + /** + * Creates new instance of the {@link AsyncHttpResponseWrapper} + * @return new instance of the {@link AsyncHttpResponseWrapper} (or null) + */ + AsyncHttpResponseWrapper create(); + + /** + * The wrapper around the response that will be called by the {@link AsyncHTTPConduit} + * instance once the response is received. + */ + interface AsyncHttpResponseWrapper { + /** + * The callback which is called by the {@link AsyncHTTPConduit} instance once + * the response is received. The delegating response handler is passed as the + * an argument and has to be called. + * @param response the response received + * @param delegate delegating response handler + */ + default void responseReceived(HttpResponse response, Consumer delegate) { + delegate.accept(response); + } + } +} diff --git a/rt/transports/http-hc5/src/test/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduitTest.java b/rt/transports/http-hc5/src/test/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduitTest.java index 8805ab1386e..d6f1ec1d9cc 100644 --- a/rt/transports/http-hc5/src/test/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduitTest.java +++ b/rt/transports/http-hc5/src/test/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduitTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import jakarta.xml.ws.AsyncHandler; import jakarta.xml.ws.Endpoint; @@ -44,9 +45,11 @@ import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; import org.apache.cxf.transport.http.HTTPConduit; import org.apache.cxf.transport.http.HTTPConduitFactory; +import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHttpResponseWrapperFactory.AsyncHttpResponseWrapper; import org.apache.cxf.transports.http.configuration.HTTPClientPolicy; import org.apache.cxf.workqueue.AutomaticWorkQueueImpl; import org.apache.cxf.workqueue.WorkQueueManager; +import org.apache.hc.core5.http.HttpResponse; import org.apache.hello_world_soap_http.Greeter; import org.apache.hello_world_soap_http.SOAPService; import org.apache.hello_world_soap_http.types.GreetMeLaterResponse; @@ -56,6 +59,8 @@ import org.junit.BeforeClass; import org.junit.Test; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; @@ -213,7 +218,7 @@ public void testConnectIssue() throws Exception { } @Test - public void testInovationWithHCAddress() throws Exception { + public void testInvocationWithHCAddress() throws Exception { String address = "hc://http://localhost:" + PORT + "/SoapContext/SoapPort"; JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); factory.setServiceClass(Greeter.class); @@ -234,6 +239,7 @@ public void testInvocationWithTransportId() throws Exception { String response = greeter.greetMe("test"); assertEquals("Get a wrong response", "Hello test", response); } + @Test public void testCall() throws Exception { updateAddressPort(g, PORT); @@ -244,6 +250,7 @@ public void testCall() throws Exception { c.setClient(cp); assertEquals("Hello " + request, g.greetMe(request)); } + @Test public void testCallAsync() throws Exception { updateAddressPort(g, PORT); @@ -264,6 +271,35 @@ public void handleResponse(Response res) { }).get(); } + @Test + public void testCallAsyncWithResponseWrapper() throws Exception { + try { + final CountDownLatch latch = new CountDownLatch(1); + final AsyncHttpResponseWrapper wrapper = new AsyncHttpResponseWrapper() { + @Override + public void responseReceived(HttpResponse response, Consumer delegate) { + delegate.accept(response); + latch.countDown(); + } + }; + + getStaticBus().setExtension(() -> wrapper, AsyncHttpResponseWrapperFactory.class); + + final String address = "hc://http://localhost:" + PORT + "/SoapContext/SoapPort"; + final Greeter greeter = new SOAPService().getSoapPort(); + setAddress(greeter, address); + + greeter.greetMeLaterAsync(1000, new AsyncHandler() { + public void handleResponse(Response res) { + } + }).get(); + + assertThat(latch.await(5, TimeUnit.SECONDS), is(true)); + } finally { + getStaticBus().setExtension(null, AsyncHttpResponseWrapperFactory.class); + } + } + @Test public void testCallAsyncCallbackInvokedOnlyOnce() throws Exception { // This test is especially targeted for RHEL 6.8