Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions core/core/src/raw/accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,28 @@ pub trait Access: Send + Sync + Debug + Unpin + 'static {
)))
}

/// Invoke the `undelete` operation on the specified path to restore a soft-deleted object.
///
/// Require [`Capability::undelete`]
///
/// # Behavior
///
/// - `undelete` restores a soft-deleted object to its active state.
/// - `undelete` SHOULD return `Ok(())` if the path is restored successfully.
/// - `undelete` SHOULD return error if the path is not soft-deleted or doesn't exist.
fn undelete(
&self,
path: &str,
args: OpUndelete,
) -> impl Future<Output = Result<RpUndelete>> + MaybeSend {
let (_, _) = (path, args);

ready(Err(Error::new(
ErrorKind::Unsupported,
"operation is not supported",
)))
}

/// Invoke the `list` operation on the specified path.
///
/// Require [`Capability::list`]
Expand Down Expand Up @@ -286,6 +308,12 @@ pub trait AccessDyn: Send + Sync + Debug + Unpin {
) -> BoxedFuture<'a, Result<(RpWrite, oio::Writer)>>;
/// Dyn version of [`Accessor::delete`]
fn delete_dyn(&self) -> BoxedFuture<'_, Result<(RpDelete, oio::Deleter)>>;
/// Dyn version of [`Accessor::undelete`]
fn undelete_dyn<'a>(
&'a self,
path: &'a str,
args: OpUndelete,
) -> BoxedFuture<'a, Result<RpUndelete>>;
/// Dyn version of [`Accessor::list`]
fn list_dyn<'a>(
&'a self,
Expand Down Expand Up @@ -359,6 +387,14 @@ where
Box::pin(self.delete())
}

fn undelete_dyn<'a>(
&'a self,
path: &'a str,
args: OpUndelete,
) -> BoxedFuture<'a, Result<RpUndelete>> {
Box::pin(self.undelete(path, args))
}

fn list_dyn<'a>(
&'a self,
path: &'a str,
Expand Down Expand Up @@ -424,6 +460,10 @@ impl Access for dyn AccessDyn {
self.delete_dyn().await
}

async fn undelete(&self, path: &str, args: OpUndelete) -> Result<RpUndelete> {
self.undelete_dyn(path, args).await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
self.list_dyn(path, args).await
}
Expand Down Expand Up @@ -505,6 +545,14 @@ impl<T: Access + ?Sized> Access for Arc<T> {
async move { self.as_ref().delete().await }
}

fn undelete(
&self,
path: &str,
args: OpUndelete,
) -> impl Future<Output = Result<RpUndelete>> + MaybeSend {
async move { self.as_ref().undelete(path, args).await }
}

