Skip to content

Commit d96ae40

Browse files
authored
Merge pull request #14 from dfinity/igornovg/improvements
add inflight req metric, cleanups, update some deps
2 parents ab5130f + 55627c5 commit d96ae40

File tree

5 files changed

+60
-219
lines changed

5 files changed

+60
-219
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ jobs:
1616
steps:
1717
- uses: actions/checkout@v4
1818

19-
- uses: Swatinem/rust-cache@23bce251a8cd2ffc3c1075eaa2367cf899916d84
19+
- uses: Swatinem/rust-cache@9d47c6ad4b02e050fd481d890b2ea34778fd09d6
2020

2121
- name: Run tests
2222
run: cargo test

Cargo.toml

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,25 +25,26 @@ derive-new = "0.7.0"
2525
fqdn = "0.4.1"
2626
futures = "0.3.31"
2727
futures-util = "0.3.31"
28-
hickory-proto = "0.24.1"
29-
hickory-resolver = { version = "0.24.1", features = [
30-
"dns-over-https-rustls",
31-
"webpki-roots",
28+
hickory-proto = "0.25.1"
29+
hickory-resolver = { version = "0.25.1", features = [
30+
"tls-ring",
31+
"https-ring",
3232
"dnssec-ring",
33+
"webpki-roots",
3334
] }
3435
http = "1.3.1"
3536
http-body = "1.0.1"
3637
http-body-util = "0.1.2"
3738
humantime = "2.2.0"
38-
hyper = "1.5.0"
39+
hyper = "1.6.0"
3940
hyper-util = { version = "0.1.10", features = ["full"] }
4041
instant-acme = { version = "0.7.2", default-features = false, features = [
4142
"ring",
4243
"hyper-rustls",
4344
] }
4445
moka = { version = "0.12.8", features = ["sync", "future"] }
4546
parse-size = { version = "1.1.0", features = ["std"] }
46-
prometheus = "0.13.4"
47+
prometheus = "0.14.0"
4748
prost = "0.13.3"
4849
prost-types = "0.13.3"
4950
rand = "0.8.5"
@@ -86,8 +87,8 @@ tower = { version = "0.5.1", features = ["util"] }
8687
tower-service = "0.3.3"
8788
tracing = "0.1.40"
8889
url = "2.5.3"
89-
uuid = { version = "1.15.0", features = ["v7"] }
90-
vrl = { version = "0.22.0", default-features = false, features = ["value"] }
90+
uuid = { version = "1.16.0", features = ["v7"] }
91+
vrl = { version = "0.23.0", default-features = false, features = ["value"] }
9192
webpki-root-certs = "0.26.6"
9293
x509-parser = "0.17.0"
9394
zeroize = { version = "1.8.1", features = ["derive"] }

src/http/client/mod.rs

Lines changed: 4 additions & 197 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,16 @@ pub mod cli;
33
use std::{
44
fmt,
55
sync::{
6+
Arc,
67
atomic::{AtomicUsize, Ordering},
7-
Arc, RwLock, RwLockWriteGuard,
88
},
9-
time::{Duration, Instant},
9+
time::Duration,
1010
};
1111

1212
use anyhow::Context;
1313
use async_trait::async_trait;
1414
use http::header::HeaderValue;
15-
use rand::{rngs::OsRng, seq::IteratorRandom};
16-
use reqwest::{dns::Resolve, Request, Response};
15+
use reqwest::{Request, Response, dns::Resolve};
1716
use scopeguard::defer;
1817

1918
use super::Error;
@@ -160,6 +159,7 @@ impl ReqwestClientLeastLoaded {
160159
#[async_trait]
161160
impl Client for ReqwestClientLeastLoaded {
162161
async fn execute(&self, req: Request) -> Result<Response, reqwest::Error> {
162+
// Select the client with least outstanding requests
163163
let cli = self
164164
.inner
165165
.iter()
@@ -176,14 +176,6 @@ impl Client for ReqwestClientLeastLoaded {
176176
}
177177
}
178178

179-
pub trait GeneratesClients: Send + Sync + fmt::Debug + 'static {
180-
fn generate(&self) -> Result<Arc<dyn Client>, Error>;
181-
}
182-
183-
pub trait GeneratesClientsWithStats: Send + Sync + fmt::Debug + 'static {
184-
fn generate(&self) -> Result<Arc<dyn ClientWithStats>, Error>;
185-
}
186-
187179
#[derive(Debug, Clone)]
188180
pub struct ClientStats {
189181
pub pool_size: usize,
@@ -194,138 +186,6 @@ pub trait Stats {
194186
fn stats(&self) -> ClientStats;
195187
}
196188

197-
#[derive(Debug)]
198-
pub struct ReqwestClientDynamic<G: GeneratesClients> {
199-
generator: G,
200-
min_clients: usize,
201-
max_clients: usize,
202-
max_outstanding: usize,
203-
idle_timeout: Duration,
204-
pool: RwLock<Vec<Arc<ReqwestClientDynamicInner>>>,
205-
}
206-
207-
impl<G: GeneratesClients> ClientWithStats for ReqwestClientDynamic<G> {
208-
fn to_client(self: Arc<Self>) -> Arc<dyn Client> {
209-
self
210-
}
211-
}
212-
213-
#[derive(Debug)]
214-
struct ReqwestClientDynamicInner {
215-
cli: Arc<dyn Client>,
216-
outstanding: AtomicUsize,
217-
last_request: RwLock<Instant>,
218-
}
219-
220-
impl ReqwestClientDynamicInner {
221-
fn new(cli: Arc<dyn Client>) -> Self {
222-
Self {
223-
cli,
224-
outstanding: AtomicUsize::new(0),
225-
last_request: RwLock::new(Instant::now()),
226-
}
227-
}
228-
}
229-
230-
impl<G: GeneratesClients> ReqwestClientDynamic<G> {
231-
pub fn new(
232-
generator: G,
233-
min_clients: usize,
234-
max_clients: usize,
235-
max_outstanding: usize,
236-
idle_timeout: Duration,
237-
) -> Result<Self, Error> {
238-
let mut pool = Vec::with_capacity(max_clients);
239-
240-
for _ in 0..min_clients {
241-
let inner = Arc::new(ReqwestClientDynamicInner::new(generator.generate()?));
242-
pool.push(inner);
243-
}
244-
245-
Ok(Self {
246-
generator,
247-
min_clients,
248-
max_clients,
249-
max_outstanding,
250-
idle_timeout,
251-
pool: RwLock::new(pool),
252-
})
253-
}
254-
255-
/// Drop unused clients while leaving min_clients always available.
256-
/// Algo mimics Vec::retain().
257-
/// TODO find a better way?
258-
fn cleanup(&self, pool: &mut RwLockWriteGuard<'_, Vec<Arc<ReqwestClientDynamicInner>>>) {
259-
let mut j = self.min_clients;
260-
for i in self.min_clients..pool.len() {
261-
if !(pool[i].outstanding.load(Ordering::SeqCst) == 0
262-
&& pool[i].last_request.read().unwrap().elapsed() > self.idle_timeout)
263-
{
264-
pool.swap(i, j);
265-
j += 1
266-
}
267-
}
268-
pool.truncate(j);
269-
}
270-
271-
fn get_client(&self) -> Arc<ReqwestClientDynamicInner> {
272-
let mut pool = self.pool.write().unwrap();
273-
self.cleanup(&mut pool);
274-
275-
pool.iter()
276-
// First try to find an existing client with spare capacity
277-
.find_map(|x| {
278-
(x.outstanding.load(Ordering::SeqCst) < self.max_outstanding).then(|| x.clone())
279-
})
280-
// Otherwise see if we have spare space in the pool
281-
.unwrap_or_else(|| {
282-
// If not - just pick a random client
283-
if pool.len() >= self.max_clients {
284-
pool.iter().choose(&mut OsRng).unwrap().clone()
285-
} else {
286-
// Otherwise generate a new client and use it
287-
// The error is checked only in new() for now.
288-
let cli = self.generator.generate().unwrap();
289-
let inner = Arc::new(ReqwestClientDynamicInner::new(cli));
290-
pool.push(inner.clone());
291-
inner
292-
}
293-
})
294-
}
295-
}
296-
297-
impl<G: GeneratesClients> Stats for ReqwestClientDynamic<G> {
298-
fn stats(&self) -> ClientStats {
299-
let pool = self.pool.read().unwrap();
300-
301-
let outstanding: usize = pool
302-
.iter()
303-
.map(|x| x.outstanding.load(Ordering::SeqCst))
304-
.sum();
305-
306-
ClientStats {
307-
pool_size: pool.len(),
308-
outstanding,
309-
}
310-
}
311-
}
312-
313-
#[async_trait]
314-
impl<G: GeneratesClients> Client for ReqwestClientDynamic<G> {
315-
async fn execute(&self, req: Request) -> Result<Response, reqwest::Error> {
316-
let inner = self.get_client();
317-
318-
// The future can be cancelled so we have to use defer to make sure the counter is decreased
319-
defer! {
320-
inner.outstanding.fetch_sub(1, Ordering::SeqCst);
321-
}
322-
323-
*inner.last_request.write().unwrap() = Instant::now();
324-
inner.outstanding.fetch_add(1, Ordering::SeqCst);
325-
inner.cli.execute(req).await
326-
}
327-
}
328-
329189
pub fn basic_auth<U, P>(username: U, password: Option<P>) -> HeaderValue
330190
where
331191
U: fmt::Display,
@@ -348,56 +208,3 @@ where
348208
header.set_sensitive(true);
349209
header
350210
}
351-
352-
#[cfg(test)]
353-
mod test {
354-
use futures::future::join_all;
355-
356-
use super::*;
357-
358-
#[derive(Debug)]
359-
struct TestClient;
360-
361-
#[async_trait]
362-
impl Client for TestClient {
363-
async fn execute(&self, _req: Request) -> Result<Response, reqwest::Error> {
364-
let resp = http::Response::new(vec![]);
365-
tokio::time::sleep(Duration::from_millis(100)).await;
366-
Ok(resp.into())
367-
}
368-
}
369-
370-
#[derive(Debug)]
371-
struct TestClientGenerator;
372-
impl GeneratesClients for TestClientGenerator {
373-
fn generate(&self) -> Result<Arc<dyn Client>, Error> {
374-
Ok(Arc::new(TestClient))
375-
}
376-
}
377-
378-
#[tokio::test]
379-
async fn test_dynamic_client() {
380-
let cli = Arc::new(
381-
ReqwestClientDynamic::new(TestClientGenerator, 1, 10, 10, Duration::ZERO).unwrap(),
382-
);
383-
384-
let mut futs = vec![];
385-
for _ in 0..200 {
386-
let req = Request::new(reqwest::Method::GET, url::Url::parse("http://foo").unwrap());
387-
388-
let cli = cli.clone();
389-
futs.push(async move { cli.execute(req).await });
390-
}
391-
392-
join_all(futs).await;
393-
let mut pool = cli.pool.write().unwrap();
394-
assert_eq!(pool.len(), 10);
395-
396-
for x in pool.iter() {
397-
assert_eq!(x.outstanding.load(Ordering::SeqCst), 0);
398-
}
399-
400-
cli.cleanup(&mut pool);
401-
assert_eq!(pool.len(), 1);
402-
}
403-
}

src/http/dns.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@ use anyhow::Context;
88
use async_trait::async_trait;
99
use hickory_proto::rr::RecordType;
1010
use hickory_resolver::{
11-
config::{NameServerConfigGroup, ResolverConfig, ResolverOpts},
11+
TokioResolver,
12+
config::{NameServerConfigGroup, ResolveHosts, ResolverConfig, ResolverOpts},
1213
lookup_ip::LookupIpIntoIter,
13-
TokioAsyncResolver,
14+
name_server::TokioConnectionProvider,
1415
};
1516
use reqwest::dns::{Addrs, Name, Resolve, Resolving};
1617
use strum_macros::EnumString;
1718

18-
use super::{client::CloneableDnsResolver, Error};
19+
use super::{Error, client::CloneableDnsResolver};
1920

2021
#[derive(Clone, Copy, Debug, EnumString)]
2122
#[strum(serialize_all = "snake_case")]
@@ -39,7 +40,7 @@ pub struct Options {
3940
}
4041

4142
#[derive(Debug, Clone)]
42-
pub struct Resolver(Arc<TokioAsyncResolver>);
43+
pub struct Resolver(Arc<TokioResolver>);
4344
impl CloneableDnsResolver for Resolver {}
4445

4546
// new() must be called in Tokio context
@@ -56,14 +57,16 @@ impl Resolver {
5657
let cfg = ResolverConfig::from_parts(None, vec![], name_servers);
5758

5859
let mut opts = ResolverOpts::default();
59-
opts.rotate = true;
6060
opts.cache_size = o.cache_size;
61-
opts.use_hosts_file = false;
61+
opts.use_hosts_file = ResolveHosts::Never;
6262
opts.preserve_intermediates = false;
6363
opts.try_tcp_on_error = true;
6464

65-
let resolver = TokioAsyncResolver::tokio(cfg, opts);
66-
Self(Arc::new(resolver))
65+
let mut builder =
66+
TokioResolver::builder_with_config(cfg, TokioConnectionProvider::default());
67+
*builder.options_mut() = opts;
68+
69+
Self(Arc::new(builder.build()))
6770
}
6871
}
6972

0 commit comments

Comments
 (0)