diff --git a/Cargo.toml b/Cargo.toml index 09aff2a9..5d1fc10c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,6 +80,7 @@ rand_distr = { version = "0.5" } serde = { version = "1", features = ["derive"] } serde_bytes = "0.11" serde_json = "1" +smol = "2" tempfile = "3" test-log = "0.2" thiserror = "2" diff --git a/foyer-common/Cargo.toml b/foyer-common/Cargo.toml index efd95bda..82fafe2a 100644 --- a/foyer-common/Cargo.toml +++ b/foyer-common/Cargo.toml @@ -17,44 +17,33 @@ exclude.workspace = true development = ["criterion", "serde_bytes"] [features] +default = ["executor-tokio"] serde = ["dep:serde", "dep:bincode"] strict_assertions = [] tracing = ["dep:fastrace"] +executor-tokio = ["dep:tokio", "tokio/rt", "tokio/rt-multi-thread"] [dependencies] bincode = { workspace = true, optional = true } bytes = { workspace = true } cfg-if = { workspace = true } fastrace = { workspace = true, optional = true } +futures-core = { workspace = true } +futures-util = { workspace = true } itertools = { workspace = true } mixtrics = { workspace = true } parking_lot = { workspace = true } pin-project = { workspace = true } serde = { workspace = true, optional = true } +smol = { workspace = true, optional = true } thiserror = { workspace = true } twox-hash = { workspace = true } [target.'cfg(madsim)'.dependencies] -tokio = { package = "madsim-tokio", version = "0.2", features = [ - "rt", - "rt-multi-thread", - "sync", - "macros", - "time", - "signal", - "fs", -] } +tokio = { package = "madsim-tokio", version = "0.2", optional = true } [target.'cfg(not(madsim))'.dependencies] -tokio = { workspace = true, features = [ - "rt", - "rt-multi-thread", - "sync", - "macros", - "time", - "signal", - "fs", -] } +tokio = { workspace = true, optional = true } [dev-dependencies] bytes = { workspace = true, features = ["serde"] } diff --git a/foyer-common/src/asyncify.rs b/foyer-common/src/asyncify.rs deleted file mode 100644 index 414e6bab..00000000 --- a/foyer-common/src/asyncify.rs +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright 2025 foyer Project Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use crate::runtime::SingletonHandle; - -/// Convert the block call to async call with given runtime handle. -#[cfg(not(madsim))] -pub async fn asyncify_with_runtime(runtime: &SingletonHandle, f: F) -> T -where - F: FnOnce() -> T + Send + 'static, - T: Send + 'static, -{ - runtime.spawn_blocking(f).await.unwrap() -} - -#[cfg(madsim)] -/// Convert the block call to async call with given runtime. -/// -/// madsim compatible mode. -pub async fn asyncify_with_runtime(_: &SingletonHandle, f: F) -> T -where - F: FnOnce() -> T + Send + 'static, - T: Send + 'static, -{ - f() -} diff --git a/foyer-common/src/executor/mod.rs b/foyer-common/src/executor/mod.rs new file mode 100644 index 00000000..bc3a12bf --- /dev/null +++ b/foyer-common/src/executor/mod.rs @@ -0,0 +1,182 @@ +// Copyright 2025 foyer Project Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// `tokio` executor implementation. +#[cfg(feature = "executor-tokio")] +pub mod tokio; + +use std::{ + fmt::Debug, + future::Future, + marker::PhantomData, + panic::{RefUnwindSafe, UnwindSafe}, +}; + +#[cfg(feature = "executor-tokio")] +use futures_util::FutureExt; +use pin_project::pin_project; + +/// A phantom join handle that does nothing. +#[doc(hidden)] +pub struct PhantomJoinHandle(PhantomData); + +unsafe impl Send for PhantomJoinHandle {} +unsafe impl Sync for PhantomJoinHandle {} +impl UnwindSafe for PhantomJoinHandle {} +impl RefUnwindSafe for PhantomJoinHandle {} + +/// A deatched join handle returned by an executor spawn. +pub trait JoinHandle: + Debug + + Future>> + + Send + + Sync + + UnwindSafe + + RefUnwindSafe + + 'static +{ +} + +/// An executor that can spawn asynchronous tasks or blocking functions. +pub trait Executor: Debug + Send + Sync + 'static + Clone { + /// The join handle type returned by this executor. + type JoinHandle: JoinHandle + where + T: Send + 'static; + + /// Spawn a future onto the executor. + fn spawn(&self, future: F) -> Self::JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static; + + /// Spawn a blocking function onto the executor. + fn spawn_blocking(&self, func: F) -> Self::JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static; +} + +/// An enum wrapper for different join handle implementations. +#[pin_project(project = JoinHandleEnumProj)] +pub enum JoinHandleEnum { + /// Tokio join handle. + #[cfg(feature = "executor-tokio")] + Tokio(crate::executor::tokio::TokioJoinHandle), + #[doc(hidden)] + Phantom(PhantomJoinHandle), +} + +impl Debug for JoinHandleEnum { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + #[cfg(feature = "executor-tokio")] + Self::Tokio(handle) => f.debug_tuple("Tokio").field(handle).finish(), + JoinHandleEnum::Phantom(_) => f.debug_tuple("Phantom").finish(), + } + } +} + +impl Future for JoinHandleEnum +where + T: Send + 'static, +{ + type Output = std::result::Result>; + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { + let this = self.project(); + match this { + #[cfg(feature = "executor-tokio")] + JoinHandleEnumProj::Tokio(handle) => handle.poll_unpin(cx), + JoinHandleEnumProj::Phantom(_) => panic!("polling a phantom join handle, cx: {:?}", cx), + } + } +} + +impl JoinHandle for JoinHandleEnum where T: Send + 'static {} + +#[cfg(feature = "executor-tokio")] +impl From> for JoinHandleEnum { + fn from(inner: crate::executor::tokio::TokioJoinHandle) -> Self { + Self::Tokio(inner) + } +} + +/// An enum wrapper for different executor implementations. +#[derive(Debug, Clone)] +pub enum ExecutorEnum { + /// Tokio runtime executor. + #[cfg(feature = "executor-tokio")] + TokioRuntime(crate::executor::tokio::TokioRuntimeExecutor), + /// Tokio handle executor. + #[cfg(feature = "executor-tokio")] + TokioHandle(crate::executor::tokio::TokioHandleExecutor), + #[doc(hidden)] + Phantom, +} + +impl Executor for ExecutorEnum { + type JoinHandle + = JoinHandleEnum + where + T: Send + 'static; + + fn spawn(&self, future: F) -> Self::JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + match self { + #[cfg(feature = "executor-tokio")] + ExecutorEnum::TokioRuntime(executor) => JoinHandleEnum::Tokio(executor.spawn(future)), + #[cfg(feature = "executor-tokio")] + ExecutorEnum::TokioHandle(executor) => JoinHandleEnum::Tokio(executor.spawn(future)), + ExecutorEnum::Phantom => { + drop(future); + panic!("spawning a future on a phantom executor") + } + } + } + + fn spawn_blocking(&self, func: F) -> Self::JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + match self { + #[cfg(feature = "executor-tokio")] + ExecutorEnum::TokioRuntime(executor) => JoinHandleEnum::Tokio(executor.spawn_blocking(func)), + #[cfg(feature = "executor-tokio")] + ExecutorEnum::TokioHandle(executor) => JoinHandleEnum::Tokio(executor.spawn_blocking(func)), + ExecutorEnum::Phantom => { + drop(func); + panic!("spawning a future on a phantom executor"); + } + } + } +} + +#[cfg(feature = "executor-tokio")] +impl From for ExecutorEnum { + fn from(inner: crate::executor::tokio::TokioRuntimeExecutor) -> Self { + Self::TokioRuntime(inner) + } +} + +#[cfg(feature = "executor-tokio")] +impl From for ExecutorEnum { + fn from(inner: crate::executor::tokio::TokioHandleExecutor) -> Self { + Self::TokioHandle(inner) + } +} diff --git a/foyer-common/src/runtime.rs b/foyer-common/src/executor/tokio.rs similarity index 54% rename from foyer-common/src/runtime.rs rename to foyer-common/src/executor/tokio.rs index c16da2c9..9b8125bd 100644 --- a/foyer-common/src/runtime.rs +++ b/foyer-common/src/executor/tokio.rs @@ -17,8 +17,10 @@ use std::{ future::Future, mem::ManuallyDrop, ops::{Deref, DerefMut}, + sync::Arc, }; +use futures_util::FutureExt; use tokio::{ runtime::{Handle, Runtime}, task::JoinHandle, @@ -27,6 +29,8 @@ use tokio::{ /// A wrapper around [`Runtime`] that shuts down the runtime in the background when dropped. /// /// This is necessary because directly dropping a nested runtime is not allowed in a parent runtime. +/// +/// FYI: https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#method.shutdown_background pub struct BackgroundShutdownRuntime(ManuallyDrop); impl Debug for BackgroundShutdownRuntime { @@ -95,7 +99,7 @@ impl SingletonHandle { /// # Examples /// /// ``` - /// use tokio::runtime::Runtime; + /// use Runtime; /// /// # fn dox() { /// // Create the runtime @@ -123,7 +127,7 @@ impl SingletonHandle { /// # Examples /// /// ``` - /// use tokio::runtime::Runtime; + /// use Runtime; /// /// # fn dox() { /// // Create the runtime @@ -143,85 +147,130 @@ impl SingletonHandle { { self.0.spawn_blocking(func) } +} - /// Runs a future to completion on this `Handle`'s associated `Runtime`. - /// - /// This runs the given future on the current thread, blocking until it is - /// complete, and yielding its resolved result. Any tasks or timers which - /// the future spawns internally will be executed on the runtime. - /// - /// When this is used on a `current_thread` runtime, only the - /// [`Runtime::block_on`] method can drive the IO and timer drivers, but the - /// `Handle::block_on` method cannot drive them. This means that, when using - /// this method on a `current_thread` runtime, anything that relies on IO or - /// timers will not work unless there is another thread currently calling - /// [`Runtime::block_on`] on the same runtime. - /// - /// # If the runtime has been shut down - /// - /// If the `Handle`'s associated `Runtime` has been shut down (through - /// [`Runtime::shutdown_background`], [`Runtime::shutdown_timeout`], or by - /// dropping it) and `Handle::block_on` is used it might return an error or - /// panic. Specifically IO resources will return an error and timers will - /// panic. Runtime independent futures will run as normal. - /// - /// # Panics - /// - /// This function panics if the provided future panics, if called within an - /// asynchronous execution context, or if a timer future is executed on a - /// runtime that has been shut down. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// // Create the runtime - /// let rt = Runtime::new().unwrap(); - /// - /// // Get a handle from this runtime - /// let handle = rt.handle(); - /// - /// // Execute the future, blocking the current thread until completion - /// handle.block_on(async { - /// println!("hello"); - /// }); - /// ``` - /// - /// Or using `Handle::current`: - /// - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main () { - /// let handle = Handle::current(); - /// std::thread::spawn(move || { - /// // Using Handle::block_on to run async code in the new thread. - /// handle.block_on(async { - /// println!("hello"); - /// }); - /// }); - /// } - /// ``` - /// - /// [`JoinError`]: struct@tokio::task::JoinError - /// [`JoinHandle`]: struct@tokio::task::JoinHandle - /// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on - /// [`Runtime::shutdown_background`]: fn@tokio::runtime::Runtime::shutdown_background - /// [`Runtime::shutdown_timeout`]: fn@tokio::runtime::Runtime::shutdown_timeout - /// [`spawn_blocking`]: tokio::task::spawn_blocking - /// [`tokio::fs`]: tokio::fs - /// [`tokio::net`]: tokio::net - /// [`tokio::time`]: tokio::time - #[cfg(not(madsim))] - pub fn block_on(&self, future: F) -> F::Output { - self.0.block_on(future) - } - - #[cfg(madsim)] - /// Dummy implementation for madsim. - pub fn block_on(&self, _: F) -> F::Output { - unimplemented!("`block_on()` is not supported with madsim") +/// A join handle for Tokio executor. +pub struct TokioJoinHandle { + inner: JoinHandle, +} + +impl Debug for TokioJoinHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TokioJoinHandle").finish() + } +} + +impl From> for TokioJoinHandle { + fn from(inner: JoinHandle) -> Self { + Self { inner } + } +} + +impl Future for TokioJoinHandle +where + T: Send + 'static, +{ + type Output = std::result::Result>; + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { + let this = self.get_mut(); + this.inner.poll_unpin(cx).map_err(|e| e.into()) + } +} + +impl super::JoinHandle for TokioJoinHandle where T: Send + 'static {} + +/// An executor implementation for Tokio runtime. +/// +/// This executor owns a `Runtime` and spawns tasks onto it. +#[derive(Debug, Clone)] +pub struct TokioRuntimeExecutor { + inner: Arc, +} + +impl TokioRuntimeExecutor { + /// Creates a new `TokioRuntimeExecutor` from a `Runtime`. + pub fn new(inner: Runtime) -> Self { + Self { + inner: Arc::new(inner.into()), + } + } +} + +impl From for TokioRuntimeExecutor { + fn from(inner: Runtime) -> Self { + Self::new(inner) + } +} + +impl super::Executor for TokioRuntimeExecutor { + type JoinHandle + = TokioJoinHandle + where + T: Send + 'static; + + fn spawn(&self, future: F) -> Self::JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.inner.spawn(future).into() + } + + fn spawn_blocking(&self, func: F) -> Self::JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.inner.spawn_blocking(func).into() + } +} + +/// An executor implementation for Tokio runtime. +/// +/// This executor uses a `Handle` to spawn tasks onto an existing runtime. +#[derive(Debug, Clone)] +pub struct TokioHandleExecutor { + inner: Handle, +} + +impl TokioHandleExecutor { + /// Creates a new `TokioHandleExecutor` from a `Handle`. + pub fn new(inner: Handle) -> Self { + Self { inner } + } + + /// Creates a new `TokioHandleExecutor` from the current runtime handle. + pub fn current() -> Self { + Self::new(Handle::current()) + } +} + +impl From for TokioHandleExecutor { + fn from(inner: Handle) -> Self { + Self::new(inner) + } +} + +impl super::Executor for TokioHandleExecutor { + type JoinHandle + = TokioJoinHandle + where + T: Send + 'static; + + fn spawn(&self, future: F) -> Self::JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.inner.spawn(future).into() + } + + fn spawn_blocking(&self, func: F) -> Self::JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.inner.spawn_blocking(func).into() } } diff --git a/foyer-common/src/lib.rs b/foyer-common/src/lib.rs index 6f3dbc87..fd7c501e 100644 --- a/foyer-common/src/lib.rs +++ b/foyer-common/src/lib.rs @@ -16,8 +16,6 @@ /// Allow to enable debug assertions in release profile with feature "strict_assertion". pub mod assert; -/// The util that convert the blocking call to async call. -pub mod asyncify; /// The bitwise utils. pub mod bits; /// The [`bytes::Buf`] and [`bytes::BufMut`] extensions. @@ -37,7 +35,7 @@ pub mod rate; /// A ticket-based rate limiter. pub mod rated_ticket; /// A runtime that automatically shutdown itself on drop. -pub mod runtime; +pub mod executor; /// Tracing related components. #[cfg(feature = "tracing")] pub mod tracing; diff --git a/foyer-memory/src/cache.rs b/foyer-memory/src/cache.rs index d90cf160..573a8f01 100644 --- a/foyer-memory/src/cache.rs +++ b/foyer-memory/src/cache.rs @@ -18,9 +18,9 @@ use equivalent::Equivalent; use foyer_common::{ code::{DefaultHasher, HashBuilder, Key, Value}, event::EventListener, + executor::ExecutorEnum, metrics::Metrics, properties::{Age, Hint, Location, Properties, Source}, - runtime::SingletonHandle, }; use mixtrics::{metrics::BoxedRegistry, registry::noop::NoopMetricsRegistry}; use pin_project::pin_project; @@ -1114,7 +1114,7 @@ where optional_fetch_builder: Option>, required_fetch_builder: Option>, ctx: C, - runtime: &SingletonHandle, + runtime: &ExecutorEnum, ) -> GetOrFetch where Q: Hash + Equivalent + ?Sized + ToOwned, diff --git a/foyer-memory/src/raw.rs b/foyer-memory/src/raw.rs index a32137be..1215c13a 100644 --- a/foyer-memory/src/raw.rs +++ b/foyer-memory/src/raw.rs @@ -31,9 +31,9 @@ use equivalent::Equivalent; use foyer_common::{ code::HashBuilder, event::{Event, EventListener}, + executor::{tokio::TokioHandleExecutor, Executor, ExecutorEnum}, metrics::Metrics, properties::{Location, Properties, Source}, - runtime::SingletonHandle, strict_assert, utils::scope::Scope, }; @@ -950,7 +950,7 @@ where .boxed() })), (), - &tokio::runtime::Handle::current().into(), + &ExecutorEnum::from(TokioHandleExecutor::current()), ) } @@ -969,7 +969,7 @@ where optional_fetch_builder: Option>, required_fetch_builder: Option>, ctx: C, - runtime: &SingletonHandle, + executor: &ExecutorEnum, ) -> RawGetOrFetch where Q: Hash + Equivalent + ?Sized + ToOwned, @@ -1006,7 +1006,7 @@ where inflights: inflights.clone(), close, }; - runtime.spawn(fetch); + executor.spawn(fetch); RawGetOrFetch::Miss(RawWait { waiter }) } Enqueue::Wait(waiter) => RawGetOrFetch::Miss(RawWait { waiter }), diff --git a/foyer-storage/src/engine/block/engine.rs b/foyer-storage/src/engine/block/engine.rs index 76c57e83..8ace8785 100644 --- a/foyer-storage/src/engine/block/engine.rs +++ b/foyer-storage/src/engine/block/engine.rs @@ -28,6 +28,7 @@ use fastrace::prelude::*; use foyer_common::{ bits, code::{StorageKey, StorageValue}, + executor::{Executor, ExecutorEnum}, metrics::Metrics, properties::{Age, Properties}, }; @@ -61,7 +62,6 @@ use crate::{ filter::conditions::IoThrottle, io::{bytes::IoSliceMut, PAGE}, keeper::PieceRef, - runtime::Runtime, serde::EntryDeserializer, Device, Load, RejectAll, StorageFilter, StorageFilterResult, }; @@ -325,7 +325,7 @@ where EngineBuildContext { io_engine, metrics, - runtime, + executor: runtime, recover_mode, }: EngineBuildContext, ) -> Result>> { @@ -437,7 +437,7 @@ where submit_queue_size, submit_queue_size_threshold: self.submit_queue_size_threshold, sequence, - runtime, + executor: runtime, active: AtomicBool::new(true), metrics, #[cfg(any(test, feature = "test_utils"))] @@ -515,7 +515,7 @@ where sequence: AtomicSequence, - runtime: Runtime, + executor: ExecutorEnum, active: AtomicBool, @@ -739,7 +739,7 @@ where }); let this = self.clone(); - self.inner.runtime.write().spawn(async move { + self.inner.executor.spawn(async move { this.inner.flushers[hash as usize % this.inner.flushers.len()].submit(Submission::Tombstone { tombstone: Tombstone { hash, sequence }, stats, @@ -927,7 +927,7 @@ mod tests { .build(EngineBuildContext { io_engine, metrics, - runtime, + executor: runtime, recover_mode: RecoverMode::Strict, }) .await @@ -969,7 +969,7 @@ mod tests { .build(EngineBuildContext { io_engine, metrics, - runtime, + executor: runtime, recover_mode: RecoverMode::Strict, }) .await @@ -1397,7 +1397,7 @@ mod tests { .build(EngineBuildContext { io_engine, metrics: Arc::new(Metrics::noop()), - runtime, + executor, recover_mode: RecoverMode::None, }) .await diff --git a/foyer-storage/src/engine/block/flusher.rs b/foyer-storage/src/engine/block/flusher.rs index de203681..ce2ee462 100644 --- a/foyer-storage/src/engine/block/flusher.rs +++ b/foyer-storage/src/engine/block/flusher.rs @@ -27,6 +27,7 @@ use std::{ use foyer_common::{ bits, code::{StorageKey, StorageValue}, + executor::{Executor, ExecutorEnum}, metrics::Metrics, properties::Properties, }; @@ -55,7 +56,6 @@ use crate::{ PAGE, }, keeper::PieceRef, - runtime::Runtime, Compression, }; @@ -176,7 +176,7 @@ where block_manager: BlockManager, tombstone_log: Option, metrics: Arc, - runtime: &Runtime, + executor: &ExecutorEnum, #[cfg(any(test, feature = "test_utils"))] flush_switch: Switch, ) -> Result<()> { let id = self.id; @@ -213,7 +213,7 @@ where indexer, tombstone_log, compression, - runtime: runtime.clone(), + executor: executor.clone(), metrics: metrics.clone(), io_tasks: VecDeque::with_capacity(1), current_block_handle, @@ -222,7 +222,7 @@ where flush_switch, }; - runtime.write().spawn(async move { + executor.spawn(async move { if let Err(e) = runner.run().await { tracing::error!(id, "[flusher]: flusher exit with error: {e}"); } @@ -312,7 +312,7 @@ where compression: Compression, - runtime: Runtime, + executor: ExecutorEnum, metrics: Arc, @@ -611,8 +611,7 @@ where let f: BoxFuture<'_, Result<(Vec, ())>> = try_join(try_join_all(futures), future).boxed(); let handle = self - .runtime - .write() + .executor .spawn(f) .map(move |jres| match jres { Ok(Ok((mut states, ()))) => IoTaskCtx { diff --git a/foyer-storage/src/engine/block/manager.rs b/foyer-storage/src/engine/block/manager.rs index 9f9baa8c..d32edfe7 100644 --- a/foyer-storage/src/engine/block/manager.rs +++ b/foyer-storage/src/engine/block/manager.rs @@ -22,7 +22,10 @@ use std::{ }, }; -use foyer_common::metrics::Metrics; +use foyer_common::{ + executor::{Executor, ExecutorEnum}, + metrics::Metrics, +}; use futures_core::future::BoxFuture; use futures_util::{ future::{ready, Shared}, @@ -43,7 +46,7 @@ use crate::{ device::Partition, engine::IoEngine, }, - Device, IoError, Runtime, + Device, IoError, }; pub type BlockId = u32; @@ -159,7 +162,7 @@ struct Inner { reclaim_concurrency: usize, clean_block_threshold: usize, metrics: Arc, - runtime: Runtime, + executor: ExecutorEnum, } #[derive(Debug, Clone)] @@ -178,7 +181,7 @@ impl BlockManager { reclaim_concurrency: usize, clean_block_threshold: usize, metrics: Arc, - runtime: Runtime, + executor: ExecutorEnum, ) -> Result { let mut blocks = vec![]; @@ -223,7 +226,7 @@ impl BlockManager { reclaim_concurrency, clean_block_threshold, metrics, - runtime, + executor, }; let inner = Arc::new(inner); let this = Self { inner }; @@ -381,7 +384,7 @@ impl BlockManager { block, }; let future = self.inner.reclaimer.reclaim(block); - self.inner.runtime.write().spawn(future); + self.inner.executor.spawn(future); } } } diff --git a/foyer-storage/src/engine/block/reclaimer.rs b/foyer-storage/src/engine/block/reclaimer.rs index 7d98d841..c0e9af99 100644 --- a/foyer-storage/src/engine/block/reclaimer.rs +++ b/foyer-storage/src/engine/block/reclaimer.rs @@ -17,6 +17,7 @@ use std::{fmt::Debug, sync::Arc}; use foyer_common::{ bits, code::{StorageKey, StorageValue}, + executor::{Executor, ExecutorEnum}, properties::Properties, }; use futures_core::future::BoxFuture; @@ -36,7 +37,6 @@ use crate::{ bytes::{IoSlice, IoSliceMut}, PAGE, }, - runtime::Runtime, Statistics, StorageFilter, }; @@ -55,7 +55,7 @@ where reinsertion_filter: Arc, blob_index_size: usize, statistics: Arc, - runtime: Runtime, + executor: ExecutorEnum, } impl Debug for Reclaimer @@ -81,7 +81,7 @@ where reinsertion_filter: Arc, blob_index_size: usize, statistics: Arc, - runtime: Runtime, + executor: ExecutorEnum, ) -> Self { Self { indexer, @@ -89,7 +89,7 @@ where reinsertion_filter, blob_index_size, statistics, - runtime, + executor, } } } @@ -105,7 +105,7 @@ where let statistics = self.statistics.clone(); let blob_index_size = self.blob_index_size; let flushers = self.flushers.clone(); - let runtime = self.runtime.clone(); + let runtime = self.executor.clone(); let indexer = self.indexer.clone(); async move { let id = block.id(); @@ -166,7 +166,7 @@ where let unpicked_count = unpicked.len(); let waits = flushers.iter().map(|flusher| flusher.wait()).collect_vec(); - runtime.write().spawn(async move { + runtime.spawn(async move { join_all(waits).await; }); indexer.remove_batch(unpicked); diff --git a/foyer-storage/src/engine/block/recover.rs b/foyer-storage/src/engine/block/recover.rs index b7a268dd..18c7bb98 100644 --- a/foyer-storage/src/engine/block/recover.rs +++ b/foyer-storage/src/engine/block/recover.rs @@ -19,7 +19,10 @@ use std::{ time::Instant, }; -use foyer_common::metrics::Metrics; +use foyer_common::{ + executor::{Executor, ExecutorEnum}, + metrics::Metrics, +}; use futures_util::{stream, StreamExt, TryStreamExt}; use itertools::Itertools; @@ -36,7 +39,6 @@ use crate::{ RecoverMode, }, error::{Error, Result}, - runtime::Runtime, }; #[derive(Debug)] @@ -53,7 +55,7 @@ impl RecoverRunner { indexer: &Indexer, block_manager: &BlockManager, tombstones: &[Tombstone], - runtime: Runtime, + executor: ExecutorEnum, metrics: Arc, ) -> Result<()> { let now = Instant::now(); @@ -62,9 +64,7 @@ impl RecoverRunner { let mode = recover_mode; let total = stream::iter(blocks.into_iter().map(|id| { let block = block_manager.block(id).clone(); - runtime - .user() - .spawn(async move { BlockRecoverRunner::run(mode, block, blob_index_size).await }) + executor.spawn(async move { BlockRecoverRunner::run(mode, block, blob_index_size).await }) })) .buffered(recover_concurrency) .try_collect::>() diff --git a/foyer-storage/src/engine/mod.rs b/foyer-storage/src/engine/mod.rs index 81015159..34d1f617 100644 --- a/foyer-storage/src/engine/mod.rs +++ b/foyer-storage/src/engine/mod.rs @@ -16,13 +16,14 @@ use std::{any::Any, fmt::Debug, sync::Arc}; use foyer_common::{ code::{StorageKey, StorageValue}, + executor::ExecutorEnum, metrics::Metrics, properties::{Age, Properties}, }; use foyer_memory::Piece; use futures_core::future::BoxFuture; -use crate::{error::Result, filter::StorageFilterResult, io::engine::IoEngine, keeper::PieceRef, Device, Runtime}; +use crate::{error::Result, filter::StorageFilterResult, io::engine::IoEngine, keeper::PieceRef, Device}; /// Source context for populated entry. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -122,10 +123,10 @@ pub enum RecoverMode { pub struct EngineBuildContext { /// IO engine for the disk cache engine. pub io_engine: Arc, + /// The executor for the disk cache engine. + pub executor: ExecutorEnum, /// Shared metrics for all components. pub metrics: Arc, - /// The runtime for the disk cache engine. - pub runtime: Runtime, /// The recover mode of the disk cache engine. pub recover_mode: RecoverMode, } diff --git a/foyer-storage/src/lib.rs b/foyer-storage/src/lib.rs index 4153039b..1c6a9e79 100644 --- a/foyer-storage/src/lib.rs +++ b/foyer-storage/src/lib.rs @@ -23,7 +23,6 @@ mod error; mod filter; mod io; mod keeper; -mod runtime; mod serde; mod store; diff --git a/foyer-storage/src/prelude.rs b/foyer-storage/src/prelude.rs index 11ea7378..a9f5377f 100644 --- a/foyer-storage/src/prelude.rs +++ b/foyer-storage/src/prelude.rs @@ -47,6 +47,5 @@ pub use crate::{ }, error::{IoError, IoResult}, }, - runtime::Runtime, - store::{RuntimeOptions, Store, StoreBuilder, TokioRuntimeOptions}, + store::{Store, StoreBuilder, TokioRuntimeOptions}, }; diff --git a/foyer-storage/src/runtime.rs b/foyer-storage/src/runtime.rs deleted file mode 100644 index 887d633f..00000000 --- a/foyer-storage/src/runtime.rs +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright 2025 foyer Project Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use foyer_common::runtime::{BackgroundShutdownRuntime, SingletonHandle}; -use tokio::runtime::Handle; - -#[derive(Debug)] -struct RuntimeInner { - _read_runtime: Option>, - _write_runtime: Option>, - - read_runtime_handle: SingletonHandle, - write_runtime_handle: SingletonHandle, - user_runtime_handle: SingletonHandle, -} - -/// [`Runtime`] holds the runtime reference and non-cloneable handles to prevent handle usage after runtime shutdown. -#[derive(Debug, Clone)] -pub struct Runtime { - inner: Arc, -} - -impl Runtime { - /// Create a new runtime with runtimes if given. - pub fn new( - read_runtime: Option>, - write_runtime: Option>, - user_runtime_handle: Handle, - ) -> Self { - let read_runtime_handle = read_runtime - .as_ref() - .map(|rt| rt.handle().clone()) - .unwrap_or(user_runtime_handle.clone()); - let write_runtime_handle = write_runtime - .as_ref() - .map(|rt| rt.handle().clone()) - .unwrap_or(user_runtime_handle.clone()); - Self { - inner: Arc::new(RuntimeInner { - _read_runtime: read_runtime, - _write_runtime: write_runtime, - read_runtime_handle: read_runtime_handle.into(), - write_runtime_handle: write_runtime_handle.into(), - user_runtime_handle: user_runtime_handle.into(), - }), - } - } - - /// Create a new runtime with current runtime env only. - pub fn current() -> Self { - Self { - inner: Arc::new(RuntimeInner { - _read_runtime: None, - _write_runtime: None, - read_runtime_handle: Handle::current().into(), - write_runtime_handle: Handle::current().into(), - user_runtime_handle: Handle::current().into(), - }), - } - } - - /// Get the non-cloneable read runtime handle. - pub fn read(&self) -> &SingletonHandle { - &self.inner.read_runtime_handle - } - - /// Get the non-cloneable write runtime handle. - pub fn write(&self) -> &SingletonHandle { - &self.inner.write_runtime_handle - } - - /// Get the non-cloneable user runtime handle. - pub fn user(&self) -> &SingletonHandle { - &self.inner.user_runtime_handle - } -} diff --git a/foyer-storage/src/store.rs b/foyer-storage/src/store.rs index c53afa5d..48880477 100644 --- a/foyer-storage/src/store.rs +++ b/foyer-storage/src/store.rs @@ -24,12 +24,11 @@ use std::{ use equivalent::Equivalent; use foyer_common::{ code::{HashBuilder, StorageKey, StorageValue}, + executor::{Executor, ExecutorEnum}, metrics::Metrics, properties::{Age, Properties}, - runtime::BackgroundShutdownRuntime, }; use foyer_memory::{Cache, Piece}; -use tokio::runtime::Handle; #[cfg(feature = "test_utils")] use crate::test_utils::*; @@ -39,13 +38,12 @@ use crate::{ noop::{NoopEngine, NoopEngineBuilder}, Engine, EngineBuildContext, EngineConfig, Load, Populated, RecoverMode, }, - error::{Error, Result}, + error::Result, io::{ device::{statistics::Statistics, throttle::Throttle, Device}, engine::{monitor::MonitoredIoEngine, psync::PsyncIoEngineBuilder, IoEngine, IoEngineBuilder}, }, keeper::Keeper, - runtime::Runtime, serde::EntrySerializer, StorageFilterResult, }; @@ -75,7 +73,7 @@ where compression: Compression, - runtime: Runtime, + executor: ExecutorEnum, metrics: Arc, @@ -95,7 +93,7 @@ where .field("keeper", &self.inner.keeper) .field("engine", &self.inner.engine) .field("compression", &self.inner.compression) - .field("runtimes", &self.inner.runtime) + .field("executor", &self.inner.executor) .finish() } } @@ -186,7 +184,7 @@ where } let future = self.inner.engine.load(hash); - match self.inner.runtime.read().spawn(future).await.unwrap() { + match self.inner.executor.spawn(future).await.unwrap() { Ok(Load::Entry { key: k, value: v, @@ -282,11 +280,6 @@ where self.inner.engine.device().statistics().throttle() } - /// Get the runtime. - pub fn runtime(&self) -> &Runtime { - &self.inner.runtime - } - /// Wait for the ongoing flush and reclaim tasks to finish. pub async fn wait(&self) { self.inner.engine.wait().await @@ -329,23 +322,6 @@ pub struct TokioRuntimeOptions { pub max_blocking_threads: usize, } -/// Options for the dedicated runtime. -#[derive(Debug, Clone)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub enum RuntimeOptions { - /// Disable dedicated runtime. The runtime which foyer is built on will be used. - Disabled, - /// Use unified dedicated runtime for both reads and writes. - Unified(TokioRuntimeOptions), - /// Use separated dedicated runtime for reads or writes. - Separated { - /// Dedicated runtime for reads. - read_runtime_options: TokioRuntimeOptions, - /// Dedicated runtime for both foreground and background writes - write_runtime_options: TokioRuntimeOptions, - }, -} - /// The builder of the disk cache. pub struct StoreBuilder where @@ -356,13 +332,12 @@ where { name: Cow<'static, str>, memory: Cache, + executor: ExecutorEnum, metrics: Arc, io_engine: Option>, engine_builder: Option>>, - runtime_config: RuntimeOptions, - compression: Compression, recover_mode: RecoverMode, @@ -381,10 +356,10 @@ where f.debug_struct("StoreBuilder") .field("name", &self.name) .field("memory", &self.memory) + .field("executor", &self.executor) .field("metrics", &self.metrics) .field("io_engine", &self.io_engine) .field("engine_builder", &self.engine_builder) - .field("runtime_config", &self.runtime_config) .field("compression", &self.compression) .field("recover_mode", &self.recover_mode) .finish() @@ -399,17 +374,21 @@ where P: Properties, { /// Setup disk cache store for the given in-memory cache. - pub fn new(name: impl Into>, memory: Cache, metrics: Arc) -> Self { + pub fn new( + name: impl Into>, + memory: Cache, + executor: ExecutorEnum, + metrics: Arc, + ) -> Self { Self { name: name.into(), memory, + executor, metrics, io_engine: None, engine_builder: None, - runtime_config: RuntimeOptions::Disabled, - compression: Compression::default(), recover_mode: RecoverMode::default(), #[cfg(any(test, feature = "test_utils"))] @@ -449,12 +428,6 @@ where self } - /// Configure the dedicated runtime for the disk cache store. - pub fn with_runtime_options(mut self, runtime_options: RuntimeOptions) -> Self { - self.runtime_config = runtime_options; - self - } - /// Set the load throttle switch for the disk cache store. #[cfg(any(test, feature = "test_utils"))] pub fn with_load_throttle_switch(mut self, switch: LoadThrottleSwitch) -> Self { @@ -471,50 +444,10 @@ where pub async fn build(self) -> Result> { let memory = self.memory.clone(); let metrics = self.metrics.clone(); + let executor = self.executor; let compression = self.compression; - let build_runtime = |config: &TokioRuntimeOptions, suffix: &str| { - let mut builder = tokio::runtime::Builder::new_multi_thread(); - #[cfg(madsim)] - let _ = config; - #[cfg(not(madsim))] - if config.worker_threads != 0 { - builder.worker_threads(config.worker_threads); - } - #[cfg(not(madsim))] - if config.max_blocking_threads != 0 { - builder.max_blocking_threads(config.max_blocking_threads); - } - builder.thread_name(format!("{}-{}", &self.name, suffix)); - let runtime = builder.enable_all().build().map_err(anyhow::Error::from)?; - let runtime = BackgroundShutdownRuntime::from(runtime); - Ok::<_, Error>(Arc::new(runtime)) - }; - - let user_runtime_handle = Handle::current(); - let (read_runtime, write_runtime) = match self.runtime_config { - RuntimeOptions::Disabled => { - tracing::info!( - "[store]: Dedicated runtime is disabled. This may lead to spikes in latency under high load. Hint: Consider configuring a dedicated runtime." - ); - (None, None) - } - RuntimeOptions::Unified(runtime_config) => { - let runtime = build_runtime(&runtime_config, "unified")?; - (Some(runtime.clone()), Some(runtime.clone())) - } - RuntimeOptions::Separated { - read_runtime_options: read_runtime_config, - write_runtime_options: write_runtime_config, - } => { - let read_runtime = build_runtime(&read_runtime_config, "read")?; - let write_runtime = build_runtime(&write_runtime_config, "write")?; - (Some(read_runtime), Some(write_runtime)) - } - }; - let runtime = Runtime::new(read_runtime, write_runtime, user_runtime_handle); - let io_engine = match self.io_engine { Some(ie) => ie, None => { @@ -538,8 +471,8 @@ where let engine = engine_builder .build(EngineBuildContext { io_engine, + executor: executor.clone(), metrics: metrics.clone(), - runtime: runtime.clone(), recover_mode: self.recover_mode, }) .await?; @@ -553,7 +486,7 @@ where keeper, engine, compression, - runtime, + executor, metrics, #[cfg(any(test, feature = "test_utils"))] load_throttle_switch, @@ -567,7 +500,7 @@ where #[cfg(test)] mod tests { - use foyer_common::hasher::ModHasher; + use foyer_common::{executor::tokio::TokioHandleExecutor, hasher::ModHasher}; use foyer_memory::CacheBuilder; use super::*; @@ -582,7 +515,8 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let metrics = Arc::new(Metrics::noop()); let memory: Cache = CacheBuilder::new(10).build(); - let _ = StoreBuilder::new("test", memory, metrics) + let executor = TokioHandleExecutor::current().into(); + let _ = StoreBuilder::new("test", memory, executor, metrics) .with_io_engine(PsyncIoEngineBuilder::new().build().await.unwrap()) .with_engine_config( BlockEngineBuilder::new( @@ -606,13 +540,14 @@ mod tests { let metrics = Arc::new(Metrics::noop()); let memory: Cache = CacheBuilder::new(10).with_hash_builder(ModHasher::default()).build(); + let executor = TokioHandleExecutor::current().into(); let e1 = memory.insert(1, "foo".to_string()); let e2 = memory.insert(1 + 1 + u64::MAX as u128, "bar".to_string()); assert_eq!(memory.hash(e1.key()), memory.hash(e2.key())); - let store = StoreBuilder::new("test", memory, metrics) + let store = StoreBuilder::new("test", memory, executor, metrics) .with_io_engine(PsyncIoEngineBuilder::new().build().await.unwrap()) .with_engine_config( BlockEngineBuilder::new(