Skip to content

Commit 97a8022

Browse files
authored
New client pool implementation (#11736)
#8100 overhauled the connection pooling, but its primary purpose was to improve the separation of concerns, and to extend connection pooling to non-exchange methods. From an efficiency standpoint, it is not very good. In particular, it uses a central "doSomeWork" method to dispatch requests onto connections, which is shared between all threads and only dispatches in a serial fashion. The connections were also shared with no regard to which event loop they were created on. #11300 bolted on "client affinity" functionality: Requests arriving on an event loop would preferentially be dispatched to connections running on the same event loop, which can reduce context switches and improve latency. However, the actual dispatch still happens in the central `doSomeWork` method serially, now further weighed down by the client affinity algorithm. Benchmarks connected to #11704 show that this model breaks down in scenarios with many connections serving many requests on multiple event loops. `doSomeWork` becomes a bottleneck, since only one event loop can execute it at a time, and the connection selection algorithm for client affinity makes this even heavier. This PR introduces a new connection pool built with scalability in mind from the start. Instead of a global pool of connections, the pool is split into "local pools", one for each event loop. Requests that are created on one event loop can be dispatched purely within that loop, without coordination with other loops. Contention between loops is kept to a minimum. If load is uneven between event loops, requests can still spill into a global request queue as a fallback, and be picked up by less busy loops. This ensures progress, but should not be a hot path. The logic for moving between event loops or into the global queue is unfortunately quite complex.
1 parent 70c551a commit 97a8022

File tree

15 files changed

+1868
-332
lines changed

15 files changed

+1868
-332
lines changed

http-client-core/src/main/java/io/micronaut/http/client/HttpClientConfiguration.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -915,7 +915,7 @@ public static class ConnectionPoolConfiguration implements Toggleable {
915915
private int maxPendingConnections = 4;
916916

917917
private int maxConcurrentRequestsPerHttp2Connection = Integer.MAX_VALUE;
918-
private int maxConcurrentHttp1Connections = Integer.MAX_VALUE;
918+
private int maxConcurrentHttp1Connections = 1000;
919919
private int maxConcurrentHttp2Connections = 1;
920920

921921
private int maxPendingAcquires = Integer.MAX_VALUE;
@@ -927,6 +927,9 @@ public static class ConnectionPoolConfiguration implements Toggleable {
927927
@NonNull
928928
private HttpClientConfiguration.ConnectionPoolConfiguration.ConnectionLocality connectionLocality = ConnectionLocality.PREFERRED;
929929

930+
@NonNull
931+
private PoolVersion version = PoolVersion.V4_9;
932+
930933
/**
931934
* Whether connection pooling is enabled.
932935
* [available in the Netty HTTP client]
@@ -1088,6 +1091,26 @@ public void setConnectionLocality(@NonNull HttpClientConfiguration.ConnectionPoo
10881091
this.connectionLocality = connectionLocality;
10891092
}
10901093

1094+
/**
1095+
* The version of the connection pool implementation. Defaults to {@code V4_9}, can be set
1096+
* to {@code V4_0} for compatibility.
1097+
*
1098+
* @return The pool version
1099+
*/
1100+
public @NonNull PoolVersion getVersion() {
1101+
return version;
1102+
}
1103+
1104+
/**
1105+
* The version of the connection pool implementation. Defaults to {@code V4_9}, can be set
1106+
* to {@code V4_0} for compatibility.
1107+
*
1108+
* @param version The pool version
1109+
*/
1110+
public void setVersion(@NonNull PoolVersion version) {
1111+
this.version = version;
1112+
}
1113+
10911114
/**
10921115
* Options for {@link #connectionLocality}.
10931116
*
@@ -1120,6 +1143,22 @@ public enum ConnectionLocality {
11201143
*/
11211144
ENFORCED_ALWAYS,
11221145
}
1146+
1147+
/**
1148+
* Different pool implementation versions, for compatibility.
1149+
*
1150+
* @since 4.9.0
1151+
*/
1152+
public enum PoolVersion {
1153+
/**
1154+
* The connection pool introduced in micronaut-core 4.0.0.
1155+
*/
1156+
V4_0,
1157+
/**
1158+
* The connection pool introduced in micronaut-core 4.9.0.
1159+
*/
1160+
V4_9
1161+
}
11231162
}
11241163

11251164
/**

http-client-core/src/main/java/io/micronaut/http/client/loadbalance/FixedLoadBalancer.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.micronaut.http.client.loadbalance;
1717

18+
import io.micronaut.core.annotation.Internal;
1819
import io.micronaut.core.annotation.Nullable;
1920
import io.micronaut.core.async.publisher.Publishers;
2021
import io.micronaut.discovery.ServiceInstance;
@@ -36,6 +37,7 @@
3637
*/
3738
public class FixedLoadBalancer implements LoadBalancer {
3839
private final Publisher<ServiceInstance> publisher;
40+
private final ServiceInstance serviceInstance;
3941
private final URI uri;
4042

4143
/**
@@ -56,7 +58,8 @@ public FixedLoadBalancer(URL url) {
5658
*/
5759
public FixedLoadBalancer(URI uri) {
5860
this.uri = uri;
59-
this.publisher = Publishers.just(ServiceInstance.of(uri.getHost(), uri));
61+
serviceInstance = ServiceInstance.of(uri.getHost(), uri);
62+
this.publisher = Publishers.just(serviceInstance);
6063
}
6164

6265
@Override
@@ -84,6 +87,16 @@ public URI getUri() {
8487
return uri;
8588
}
8689

90+
/**
91+
* The fixed {@link ServiceInstance} returned by {@link #select}. Internal use only.
92+
*
93+
* @return The service instance
94+
*/
95+
@Internal
96+
public ServiceInstance getServiceInstance() {
97+
return serviceInstance;
98+
}
99+
87100
@Override
88101
public Optional<String> getContextPath() {
89102
return Optional.ofNullable(getUri().getPath());

http-client/src/main/java/io/micronaut/http/client/netty/BlockHint.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.micronaut.core.annotation.NonNull;
2020
import io.micronaut.core.annotation.Nullable;
2121
import io.micronaut.http.client.exceptions.HttpClientException;
22-
import io.netty.channel.EventLoop;
22+
import io.netty.util.concurrent.EventExecutor;
2323

2424
/**
2525
* Information about what threads are blocked waiting for a request to complete. This is used to
@@ -64,7 +64,7 @@ static HttpClientException createException() {
6464
"https://docs.micronaut.io/latest/guide/index.html#clientConfiguration");
6565
}
6666

67-
boolean blocks(EventLoop eventLoop) {
67+
boolean blocks(EventExecutor eventLoop) {
6868
BlockHint bh = this;
6969
while (bh != null) {
7070
if (eventLoop.inEventLoop(bh.blockedThread)) {

0 commit comments

Comments
 (0)