Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All significant changes to this project will be documented in this file.

## Unreleased

### Breaking Changes

* `ClientBuilder` is now `ClientFactory` for reusing the underlying reqwest client.

## v0.1.4

### Bug Fixes
Expand Down
53 changes: 25 additions & 28 deletions api/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,24 @@ use reqwest::IntoUrl;
use reqwest::StatusCode;
use reqwest::Url;

#[derive(Debug, thiserror::Error)]
#[error("{0}")]
pub enum Error {
#[from(std::io::Error)]
IO(std::io::Error),
#[from(reqwest::Error)]
Http(reqwest::Error),

Other(String),
}
use crate::Error;

pub struct ClientBuilder {
endpoint: String,
#[derive(Debug, Clone)]
pub struct ClientFactory {
client: reqwest::Client,
}

impl ClientBuilder {
pub fn new(endpoint: String) -> Self {
Self { endpoint }
impl ClientFactory {
pub fn new() -> Result<Self, Error> {
let client = reqwest::ClientBuilder::new()
.no_proxy()
.build()
.map_err(Error::Http)?;
Ok(Self { client })
}

pub fn build(self) -> Result<Client, Error> {
let builder = reqwest::ClientBuilder::new().no_proxy();
Client::new(self.endpoint, builder)
pub fn make_client(&self, endpoint: String) -> Result<Client, Error> {
Client::new(endpoint, self.client.clone())
}
}

Expand All @@ -60,8 +55,7 @@ impl Client {
do_delete(self, key).await
}

fn new(base_url: impl IntoUrl, builder: reqwest::ClientBuilder) -> Result<Self, Error> {
let client = builder.build().map_err(Error::Http)?;
fn new(base_url: impl IntoUrl, client: reqwest::Client) -> Result<Self, Error> {
let base_url = base_url.into_url().map_err(Error::Http)?;
Ok(Client { client, base_url })
}
Expand All @@ -72,14 +66,16 @@ async fn do_get(client: &Client, key: &str) -> Result<Option<Vec<u8>>, Error> {
.base_url
.join(key)
.map_err(|e| Error::Other(e.to_string()))?;

let resp = client.client.get(url).send().await.map_err(Error::Http)?;

match resp.status() {
StatusCode::NOT_FOUND | StatusCode::TOO_MANY_REQUESTS => Ok(None),
StatusCode::NOT_FOUND => Ok(None),
StatusCode::OK => {
let body = resp.bytes().await.map_err(Error::Http)?;
Ok(Some(body.to_vec()))
}
StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
_ => Err(Error::Other(resp.status().to_string())),
}
}
Expand All @@ -89,6 +85,7 @@ async fn do_put(client: &Client, key: &str, value: &[u8]) -> Result<(), Error> {
.base_url
.join(key)
.map_err(|e| Error::Other(e.to_string()))?;

let resp = client
.client
.put(url)
Expand All @@ -98,11 +95,9 @@ async fn do_put(client: &Client, key: &str, value: &[u8]) -> Result<(), Error> {
.map_err(Error::Http)?;

match resp.status() {
StatusCode::OK
| StatusCode::CREATED
| StatusCode::NO_CONTENT
| StatusCode::TOO_MANY_REQUESTS => Ok(()),
_ => Err(Error::Other(resp.status().to_string())),
StatusCode::OK | StatusCode::CREATED => Ok(()),
StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
status => Err(Error::Other(status.to_string())),
}
}

Expand All @@ -111,6 +106,7 @@ async fn do_delete(client: &Client, key: &str) -> Result<(), Error> {
.base_url
.join(key)
.map_err(|e| Error::Other(e.to_string()))?;

let resp = client
.client
.delete(url)
Expand All @@ -119,7 +115,8 @@ async fn do_delete(client: &Client, key: &str) -> Result<(), Error> {
.map_err(Error::Http)?;

match resp.status() {
StatusCode::OK | StatusCode::NO_CONTENT | StatusCode::TOO_MANY_REQUESTS => Ok(()),
_ => Err(Error::Other(resp.status().to_string())),
StatusCode::OK | StatusCode::NO_CONTENT => Ok(()),
StatusCode::TOO_MANY_REQUESTS => Err(Error::TooManyRequests),
status => Err(Error::Other(status.to_string())),
}
}
14 changes: 13 additions & 1 deletion api/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,16 @@
mod client;

pub use client::Client;
pub use client::ClientBuilder;
pub use client::ClientFactory;

#[derive(Debug, thiserror::Error)]
#[error("{0}")]
pub enum Error {
#[from(std::io::Error)]
IO(std::io::Error),
#[from(reqwest::Error)]
Http(reqwest::Error),
#[error("Too Many Requests")]
TooManyRequests,
Other(String),
}
35 changes: 24 additions & 11 deletions crates/server/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use mea::semaphore::Semaphore;
use percas_client::ClientBuilder;
use percas_client::ClientFactory;
use percas_cluster::Proxy;
use percas_cluster::RouteDest;
use percas_core::num_cpus;
Expand All @@ -31,6 +31,7 @@ use crate::server::delete_success;
use crate::server::get_not_found;
use crate::server::get_success;
use crate::server::put_success;
use crate::server::too_many_requests;

pub struct LoggerMiddleware;

Expand Down Expand Up @@ -72,11 +73,12 @@ where

pub struct ClusterProxyMiddleware {
proxy: Option<Proxy>,
factory: ClientFactory,
}

impl ClusterProxyMiddleware {
pub fn new(proxy: Option<Proxy>) -> Self {
Self { proxy }
pub fn new(proxy: Option<Proxy>, factory: ClientFactory) -> Self {
Self { proxy, factory }
}
}

Expand All @@ -90,13 +92,15 @@ where
fn transform(&self, endpoint: E) -> Self::Output {
ClusterProxyEndpoint {
proxy: self.proxy.clone(),
factory: self.factory.clone(),
endpoint,
}
}
}

pub struct ClusterProxyEndpoint<E> {
proxy: Option<Proxy>,
factory: ClientFactory,
endpoint: E,
}

Expand All @@ -118,9 +122,11 @@ where
.await
.map(IntoResponse::into_response),
RouteDest::RemoteAddr(addr) => {
let client = ClientBuilder::new(format!("http://{addr}"))
.build()
.unwrap();
let client = self
.factory
.make_client(format!("http://{addr}"))
.map_err(|err| poem::Error::new(err, StatusCode::INTERNAL_SERVER_ERROR))?;

match *req.method() {
Method::GET => {
let resp = client.get(&key).await;
Expand All @@ -132,6 +138,9 @@ where
Ok(get_not_found())
}
}
Err(percas_client::Error::TooManyRequests) => {
Ok(too_many_requests())
}
Err(err) => {
log::error!("failed to get from remote: {err}");
self.endpoint
Expand All @@ -146,8 +155,11 @@ where
let resp = client.put(&key, &body).await;
match resp {
Ok(()) => Ok(put_success()),
Err(percas_client::Error::TooManyRequests) => {
Ok(too_many_requests())
}
Err(err) => {
log::error!("failed to put from remote: {err}");
log::error!("failed to put to remote: {err}");
req.set_body(body);
self.endpoint
.call(req)
Expand All @@ -160,8 +172,11 @@ where
let resp = client.delete(&key).await;
match resp {
Ok(()) => Ok(delete_success()),
Err(percas_client::Error::TooManyRequests) => {
Ok(too_many_requests())
}
Err(err) => {
log::error!("failed to delete from remote: {err}");
log::error!("failed to delete at remote: {err}");
self.endpoint
.call(req)
.await
Expand Down Expand Up @@ -235,9 +250,7 @@ where

async fn call(&self, req: Request) -> Result<Self::Output, poem::Error> {
let Some(_wait_permit) = self.wait_permit.try_acquire(1) else {
return Ok(Response::builder()
.status(StatusCode::TOO_MANY_REQUESTS)
.body(StatusCode::TOO_MANY_REQUESTS.to_string()));
return Ok(too_many_requests());
};
let _run_permit = self.run_permit.acquire(1).await;

Expand Down
12 changes: 11 additions & 1 deletion crates/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use fastimer::schedule::SimpleActionExt;
use mea::shutdown::ShutdownRecv;
use mea::shutdown::ShutdownSend;
use mea::waitgroup::WaitGroup;
use percas_client::ClientFactory;
use percas_cluster::GossipFuture;
use percas_cluster::Proxy;
use percas_core::Runtime;
Expand Down Expand Up @@ -140,13 +141,15 @@ pub async fn start_server(
let shutdown_clone = shutdown_rx_server.clone();
let wg_clone = wg.clone();

let client_factory = ClientFactory::new().map_err(io::Error::other)?;
let proxy_middleware = ClusterProxyMiddleware::new(cluster_proxy, client_factory);
let route = Route::new()
.at(
"/*key",
poem::get(get)
.put(put)
.delete(delete)
.with(ClusterProxyMiddleware::new(cluster_proxy)),
.with(proxy_middleware),
)
.data(ctx.clone())
.with(RateLimitMiddleware::new())
Expand Down Expand Up @@ -190,6 +193,13 @@ pub async fn start_server(
})
}

pub fn too_many_requests() -> Response {
Response::builder()
.status(StatusCode::TOO_MANY_REQUESTS)
.typed_header(ContentType::text())
.body(StatusCode::TOO_MANY_REQUESTS.to_string())
}

pub fn get_success(body: impl Into<Body>) -> Response {
Response::builder()
.status(StatusCode::OK)
Expand Down
5 changes: 3 additions & 2 deletions tests/behavior/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::process::ExitCode;

use percas_client::Client;
use percas_client::ClientBuilder;
use percas_client::ClientFactory;
use percas_core::make_runtime;
use tests_toolkit::make_test_name;

Expand All @@ -37,7 +37,8 @@ where

rt.block_on(async move {
let server_addr = format!("http://{}", state.server_state.advertise_addr());
let client = ClientBuilder::new(server_addr).build().unwrap();
let factory = ClientFactory::new().unwrap();
let client = factory.make_client(server_addr).unwrap();

let exit_code = test(Testkit { client }).await.report();

Expand Down
Loading