Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 10 additions & 186 deletions core/layers/timeout/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,94 +28,6 @@ use opendal_core::raw::*;
use opendal_core::*;

/// Add timeout for every operation to avoid slow or unexpected hang operations.
///
/// For example, a dead connection could hang a databases sql query. TimeoutLayer
/// will break this connection and returns an error so users can handle it by
/// retrying or print to users.
///
/// # Notes
///
/// `TimeoutLayer` treats all operations in two kinds:
///
/// - Non IO Operation like `stat`, `delete` they operate on a single file. We control
/// them by setting `timeout`.
/// - IO Operation like `read`, `Reader::read` and `Writer::write`, they operate on data directly, we
/// control them by setting `io_timeout`.
///
/// # Default
///
/// - timeout: 60 seconds
/// - io_timeout: 10 seconds
///
/// # Panics
///
/// TimeoutLayer will drop the future if the timeout is reached. This might cause the internal state
/// of the future to be broken. If underlying future moves ownership into the future, it will be
/// dropped and will neven return back.
///
/// For example, while using `TimeoutLayer` with `RetryLayer` at the same time, please make sure
/// timeout layer showed up before retry layer.
///
/// ```no_run
/// # use std::time::Duration;
/// #
/// # use opendal_core::services;
/// # use opendal_core::Operator;
/// # use opendal_core::Result;
/// # use opendal_layer_retry::RetryLayer;
/// # use opendal_layer_timeout::TimeoutLayer;
/// #
/// # fn main() -> Result<()> {
/// let op = Operator::new(services::Memory::default())?
/// // This is fine, since timeout happen during retry.
/// .layer(TimeoutLayer::default().with_io_timeout(Duration::from_nanos(1)))
/// .layer(RetryLayer::default())
/// // This is wrong. Since timeout layer will drop future, leaving retry layer in a bad state.
/// .layer(TimeoutLayer::default().with_io_timeout(Duration::from_nanos(1)))
/// .finish();
/// # Ok(())
/// # }
/// ```
///
/// # Examples
///
/// The following examples will create a timeout layer with 10 seconds timeout for all non-io
/// operations, 3 seconds timeout for all io operations.
///
/// ```no_run
/// # use std::time::Duration;
/// #
/// # use opendal_core::services;
/// # use opendal_core::Operator;
/// # use opendal_core::Result;
/// # use opendal_layer_timeout::TimeoutLayer;
/// #
/// # fn main() -> Result<()> {
/// let _ = Operator::new(services::Memory::default())?
/// .layer(
/// TimeoutLayer::default()
/// .with_timeout(Duration::from_secs(10))
/// .with_io_timeout(Duration::from_secs(3)),
/// )
/// .finish();
/// # Ok(())
/// # }
/// ```
///
/// # Implementation Notes
///
/// TimeoutLayer is using [`tokio::time::timeout`] to implement timeout for operations. And IO
/// Operations insides `reader`, `writer` will use `Pin<Box<tokio::time::Sleep>>` to track the
/// timeout.
///
/// This might introduce a bit overhead for IO operations, but it's the only way to implement
/// timeout correctly. We used to implement timeout layer in zero cost way that only stores
/// a [`Instant`] and check the timeout by comparing the instant with current time.
/// However, it doesn't work for all cases.
///
/// For examples, users TCP connection could be in [Busy ESTAB](https://blog.cloudflare.com/when-tcp-sockets-refuse-to-die) state. In this state, no IO event will be emitted. The runtime
/// will never poll our future again. From the application side, this future is hanging forever
/// until this TCP connection is closed for reaching the linux [net.ipv4.tcp_retries2](https://man7.org/linux/man-pages/man7/tcp.7.html) times.
#[derive(Clone)]
pub struct TimeoutLayer {
timeout: Duration,
Expand All @@ -137,17 +49,13 @@ impl TimeoutLayer {
Self::default()
}

/// Set timeout for TimeoutLayer with given value.
///
/// This timeout is for all non-io operations like `stat`, `delete`.
/// Set timeout for non-io operations like `stat`, `delete`.
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}

