Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
917 changes: 266 additions & 651 deletions Cargo.lock

Large diffs are not rendered by default.

34 changes: 22 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ ctrlc = { version = "3.5", features = ["termination"] }
exn = { version = "0.2.0" }
fastimer = { version = "0.10.0" }
fastrace = { version = "0.7.9" }
fastrace-opentelemetry = { version = "0.14.0" }
fastrace-reqwest = { version = "0.2.0" }
foyer = { version = "0.21.0", features = ["nightly"] }
fastrace-opentelemetry = { version = "0.15.1" }
fastrace-reqwest = { version = "0.3.0" }
foyer = { version = "0.22.2", features = ["nightly"] }
futures-util = { version = "0.3.31" }
gix-discover = { version = "0.43.0" }
gix-discover = { version = "0.45.0" }
googletest = { version = "0.14.0" }
indent = { version = "0.1.1" }
insta = { version = "1.42.2", features = ["json", "toml", "redactions"] }
Expand All @@ -76,19 +76,29 @@ logforth = { version = "0.29.1", features = [
"diagnostic-fastrace",
"starter-log",
] }
mea = { version = "0.5.2" }
mea = { version = "0.6.1" }
mixtrics = { version = "0.2.2", features = ["opentelemetry_0_31"] }
murmur3 = { version = "0.5.2" }
opentelemetry = { version = "0.31.0", features = ["trace", "metrics"] }
opentelemetry-otlp = { version = "0.31.0", features = [
mur3 = { version = "0.1.0" }
opentelemetry = { version = "0.31.0", default-features = false, features = [
"trace",
"metrics",
"logs",
"internal-logs",
] }
opentelemetry-otlp = { version = "0.31.0", default-features = false, features = [
"trace",
"metrics",
"logs",
"grpc-tonic",
"http-proto",
"internal-logs",
] }
opentelemetry_sdk = { version = "0.31.0", features = [
opentelemetry_sdk = { version = "0.31.0", default-features = false, features = [
"trace",
"metrics",
"logs",
"rt-tokio",
"internal-logs",
] }
parse-display = { version = "0.10.0" }
pin-project = { version = "1.1" }
Expand All @@ -97,7 +107,7 @@ pretty-hex = { version = "0.4.1" }
pretty_assertions = { version = "1.4.1" }
rand = { version = "0.9.2" }
regex = { version = "1.11.1" }
reqwest = { version = "0.12.15", default-features = false, features = ["json"] }
reqwest = { version = "0.13.1", default-features = false, features = ["json"] }
schemars = { version = "1.0.4", features = ["jiff02", "url2"] }
scopeguard = { version = "1.2.0" }
sealed_test = { version = "1.1" }
Expand All @@ -108,9 +118,9 @@ sysinfo = { version = "0.37.0" }
tempfile = { version = "3.19.1" }
test-harness = { version = "0.3.0" }
tokio = { version = "1.44.2" }
toml_edit = { version = "0.23.2" }
toml_edit = { version = "0.24.0+spec-1.1.0" }
unindent = { version = "0.2.4" }
url = { version = "2.5.7" }
url = { version = "2.5.7", features = ["serde"] }
uuid = { version = "1.16.0", features = ["v7", "serde"] }

[workspace.lints.rust]
Expand Down
2 changes: 2 additions & 0 deletions client/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ All significant changes to the `percase-client` crate will be documented in this

## Unreleased

## v0.3.1 (2026-01-13)

### New features

* Added `Client::put_owned` to avoid an extra copy when the caller has ownership of the data.
Expand Down
4 changes: 2 additions & 2 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

[package]
name = "percas-client"
version = "0.3.0"
version = "0.3.1"

description = "A client for talking to the Percas server"
publish = true
Expand All @@ -32,7 +32,7 @@ tag-message = "chore: Release {{crate_name}} version {{version}}"

[dependencies]
fastrace-reqwest = { workspace = true }
murmur3 = { workspace = true }
mur3 = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
uuid = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion client/src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl RouteTable {
}

pub(crate) fn lookup(&self, key: &str) -> Option<(&Uuid, &Url)> {
let hash = murmur3::murmur3_32(&mut key.as_bytes(), 0).unwrap();
let hash = mur3::murmurhash3_x86_32(key.as_bytes(), 0);
self.ring
.range(hash..)
.next()
Expand Down
12 changes: 12 additions & 0 deletions cmd/percas/src/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ impl CommandStart {
}
log::info!("Percas is starting with loaded config: {config:#?}");

let io_runtime = make_io_runtime();
let server_runtime = make_server_runtime();
let gossip_runtime = make_gossip_runtime();
server_runtime.block_on(run_server(
&io_runtime,
&server_runtime,
&gossip_runtime,
node_id,
Expand All @@ -75,6 +77,14 @@ impl CommandStart {
}
}

fn make_io_runtime() -> Runtime {
percas_core::Builder::new("foyer_io_runtime", "foyer_io_thread")
.worker_threads(4)
.max_blocking_threads(num_cpus().get() * 2)
.build()
.unwrap()
}

fn make_telemetry_runtime() -> Runtime {
make_runtime("telemetry_runtime", "telemetry_thread", 1)
}
Expand All @@ -89,6 +99,7 @@ fn make_gossip_runtime() -> Runtime {
}

async fn run_server(
io_rt: &Runtime,
server_rt: &Runtime,
gossip_rt: &Runtime,
node_id: Uuid,
Expand All @@ -105,6 +116,7 @@ async fn run_server(
})?;

let engine = FoyerEngine::try_new(
io_rt,
config.storage.data_dir.as_path(),
config.storage.memory_capacity.into(),
config.storage.disk_capacity.into(),
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ jemallocator = { workspace = true }
criterion = { workspace = true }
googletest = { workspace = true }
insta = { workspace = true }
mea = { workspace = true }
schemars = { workspace = true }
serde_json = { workspace = true }
tempfile = { workspace = true }
Expand Down
5 changes: 5 additions & 0 deletions crates/core/benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@ fn foyer_engine(c: &mut Criterion) {
let runtime = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

let test_rt = percas_core::make_runtime("test_runtime", "test_thread", 2);

{
let dir = tempdir_in("/tmp").unwrap();
let engine = runtime.block_on(async {
FoyerEngine::try_new(
&test_rt,
dir.path(),
ByteSize::default(),
ByteSize::gib(4),
Expand Down Expand Up @@ -63,6 +67,7 @@ fn foyer_engine(c: &mut Criterion) {
let dir = tempdir_in("/tmp").unwrap();
let engine = runtime.block_on(async {
FoyerEngine::try_new(
&test_rt,
dir.path(),
ByteSize::default(),
ByteSize::gib(4),
Expand Down
4 changes: 0 additions & 4 deletions crates/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,17 +534,13 @@ mod tests {
filter = 'INFO'
dir = 'logs'
max_files = 64

[telemetry.logs.stderr]
filter = 'INFO'

[telemetry.logs.opentelemetry]
filter = 'INFO'
otlp_endpoint = 'http://127.0.0.1:4317'

[telemetry.traces]
capture_log_filter = 'INFO'

[telemetry.traces.opentelemetry]
otlp_endpoint = 'http://127.0.0.1:4317'
[telemetry.metrics.opentelemetry]
Expand Down
92 changes: 40 additions & 52 deletions crates/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,24 @@ use std::sync::Arc;
use bytesize::ByteSize;
use exn::Result;
use exn::bail;
use foyer::BlockEngineBuilder;
use foyer::BlockEngineConfig;
use foyer::DeviceBuilder;
use foyer::FsDeviceBuilder;
use foyer::HybridCache;
use foyer::HybridCacheBuilder;
use foyer::HybridCachePolicy;
use foyer::IoEngineBuilder;
use foyer::IoEngineConfig;
use foyer::IopsCounter;
use foyer::LfuConfig;
use foyer::PsyncIoEngineBuilder;
use foyer::RecoverMode;
use foyer::RuntimeOptions;
use foyer::TokioRuntimeOptions;
use foyer::Spawner;
use mixtrics::registry::noop::NoopMetricsRegistry;
use mixtrics::registry::opentelemetry_0_31::OpenTelemetryMetricsRegistry;
use parse_display::Display;

use crate::newtype::DiskThrottle;
use crate::num_cpus;
use crate::runtime;

const DEFAULT_MEMORY_CAPACITY_FACTOR: f64 = 0.5; // 50% of available memory
const DEFAULT_BLOCK_SIZE: ByteSize = ByteSize::mib(64);
Expand All @@ -48,12 +47,13 @@ pub struct EngineError(String);
impl std::error::Error for EngineError {}

pub struct FoyerEngine {
capacity: ByteSize,
inner: HybridCache<Vec<u8>, Vec<u8>>,
capacity: ByteSize,
}

impl FoyerEngine {
pub async fn try_new(
io_runtime: &runtime::Runtime,
data_dir: &Path,
memory_capacity: ByteSize,
disk_capacity: ByteSize,
Expand Down Expand Up @@ -93,31 +93,17 @@ impl FoyerEngine {
.build()
.map_err(|err| EngineError(format!("failed to create device: {err}")))?;

let psync_io_engine = PsyncIoEngineBuilder::new()
.build()
.await
.map_err(|err| EngineError(err.to_string()))?;

let io_engine = {
let io_engine: Box<dyn IoEngineConfig> = {
#[cfg(target_os = "linux")]
{
use foyer::UringIoEngineBuilder;

UringIoEngineBuilder::new()
.with_sqpoll(true)
.build()
.await
.inspect_err(|e| {
log::warn!(
"failed to build io_uring engine, fallback to psync engine: {e}"
);
})
.unwrap_or(psync_io_engine)
use foyer::UringIoEngineConfig;
Box::new(UringIoEngineConfig::new().with_sqpoll(true))
}

#[cfg(not(target_os = "linux"))]
{
psync_io_engine
use foyer::PsyncIoEngineConfig;
Box::new(PsyncIoEngineConfig::new())
}
};

Expand All @@ -138,24 +124,21 @@ impl FoyerEngine {
.with_eviction_config(LfuConfig::default())
.storage()
.with_engine_config(
BlockEngineBuilder::new(dev)
BlockEngineConfig::new(dev)
.with_recover_concurrency(parallelism)
.with_block_size(DEFAULT_BLOCK_SIZE.0 as usize)
.with_flushers(DEFAULT_FLUSHERS),
)
.with_io_engine(io_engine)
.with_io_engine_config(io_engine)
.with_recover_mode(RecoverMode::Quiet)
.with_runtime_options(RuntimeOptions::Unified(TokioRuntimeOptions {
worker_threads: 4,
max_blocking_threads: num_cpus().get() * 2,
}))
.with_spawner(io_runtime.spawn_blocking(Spawner::current).await)
.build()
.await
.map_err(|err| EngineError(err.to_string()))?;

Ok(FoyerEngine {
capacity: disk_capacity,
inner: cache,
capacity: disk_capacity,
})
}

Expand Down Expand Up @@ -194,25 +177,30 @@ mod tests {

use super::*;

#[tokio::test]
async fn test_get() {
let temp_dir = tempfile::tempdir().unwrap();

let engine = FoyerEngine::try_new(
temp_dir.path(),
ByteSize::kib(512),
ByteSize::mib(1),
None,
None,
)
.await
.unwrap();

engine.put(b"foo".to_vec().as_ref(), b"bar".to_vec().as_ref());

assert_compact_debug_snapshot!(
engine.get(b"foo".to_vec().as_ref()).await,
@"Some([98, 97, 114])"
);
#[test]
fn test_get() {
let runtime = runtime::make_runtime("test_runtime", "test_thread", 2);

runtime.block_on(async {
let temp_dir = tempfile::tempdir().unwrap();

let engine = FoyerEngine::try_new(
&runtime,
temp_dir.path(),
ByteSize::kib(512),
ByteSize::mib(1),
None,
None,
)
.await
.unwrap();

engine.put(b"foo".to_vec().as_ref(), b"bar".to_vec().as_ref());

assert_compact_debug_snapshot!(
engine.get(b"foo".to_vec().as_ref()).await,
@"Some([98, 97, 114])"
);
});
}
}
Loading