Skip to content

Commit 490dc4e

Browse files
committed
Limit concurrent HTTP requests to 32 by default
1 parent 59a6cc1 commit 490dc4e

File tree

2 files changed

+25
-5
lines changed

2 files changed

+25
-5
lines changed

app/buck2_http/src/client.rs

+13-5
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use hyper::Response;
2626
use hyper::client::ResponseFuture;
2727
use hyper::client::connect::Connect;
2828
use tokio::io::AsyncReadExt;
29+
use tokio::sync::Semaphore;
2930
use tokio_util::io::StreamReader;
3031

3132
use crate::HttpError;
@@ -49,6 +50,9 @@ pub struct HttpClient {
4950
supports_vpnless: bool,
5051
http2: bool,
5152
stats: HttpNetworkStats,
53+
// tokio::sync::Semaphore doesn't impl Allocative
54+
#[allocative(skip)]
55+
concurrent_requests_budget: Arc<Semaphore>,
5256
}
5357

5458
impl HttpClient {
@@ -124,6 +128,7 @@ impl HttpClient {
124128
);
125129
change_scheme_to_http(&mut request)?;
126130
}
131+
let semaphore_guard = self.concurrent_requests_budget.acquire().await.unwrap();
127132
let resp = self.inner.request(request).await.map_err(|e| {
128133
if is_hyper_error_due_to_timeout(&e) {
129134
HttpError::Timeout {
@@ -134,11 +139,14 @@ impl HttpClient {
134139
HttpError::SendRequest { uri, source: e }
135140
}
136141
})?;
137-
Ok(
138-
resp.map(|body| {
139-
CountingStream::new(body, self.stats.downloaded_bytes().dupe()).boxed()
140-
}),
141-
)
142+
Ok(resp.map(|body| {
143+
CountingStream::new(body, self.stats.downloaded_bytes().dupe())
144+
.inspect(|_| {
145+
// Ensure we keep a concurrent request permit alive until the stream is consumed
146+
_ = semaphore_guard;
147+
})
148+
.boxed()
149+
}))
142150
}
143151

144152
/// Send a generic request.

app/buck2_http/src/client/builder.rs

+12
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use hyper_timeout::TimeoutConnector;
2828
use rustls::ClientConfig;
2929
use tokio::io::AsyncRead;
3030
use tokio::io::AsyncWrite;
31+
use tokio::sync::Semaphore;
3132
use tokio_rustls::TlsConnector;
3233

3334
use super::HttpClient;
@@ -66,6 +67,7 @@ pub struct HttpClientBuilder {
6667
supports_vpnless: bool,
6768
http2: bool,
6869
timeout_config: Option<TimeoutConfig>,
70+
max_concurrent_requests: usize,
6971
}
7072

7173
impl HttpClientBuilder {
@@ -104,6 +106,10 @@ impl HttpClientBuilder {
104106
supports_vpnless: false,
105107
http2: true,
106108
timeout_config: None,
109+
// Semi-arbitrary sensible default: big enough to ensure good utilization of a typical
110+
// internet link, small enough that we won't run out of FDs or make a server think we're
111+
// DoSing them.
112+
max_concurrent_requests: 32,
107113
})
108114
}
109115

@@ -214,6 +220,11 @@ impl HttpClientBuilder {
214220
self.supports_vpnless
215221
}
216222

223+
pub fn with_max_concurrent_requests(&mut self, max_concurrent_requests: usize) -> &mut Self {
224+
self.max_concurrent_requests = max_concurrent_requests;
225+
self
226+
}
227+
217228
fn build_inner(&self) -> Arc<dyn RequestClient> {
218229
match (self.proxies.as_slice(), &self.timeout_config) {
219230
// Construct x2p unix socket client.
@@ -294,6 +305,7 @@ impl HttpClientBuilder {
294305
supports_vpnless: self.supports_vpnless,
295306
http2: self.http2,
296307
stats: HttpNetworkStats::new(),
308+
concurrent_requests_budget: Arc::new(Semaphore::new(self.max_concurrent_requests)),
297309
}
298310
}
299311
}

0 commit comments

Comments
 (0)