Skip to content

Commit fc6975c

Browse files
committed
Limit concurrent HTTP requests to 32 by default
1 parent 3e7f4fe commit fc6975c

File tree

2 files changed

+65
-10
lines changed

2 files changed

+65
-10
lines changed

app/buck2_http/src/client.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use hyper::Request;
2626
use hyper::Response;
2727
use hyper::client::connect::Connect;
2828
use tokio::io::AsyncReadExt;
29+
use tokio::sync::Semaphore;
2930
use tokio_util::io::StreamReader;
3031

3132
use crate::HttpError;
@@ -209,12 +210,31 @@ pub(super) trait RequestClient: Send + Sync {
209210

210211
type ResponseFuture<'a> = BoxFuture<'a, hyper::Result<Response<Body>>>;
211212

212-
impl<C> RequestClient for hyper::Client<C>
213+
// We implement throttling inside the `RequestClient` impl so that it's automatically shared between
214+
// `HttpClient` clones
215+
struct ThrottledClient<C> {
216+
semaphore: Semaphore,
217+
inner: hyper::Client<C>,
218+
}
219+
220+
impl<C> ThrottledClient<C> {
221+
fn new(inner: hyper::Client<C>, max_concurrent_requests: usize) -> Self {
222+
Self {
223+
semaphore: Semaphore::new(max_concurrent_requests),
224+
inner,
225+
}
226+
}
227+
}
228+
229+
impl<C> RequestClient for ThrottledClient<C>
213230
where
214231
C: Connect + Clone + Send + Sync + 'static,
215232
{
216233
fn request(&self, request: Request<Bytes>) -> ResponseFuture<'_> {
217-
Box::pin(self.request(request.map(Body::from)))
234+
Box::pin(async {
235+
let _permit = self.semaphore.acquire().await.unwrap();
236+
self.inner.request(request.map(Body::from)).await
237+
})
218238
}
219239
}
220240

app/buck2_http/src/client/builder.rs

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use tokio_rustls::TlsConnector;
3232

3333
use super::HttpClient;
3434
use super::RequestClient;
35+
use super::ThrottledClient;
3536
use crate::proxy;
3637
use crate::stats::HttpNetworkStats;
3738
use crate::x2p;
@@ -66,6 +67,7 @@ pub struct HttpClientBuilder {
6667
supports_vpnless: bool,
6768
http2: bool,
6869
timeout_config: Option<TimeoutConfig>,
70+
max_concurrent_requests: usize,
6971
}
7072

7173
impl HttpClientBuilder {
@@ -104,6 +106,10 @@ impl HttpClientBuilder {
104106
supports_vpnless: false,
105107
http2: true,
106108
timeout_config: None,
109+
// Semi-arbitrary sensible default: big enough to ensure good utilization of a typical
110+
// internet link, small enough that we won't run out of FDs or make a server think we're
111+
// DoSing them.
112+
max_concurrent_requests: 32,
107113
})
108114
}
109115

@@ -214,6 +220,11 @@ impl HttpClientBuilder {
214220
self.supports_vpnless
215221
}
216222

