From ae4b92ef3583a1185ba2bdc3fbdf35858248327a Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Wed, 29 Jan 2025 01:05:54 +0800 Subject: [PATCH 1/3] [improve][client] Implement HTTP client using javax.ws.rs Signed-off-by: Zixuan Liu --- .../client/admin/internal/FunctionsImpl.java | 2 +- .../client/admin/internal/PackagesImpl.java | 2 +- .../admin/internal/PulsarAdminImpl.java | 9 +- .../client/admin/internal/SinksImpl.java | 2 +- .../client/admin/internal/SourcesImpl.java | 2 +- .../internal/http/AsyncHttpConnectorTest.java | 39 ++-- pulsar-client/pom.xml | 22 ++ .../apache/pulsar/client/impl/HttpClient.java | 212 ++++++------------ .../pulsar/client/impl/HttpLookupService.java | 51 +++-- .../internal/http/AsyncHttpConnector.java | 18 +- .../http/AsyncHttpConnectorProvider.java | 14 +- .../http/AsyncHttpRequestExecutor.java | 2 +- .../client}/internal/http/package-info.java | 2 +- 13 files changed, 172 insertions(+), 205 deletions(-) rename pulsar-client-admin/src/test/java/org/apache/pulsar/client/{admin => }/internal/http/AsyncHttpConnectorTest.java (88%) rename {pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin => pulsar-client/src/main/java/org/apache/pulsar/client}/internal/http/AsyncHttpConnector.java (97%) rename {pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin => pulsar-client/src/main/java/org/apache/pulsar/client}/internal/http/AsyncHttpConnectorProvider.java (80%) rename {pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin => pulsar-client/src/main/java/org/apache/pulsar/client}/internal/http/AsyncHttpRequestExecutor.java (97%) rename {pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin => pulsar-client/src/main/java/org/apache/pulsar/client}/internal/http/package-info.java (93%) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index bfcc3fe39a444..2cc67b477f0ae 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -40,8 +40,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.Functions; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.internal.http.AsyncHttpRequestExecutor; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.functions.FunctionDefinition; import org.apache.pulsar.common.functions.FunctionState; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java index 2b8efc3b97c8c..c32549b2cee6f 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java @@ -35,8 +35,8 @@ import javax.ws.rs.core.Response; import org.apache.pulsar.client.admin.Packages; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.internal.http.AsyncHttpRequestExecutor; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.packages.management.core.common.PackageMetadata; import org.apache.pulsar.packages.management.core.common.PackageName; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java index aaea8a89f8db5..cf70d52f6f77d 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java @@ -51,13 +51,14 @@ import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.admin.Transactions; import org.apache.pulsar.client.admin.Worker; -import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector; -import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnectorProvider; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.PulsarServiceNameResolver; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.internal.http.AsyncHttpConnector; +import org.apache.pulsar.client.internal.http.AsyncHttpConnectorProvider; import org.apache.pulsar.common.net.ServiceURI; import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.client.ClientProperties; @@ -124,8 +125,10 @@ public PulsarAdminImpl(String serviceUrl, ClientConfigurationData clientConfigDa clientConfigData.setServiceUrl(serviceUrl); } + PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver(); + pulsarServiceNameResolver.updateServiceUrl(serviceUrl); AsyncHttpConnectorProvider asyncConnectorProvider = new AsyncHttpConnectorProvider(clientConfigData, - clientConfigData.getAutoCertRefreshSeconds(), acceptGzipCompression); + clientConfigData.getAutoCertRefreshSeconds(), acceptGzipCompression, pulsarServiceNameResolver); ClientConfig httpConfig = new ClientConfig(); httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java index bba0289d81254..5b63142bdb220 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java @@ -34,8 +34,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Sink; import org.apache.pulsar.client.admin.Sinks; -import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.internal.http.AsyncHttpRequestExecutor; import org.apache.pulsar.common.functions.UpdateOptions; import org.apache.pulsar.common.functions.UpdateOptionsImpl; import org.apache.pulsar.common.io.ConnectorDefinition; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java index 56cf7db229b78..441f9fe8202d4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java @@ -33,8 +33,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Source; import org.apache.pulsar.client.admin.Sources; -import org.apache.pulsar.client.admin.internal.http.AsyncHttpRequestExecutor; import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.internal.http.AsyncHttpRequestExecutor; import org.apache.pulsar.common.functions.UpdateOptions; import org.apache.pulsar.common.functions.UpdateOptionsImpl; import org.apache.pulsar.common.io.ConnectorDefinition; diff --git a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/internal/http/AsyncHttpConnectorTest.java similarity index 88% rename from pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java rename to pulsar-client-admin/src/test/java/org/apache/pulsar/client/internal/http/AsyncHttpConnectorTest.java index f8518b5931034..15c5ce92820a3 100644 --- a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java +++ b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/internal/http/AsyncHttpConnectorTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.admin.internal.http; +package org.apache.pulsar.client.internal.http; import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; import static com.github.tomakehurst.wiremock.client.WireMock.get; @@ -44,6 +44,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.Cleanup; +import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL; +import org.apache.pulsar.client.impl.PulsarServiceNameResolver; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.util.FutureUtil; import org.asynchttpclient.Request; @@ -174,9 +176,11 @@ public void testShouldStopRetriesWhenTimeoutOccurs() throws IOException, Executi Executor delayedExecutor = runnable -> { scheduledExecutor.schedule(runnable, requestTimeout, TimeUnit.MILLISECONDS); }; + PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver(); + pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl()); @Cleanup AsyncHttpConnector connector = new AsyncHttpConnector(5000, requestTimeout, - requestTimeout, 0, conf, false) { + requestTimeout, 0, conf, false, pulsarServiceNameResolver) { @Override protected CompletableFuture oneShot(InetSocketAddress host, ClientRequest request) { // delay the response to simulate a timeout @@ -214,7 +218,7 @@ public void failure(Throwable failure) { } @Test - void testMaxRedirects() { + void testMaxRedirects() throws InvalidServiceURL { // Redirect to itself to test max redirects server.stubFor(get(urlEqualTo("/admin/v2/clusters")) .willReturn(aResponse() @@ -224,9 +228,11 @@ void testMaxRedirects() { ClientConfigurationData conf = new ClientConfigurationData(); conf.setServiceUrl("http://localhost:" + server.port()); + PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver(); + pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl()); @Cleanup AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000, - 5000, 0, conf, false); + 5000, 0, conf, false, pulsarServiceNameResolver); Request request = new RequestBuilder("GET") .setUrl("http://localhost:" + server.port() + "/admin/v2/clusters") @@ -243,21 +249,21 @@ void testMaxRedirects() { } @Test - void testRelativeRedirect() throws ExecutionException, InterruptedException { + void testRelativeRedirect() throws ExecutionException, InterruptedException, InvalidServiceURL { doTestRedirect("path2"); } @Test - void testAbsoluteRedirect() throws ExecutionException, InterruptedException { + void testAbsoluteRedirect() throws ExecutionException, InterruptedException, InvalidServiceURL { doTestRedirect("/path2"); } @Test - void testUrlRedirect() throws ExecutionException, InterruptedException { + void testUrlRedirect() throws ExecutionException, InterruptedException, InvalidServiceURL { doTestRedirect("http://localhost:" + server.port() + "/path2"); } - private void doTestRedirect(String location) throws InterruptedException, ExecutionException { + private void doTestRedirect(String location) throws InterruptedException, ExecutionException, InvalidServiceURL { server.stubFor(get(urlEqualTo("/path1")) .willReturn(aResponse() .withStatus(301) @@ -270,9 +276,11 @@ private void doTestRedirect(String location) throws InterruptedException, Execut ClientConfigurationData conf = new ClientConfigurationData(); conf.setServiceUrl("http://localhost:" + server.port()); + PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver(); + pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl()); @Cleanup AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000, - 5000, 0, conf, false); + 5000, 0, conf, false, pulsarServiceNameResolver); Request request = new RequestBuilder("GET") .setUrl("http://localhost:" + server.port() + "/path1") @@ -283,7 +291,7 @@ private void doTestRedirect(String location) throws InterruptedException, Execut } @Test - void testRedirectWithBody() throws ExecutionException, InterruptedException { + void testRedirectWithBody() throws ExecutionException, InterruptedException, InvalidServiceURL { server.stubFor(post(urlEqualTo("/path1")) .willReturn(aResponse() .withStatus(307) @@ -296,9 +304,12 @@ void testRedirectWithBody() throws ExecutionException, InterruptedException { ClientConfigurationData conf = new ClientConfigurationData(); conf.setServiceUrl("http://localhost:" + server.port()); + PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver(); + pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl()); @Cleanup AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000, - 5000, 0, conf, false); + 5000, 0, conf, false, pulsarServiceNameResolver); + Request request = new RequestBuilder("POST") .setUrl("http://localhost:" + server.port() + "/path1") @@ -310,7 +321,7 @@ void testRedirectWithBody() throws ExecutionException, InterruptedException { } @Test - void testMaxConnections() throws ExecutionException, InterruptedException { + void testMaxConnections() throws ExecutionException, InterruptedException, InvalidServiceURL { server.stubFor(post(urlEqualTo("/concurrency-test")) .willReturn(aResponse() .withTransformers("concurrency-test"))); @@ -320,9 +331,11 @@ void testMaxConnections() throws ExecutionException, InterruptedException { conf.setConnectionsPerBroker(maxConnections); conf.setServiceUrl("http://localhost:" + server.port()); + PulsarServiceNameResolver pulsarServiceNameResolver = new PulsarServiceNameResolver(); + pulsarServiceNameResolver.updateServiceUrl(conf.getServiceUrl()); @Cleanup AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000, - 5000, 0, conf, false); + 5000, 0, conf, false, pulsarServiceNameResolver); Request request = new RequestBuilder("POST") .setUrl("http://localhost:" + server.port() + "/concurrency-test") diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml index e1a70ed074833..1fae7b3c95e06 100644 --- a/pulsar-client/pom.xml +++ b/pulsar-client/pom.xml @@ -186,6 +186,21 @@ true + + jakarta.ws.rs + jakarta.ws.rs-api + + + + org.glassfish.jersey.core + jersey-client + + + + org.glassfish.jersey.media + jersey-media-multipart + + ${project.groupId} @@ -217,6 +232,13 @@ fastutil + + com.github.tomakehurst + wiremock-jre8 + ${wiremock.version} + test + + diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java index 8e448d801fa6b..512ee2284ccce 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java @@ -18,114 +18,71 @@ */ package org.apache.pulsar.client.impl; +import com.fasterxml.jackson.core.util.JacksonFeature; import io.netty.channel.EventLoopGroup; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpResponse; import java.io.Closeable; import java.io.IOException; import java.net.HttpURLConnection; import java.net.InetSocketAddress; -import java.net.URI; -import java.net.URL; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Invocation.Builder; +import javax.ws.rs.client.InvocationCallback; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.NotFoundException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; -import org.apache.pulsar.client.util.ExecutorProvider; -import org.apache.pulsar.client.util.PulsarHttpAsyncSslEngineFactory; -import org.apache.pulsar.common.util.ObjectMapperFactory; -import org.apache.pulsar.common.util.PulsarSslConfiguration; -import org.apache.pulsar.common.util.PulsarSslFactory; -import org.asynchttpclient.AsyncHttpClient; -import org.asynchttpclient.AsyncHttpClientConfig; -import org.asynchttpclient.BoundRequestBuilder; -import org.asynchttpclient.DefaultAsyncHttpClient; -import org.asynchttpclient.DefaultAsyncHttpClientConfig; -import org.asynchttpclient.Request; -import org.asynchttpclient.SslEngineFactory; -import org.asynchttpclient.channel.DefaultKeepAliveStrategy; +import org.apache.pulsar.client.internal.http.AsyncHttpConnectorProvider; +import org.apache.pulsar.common.net.ServiceURI; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.media.multipart.MultiPartFeature; @Slf4j public class HttpClient implements Closeable { - protected static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10; - protected static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30; - - protected final AsyncHttpClient httpClient; - protected final ServiceNameResolver serviceNameResolver; + private final ServiceNameResolver serviceNameResolver; protected final Authentication authentication; - protected ScheduledExecutorService executorService; - protected PulsarSslFactory sslFactory; + private final Client client; + private volatile WebTarget root; protected HttpClient(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException { this.authentication = conf.getAuthentication(); + this.serviceNameResolver = new PulsarServiceNameResolver(); this.serviceNameResolver.updateServiceUrl(conf.getServiceUrl()); - DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); - confBuilder.setCookieStore(null); - confBuilder.setUseProxyProperties(true); - confBuilder.setFollowRedirect(true); - confBuilder.setMaxRedirects(conf.getMaxLookupRedirects()); - confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000); - confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000); - confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())); - confBuilder.setKeepAliveStrategy(new DefaultKeepAliveStrategy() { - @Override - public boolean keepAlive(InetSocketAddress remoteAddress, Request ahcRequest, - HttpRequest request, HttpResponse response) { - // Close connection upon a server error or per HTTP spec - return (response.status().code() / 100 != 5) - && super.keepAlive(remoteAddress, ahcRequest, request, response); - } - }); + AsyncHttpConnectorProvider asyncConnectorProvider = new AsyncHttpConnectorProvider(conf, + conf.getAutoCertRefreshSeconds(), true, this.serviceNameResolver); - if ("https".equals(serviceNameResolver.getServiceUri().getServiceName())) { - try { - // Set client key and certificate if available - this.executorService = Executors - .newSingleThreadScheduledExecutor(new ExecutorProvider - .ExtendedThreadFactory("httpclient-ssl-refresh")); - PulsarSslConfiguration sslConfiguration = - buildSslConfiguration(conf, serviceNameResolver.resolveHostUri().getHost()); - this.sslFactory = (PulsarSslFactory) Class.forName(conf.getSslFactoryPlugin()) - .getConstructor().newInstance(); - this.sslFactory.initialize(sslConfiguration); - this.sslFactory.createInternalSslContext(); - if (conf.getAutoCertRefreshSeconds() > 0) { - this.executorService.scheduleWithFixedDelay(this::refreshSslContext, - conf.getAutoCertRefreshSeconds(), - conf.getAutoCertRefreshSeconds(), TimeUnit.SECONDS); - } - String hostname = conf.isTlsHostnameVerificationEnable() ? null : serviceNameResolver - .resolveHostUri().getHost(); - SslEngineFactory sslEngineFactory = new PulsarHttpAsyncSslEngineFactory(this.sslFactory, hostname); - confBuilder.setSslEngineFactory(sslEngineFactory); + ClientConfig httpConfig = new ClientConfig(); + httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true); + httpConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8); + httpConfig.register(MultiPartFeature.class); + httpConfig.connectorProvider(asyncConnectorProvider); + ClientBuilder clientBuilder = ClientBuilder.newBuilder() + .withConfig(httpConfig) + .connectTimeout(conf.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS) + .readTimeout(conf.getReadTimeoutMs(), TimeUnit.MILLISECONDS) + .register(JacksonFeature.class); + client = clientBuilder.build(); + String serviceUri = ServiceURI.create(conf.getServiceUrl()).selectOne(); + root = client.target(serviceUri); - confBuilder.setUseInsecureTrustManager(conf.isTlsAllowInsecureConnection()); - confBuilder.setDisableHttpsEndpointIdentificationAlgorithm(!conf.isTlsHostnameVerificationEnable()); - } catch (Exception e) { - throw new PulsarClientException.InvalidConfigurationException(e); - } - } - confBuilder.setEventLoopGroup(eventLoopGroup); - AsyncHttpClientConfig config = confBuilder.build(); - httpClient = new DefaultAsyncHttpClient(config); - - log.debug("Using HTTP url: {}", conf.getServiceUrl()); + log.debug("Using HTTP url: {}", serviceUri); } String getServiceUrl() { @@ -138,22 +95,23 @@ public InetSocketAddress resolveHost() { void setServiceUrl(String serviceUrl) throws PulsarClientException { this.serviceNameResolver.updateServiceUrl(serviceUrl); + root = client.target(serviceNameResolver.resolveHostUri()); } @Override public void close() throws IOException { - httpClient.close(); - if (executorService != null) { - executorService.shutdownNow(); + if (client != null) { + client.close(); } } - public CompletableFuture get(String path, Class clazz) { + public CompletableFuture get(Function webTargetFn, Class clazz) { + WebTarget finalWebTarget = webTargetFn.apply(root); + final CompletableFuture future = new CompletableFuture<>(); try { - URI hostUri = serviceNameResolver.resolveHostUri(); - String requestUrl = new URL(hostUri.toURL(), path).toString(); - String remoteHostName = hostUri.getHost(); + String requestUrl = finalWebTarget.getUri().toString(); + String remoteHostName = finalWebTarget.getUri().getHost(); AuthenticationDataProvider authData = authentication.getAuthData(remoteHostName); CompletableFuture> authFuture = new CompletableFuture<>(); @@ -169,14 +127,13 @@ public CompletableFuture get(String path, Class clazz) { authFuture.whenComplete((respHeaders, ex) -> { if (ex != null) { log.warn("[{}] Failed to perform http request at authentication stage: {}", - requestUrl, ex.getMessage()); + requestUrl, ex.getMessage()); future.completeExceptionally(new PulsarClientException(ex)); return; } // auth complete, use a new builder - BoundRequestBuilder builder = httpClient.prepareGet(requestUrl) - .setHeader("Accept", "application/json"); + Builder builder = finalWebTarget.request("application/json"); if (authData.hasDataForHttp()) { Set> headers; @@ -188,42 +145,42 @@ public CompletableFuture get(String path, Class clazz) { return; } if (headers != null) { - headers.forEach(entry -> builder.addHeader(entry.getKey(), entry.getValue())); + headers.forEach(entry -> builder.header(entry.getKey(), entry.getValue())); } } - builder.execute().toCompletableFuture().whenComplete((response2, t) -> { - if (t != null) { - log.warn("[{}] Failed to perform http request: {}", requestUrl, t.getMessage()); - future.completeExceptionally(new PulsarClientException(t)); - return; - } - - // request not success - if (response2.getStatusCode() != HttpURLConnection.HTTP_OK) { - log.warn("[{}] HTTP get request failed: {}", requestUrl, response2.getStatusText()); - Exception e; - if (response2.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { - e = new NotFoundException("Not found: " + response2.getStatusText()); + builder.async().get(new InvocationCallback() { + @Override + public void completed(Response response) { + if (response.getStatusInfo().getFamily() == Response.Status.Family.SUCCESSFUL) { + try { + T data = response.readEntity(clazz); + future.complete(data); + } catch (Exception e) { + log.warn("[{}] Error during HTTP get request: {}", requestUrl, e.getMessage()); + future.completeExceptionally(new PulsarClientException(e)); + } } else { - e = new PulsarClientException("HTTP get request failed: " + response2.getStatusText()); + log.warn("[{}] HTTP get request failed: {}", requestUrl, response.getStatusInfo()); + Exception e; + if (response.getStatus() == HttpURLConnection.HTTP_NOT_FOUND) { + e = new NotFoundException("Not found: " + response.getStatusInfo()); + } else { + e = new PulsarClientException("HTTP get request failed: " + response.getStatusInfo()); + } + future.completeExceptionally(e); } - future.completeExceptionally(e); - return; } - try { - T data = ObjectMapperFactory.getMapper().reader().readValue( - response2.getResponseBodyAsBytes(), clazz); - future.complete(data); - } catch (Exception e) { - log.warn("[{}] Error during HTTP get request: {}", requestUrl, e.getMessage()); - future.completeExceptionally(new PulsarClientException(e)); + @Override + public void failed(Throwable throwable) { + log.warn("[{}] Failed to perform http request: {}", requestUrl, throwable.getMessage()); + future.completeExceptionally(new PulsarClientException(throwable)); } }); }); } catch (Exception e) { - log.warn("[{}]PulsarClientImpl: {}", path, e.getMessage()); + log.warn("[{}]PulsarClientImpl: {}", finalWebTarget.getUri(), e.getMessage()); if (e instanceof PulsarClientException) { future.completeExceptionally(e); } else { @@ -233,37 +190,4 @@ public CompletableFuture get(String path, Class clazz) { return future; } - - protected PulsarSslConfiguration buildSslConfiguration(ClientConfigurationData config, String host) - throws PulsarClientException { - return PulsarSslConfiguration.builder() - .tlsProvider(config.getSslProvider()) - .tlsKeyStoreType(config.getTlsKeyStoreType()) - .tlsKeyStorePath(config.getTlsKeyStorePath()) - .tlsKeyStorePassword(config.getTlsKeyStorePassword()) - .tlsTrustStoreType(config.getTlsTrustStoreType()) - .tlsTrustStorePath(config.getTlsTrustStorePath()) - .tlsTrustStorePassword(config.getTlsTrustStorePassword()) - .tlsCiphers(config.getTlsCiphers()) - .tlsProtocols(config.getTlsProtocols()) - .tlsTrustCertsFilePath(config.getTlsTrustCertsFilePath()) - .tlsCertificateFilePath(config.getTlsCertificateFilePath()) - .tlsKeyFilePath(config.getTlsKeyFilePath()) - .allowInsecureConnection(config.isTlsAllowInsecureConnection()) - .requireTrustedClientCertOnConnect(false) - .tlsEnabledWithKeystore(config.isUseKeyStoreTls()) - .tlsCustomParams(config.getSslFactoryPluginParams()) - .authData(config.getAuthentication().getAuthData(host)) - .serverMode(false) - .isHttps(true) - .build(); - } - - protected void refreshSslContext() { - try { - this.sslFactory.update(); - } catch (Exception e) { - log.error("Failed to refresh SSL context", e); - } - } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index 4a5557fa869e4..47ba74e855562 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -27,6 +27,7 @@ import java.util.Base64; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import javax.ws.rs.client.WebTarget; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.NotFoundException; @@ -46,7 +47,6 @@ import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; -import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,11 +98,14 @@ public void updateServiceUrl(String serviceUrl) throws PulsarClientException { public CompletableFuture getBroker(TopicName topicName) { String basePath = topicName.isV2() ? BasePathV2 : BasePathV1; String path = basePath + topicName.getLookupName(); - path = StringUtils.isBlank(listenerName) ? path : path + "?listenerName=" + Codec.encode(listenerName); - long startTime = System.nanoTime(); - CompletableFuture httpFuture = httpClient.get(path, LookupData.class); - + CompletableFuture httpFuture = httpClient.get(n -> { + WebTarget webTarget = n.path(path); + if (!StringUtils.isBlank(listenerName)) { + webTarget = webTarget.queryParam("listenerName", listenerName); + } + return webTarget; + }, LookupData.class); httpFuture.thenRun(() -> { histoGetBroker.recordSuccess(System.nanoTime() - startTime); }).exceptionally(x -> { @@ -145,11 +148,10 @@ public CompletableFuture getPartitionedTopicMetadata( long startTime = System.nanoTime(); String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions"; - CompletableFuture httpFuture = httpClient.get( - String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation=" - + metadataAutoCreationEnabled, - PartitionedTopicMetadata.class); - + CompletableFuture httpFuture = + httpClient.get(n -> n.path(String.format(format, topicName.getLookupName())) + .queryParam("checkAllowAutoCreation", metadataAutoCreationEnabled), + PartitionedTopicMetadata.class); httpFuture.thenRun(() -> { histoGetTopicMetadata.recordSuccess(System.nanoTime() - startTime); }).exceptionally(x -> { @@ -177,18 +179,17 @@ public CompletableFuture getTopicsUnderNamespace(NamespaceName CompletableFuture future = new CompletableFuture<>(); - String format = namespace.isV2() - ? "admin/v2/namespaces/%s/topics?mode=%s" : "admin/namespaces/%s/destinations?mode=%s"; - httpClient - .get(String.format(format, namespace, mode.toString()), String[].class) - .thenAccept(topics -> { - future.complete(new GetTopicsResult(topics)); - }).exceptionally(ex -> { - Throwable cause = FutureUtil.unwrapCompletionException(ex); - log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", namespace, cause.getMessage()); - future.completeExceptionally(cause); - return null; - }); + String format = namespace.isV2() ? "admin/v2/namespaces/%s/topics" : "admin/namespaces/%s/destinations"; + httpClient.get(n -> n.path(String.format(format, namespace)).queryParam("mode", mode), + String[].class) + .thenAccept(topics -> { + future.complete(new GetTopicsResult(topics)); + }).exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", namespace, cause.getMessage()); + future.completeExceptionally(cause); + return null; + }); future.thenRun(() -> { histoListTopics.recordSuccess(System.nanoTime() - startTime); @@ -211,7 +212,7 @@ public CompletableFuture> getSchema(TopicName topicName, by CompletableFuture> future = new CompletableFuture<>(); String schemaName = topicName.getSchemaName(); - String path = String.format("admin/v2/schemas/%s/schema", schemaName); + String path; if (version != null) { if (version.length == 0) { future.completeExceptionally(new SchemaSerializationException("Empty schema version")); @@ -220,8 +221,10 @@ public CompletableFuture> getSchema(TopicName topicName, by path = String.format("admin/v2/schemas/%s/schema/%s", schemaName, ByteBuffer.wrap(version).getLong()); + } else { + path = String.format("admin/v2/schemas/%s/schema", schemaName); } - httpClient.get(path, GetSchemaResponse.class).thenAccept(response -> { + httpClient.get(n -> n.path(path), GetSchemaResponse.class).thenAccept(response -> { if (response.getType() == SchemaType.KEY_VALUE) { SchemaData data = SchemaData .builder() diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client/src/main/java/org/apache/pulsar/client/internal/http/AsyncHttpConnector.java similarity index 97% rename from pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/internal/http/AsyncHttpConnector.java index fb11d9e46d34c..3c0ecac6f08a9 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/internal/http/AsyncHttpConnector.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.admin.internal.http; +package org.apache.pulsar.client.internal.http; import static org.asynchttpclient.util.HttpConstants.Methods.GET; import static org.asynchttpclient.util.HttpConstants.Methods.HEAD; @@ -58,9 +58,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.Validate; import org.apache.pulsar.PulsarVersion; -import org.apache.pulsar.client.admin.internal.PulsarAdminImpl; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.impl.PulsarServiceNameResolver; import org.apache.pulsar.client.impl.ServiceNameResolver; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.util.PulsarHttpAsyncSslEngineFactory; @@ -105,25 +103,25 @@ public class AsyncHttpConnector implements Connector, AsyncHttpRequestExecutor { private final boolean acceptGzipCompression; private final Map> concurrencyReducers = new ConcurrentHashMap<>(); private PulsarSslFactory sslFactory; + public static final int DEFAULT_REQUEST_TIMEOUT_SECONDS = 300; public AsyncHttpConnector(Client client, ClientConfigurationData conf, int autoCertRefreshTimeSeconds, - boolean acceptGzipCompression) { + boolean acceptGzipCompression, ServiceNameResolver serviceNameResolver) { this((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT), (int) client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT), - PulsarAdminImpl.DEFAULT_REQUEST_TIMEOUT_SECONDS * 1000, + DEFAULT_REQUEST_TIMEOUT_SECONDS * 1000, autoCertRefreshTimeSeconds, - conf, acceptGzipCompression); + conf, acceptGzipCompression, serviceNameResolver); } @SneakyThrows public AsyncHttpConnector(int connectTimeoutMs, int readTimeoutMs, int requestTimeoutMs, int autoCertRefreshTimeSeconds, ClientConfigurationData conf, - boolean acceptGzipCompression) { + boolean acceptGzipCompression, + ServiceNameResolver serviceNameResolver) { Validate.notEmpty(conf.getServiceUrl(), "Service URL is not provided"); - serviceNameResolver = new PulsarServiceNameResolver(); - String serviceUrl = conf.getServiceUrl(); - serviceNameResolver.updateServiceUrl(serviceUrl); + this.serviceNameResolver = serviceNameResolver; this.acceptGzipCompression = acceptGzipCompression; AsyncHttpClientConfig asyncHttpClientConfig = createAsyncHttpClientConfig(conf, connectTimeoutMs, readTimeoutMs, requestTimeoutMs, diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/internal/http/AsyncHttpConnectorProvider.java similarity index 80% rename from pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/internal/http/AsyncHttpConnectorProvider.java index d20dc84849458..6d87ed490c6e6 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/internal/http/AsyncHttpConnectorProvider.java @@ -16,10 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.admin.internal.http; +package org.apache.pulsar.client.internal.http; import javax.ws.rs.client.Client; import javax.ws.rs.core.Configuration; +import org.apache.pulsar.client.impl.ServiceNameResolver; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.glassfish.jersey.client.spi.Connector; import org.glassfish.jersey.client.spi.ConnectorProvider; @@ -30,29 +31,32 @@ public class AsyncHttpConnectorProvider implements ConnectorProvider { private final ClientConfigurationData conf; + private final ServiceNameResolver serviceNameResolver; private Connector connector; private final int autoCertRefreshTimeSeconds; private final boolean acceptGzipCompression; public AsyncHttpConnectorProvider(ClientConfigurationData conf, int autoCertRefreshTimeSeconds, - boolean acceptGzipCompression) { + boolean acceptGzipCompression, ServiceNameResolver serviceNameResolver) { this.conf = conf; this.autoCertRefreshTimeSeconds = autoCertRefreshTimeSeconds; this.acceptGzipCompression = acceptGzipCompression; + this.serviceNameResolver = serviceNameResolver; } @Override public Connector getConnector(Client client, Configuration runtimeConfig) { if (connector == null) { - connector = new AsyncHttpConnector(client, conf, autoCertRefreshTimeSeconds, acceptGzipCompression); + connector = new AsyncHttpConnector(client, conf, autoCertRefreshTimeSeconds, acceptGzipCompression, + serviceNameResolver); } return connector; } public AsyncHttpConnector getConnector(int connectTimeoutMs, int readTimeoutMs, int requestTimeoutMs, - int autoCertRefreshTimeSeconds) { + int autoCertRefreshTimeSeconds) { return new AsyncHttpConnector(connectTimeoutMs, readTimeoutMs, requestTimeoutMs, autoCertRefreshTimeSeconds, - conf, acceptGzipCompression); + conf, acceptGzipCompression, this.serviceNameResolver); } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpRequestExecutor.java b/pulsar-client/src/main/java/org/apache/pulsar/client/internal/http/AsyncHttpRequestExecutor.java similarity index 97% rename from pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpRequestExecutor.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/internal/http/AsyncHttpRequestExecutor.java index d3c7a653b36b4..5f530c1bcff68 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpRequestExecutor.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/internal/http/AsyncHttpRequestExecutor.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.admin.internal.http; +package org.apache.pulsar.client.internal.http; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/package-info.java b/pulsar-client/src/main/java/org/apache/pulsar/client/internal/http/package-info.java similarity index 93% rename from pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/package-info.java rename to pulsar-client/src/main/java/org/apache/pulsar/client/internal/http/package-info.java index aec70be74dfb1..10b36b1e54259 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/package-info.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/internal/http/package-info.java @@ -16,4 +16,4 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.admin.internal.http; \ No newline at end of file +package org.apache.pulsar.client.internal.http; \ No newline at end of file From e1abde283cce501f0da1b70835e9defade941176 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Wed, 5 Feb 2025 09:43:30 +0800 Subject: [PATCH 2/3] [fix][client] Fix JSON deserialization for LookupData Signed-off-by: Zixuan Liu --- .../java/org/apache/pulsar/common/lookup/data/LookupData.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/data/LookupData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/data/LookupData.java index e743208f4d8fa..036c6abaadec7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/data/LookupData.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/lookup/data/LookupData.java @@ -18,11 +18,13 @@ */ package org.apache.pulsar.common.lookup.data; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.google.common.base.MoreObjects; /** * This class encapsulates lookup data. */ +@JsonIgnoreProperties(ignoreUnknown = true) public class LookupData { private String brokerUrl; private String brokerUrlTls; From f6728e3529577860d5d5155e8e79b5485b4d1ecc Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Wed, 5 Feb 2025 09:42:24 +0800 Subject: [PATCH 3/3] [fix][client] Replace String with WebTarget in authenticationStage Signed-off-by: Zixuan Liu --- .../authentication/SaslAuthenticateTest.java | 55 ++++++++++++++++++- .../client/admin/internal/BaseResource.java | 2 +- .../admin/internal/ComponentResource.java | 2 +- pulsar-client-api/pom.xml | 6 ++ .../pulsar/client/api/Authentication.java | 14 +++++ .../client/impl/auth/AuthenticationSasl.java | 7 +-- .../apache/pulsar/client/impl/HttpClient.java | 2 +- 7 files changed, 80 insertions(+), 8 deletions(-) diff --git a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java index 226ec15d33afe..be1cc7705bb5d 100644 --- a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java +++ b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java @@ -38,6 +38,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -80,6 +81,18 @@ public class SaslAuthenticateTest extends ProducerConsumerBase { private static String localHostname = "localhost"; private Authentication authSasl; + private Map clientSaslConfig; + + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setWebServicePortTls(Optional.of(0)); + conf.setBrokerServicePortTls(Optional.of(0)); + conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH); + conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH); + conf.setTlsTrustCertsFilePath(CA_CERT_FILE_PATH); + } @BeforeClass public static void startMiniKdc() throws Exception { @@ -172,7 +185,7 @@ protected void setup() throws Exception { isTcpLookup = false; // Client config - Map clientSaslConfig = new HashMap<>(); + clientSaslConfig = new HashMap<>(); clientSaslConfig.put("saslJaasClientSectionName", "PulsarClient"); clientSaslConfig.put("serverType", "broker"); log.info("set client jaas section name: PulsarClient"); @@ -223,6 +236,46 @@ protected void cleanup() throws Exception { super.internalCleanup(); } + @Test + public void testClientWithTLSTransportAndSaslAuth() throws Exception { + @Cleanup + PulsarAdmin pulsarAdmin = PulsarAdmin.builder() + .serviceHttpUrl(pulsar.getWebServiceAddressTls()) + .authentication(AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig)) + .tlsTrustCertsFilePath(CA_CERT_FILE_PATH).build(); + pulsarAdmin.tenants().getTenants(); + + try (PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(pulsar.getWebServiceAddressTls()) + .authentication(AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig)) + .tlsTrustCertsFilePath(CA_CERT_FILE_PATH).build()) { + @Cleanup + Producer ignoredProducer = + pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic-sasl") + .create(); + + @Cleanup + Consumer ignoredConsumer = + pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic-sasl") + .subscriptionName("my-sub").subscribe(); + } + + try (PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrlTls()) + .authentication(AuthenticationFactory.create(AuthenticationSasl.class.getName(), clientSaslConfig)) + .tlsTrustCertsFilePath(CA_CERT_FILE_PATH).build()) { + @Cleanup + Producer ignoredProducer = + pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic-sasl") + .create(); + + @Cleanup + Consumer ignoredConsumer = + pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic-sasl") + .subscriptionName("my-sub").subscribe(); + } + } + // Test could verify with kerberos configured. @Test public void testProducerAndConsumerPassed() throws Exception { diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java index ea39053c2ceeb..20898b6c499c6 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java @@ -85,7 +85,7 @@ public CompletableFuture requestAsync(final WebTarget target) { AuthenticationDataProvider authData = auth.getAuthData(target.getUri().getHost()); if (authData.hasDataForHttp()) { - auth.authenticationStage(target.getUri().toString(), authData, null, authFuture); + auth.authenticationStage(target, authData, null, authFuture); } else { authFuture.complete(null); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java index 0301f0fc2ee2b..74a3097081f47 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java @@ -61,7 +61,7 @@ private Set> getAuthHeaders(WebTarget target) throws Excep String targetUrl = target.getUri().toString(); if (auth.getAuthMethodName().equalsIgnoreCase(SaslConstants.AUTH_METHOD_NAME)) { CompletableFuture> authFuture = new CompletableFuture<>(); - auth.authenticationStage(targetUrl, authData, null, authFuture); + auth.authenticationStage(target, authData, null, authFuture); return auth.newRequestHeader(targetUrl, authData, authFuture.get()); } else if (authData.hasDataForHttp()) { return auth.newRequestHeader(targetUrl, authData, null); diff --git a/pulsar-client-api/pom.xml b/pulsar-client-api/pom.xml index 63b9c444983f0..7ba7c26074adb 100644 --- a/pulsar-client-api/pom.xml +++ b/pulsar-client-api/pom.xml @@ -51,6 +51,12 @@ opentelemetry-api provided + + + jakarta.ws.rs + jakarta.ws.rs-api + provided + diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java index 48d9e3e230701..47f2dd16e9cd6 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java @@ -24,6 +24,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CompletableFuture; +import javax.ws.rs.client.WebTarget; import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -85,7 +86,9 @@ default AuthenticationDataProvider getAuthData(String brokerHostName) throws Pul /** * An authentication Stage. * when authentication complete, passed-in authFuture will contains authentication related http request headers. + * @deprecated Use {@link #authenticationStage(WebTarget, AuthenticationDataProvider, Map, CompletableFuture)} instead. */ + @Deprecated default void authenticationStage(String requestUrl, AuthenticationDataProvider authData, Map previousResHeaders, @@ -93,6 +96,17 @@ default void authenticationStage(String requestUrl, authFuture.complete(null); } + /** + * An authentication Stage. + * when authentication complete, passed-in authFuture will contains authentication related http request headers. + */ + default void authenticationStage(WebTarget webTarget, + AuthenticationDataProvider authData, + Map previousResHeaders, + CompletableFuture> authFuture) { + authenticationStage(webTarget.getUri().toString(), authData, previousResHeaders, authFuture); + } + /** * Add an authenticationStage that will complete along with authFuture. */ diff --git a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java index f7ec9b964c6df..14f03175b51c5 100644 --- a/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java +++ b/pulsar-client-auth-sasl/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationSasl.java @@ -302,18 +302,17 @@ private Map getHeaders(Response response) { } @Override - public void authenticationStage(String requestUrl, - AuthenticationDataProvider authData, + public void authenticationStage(WebTarget webTarget, AuthenticationDataProvider authData, Map previousResHeaders, CompletableFuture> authFuture) { // a new request for sasl auth - Builder builder = newRequestBuilder(client.target(requestUrl), authData, previousResHeaders); + Builder builder = newRequestBuilder(webTarget, authData, previousResHeaders); builder.async().get(new InvocationCallback() { @Override public void completed(Response response) { if (response.getStatus() == HTTP_UNAUTHORIZED) { // sasl auth on going - authenticationStage(requestUrl, authData, getHeaders(response), authFuture); + authenticationStage(webTarget, authData, getHeaders(response), authFuture); return; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java index 512ee2284ccce..82f56f2427c30 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java @@ -118,7 +118,7 @@ public CompletableFuture get(Function webTargetFn, // bring a authenticationStage for sasl auth. if (authData.hasDataForHttp()) { - authentication.authenticationStage(requestUrl, authData, null, authFuture); + authentication.authenticationStage(finalWebTarget, authData, null, authFuture); } else { authFuture.complete(null); }