Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ error-stack = { version = "0.5", default-features = false, features = [
fastimer = { version = "0.9.0" }
fastrace = { version = "0.7.9" }
fastrace-opentelemetry = { version = "0.10.0" }
foyer = { version = "0.17", features = ["nightly"] }
foyer = { version = "0.17.1", features = ["nightly", "serde"] }
futures-util = { version = "0.3.31" }
gix-discover = { version = "0.40.1" }
googletest = { version = "0.14.0" }
Expand Down
1 change: 1 addition & 0 deletions cmd/percas/src/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ async fn run_server(server_rt: &Runtime, gossip_rt: &Runtime, config: Config) ->
&config.storage.data_dir,
config.storage.memory_capacity,
config.storage.disk_capacity,
config.storage.disk_throttle,
)
.await
.change_context_lazy(make_error)?;
Expand Down
4 changes: 2 additions & 2 deletions crates/core/benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn foyer_engine(c: &mut Criterion) {
{
let dir = tempdir_in("/tmp").unwrap();
let engine = runtime.block_on(async {
FoyerEngine::try_new(dir.path(), Some(0), 4 * 1024 * 1024 * 1024)
FoyerEngine::try_new(dir.path(), Some(0), 4 * 1024 * 1024 * 1024, None)
.await
.unwrap()
});
Expand All @@ -59,7 +59,7 @@ fn foyer_engine(c: &mut Criterion) {
.for_each(|len| {
let dir = tempdir_in("/tmp").unwrap();
let engine = runtime.block_on(async {
FoyerEngine::try_new(dir.path(), Some(0), 4 * 1024 * 1024 * 1024)
FoyerEngine::try_new(dir.path(), Some(0), 4 * 1024 * 1024 * 1024, None)
.await
.unwrap()
});
Expand Down
76 changes: 76 additions & 0 deletions crates/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,54 @@ pub enum ServerConfig {
},
}

#[cfg(test)]
pub(crate) mod defs {

use serde::Deserialize;
use serde::Serialize;

#[derive(schemars::JsonSchema, Serialize, Deserialize)]
#[serde(remote = "foyer::IopsCounter")]
#[serde(deny_unknown_fields)]
#[serde(tag = "mode")]
pub enum IopsCounterDef {
/// Count 1 iops for each read/write.
#[serde(rename = "per_io")]
PerIo,
/// Count 1 iops for each read/write with the size of the i/o.
#[serde(rename = "per_io_size")]
PerIoSize(std::num::NonZeroUsize),
}

#[derive(schemars::JsonSchema, Serialize, Deserialize)]
#[serde(remote = "foyer::Throttle")]
#[serde(deny_unknown_fields)]
pub struct ThrottleDef {
/// The maximum write iops for the device.
pub write_iops: Option<std::num::NonZeroUsize>,
/// The maximum read iops for the device.
pub read_iops: Option<std::num::NonZeroUsize>,
/// The maximum write throughput for the device.
pub write_throughput: Option<std::num::NonZeroUsize>,
/// The maximum read throughput for the device.
pub read_throughput: Option<std::num::NonZeroUsize>,
/// The iops counter for the device.
#[serde(with = "IopsCounterDef")]
pub iops_counter: foyer::IopsCounter,
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(test, derive(schemars::JsonSchema))]
#[serde(deny_unknown_fields)]
#[repr(transparent)]
pub struct DiskThrottle(#[cfg_attr(test, serde(with = "defs::ThrottleDef"))] foyer::Throttle);

impl From<DiskThrottle> for foyer::Throttle {
fn from(value: DiskThrottle) -> Self {
value.0
}
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(test, derive(schemars::JsonSchema))]
#[serde(deny_unknown_fields)]
Expand All @@ -70,6 +118,8 @@ pub struct StorageConfig {
pub data_dir: PathBuf,
pub disk_capacity: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub disk_throttle: Option<DiskThrottle>,
#[serde(skip_serializing_if = "Option::is_none")]
pub memory_capacity: Option<u64>,
}

Expand Down Expand Up @@ -203,6 +253,7 @@ impl Default for Config {
storage: StorageConfig {
data_dir: default_data_dir(),
disk_capacity: 512 * 1024 * 1024,
disk_throttle: None,
memory_capacity: None,
},
telemetry: TelemetryConfig {
Expand Down Expand Up @@ -299,6 +350,31 @@ pub const fn known_option_entries() -> &'static [OptionEntry] {
ent_path: "storage.disk_capacity",
ent_type: "integer",
},
OptionEntry {
env_name: "PERCAS_CONFIG_STORAGE_DISK_THROTTLE_IOPS_COUNTER_MODE",
ent_path: "storage.disk_throttle.iops_counter.mode",
ent_type: "string",
},
OptionEntry {
env_name: "PERCAS_CONFIG_STORAGE_DISK_THROTTLE_READ_IOPS",
ent_path: "storage.disk_throttle.read_iops",
ent_type: "integer",
},
OptionEntry {
env_name: "PERCAS_CONFIG_STORAGE_DISK_THROTTLE_READ_THROUGHPUT",
ent_path: "storage.disk_throttle.read_throughput",
ent_type: "integer",
},
OptionEntry {
env_name: "PERCAS_CONFIG_STORAGE_DISK_THROTTLE_WRITE_IOPS",
ent_path: "storage.disk_throttle.write_iops",
ent_type: "integer",
},
OptionEntry {
env_name: "PERCAS_CONFIG_STORAGE_DISK_THROTTLE_WRITE_THROUGHPUT",
ent_path: "storage.disk_throttle.write_throughput",
ent_type: "integer",
},
OptionEntry {
env_name: "PERCAS_CONFIG_STORAGE_MEMORY_CAPACITY",
ent_path: "storage.memory_capacity",
Expand Down
17 changes: 11 additions & 6 deletions crates/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use foyer::RuntimeOptions;
use sysinfo::Pid;
use thiserror::Error;

use crate::DiskThrottle;
use crate::num_cpus;

const DEFAULT_MEMORY_CAPACITY_FACTOR: f64 = 0.8;
Expand All @@ -46,6 +47,7 @@ impl FoyerEngine {
data_dir: &Path,
memory_capacity: Option<u64>,
disk_capacity: u64,
disk_throttle: Option<DiskThrottle>,
) -> Result<Self, EngineError> {
let _ = std::fs::create_dir_all(data_dir);
if !data_dir.exists() {
Expand All @@ -55,6 +57,13 @@ impl FoyerEngine {
)));
}

let mut dev = DirectFsDeviceOptions::new(data_dir)
.with_capacity(disk_capacity as usize)
.with_file_size(64 * 1024 * 1024);
if let Some(throttle) = disk_throttle {
dev = dev.with_throttle(throttle.into());
}

let parallelism = num_cpus().get();
let cache = HybridCacheBuilder::new()
.with_policy(HybridCachePolicy::WriteOnInsertion)
Expand All @@ -78,11 +87,7 @@ impl FoyerEngine {
.with_shards(parallelism)
.with_eviction_config(FifoConfig::default())
.storage(foyer::Engine::Large)
.with_device_options(
DirectFsDeviceOptions::new(data_dir)
.with_capacity(disk_capacity as usize)
.with_file_size(64 * 1024 * 1024),
)
.with_device_options(dev)
.with_recover_mode(RecoverMode::Quiet)
.with_runtime_options(RuntimeOptions::Unified(Default::default()))
.build()
Expand Down Expand Up @@ -132,7 +137,7 @@ mod tests {
async fn test_get() {
let temp_dir = tempfile::tempdir().unwrap();

let engine = FoyerEngine::try_new(temp_dir.path(), Some(512 * 1024), 1024 * 1024)
let engine = FoyerEngine::try_new(temp_dir.path(), Some(512 * 1024), 1024 * 1024, None)
.await
.unwrap();
engine.put(b"foo".to_vec().as_ref(), b"bar".to_vec().as_ref());
Expand Down
1 change: 1 addition & 0 deletions tests/toolkit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub fn start_test_server(_test_name: &str, rt: &Runtime) -> Option<TestServerSta
&config.storage.data_dir,
config.storage.memory_capacity,
config.storage.disk_capacity,
config.storage.disk_throttle,
)
.await
.unwrap();
Expand Down