Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#7689](https://github.com/apache/incubator-seata/pull/7689)] optimize source release
- [[#7711](https://github.com/apache/incubator-seata/pull/7711)] add fastjson support for serialization and deserialization of PostgreSQL array types
- [[#7722](https://github.com/apache/incubator-seata/pull/7722)] optimize serializer type meaning
- [[#7736](https://github.com/apache/incubator-seata/pull/7736)] Enhance the support for http2 on both the client and server sides



Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
- [[#7689](https://github.com/apache/incubator-seata/pull/7689)] 优化 source release
- [[#7711](https://github.com/apache/incubator-seata/pull/7711)] 添加 fastjson 对 PostgreSQL 数组类型的序列化和反序列化的支持
- [[#7722](https://github.com/apache/incubator-seata/pull/7722)] 优化 SerializerType 枚举含义
- [[#7736](https://github.com/apache/incubator-seata/pull/7736)] 增强客户端和服务端对于h2c协议的支持


### security:
Expand Down
1 change: 1 addition & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
155 changes: 0 additions & 155 deletions common/src/main/java/org/apache/seata/common/util/Http5ClientUtil.java

This file was deleted.

159 changes: 149 additions & 10 deletions common/src/main/java/org/apache/seata/common/util/HttpClientUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,18 @@
*/
package org.apache.seata.common.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.FormBody;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Protocol;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.http.NameValuePair;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.RequestConfig;
Expand All @@ -31,6 +42,7 @@
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicNameValuePair;
import org.apache.seata.common.executor.HttpCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,16 +51,20 @@
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public class HttpClientUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientUtil.class);

private static final Map<Integer /*timeout*/, CloseableHttpClient> HTTP_CLIENT_MAP = new ConcurrentHashMap<>();

private static final Map<Integer /*timeout*/, OkHttpClient> HTTP2_CLIENT_MAP = new ConcurrentHashMap<>();

private static final PoolingHttpClientConnectionManager POOLING_HTTP_CLIENT_CONNECTION_MANAGER =
new PoolingHttpClientConnectionManager();

Expand All @@ -57,18 +73,31 @@ public class HttpClientUtil {
static {
POOLING_HTTP_CLIENT_CONNECTION_MANAGER.setMaxTotal(10);
POOLING_HTTP_CLIENT_CONNECTION_MANAGER.setDefaultMaxPerRoute(10);
Runtime.getRuntime().addShutdownHook(new Thread(() -> HTTP_CLIENT_MAP.values().parallelStream()
.forEach(client -> {
try {
// delay 3s, make sure unregister http request send successfully
Thread.sleep(3000);
client.close();
} catch (IOException | InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
})));
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
HTTP_CLIENT_MAP.values().parallelStream().forEach(client -> {
try {
// delay 3s, make sure unregister http request send successfully
Thread.sleep(3000);
client.close();
} catch (IOException | InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
});

HTTP2_CLIENT_MAP.values().parallelStream().forEach(client -> {
try {
client.dispatcher().executorService().shutdown();
client.connectionPool().evictAll();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
});
}));
}

public static final MediaType MEDIA_TYPE_JSON = MediaType.parse("application/json");
public static final MediaType MEDIA_TYPE_FORM_URLENCODED = MediaType.parse("application/x-www-form-urlencoded");

// post request
public static CloseableHttpResponse doPost(
String url, Map<String, String> params, Map<String, String> header, int timeout) throws IOException {
Expand Down Expand Up @@ -195,4 +224,114 @@ public static CloseableHttpResponse doPostJson(
CloseableHttpClient client = HttpClients.createDefault();
return client.execute(post);
}

public static void doPostWithHttp2(
String url, Map<String, String> params, Map<String, String> headers, HttpCallback<Response> callback) {
doPostWithHttp2(url, params, headers, callback, 10);
}

public static void doPostWithHttp2(
String url,
Map<String, String> params,
Map<String, String> headers,
HttpCallback<Response> callback,
int timeoutSeconds) {
try {
String contentType = headers != null ? headers.get("Content-Type") : "";
RequestBody requestBody = createRequestBody(params, contentType);
Request request = buildHttp2Request(url, headers, requestBody, "POST");
OkHttpClient client = createHttp2ClientWithTimeout(timeoutSeconds);
executeAsync(client, request, callback);
} catch (JsonProcessingException e) {
LOGGER.error(e.getMessage(), e);
callback.onFailure(e);
}
}

public static void doPostWithHttp2(
String url, String body, Map<String, String> headers, HttpCallback<Response> callback) {
// default timeout 10 seconds
doPostWithHttp2(url, body, headers, callback, 10);
}

public static void doPostWithHttp2(
String url, String body, Map<String, String> headers, HttpCallback<Response> callback, int timeoutSeconds) {
RequestBody requestBody = RequestBody.create(body, MEDIA_TYPE_JSON);
Request request = buildHttp2Request(url, headers, requestBody, "POST");
OkHttpClient client = createHttp2ClientWithTimeout(timeoutSeconds);
executeAsync(client, request, callback);
}

public static void doGetWithHttp2(
String url, Map<String, String> headers, final HttpCallback<Response> callback, int timeoutSeconds) {
Request request = buildHttp2Request(url, headers, null, "GET");
OkHttpClient client = createHttp2ClientWithTimeout(timeoutSeconds);
executeAsync(client, request, callback);
}

private static RequestBody createRequestBody(Map<String, String> params, String contentType)
throws JsonProcessingException {
if (params == null || params.isEmpty()) {
return RequestBody.create(new byte[0]);
}

if (MEDIA_TYPE_FORM_URLENCODED.toString().equals(contentType)) {
FormBody.Builder formBuilder = new FormBody.Builder();
params.forEach(formBuilder::add);
return formBuilder.build();
} else {
String json = OBJECT_MAPPER.writeValueAsString(params);
return RequestBody.create(json, MEDIA_TYPE_JSON);
}
}

private static OkHttpClient createHttp2ClientWithTimeout(int timeoutSeconds) {
return HTTP2_CLIENT_MAP.computeIfAbsent(timeoutSeconds, k -> new OkHttpClient.Builder()
// Use HTTP/2 prior knowledge to directly use HTTP/2 without an initial HTTP/1.1 upgrade
.protocols(Collections.singletonList(Protocol.H2_PRIOR_KNOWLEDGE))
.connectTimeout(timeoutSeconds, TimeUnit.SECONDS)
.readTimeout(timeoutSeconds, TimeUnit.SECONDS)
.writeTimeout(timeoutSeconds, TimeUnit.SECONDS)
.build());
}

private static Request buildHttp2Request(
String url, Map<String, String> headers, RequestBody requestBody, String method) {
Headers.Builder headerBuilder = new Headers.Builder();
if (headers != null) {
headers.forEach(headerBuilder::add);
}

Request.Builder requestBuilder = new Request.Builder().url(url).headers(headerBuilder.build());

if ("POST".equals(method) && requestBody != null) {
requestBuilder.post(requestBody);
} else if ("GET".equals(method)) {
requestBuilder.get();
}

return requestBuilder.build();
}

private static void executeAsync(OkHttpClient client, Request request, final HttpCallback<Response> callback) {
client.newCall(request).enqueue(new Callback() {
@Override
public void onResponse(Call call, Response response) {
try {
callback.onSuccess(response);
} finally {
response.close();
}
}

@Override
public void onFailure(Call call, IOException e) {
if (call.isCanceled()) {
callback.onCancelled();
} else {
callback.onFailure(e);
}
}
});
}
}
Loading
Loading