Skip to content

Limit concurrent HTTP requests #922

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions app/buck2_common/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub struct HttpConfig {
write_timeout_ms: Option<u64>,
pub http2: bool,
pub max_redirects: Option<usize>,
pub max_concurrent_requests: Option<usize>,
}

impl HttpConfig {
Expand All @@ -84,13 +85,18 @@ impl HttpConfig {
property: "http2",
})?
.unwrap_or(true);
let max_concurrent_requests = config.parse(BuckconfigKeyRef {
section: "http",
property: "max_concurrent_requests",
})?;

Ok(Self {
connect_timeout_ms,
read_timeout_ms,
write_timeout_ms,
max_redirects,
http2,
max_concurrent_requests,
})
}

Expand Down
52 changes: 47 additions & 5 deletions app/buck2_http/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use hyper::Response;
use hyper::client::ResponseFuture;
use hyper::client::connect::Connect;
use tokio::io::AsyncReadExt;
use tokio::sync::Semaphore;
use tokio_util::io::StreamReader;

use crate::HttpError;
Expand All @@ -49,6 +50,9 @@ pub struct HttpClient {
supports_vpnless: bool,
http2: bool,
stats: HttpNetworkStats,
// tokio::sync::Semaphore doesn't impl Allocative
#[allocative(skip)]
concurrent_requests_budget: Arc<Semaphore>,
}

impl HttpClient {
Expand Down Expand Up @@ -124,6 +128,7 @@ impl HttpClient {
);
change_scheme_to_http(&mut request)?;
}
let semaphore_guard = self.concurrent_requests_budget.acquire().await.unwrap();
let resp = self.inner.request(request).await.map_err(|e| {
if is_hyper_error_due_to_timeout(&e) {
HttpError::Timeout {
Expand All @@ -134,11 +139,14 @@ impl HttpClient {
HttpError::SendRequest { uri, source: e }
}
})?;
Ok(
resp.map(|body| {
CountingStream::new(body, self.stats.downloaded_bytes().dupe()).boxed()
}),
)
Ok(resp.map(move |body| {
CountingStream::new(body, self.stats.downloaded_bytes().dupe())
.inspect(move |_| {
// Ensure we keep a concurrent request permit alive until the stream is consumed
let _guard = &semaphore_guard;
})
Comment on lines +144 to +147
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little cheeky and might be clearer as a stream transformer struct, but that would probably take about 4x as much code. Let me know if that'd be preferred.

.boxed()
}))
}

/// Send a generic request.
Expand Down Expand Up @@ -768,6 +776,40 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_concurrency_limit() -> buck2_error::Result<()> {
let test_server = httptest::Server::run();
test_server.expect(
Expectation::matching(request::method_path("GET", "/foo"))
.times(3)
.respond_with(responders::status_code(200)),
);

let client = HttpClientBuilder::https_with_system_roots()
.await?
.with_max_concurrent_requests(2)
.build();
let url = test_server.url_str("/foo");
let req1 = client.get(&url).await?;
let req2 = client.get(&url).await?;
assert_eq!(client.concurrent_requests_budget.available_permits(), 0);
let mut req3 = std::pin::pin!(client.get(&url));
// TODO: Use `tokio::time::pause` to make this faster and deterministic. Blocked by
// https://github.com/ggriffiniii/httptest/issues/29.
assert!(
tokio::time::timeout(tokio::time::Duration::from_millis(100), &mut req3)
.await
.is_err()
);
drop(req1);
req3.await?;
assert_eq!(client.concurrent_requests_budget.available_permits(), 1);
drop(req2);
assert_eq!(client.concurrent_requests_budget.available_permits(), 2);

Ok(())
}
}

// TODO(skarlage, T160529958): Debug why these tests fail on CircleCI
Expand Down
12 changes: 12 additions & 0 deletions app/buck2_http/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use hyper_timeout::TimeoutConnector;
use rustls::ClientConfig;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::sync::Semaphore;
use tokio_rustls::TlsConnector;

use super::HttpClient;
Expand Down Expand Up @@ -66,6 +67,7 @@ pub struct HttpClientBuilder {
supports_vpnless: bool,
http2: bool,
timeout_config: Option<TimeoutConfig>,
max_concurrent_requests: usize,
}

impl HttpClientBuilder {
Expand Down Expand Up @@ -104,6 +106,10 @@ impl HttpClientBuilder {
supports_vpnless: false,
http2: true,
timeout_config: None,
// Semi-arbitrary sensible default: big enough to ensure good utilization of a typical
// internet link, small enough that we won't run out of FDs or make a server think we're
// DoSing them.
max_concurrent_requests: 32,
})
}

Expand Down Expand Up @@ -214,6 +220,11 @@ impl HttpClientBuilder {
self.supports_vpnless
}

pub fn with_max_concurrent_requests(&mut self, max_concurrent_requests: usize) -> &mut Self {
self.max_concurrent_requests = max_concurrent_requests;
self
}

fn build_inner(&self) -> Arc<dyn RequestClient> {
match (self.proxies.as_slice(), &self.timeout_config) {
// Construct x2p unix socket client.
Expand Down Expand Up @@ -294,6 +305,7 @@ impl HttpClientBuilder {
supports_vpnless: self.supports_vpnless,
http2: self.http2,
stats: HttpNetworkStats::new(),
concurrent_requests_budget: Arc::new(Semaphore::new(self.max_concurrent_requests)),
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions app/buck2_server/src/daemon/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,9 @@ async fn http_client_from_startup_config(
}
_ => {}
}
if let Some(n) = config.http.max_concurrent_requests {
builder.with_max_concurrent_requests(n);
}

Ok(builder)
}
Expand Down
Loading