fn list(
&self,
path: &str,
Expand Down
12 changes: 12 additions & 0 deletions core/core/src/raw/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ pub trait LayeredAccess: Send + Sync + Debug + Unpin + 'static {

fn delete(&self) -> impl Future<Output = Result<(RpDelete, Self::Deleter)>> + MaybeSend;

fn undelete(
&self,
path: &str,
args: OpUndelete,
) -> impl Future<Output = Result<RpUndelete>> + MaybeSend {
self.inner().undelete(path, args)
}

fn list(
&self,
path: &str,
Expand Down Expand Up @@ -217,6 +225,10 @@ impl<L: LayeredAccess> Access for L {
LayeredAccess::delete(self).await
}

async fn undelete(&self, path: &str, args: OpUndelete) -> Result<RpUndelete> {
LayeredAccess::undelete(self, path, args).await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
LayeredAccess::list(self, path, args).await
}
Expand Down
3 changes: 3 additions & 0 deletions core/core/src/raw/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub enum Operation {
Stat,
/// Operation to delete files.
Delete,
/// Operation to restore soft-deleted files.
Undelete,
/// Operation to get the next file from the list.
List,
/// Operation to generate a presigned URL.
Expand Down Expand Up @@ -75,6 +77,7 @@ impl From<Operation> for &'static str {
Operation::Rename => "rename",
Operation::Stat => "stat",
Operation::Delete => "delete",
Operation::Undelete => "undelete",
Operation::List => "list",
Operation::Presign => "presign",
}
Expand Down
13 changes: 13 additions & 0 deletions core/core/src/raw/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,19 @@ impl OpDeleter {
}
}

/// Args for `undelete` operation.
///
/// The path must be normalized.
#[derive(Debug, Clone, Default)]
pub struct OpUndelete {}

impl OpUndelete {
/// Create a new `OpUndelete`.
pub fn new() -> Self {
Self::default()
}
}

/// Args for `list` operation.
#[derive(Debug, Clone, Default)]
pub struct OpList {
Expand Down
4 changes: 4 additions & 0 deletions core/core/src/raw/rps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ pub struct RpCreateDir {}
#[derive(Debug, Clone, Default)]
pub struct RpDelete {}

/// Reply for `undelete` operation
#[derive(Debug, Clone, Default)]
pub struct RpUndelete {}

/// Reply for `list` operation.
#[derive(Debug, Clone, Default)]
pub struct RpList {}
Expand Down
3 changes: 3 additions & 0 deletions core/core/src/types/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ pub struct Capability {
/// Maximum size supported for single delete operations.
pub delete_max_size: Option<usize>,

/// Indicates if undelete (restore) operations are supported for soft-deleted objects.
pub undelete: bool,

/// Indicates if copy operations are supported.
pub copy: bool,
/// Indicates if conditional copy operations with if-not-exists are supported.
Expand Down
28 changes: 28 additions & 0 deletions core/core/src/types/operator/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,34 @@ impl Operator {
self.delete_with(path).recursive(true).await
}

/// Restore a soft-deleted object.
///
/// # Notes
///
/// - This operation requires soft delete to be enabled on the storage backend.
/// - Restoring a non-existent or permanently deleted object will return an error.
///
/// # Examples
///
/// ```
/// # use anyhow::Result;
/// # use opendal_core::Operator;
/// #
/// # async fn test(op: Operator) -> Result<()> {
/// // Delete a file (with soft delete enabled, it's not permanently deleted)
/// op.delete("path/to/file").await?;
///
/// // Restore the soft-deleted file
/// op.undelete("path/to/file").await?;
/// # Ok(())
/// # }
/// ```
pub async fn undelete(&self, path: &str) -> Result<()> {
let path = normalize_path(path);
self.inner().undelete(&path, OpUndelete::new()).await?;
Ok(())
}

/// List entries whose paths start with the given prefix `path`.
///
/// # Semantics
Expand Down
32 changes: 32 additions & 0 deletions core/layers/logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,38 @@ impl<A: Access, I: LoggingInterceptor> LayeredAccess for LoggingAccessor<A, I> {
})
}

async fn undelete(&self, path: &str, args: OpUndelete) -> Result<RpUndelete> {
self.logger.log(
&self.info,
Operation::Undelete,
&[("path", path)],
"started",
None,
);

self.inner
.undelete(path, args)
.await
.inspect(|_| {
self.logger.log(
&self.info,
Operation::Undelete,
&[("path", path)],
"finished",
None,
);
})
.inspect_err(|err| {
self.logger.log(
&self.info,
Operation::Undelete,
&[("path", path)],
"failed",
Some(err),
);
})
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
self.logger.log(
&self.info,
Expand Down
9 changes: 9 additions & 0 deletions core/layers/retry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,15 @@ impl<A: Access, I: RetryInterceptor> LayeredAccess for RetryAccessor<A, I> {
.map_err(|e| e.set_persistent())
}

async fn undelete(&self, path: &str, args: OpUndelete) -> Result<RpUndelete> {
{ || self.inner.undelete(path, args.clone()) }
.retry(self.builder)
.when(|e| e.is_temporary())
.notify(|err, dur| self.notify.intercept(err, dur))
.await
.map_err(|e| e.set_persistent())
}

async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
{ || self.inner.copy(from, to, args.clone()) }
.retry(self.builder)
Expand Down
5 changes: 5 additions & 0 deletions core/layers/timeout/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,11 @@ impl<A: Access> LayeredAccess for TimeoutAccessor<A> {
.map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
}

async fn undelete(&self, path: &str, args: OpUndelete) -> Result<RpUndelete> {
self.timeout(Operation::Undelete, self.inner.undelete(path, args))
.await
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
self.io_timeout(Operation::List, self.inner.list(path, args))
.await
Expand Down
24 changes: 24 additions & 0 deletions core/services/azblob/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,15 @@ impl AzblobBuilder {
self
}

/// Set the soft delete feature for this backend.
///
/// If enabled, deleted blobs will be retained for the configured retention period
/// and can be listed using list_with_deleted.
pub fn enable_soft_deletes(mut self, enabled: bool) -> Self {
self.config.enable_soft_deletes = enabled;
self
}

/// from_connection_string will make a builder from connection string
///
/// connection string looks like:
Expand Down Expand Up @@ -402,6 +411,9 @@ impl Builder for AzblobBuilder {

list: true,
list_with_recursive: true,
list_with_deleted: self.config.enable_soft_deletes,

undelete: self.config.enable_soft_deletes,

presign: self.config.sas_token.is_some(),
presign_stat: self.config.sas_token.is_some(),
Expand Down Expand Up @@ -508,12 +520,24 @@ impl Access for AzblobBackend {
))
}

async fn undelete(&self, path: &str, _args: OpUndelete) -> Result<RpUndelete> {
let resp = self.core.azblob_undelete_blob(path).await?;

let status = resp.status();

match status {
StatusCode::OK => Ok(RpUndelete::default()),
_ => Err(parse_error(resp)),
}
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let l = AzblobLister::new(
self.core.clone(),
path.to_string(),
args.recursive(),
args.limit(),
args.deleted(),
);

Ok((RpList::default(), oio::PageLister::new(l)))
Expand Down
5 changes: 5 additions & 0 deletions core/services/azblob/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ pub struct AzblobConfig {

/// The maximum batch operations of Azblob service backend.
pub batch_max_operations: Option<usize>,

/// Enable soft deletes for this storage account.
#[serde(default)]
pub enable_soft_deletes: bool,
}

impl Debug for AzblobConfig {
Expand All @@ -84,6 +88,7 @@ impl Debug for AzblobConfig {
.field("root", &self.root)
.field("container", &self.container)
.field("endpoint", &self.endpoint)
.field("enable_soft_deletes", &self.enable_soft_deletes)
.finish_non_exhaustive()
}
}
Expand Down
13 changes: 13 additions & 0 deletions core/services/azblob/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,19 @@ impl AzblobCore {
self.send(req).await
}

pub async fn azblob_undelete_blob(&self, path: &str) -> Result<Response<Buffer>> {
let url = format!("{}?comp=undelete", self.build_path_url(path));

let mut req = Request::put(&url)
.header(CONTENT_LENGTH, 0)
.extension(Operation::Undelete)
.body(Buffer::new())
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;
self.send(req).await
}

pub async fn azblob_copy_blob(
&self,
from: &str,
Expand Down
Loading
Loading