Skip to content

Commit 7f67a1b

Browse files
authored
refactor: special handle TooManyRequests (#58)
Signed-off-by: tison <wander4096@gmail.com>
1 parent 652a002 commit 7f67a1b

File tree

6 files changed

+80
-43
lines changed

6 files changed

+80
-43
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ All significant changes to this project will be documented in this file.
44

55
## Unreleased
66

7+
### Breaking Changes
8+
9+
* `ClientBuilder` is now `ClientFactory` for reusing the underlying reqwest client.
10+
711
## v0.1.4
812

913
### Bug Fixes

api/client/src/client.rs

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,24 @@ use reqwest::IntoUrl;
1616
use reqwest::StatusCode;
1717
use reqwest::Url;
1818

19-
#[derive(Debug, thiserror::Error)]
20-
#[error("{0}")]
21-
pub enum Error {
22-
#[from(std::io::Error)]
23-
IO(std::io::Error),
24-
#[from(reqwest::Error)]
25-
Http(reqwest::Error),
26-
27-
Other(String),
28-
}
19+
use crate::Error;
2920

30-
pub struct ClientBuilder {
31-
endpoint: String,
21+
#[derive(Debug, Clone)]
22+
pub struct ClientFactory {
23+
client: reqwest::Client,
3224
}
3325

34-
impl ClientBuilder {
35-
pub fn new(endpoint: String) -> Self {
36-
Self { endpoint }
26+
impl ClientFactory {
27+
pub fn new() -> Result<Self, Error> {
28+
let client = reqwest::ClientBuilder::new()
29+
.no_proxy()
30+
.build()
31+
.map_err(Error::Http)?;
32+
Ok(Self { client })
3733
}
3834

39-
pub fn build(self) -> Result<Client, Error> {
40-
let builder = reqwest::ClientBuilder::new().no_proxy();
41-
Client::new(self.endpoint, builder)
35+
pub fn make_client(&self, endpoint: String) -> Result<Client, Error> {
36+
Client::new(endpoint, self.client.clone())
4237
}
4338
}
4439

@@ -60,8 +55,7 @@ impl Client {
6055
do_delete(self, key).await
6156
}
6257

63-
fn new(base_url: impl IntoUrl, builder: reqwest::ClientBuilder) -> Result<Self, Error> {
64-
let client = builder.build().map_err(Error::Http)?;
58+
fn new(base_url: impl IntoUrl, client: reqwest::Client) -> Result<Self, Error> {
6559
let base_url = base_url.into_url().map_err(Error::Http)?;
6660
Ok(Client { client, base_url })
6761
}
@@ -72,14 +66,16 @@ async fn do_get(client: &Client, key: &str) -> Result<Option<Vec<u8>>, Error> {
7266
.base_url
7367
.join(key)
7468
.map_err(|e| Error::Other(e.to_string()))?;
69+
7570
let resp = client.client.get(url).send().await.map_err(Error::Http)?;
7671

7772
match resp.status() {
78-
StatusCode::NOT_FOUND | StatusCode::TOO_MANY_REQUESTS => Ok(None),
73+
StatusCode::NOT_FOUND => Ok(None),
7974
StatusCode::OK => {
8075
let body = resp.bytes().await.map_err(Error::Http)?;
8176
Ok(Some(body.to_vec()))
8277
}
78+
StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
8379
_ => Err(Error::Other(resp.status().to_string())),
8480
}
8581
}
@@ -89,6 +85,7 @@ async fn do_put(client: &Client, key: &str, value: &[u8]) -> Result<(), Error> {
8985
.base_url
9086
.join(key)
9187
.map_err(|e| Error::Other(e.to_string()))?;
88+
9289
let resp = client
9390
.client
9491
.put(url)
@@ -98,11 +95,9 @@ async fn do_put(client: &Client, key: &str, value: &[u8]) -> Result<(), Error> {
9895
.map_err(Error::Http)?;
9996

10097
match resp.status() {
101-
StatusCode::OK
102-
| StatusCode::CREATED
103-
| StatusCode::NO_CONTENT
104-
| StatusCode::TOO_MANY_REQUESTS => Ok(()),
105-
_ => Err(Error::Other(resp.status().to_string())),
98+
StatusCode::OK | StatusCode::CREATED => Ok(()),
99+
StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
100+
status => Err(Error::Other(status.to_string())),
106101
}
107102
}
108103

@@ -111,6 +106,7 @@ async fn do_delete(client: &Client, key: &str) -> Result<(), Error> {
111106
.base_url
112107
.join(key)
113108
.map_err(|e| Error::Other(e.to_string()))?;
109+
114110
let resp = client
115111
.client
116112
.delete(url)
@@ -119,7 +115,8 @@ async fn do_delete(client: &Client, key: &str) -> Result<(), Error> {
119115
.map_err(Error::Http)?;
120116

121117
match resp.status() {
122-
StatusCode::OK | StatusCode::NO_CONTENT | StatusCode::TOO_MANY_REQUESTS => Ok(()),
123-
_ => Err(Error::Other(resp.status().to_string())),
118+
StatusCode::OK | StatusCode::NO_CONTENT => Ok(()),
119+
StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
120+
status => Err(Error::Other(status.to_string())),
124121
}
125122
}

api/client/src/lib.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,16 @@
1515
mod client;
1616

1717
pub use client::Client;
18-
pub use client::ClientBuilder;
18+
pub use client::ClientFactory;
19+
20+
#[derive(Debug, thiserror::Error)]
21+
#[error("{0}")]
22+
pub enum Error {
23+
#[from(std::io::Error)]
24+
IO(std::io::Error),
25+
#[from(reqwest::Error)]
26+
Http(reqwest::Error),
27+
#[error("Too Many Requests")]
28+
TooManyRequests,
29+
Other(String),
30+
}

crates/server/src/middleware.rs

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use std::sync::Arc;
1616

1717
use mea::semaphore::Semaphore;
18-
use percas_client::ClientBuilder;
18+
use percas_client::ClientFactory;
1919
use percas_cluster::Proxy;
2020
use percas_cluster::RouteDest;
2121
use percas_core::num_cpus;
@@ -31,6 +31,7 @@ use crate::server::delete_success;
3131
use crate::server::get_not_found;
3232
use crate::server::get_success;
3333
use crate::server::put_success;
34+
use crate::server::too_many_requests;
3435

3536
pub struct LoggerMiddleware;
3637

@@ -72,11 +73,12 @@ where
7273

7374
pub struct ClusterProxyMiddleware {
7475
proxy: Option<Proxy>,
76+
factory: ClientFactory,
7577
}
7678

7779
impl ClusterProxyMiddleware {
78-
pub fn new(proxy: Option<Proxy>) -> Self {
79-
Self { proxy }
80+
pub fn new(proxy: Option<Proxy>, factory: ClientFactory) -> Self {
81+
Self { proxy, factory }
8082
}
8183
}
8284

@@ -90,13 +92,15 @@ where
9092
fn transform(&self, endpoint: E) -> Self::Output {
9193
ClusterProxyEndpoint {
9294
proxy: self.proxy.clone(),
95+
factory: self.factory.clone(),
9396
endpoint,
9497
}
9598
}
9699
}
97100

98101
pub struct ClusterProxyEndpoint<E> {
99102
proxy: Option<Proxy>,
103+
factory: ClientFactory,
100104
endpoint: E,
101105
}
102106

@@ -118,9 +122,11 @@ where
118122
.await
119123
.map(IntoResponse::into_response),
120124
RouteDest::RemoteAddr(addr) => {
121-
let client = ClientBuilder::new(format!("http://{addr}"))
122-
.build()
123-
.unwrap();
125+
let client = self
126+
.factory
127+
.make_client(format!("http://{addr}"))
128+
.map_err(|err| poem::Error::new(err, StatusCode::INTERNAL_SERVER_ERROR))?;
129+
124130
match *req.method() {
125131
Method::GET => {
126132
let resp = client.get(&key).await;
@@ -132,6 +138,9 @@ where
132138
Ok(get_not_found())
133139
}
134140
}
141+
Err(percas_client::Error::TooManyRequests) => {
142+
Ok(too_many_requests())
143+
}
135144
Err(err) => {
136145
log::error!("failed to get from remote: {err}");
137146
self.endpoint
@@ -146,8 +155,11 @@ where
146155
let resp = client.put(&key, &body).await;
147156
match resp {
148157
Ok(()) => Ok(put_success()),
158+
Err(percas_client::Error::TooManyRequests) => {
159+
Ok(too_many_requests())
160+
}
149161
Err(err) => {
150-
log::error!("failed to put from remote: {err}");
162+
log::error!("failed to put to remote: {err}");
151163
req.set_body(body);
152164
self.endpoint
153165
.call(req)
@@ -160,8 +172,11 @@ where
160172
let resp = client.delete(&key).await;
161173
match resp {
162174
Ok(()) => Ok(delete_success()),
175+
Err(percas_client::Error::TooManyRequests) => {
176+
Ok(too_many_requests())
177+
}
163178
Err(err) => {
164-
log::error!("failed to delete from remote: {err}");
179+
log::error!("failed to delete at remote: {err}");
165180
self.endpoint
166181
.call(req)
167182
.await
@@ -235,9 +250,7 @@ where
235250

236251
async fn call(&self, req: Request) -> Result<Self::Output, poem::Error> {
237252
let Some(_wait_permit) = self.wait_permit.try_acquire(1) else {
238-
return Ok(Response::builder()
239-
.status(StatusCode::TOO_MANY_REQUESTS)
240-
.body(StatusCode::TOO_MANY_REQUESTS.to_string()));
253+
return Ok(too_many_requests());
241254
};
242255
let _run_permit = self.run_permit.acquire(1).await;
243256

crates/server/src/server.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use fastimer::schedule::SimpleActionExt;
2121
use mea::shutdown::ShutdownRecv;
2222
use mea::shutdown::ShutdownSend;
2323
use mea::waitgroup::WaitGroup;
24+
use percas_client::ClientFactory;
2425
use percas_cluster::GossipFuture;
2526
use percas_cluster::Proxy;
2627
use percas_core::Runtime;
@@ -140,13 +141,15 @@ pub async fn start_server(
140141
let shutdown_clone = shutdown_rx_server.clone();
141142
let wg_clone = wg.clone();
142143

144+
let client_factory = ClientFactory::new().map_err(io::Error::other)?;
145+
let proxy_middleware = ClusterProxyMiddleware::new(cluster_proxy, client_factory);
143146
let route = Route::new()
144147
.at(
145148
"/*key",
146149
poem::get(get)
147150
.put(put)
148151
.delete(delete)
149-
.with(ClusterProxyMiddleware::new(cluster_proxy)),
152+
.with(proxy_middleware),
150153
)
151154
.data(ctx.clone())
152155
.with(RateLimitMiddleware::new())
@@ -190,6 +193,13 @@ pub async fn start_server(
190193
})
191194
}
192195

196+
pub fn too_many_requests() -> Response {
197+
Response::builder()
198+
.status(StatusCode::TOO_MANY_REQUESTS)
199+
.typed_header(ContentType::text())
200+
.body(StatusCode::TOO_MANY_REQUESTS.to_string())
201+
}
202+
193203
pub fn get_success(body: impl Into<Body>) -> Response {
194204
Response::builder()
195205
.status(StatusCode::OK)

tests/behavior/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use std::process::ExitCode;
1616

1717
use percas_client::Client;
18-
use percas_client::ClientBuilder;
18+
use percas_client::ClientFactory;
1919
use percas_core::make_runtime;
2020
use tests_toolkit::make_test_name;
2121

@@ -37,7 +37,8 @@ where
3737

3838
rt.block_on(async move {
3939
let server_addr = format!("http://{}", state.server_state.advertise_addr());
40-
let client = ClientBuilder::new(server_addr).build().unwrap();
40+
let factory = ClientFactory::new().unwrap();
41+
let client = factory.make_client(server_addr).unwrap();
4142

4243
let exit_code = test(Testkit { client }).await.report();
4344

0 commit comments

Comments
 (0)