/// Set io timeout for TimeoutLayer with given value.
///
/// This timeout is for all io operations like `read`, `Reader::read` and `Writer::write`.
/// Set io timeout for operations like `read`, `write`.
pub fn with_io_timeout(mut self, timeout: Duration) -> Self {
self.io_timeout = timeout;
self
Expand All @@ -165,7 +73,6 @@ impl<A: Access> Layer<A> for TimeoutLayer {

TimeoutAccessor {
inner,

timeout: self.timeout,
io_timeout: self.io_timeout,
}
Expand All @@ -176,7 +83,6 @@ impl<A: Access> Layer<A> for TimeoutLayer {
#[derive(Debug)]
pub struct TimeoutAccessor<A: Access> {
inner: A,

timeout: Duration,
io_timeout: Duration,
}
Expand All @@ -186,7 +92,7 @@ impl<A: Access> TimeoutAccessor<A> {
tokio::time::timeout(self.timeout, fut).await.map_err(|_| {
Error::new(ErrorKind::Unexpected, "operation timeout reached")
.with_operation(op)
.with_context("timeout", self.timeout.as_secs_f64().to_string())
.with_context("timeout", format!("{:?}", self.timeout))
.set_temporary()
})?
}
Expand All @@ -201,7 +107,7 @@ impl<A: Access> TimeoutAccessor<A> {
.map_err(|_| {
Error::new(ErrorKind::Unexpected, "io timeout reached")
.with_operation(op)
.with_context("timeout", self.io_timeout.as_secs_f64().to_string())
.with_context("timeout", format!("{:?}", self.io_timeout))
.set_temporary()
})?
}
Expand Down Expand Up @@ -292,7 +198,6 @@ impl Execute for TimeoutExecutor {
#[doc(hidden)]
pub struct TimeoutWrapper<R> {
inner: R,

timeout: Duration,
}

Expand All @@ -310,7 +215,7 @@ impl<R> TimeoutWrapper<R> {
tokio::time::timeout(timeout, fut).await.map_err(|_| {
Error::new(ErrorKind::Unexpected, "io operation timeout reached")
.with_operation(op)
.with_context("timeout", timeout.as_secs_f64().to_string())
.with_context("timeout", format!("{:?}", timeout))
.set_temporary()
})?
}
Expand Down Expand Up @@ -349,7 +254,8 @@ impl<R: oio::List> oio::List for TimeoutWrapper<R> {

impl<R: oio::Delete> oio::Delete for TimeoutWrapper<R> {
async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.inner.delete(path, args).await
let fut = self.inner.delete(path, args);
Self::io_timeout(self.timeout, Operation::Delete.into_static(), fut).await
}

async fn close(&mut self) -> Result<()> {
Expand All @@ -360,13 +266,9 @@ impl<R: oio::Delete> oio::Delete for TimeoutWrapper<R> {

#[cfg(test)]
mod tests {
use super::*;
use std::future::pending;

use futures::StreamExt;
use tokio::time::sleep;
use tokio::time::timeout;

use super::*;

#[derive(Debug, Clone, Default)]
struct MockService;
Expand All @@ -384,112 +286,34 @@ mod tests {
delete: true,
..Default::default()
});

am.into()
}

/// This function will build a reader that always return pending.
async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
Ok((RpRead::new(), Box::new(MockReader)))
}

/// This function will never return.
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
sleep(Duration::from_secs(u64::MAX)).await;

Ok((RpDelete::default(), Box::new(())))
}

async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
Ok((RpList::default(), Box::new(MockLister)))
}
}

#[derive(Debug, Clone, Default)]
struct MockReader;

impl oio::Read for MockReader {
fn read(&mut self) -> impl Future<Output = Result<Buffer>> {
pending()
}
}

#[derive(Debug, Clone, Default)]
struct MockLister;

impl oio::List for MockLister {
fn next(&mut self) -> impl Future<Output = Result<Option<oio::Entry>>> {
pending()
}
}

#[tokio::test]
async fn test_operation_timeout() {
let srv = MockService;
let op = Operator::from_inner(Arc::new(srv))
.layer(TimeoutLayer::default().with_timeout(Duration::from_secs(1)));

let fut = async {
let res = op.delete("test").await;
assert!(res.is_err());
let err = res.unwrap_err();
assert_eq!(err.kind(), ErrorKind::Unexpected);
assert!(err.to_string().contains("timeout"))
};

timeout(Duration::from_secs(2), fut)
.await
.expect("this test should not exceed 2 seconds")
}

#[tokio::test]
async fn test_io_timeout() {
let srv = MockService;
let op = Operator::from_inner(Arc::new(srv))
.layer(TimeoutLayer::default().with_io_timeout(Duration::from_secs(1)));

let reader = op.reader("test").await.unwrap();

let res = reader.read(0..4).await;
assert!(res.is_err());
let err = res.unwrap_err();
assert_eq!(err.kind(), ErrorKind::Unexpected);
assert!(err.to_string().contains("timeout"))
}

#[tokio::test]
async fn test_list_timeout() {
let srv = MockService;
let op = Operator::from_inner(Arc::new(srv)).layer(
TimeoutLayer::default()
.with_timeout(Duration::from_secs(1))
.with_io_timeout(Duration::from_secs(1)),
);

let mut lister = op.lister("test").await.unwrap();

let res = lister.next().await.unwrap();
assert!(res.is_err());
let err = res.unwrap_err();
assert_eq!(err.kind(), ErrorKind::Unexpected);
assert!(err.to_string().contains("timeout"))
}

#[tokio::test]
async fn test_list_timeout_raw() {
use oio::List;

let acc = MockService;
let timeout_layer = TimeoutLayer::default()
.with_timeout(Duration::from_secs(1))
.with_io_timeout(Duration::from_secs(1));
let timeout_acc = timeout_layer.layer(acc);

let (_, mut lister) = Access::list(&timeout_acc, "test", OpList::default())
.await
.unwrap();
.layer(TimeoutLayer::default().with_timeout(Duration::from_millis(10)));

let res = lister.next().await;
let res = op.delete("test").await;
assert!(res.is_err());
let err = res.unwrap_err();
assert_eq!(err.kind(), ErrorKind::Unexpected);
Expand Down
Loading