Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion crates/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::path::Path;
use std::sync::Arc;

use error_stack::Result;
use error_stack::bail;
Expand All @@ -29,7 +30,7 @@ use thiserror::Error;

use crate::num_cpus;

const DEFAULT_MEMORY_CAPACITY_FACTOR: f64 = 0.5;
const DEFAULT_MEMORY_CAPACITY_FACTOR: f64 = 0.8;

#[derive(Debug, Error)]
#[error("{0}")]
Expand Down Expand Up @@ -69,6 +70,11 @@ impl FoyerEngine {
) as f64
* DEFAULT_MEMORY_CAPACITY_FACTOR) as usize,
)
.with_weighter(|key: &Vec<u8>, value: &Vec<u8>| {
let key_size = key.len();
let value_size = value.len();
key_size + value_size
})
.with_shards(parallelism)
.with_eviction_config(FifoConfig::default())
.storage(foyer::Engine::Large)
Expand Down Expand Up @@ -110,6 +116,10 @@ impl FoyerEngine {
pub fn capacity(&self) -> u64 {
self.capacity
}

pub fn stats(&self) -> Arc<foyer::DeviceStats> {
self.inner.stats()
}
}

#[cfg(test)]
Expand Down
35 changes: 31 additions & 4 deletions crates/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl GlobalMetrics {
pub struct StorageMetrics {
pub capacity: Gauge<u64>,
pub used: Gauge<u64>,
pub entries: Gauge<u64>,
pub io: StorageIOMetrics,
}

impl StorageMetrics {
Expand All @@ -61,12 +61,39 @@ impl StorageMetrics {
.with_description("The used capacity of the storage")
.with_unit("byte")
.build(),
entries: meter
.u64_gauge("percas.storage.entries")
.with_description("The number of entries in the storage")

io: StorageIOMetrics::new(meter),
}
}
}

pub struct StorageIOMetrics {
pub count: Counter<u64>,
pub bytes: Counter<u64>,
}

impl StorageIOMetrics {
pub fn new(meter: Meter) -> Self {
Self {
count: meter
.u64_counter("percas.storage.io.count")
.with_description("The number of IOs")
.build(),
bytes: meter
.u64_counter("percas.storage.io.bytes")
.with_description("The number of IO bytes")
.with_unit("byte")
.build(),
}
}

pub const OPERATION_READ: &str = "read";
pub const OPERATION_WRITE: &str = "write";
pub const OPERATION_FLUSH: &str = "flush";

pub fn operation_labels(operation: &str) -> [KeyValue; 1] {
[KeyValue::new("operation", operation.to_string())]
}
}

pub struct OperationMetrics {
Expand Down
22 changes: 16 additions & 6 deletions crates/server/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use mea::semaphore::Semaphore;
use percas_client::ClientBuilder;
use percas_cluster::Proxy;
use percas_cluster::RouteDest;
use percas_core::num_cpus;
use poem::Endpoint;
use poem::IntoResponse;
use poem::Middleware;
Expand Down Expand Up @@ -187,13 +188,18 @@ where
}

pub struct RateLimitMiddleware {
permit: Arc<Semaphore>,
wait_permit: Arc<Semaphore>,
run_permit: Arc<Semaphore>,
}

impl RateLimitMiddleware {
pub fn new(limit: usize) -> Self {
pub fn new() -> Self {
let run_limit = num_cpus().get() * 100;
let wait_limit = run_limit * 5;

Self {
permit: Arc::new(Semaphore::new(limit)),
wait_permit: Arc::new(Semaphore::new(wait_limit)),
run_permit: Arc::new(Semaphore::new(run_limit)),
}
}
}
Expand All @@ -207,14 +213,16 @@ where

fn transform(&self, endpoint: E) -> Self::Output {
RateLimitEndpoint {
permit: self.permit.clone(),
wait_permit: self.wait_permit.clone(),
run_permit: self.run_permit.clone(),
endpoint,
}
}
}

pub struct RateLimitEndpoint<E> {
permit: Arc<Semaphore>,
wait_permit: Arc<Semaphore>,
run_permit: Arc<Semaphore>,
endpoint: E,
}

Expand All @@ -226,11 +234,13 @@ where
type Output = Response;

async fn call(&self, req: Request) -> Result<Self::Output, poem::Error> {
let Some(_permit) = self.permit.try_acquire(1) else {
let Some(_wait_permit) = self.wait_permit.try_acquire(1) else {
return Ok(Response::builder()
.status(StatusCode::TOO_MANY_REQUESTS)
.body(StatusCode::TOO_MANY_REQUESTS.to_string()));
};
let _run_permit = self.run_permit.acquire(1).await;

self.endpoint
.call(req)
.await
Expand Down
43 changes: 43 additions & 0 deletions crates/server/src/scheduled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use percas_metrics::GlobalMetrics;
use percas_metrics::StorageIOMetrics;

use crate::PercasContext;

Expand All @@ -34,6 +35,48 @@ impl ReportMetricsAction {
metrics.storage.capacity.record(engine.capacity(), &[]);
// Foyer will reserve all the space in the disk, so the used space is meaningless
metrics.storage.used.record(engine.capacity(), &[]);

let stats = engine.stats();
let read_label = StorageIOMetrics::operation_labels(StorageIOMetrics::OPERATION_READ);
let write_label = StorageIOMetrics::operation_labels(StorageIOMetrics::OPERATION_WRITE);
let flush_label = StorageIOMetrics::operation_labels(StorageIOMetrics::OPERATION_FLUSH);
metrics.storage.io.bytes.add(
stats.read_bytes.load(std::sync::atomic::Ordering::Relaxed) as u64,
&read_label,
);
metrics.storage.io.bytes.add(
stats.write_bytes.load(std::sync::atomic::Ordering::Relaxed) as u64,
&write_label,
);
metrics.storage.io.count.add(
stats.read_ios.load(std::sync::atomic::Ordering::Relaxed) as u64,
&read_label,
);
metrics.storage.io.count.add(
stats.write_ios.load(std::sync::atomic::Ordering::Relaxed) as u64,
&write_label,
);
metrics.storage.io.count.add(
stats.flush_ios.load(std::sync::atomic::Ordering::Relaxed) as u64,
&flush_label,
);

// Reset the stats
stats
.read_ios
.store(0, std::sync::atomic::Ordering::Relaxed);
stats
.read_bytes
.store(0, std::sync::atomic::Ordering::Relaxed);
stats
.write_ios
.store(0, std::sync::atomic::Ordering::Relaxed);
stats
.write_bytes
.store(0, std::sync::atomic::Ordering::Relaxed);
stats
.flush_ios
.store(0, std::sync::atomic::Ordering::Relaxed);
}
}

Expand Down
3 changes: 1 addition & 2 deletions crates/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use mea::shutdown::ShutdownSend;
use mea::waitgroup::WaitGroup;
use percas_cluster::Proxy;
use percas_core::Runtime;
use percas_core::num_cpus;
use percas_core::timer;
use percas_metrics::GlobalMetrics;
use percas_metrics::OperationMetrics;
Expand Down Expand Up @@ -132,7 +131,7 @@ pub async fn start_server(
.with(ClusterProxyMiddleware::new(cluster_proxy)),
)
.data(ctx.clone())
.with(RateLimitMiddleware::new(num_cpus().get() * 100))
.with(RateLimitMiddleware::new())
.with(LoggerMiddleware);
let listen_addr = listen_addr.clone();
let signal = async move {
Expand Down
Loading