Skip to content

Commit 8fc5e4e

Browse files
committed
Implement request/response buffering properly, add CLI args
1 parent 06ab7af commit 8fc5e4e

File tree

7 files changed

+188
-55
lines changed

7 files changed

+188
-55
lines changed

Cargo.lock

Lines changed: 9 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ hostname = "0.4.0"
1717
http = "1.3.1"
1818
http-body-util = "0.1.3"
1919
humantime = "2.2.0"
20-
ic-bn-lib = { git = "https://github.com/dfinity/ic-bn-lib", rev = "ca9b455682d8f2c855d07eeac0566c7273be8b1d", features = [
20+
ic-bn-lib = { git = "https://github.com/dfinity/ic-bn-lib", rev = "61c8e29bb5a38fed927e620ca319eab8d5aaa09d", features = [
2121
"acme-alpn",
2222
"cert-providers",
2323
"clients-hyper",

src/backend.rs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{fmt::Display, path::PathBuf, sync::Arc, time::Duration};
1+
use std::{cell::RefCell, fmt::Display, path::PathBuf, sync::Arc, time::Duration};
22

33
use anyhow::{Context, Error, anyhow, bail};
44
use arc_swap::ArcSwapOption;
@@ -22,6 +22,7 @@ use tokio::{
2222
fs, select,
2323
signal::unix::{SignalKind, signal},
2424
sync::{Mutex, MutexGuard},
25+
task_local,
2526
time::timeout,
2627
};
2728
use tokio_util::sync::CancellationToken;
@@ -30,6 +31,16 @@ use url::Url;
3031

3132
pub type LBBackendRouter = BackendRouter<Arc<Backend>, Request, Response, ic_bn_lib::http::Error>;
3233

34+
task_local! {
35+
pub static REQUEST_CONTEXT: RefCell<RequestContext>;
36+
}
37+
38+
/// Request context information
39+
#[derive(Debug, Clone, Default)]
40+
pub struct RequestContext {
41+
pub backend: Option<Arc<Backend>>,
42+
}
43+
3344
#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq)]
3445
pub struct Config {
3546
strategy: Strategy,
@@ -419,6 +430,11 @@ impl ExecutesRequest<Arc<Backend>> for RequestExecutor {
419430
backend: &Arc<Backend>,
420431
mut req: Self::Request,
421432
) -> Result<Self::Response, Self::Error> {
433+
// Store the selected backend in the request context
434+
let _ = REQUEST_CONTEXT.try_with(|x| {
435+
x.borrow_mut().backend = Some(backend.clone());
436+
});
437+
422438
let uri = match Uri::builder()
423439
.scheme(backend.url.scheme())
424440
.authority(backend.url.authority())
@@ -452,16 +468,6 @@ impl ExecutesRequest<Arc<Backend>> for RequestExecutor {
452468
strip_connection_headers(req.headers_mut());
453469

454470
// Execute it
455-
let result = self.client.execute(req).await.map(|mut x| {
456-
// Insert the backend into the response for observability
457-
x.extensions_mut().insert(backend.clone());
458-
x
459-
});
460-
461-
if let Err(e) = &result {
462-
warn!("Unable to execute request: {e:#}");
463-
}
464-
465-
result
471+
self.client.execute(req).await
466472
}
467473
}

src/cli.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,15 @@ pub struct Network {
9090
/// *** Dangerous *** - use only for testing.
9191
#[clap(env, long)]
9292
pub network_http_client_insecure_bypass_tls_verification: bool,
93+
94+
/// Whether to buffer request body from the client before sending it to the backend.
95+
/// If `retry_attempts` is >1 then this is implicitly enabled.
96+
#[clap(env, long)]
97+
pub network_request_body_buffer: bool,
98+
99+
/// Whether to buffer response body from the backend before sending it to the client.
100+
#[clap(env, long)]
101+
pub network_response_body_buffer: bool,
93102
}
94103

95104
#[derive(Args)]
@@ -177,8 +186,8 @@ pub struct Config {
177186
pub struct Retry {
178187
/// Number of request attempts to do.
179188
/// Only network errors are retried, not HTTP codes.
180-
/// If the number of attempts is 1 then we don't retry and don't buffer the request body.
181-
#[clap(env, long, default_value = "3", value_parser = clap::value_parser!(u8).range(1..))]
189+
/// If the number of attempts is 1 then we don't retry.
190+
#[clap(env, long, default_value = "1", value_parser = clap::value_parser!(u8).range(1..))]
182191
pub retry_attempts: u8,
183192

184193
/// Initial retry interval, with each retry it is doubled.
@@ -190,13 +199,21 @@ pub struct Retry {
190199

191200
#[derive(Args)]
192201
pub struct Limits {
193-
/// Maximum request body size.
202+
/// Maximum request body size if buffering.
194203
#[clap(env, long, default_value = "10MB", value_parser = parse_size_usize)]
195204
pub limits_request_body_size: usize,
196205

197-
/// Maximum time allowed to send the request body.
206+
/// Maximum time allowed to buffer the request body.
198207
#[clap(env, long, default_value = "30s", value_parser = parse_duration)]
199208
pub limits_request_body_timeout: Duration,
209+
210+
/// Maximum response body size if buffering.
211+
#[clap(env, long, default_value = "30MB", value_parser = parse_size_usize)]
212+
pub limits_response_body_size: usize,
213+
214+
/// Maximum time allowed to buffer the response body.
215+
#[clap(env, long, default_value = "60s", value_parser = parse_duration)]
216+
pub limits_response_body_timeout: Duration,
200217
}
201218

202219
#[derive(Args)]

src/core.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use std::sync::{Arc, OnceLock};
22

33
use anyhow::{Context, Error};
4-
use axum::Router;
4+
use axum::{Router, body::Body};
55
use ic_bn_lib::{
66
http::{
7-
self as bnhttp, HyperClientLeastLoaded, ReqwestClient, ServerBuilder, dns,
8-
redirect_to_https,
7+
self as bnhttp, ClientHttp, HyperClient, HyperClientLeastLoaded, ReqwestClient,
8+
ServerBuilder, dns, redirect_to_https,
99
},
1010
rustls,
1111
tasks::TaskManager,
@@ -69,12 +69,16 @@ pub async fn main(
6969
.context("unable to setup HTTP client")?,
7070
);
7171

72-
let http_client = Arc::new(HyperClientLeastLoaded::new(
73-
http_client_opts,
74-
resolver,
75-
cli.network.network_http_client_count as usize,
76-
Some(&registry),
77-
));
72+
let http_client: Arc<dyn ClientHttp<Body>> = if cli.network.network_http_client_count > 1 {
73+
Arc::new(HyperClientLeastLoaded::new(
74+
http_client_opts,
75+
resolver,
76+
cli.network.network_http_client_count as usize,
77+
Some(&registry),
78+
))
79+
} else {
80+
Arc::new(HyperClient::new(http_client_opts, resolver))
81+
};
7882

7983
// Setup Vector
8084
let vector = cli.log.vector.log_vector_url.as_ref().map(|_| {

src/middleware/metrics.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::{
2+
cell::RefCell,
23
sync::Arc,
34
time::{Duration, Instant},
45
};
@@ -28,7 +29,7 @@ use tokio::sync::oneshot;
2829
use tracing::info;
2930

3031
use crate::{
31-
backend::Backend,
32+
backend::{REQUEST_CONTEXT, RequestContext},
3233
core::{ENV, HOSTNAME},
3334
middleware::request_id::RequestId,
3435
routing::Retries,
@@ -210,14 +211,19 @@ pub async fn middleware(
210211

211212
// Execute the request
212213
let start = Instant::now();
213-
let mut response = next.run(request).await;
214+
let (mut response, ctx) = REQUEST_CONTEXT
215+
.scope(RefCell::new(RequestContext::default()), async {
216+
let r = next.run(request).await;
217+
let ctx = REQUEST_CONTEXT.try_with(|x| x.clone()).unwrap_or_default();
218+
(r, ctx.into_inner())
219+
})
220+
.await;
214221
let duration = start.elapsed().as_secs_f64();
215222

216-
let backend = response
217-
.extensions_mut()
218-
.remove::<Arc<Backend>>()
223+
let backend = ctx
224+
.backend
219225
.map(|x| x.name.clone())
220-
.unwrap_or_default();
226+
.unwrap_or_else(|| "unknown".into());
221227
let response_size = response
222228
.body()
223229
.size_hint()

0 commit comments

Comments
 (0)