223+
pub fn with_max_concurrent_requests(&mut self, max_concurrent_requests: usize) -> &mut Self {
224+
self.max_concurrent_requests = max_concurrent_requests;
225+
self
226+
}
227+
217228
fn build_inner(&self) -> Arc<dyn RequestClient> {
218229
match (self.proxies.as_slice(), &self.timeout_config) {
219230
// Construct x2p unix socket client.
@@ -226,7 +237,10 @@ impl HttpClientBuilder {
226237
timeout_config.to_connector(hyper_unix_connector::UnixClient);
227238
let proxy_connector =
228239
build_proxy_connector(&[unix_socket.clone()], timeout_connector, None);
229-
Arc::new(hyper::Client::builder().build::<_, Body>(proxy_connector))
240+
Arc::new(ThrottledClient::new(
241+
hyper::Client::builder().build::<_, Body>(proxy_connector),
242+
self.max_concurrent_requests,
243+
))
230244
}
231245
#[cfg(unix)]
232246
(proxies @ [_, ..], None) if let Some(unix_socket) = find_unix_proxy(proxies) => {
@@ -235,7 +249,10 @@ impl HttpClientBuilder {
235249
hyper_unix_connector::UnixClient,
236250
None,
237251
);
238-
Arc::new(hyper::Client::builder().build::<_, Body>(proxy_connector))
252+
Arc::new(ThrottledClient::new(
253+
hyper::Client::builder().build::<_, Body>(proxy_connector),
254+
self.max_concurrent_requests,
255+
))
239256
}
240257

241258
// Construct x2p http proxy client.
@@ -245,14 +262,20 @@ impl HttpClientBuilder {
245262
http_connector.enforce_http(true);
246263
let timeout_connector = timeout_config.to_connector(http_connector);
247264
let proxy_connector = build_proxy_connector(proxies, timeout_connector, None);
248-
Arc::new(hyper::Client::builder().build::<_, Body>(proxy_connector))
265+
Arc::new(ThrottledClient::new(
266+
hyper::Client::builder().build::<_, Body>(proxy_connector),
267+
self.max_concurrent_requests,
268+
))
249269
}
250270
(proxies @ [_, ..], None) if self.supports_vpnless => {
251271
let mut http_connector = HttpConnector::new();
252272
// When talking to local x2pagent proxy, only http is supported.
253273
http_connector.enforce_http(true);
254274
let proxy_connector = build_proxy_connector(proxies, http_connector, None);
255-
Arc::new(hyper::Client::builder().build::<_, Body>(proxy_connector))
275+
Arc::new(ThrottledClient::new(
276+
hyper::Client::builder().build::<_, Body>(proxy_connector),
277+
self.max_concurrent_requests,
278+
))
256279
}
257280

258281
// Proxied http client with TLS.
@@ -265,24 +288,36 @@ impl HttpClientBuilder {
265288
timeout_connector,
266289
Some(self.tls_config.clone()),
267290
);
268-
Arc::new(hyper::Client::builder().build::<_, Body>(proxy_connector))
291+
Arc::new(ThrottledClient::new(
292+
hyper::Client::builder().build::<_, Body>(proxy_connector),
293+
self.max_concurrent_requests,
294+
))
269295
}
270296
(proxies @ [_, ..], None) => {
271297
let https_connector = build_https_connector(self.tls_config.clone(), self.http2);
272298
let proxy_connector =
273299
build_proxy_connector(proxies, https_connector, Some(self.tls_config.clone()));
274-
Arc::new(hyper::Client::builder().build::<_, Body>(proxy_connector))
300+
Arc::new(ThrottledClient::new(
301+
hyper::Client::builder().build::<_, Body>(proxy_connector),
302+
self.max_concurrent_requests,
303+
))
275304
}
276305

277306
// Client with TLS only.
278307
([], Some(timeout_config)) => {
279308
let https_connector = build_https_connector(self.tls_config.clone(), self.http2);
280309
let timeout_connector = timeout_config.to_connector(https_connector);
281-
Arc::new(hyper::Client::builder().build::<_, Body>(timeout_connector))
310+
Arc::new(ThrottledClient::new(
311+
hyper::Client::builder().build::<_, Body>(timeout_connector),
312+
self.max_concurrent_requests,
313+
))
282314
}
283315
([], None) => {
284316
let https_connector = build_https_connector(self.tls_config.clone(), self.http2);
285-
Arc::new(hyper::Client::builder().build::<_, Body>(https_connector))
317+
Arc::new(ThrottledClient::new(
318+
hyper::Client::builder().build::<_, Body>(https_connector),
319+
self.max_concurrent_requests,
320+
))
286321
}
287322
}
288323
}

0 commit comments

Comments
 (0)