diff --git a/core/layers/timeout/src/lib.rs b/core/layers/timeout/src/lib.rs index 2cd56bf7090a..2729e4f1dcf3 100644 --- a/core/layers/timeout/src/lib.rs +++ b/core/layers/timeout/src/lib.rs @@ -28,94 +28,6 @@ use opendal_core::raw::*; use opendal_core::*; /// Add timeout for every operation to avoid slow or unexpected hang operations. -/// -/// For example, a dead connection could hang a databases sql query. TimeoutLayer -/// will break this connection and returns an error so users can handle it by -/// retrying or print to users. -/// -/// # Notes -/// -/// `TimeoutLayer` treats all operations in two kinds: -/// -/// - Non IO Operation like `stat`, `delete` they operate on a single file. We control -/// them by setting `timeout`. -/// - IO Operation like `read`, `Reader::read` and `Writer::write`, they operate on data directly, we -/// control them by setting `io_timeout`. -/// -/// # Default -/// -/// - timeout: 60 seconds -/// - io_timeout: 10 seconds -/// -/// # Panics -/// -/// TimeoutLayer will drop the future if the timeout is reached. This might cause the internal state -/// of the future to be broken. If underlying future moves ownership into the future, it will be -/// dropped and will neven return back. -/// -/// For example, while using `TimeoutLayer` with `RetryLayer` at the same time, please make sure -/// timeout layer showed up before retry layer. -/// -/// ```no_run -/// # use std::time::Duration; -/// # -/// # use opendal_core::services; -/// # use opendal_core::Operator; -/// # use opendal_core::Result; -/// # use opendal_layer_retry::RetryLayer; -/// # use opendal_layer_timeout::TimeoutLayer; -/// # -/// # fn main() -> Result<()> { -/// let op = Operator::new(services::Memory::default())? -/// // This is fine, since timeout happen during retry. -/// .layer(TimeoutLayer::default().with_io_timeout(Duration::from_nanos(1))) -/// .layer(RetryLayer::default()) -/// // This is wrong. Since timeout layer will drop future, leaving retry layer in a bad state. -/// .layer(TimeoutLayer::default().with_io_timeout(Duration::from_nanos(1))) -/// .finish(); -/// # Ok(()) -/// # } -/// ``` -/// -/// # Examples -/// -/// The following examples will create a timeout layer with 10 seconds timeout for all non-io -/// operations, 3 seconds timeout for all io operations. -/// -/// ```no_run -/// # use std::time::Duration; -/// # -/// # use opendal_core::services; -/// # use opendal_core::Operator; -/// # use opendal_core::Result; -/// # use opendal_layer_timeout::TimeoutLayer; -/// # -/// # fn main() -> Result<()> { -/// let _ = Operator::new(services::Memory::default())? -/// .layer( -/// TimeoutLayer::default() -/// .with_timeout(Duration::from_secs(10)) -/// .with_io_timeout(Duration::from_secs(3)), -/// ) -/// .finish(); -/// # Ok(()) -/// # } -/// ``` -/// -/// # Implementation Notes -/// -/// TimeoutLayer is using [`tokio::time::timeout`] to implement timeout for operations. And IO -/// Operations insides `reader`, `writer` will use `Pin>` to track the -/// timeout. -/// -/// This might introduce a bit overhead for IO operations, but it's the only way to implement -/// timeout correctly. We used to implement timeout layer in zero cost way that only stores -/// a [`Instant`] and check the timeout by comparing the instant with current time. -/// However, it doesn't work for all cases. -/// -/// For examples, users TCP connection could be in [Busy ESTAB](https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die) state. In this state, no IO event will be emitted. The runtime -/// will never poll our future again. From the application side, this future is hanging forever -/// until this TCP connection is closed for reaching the linux [net.ipv4.tcp_retries2](https://man7.org/linux/man-pages/man7/tcp.7.html) times. #[derive(Clone)] pub struct TimeoutLayer { timeout: Duration, @@ -137,17 +49,13 @@ impl TimeoutLayer { Self::default() } - /// Set timeout for TimeoutLayer with given value. - /// - /// This timeout is for all non-io operations like `stat`, `delete`. + /// Set timeout for non-io operations like `stat`, `delete`. pub fn with_timeout(mut self, timeout: Duration) -> Self { self.timeout = timeout; self } - /// Set io timeout for TimeoutLayer with given value. - /// - /// This timeout is for all io operations like `read`, `Reader::read` and `Writer::write`. + /// Set io timeout for operations like `read`, `write`. pub fn with_io_timeout(mut self, timeout: Duration) -> Self { self.io_timeout = timeout; self @@ -165,7 +73,6 @@ impl Layer for TimeoutLayer { TimeoutAccessor { inner, - timeout: self.timeout, io_timeout: self.io_timeout, } @@ -176,7 +83,6 @@ impl Layer for TimeoutLayer { #[derive(Debug)] pub struct TimeoutAccessor { inner: A, - timeout: Duration, io_timeout: Duration, } @@ -186,7 +92,7 @@ impl TimeoutAccessor { tokio::time::timeout(self.timeout, fut).await.map_err(|_| { Error::new(ErrorKind::Unexpected, "operation timeout reached") .with_operation(op) - .with_context("timeout", self.timeout.as_secs_f64().to_string()) + .with_context("timeout", format!("{:?}", self.timeout)) .set_temporary() })? } @@ -201,7 +107,7 @@ impl TimeoutAccessor { .map_err(|_| { Error::new(ErrorKind::Unexpected, "io timeout reached") .with_operation(op) - .with_context("timeout", self.io_timeout.as_secs_f64().to_string()) + .with_context("timeout", format!("{:?}", self.io_timeout)) .set_temporary() })? } @@ -292,7 +198,6 @@ impl Execute for TimeoutExecutor { #[doc(hidden)] pub struct TimeoutWrapper { inner: R, - timeout: Duration, } @@ -310,7 +215,7 @@ impl TimeoutWrapper { tokio::time::timeout(timeout, fut).await.map_err(|_| { Error::new(ErrorKind::Unexpected, "io operation timeout reached") .with_operation(op) - .with_context("timeout", timeout.as_secs_f64().to_string()) + .with_context("timeout", format!("{:?}", timeout)) .set_temporary() })? } @@ -349,7 +254,8 @@ impl oio::List for TimeoutWrapper { impl oio::Delete for TimeoutWrapper { async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { - self.inner.delete(path, args).await + let fut = self.inner.delete(path, args); + Self::io_timeout(self.timeout, Operation::Delete.into_static(), fut).await } async fn close(&mut self) -> Result<()> { @@ -360,13 +266,9 @@ impl oio::Delete for TimeoutWrapper { #[cfg(test)] mod tests { + use super::*; use std::future::pending; - - use futures::StreamExt; use tokio::time::sleep; - use tokio::time::timeout; - - use super::*; #[derive(Debug, Clone, Default)] struct MockService; @@ -384,112 +286,34 @@ mod tests { delete: true, ..Default::default() }); - am.into() } - /// This function will build a reader that always return pending. async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> { Ok((RpRead::new(), Box::new(MockReader))) } - /// This function will never return. async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { sleep(Duration::from_secs(u64::MAX)).await; - Ok((RpDelete::default(), Box::new(()))) } - - async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> { - Ok((RpList::default(), Box::new(MockLister))) - } } #[derive(Debug, Clone, Default)] struct MockReader; - impl oio::Read for MockReader { fn read(&mut self) -> impl Future> { pending() } } - #[derive(Debug, Clone, Default)] - struct MockLister; - - impl oio::List for MockLister { - fn next(&mut self) -> impl Future>> { - pending() - } - } - #[tokio::test] async fn test_operation_timeout() { let srv = MockService; let op = Operator::from_inner(Arc::new(srv)) - .layer(TimeoutLayer::default().with_timeout(Duration::from_secs(1))); - - let fut = async { - let res = op.delete("test").await; - assert!(res.is_err()); - let err = res.unwrap_err(); - assert_eq!(err.kind(), ErrorKind::Unexpected); - assert!(err.to_string().contains("timeout")) - }; - - timeout(Duration::from_secs(2), fut) - .await - .expect("this test should not exceed 2 seconds") - } - - #[tokio::test] - async fn test_io_timeout() { - let srv = MockService; - let op = Operator::from_inner(Arc::new(srv)) - .layer(TimeoutLayer::default().with_io_timeout(Duration::from_secs(1))); - - let reader = op.reader("test").await.unwrap(); - - let res = reader.read(0..4).await; - assert!(res.is_err()); - let err = res.unwrap_err(); - assert_eq!(err.kind(), ErrorKind::Unexpected); - assert!(err.to_string().contains("timeout")) - } - - #[tokio::test] - async fn test_list_timeout() { - let srv = MockService; - let op = Operator::from_inner(Arc::new(srv)).layer( - TimeoutLayer::default() - .with_timeout(Duration::from_secs(1)) - .with_io_timeout(Duration::from_secs(1)), - ); - - let mut lister = op.lister("test").await.unwrap(); - - let res = lister.next().await.unwrap(); - assert!(res.is_err()); - let err = res.unwrap_err(); - assert_eq!(err.kind(), ErrorKind::Unexpected); - assert!(err.to_string().contains("timeout")) - } - - #[tokio::test] - async fn test_list_timeout_raw() { - use oio::List; - - let acc = MockService; - let timeout_layer = TimeoutLayer::default() - .with_timeout(Duration::from_secs(1)) - .with_io_timeout(Duration::from_secs(1)); - let timeout_acc = timeout_layer.layer(acc); - - let (_, mut lister) = Access::list(&timeout_acc, "test", OpList::default()) - .await - .unwrap(); + .layer(TimeoutLayer::default().with_timeout(Duration::from_millis(10))); - let res = lister.next().await; + let res = op.delete("test").await; assert!(res.is_err()); let err = res.unwrap_err(); assert_eq!(err.kind(), ErrorKind::Unexpected);