Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/core/src/types/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl Reader {
let gap = self.ctx.options().gap().unwrap_or(1024 * 1024) as u64;
// We don't care about the order of range with same start, they
// will be merged in the next step.
ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start));
ranges.sort_unstable_by_key(|a| a.start);

// We know that this vector will have at most element
let mut merged = Vec::with_capacity(ranges.len());
Expand Down
15 changes: 10 additions & 5 deletions core/services/onedrive/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,9 @@ impl Access for OnedriveBackend {
return Ok(RpCreateDir::default());
}

let response = self.core.onedrive_create_dir(path).await?;
match response.status() {
StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()),
_ => Err(parse_error(response)),
}
self.core.ensure_directory(path).await?;

Ok(RpCreateDir::default())
}

async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
Expand Down Expand Up @@ -96,6 +94,13 @@ impl Access for OnedriveBackend {
}

async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> {
if _args.if_not_exists() {
return Err(Error::new(
ErrorKind::Unsupported,
"copy with if_not_exists is not supported by OneDrive",
));
}

let monitor_url = self.core.initialize_copy(from, to).await?;
self.core.wait_until_complete(monitor_url).await?;
Ok(RpCopy::default())
Expand Down
4 changes: 3 additions & 1 deletion core/services/onedrive/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ impl Builder for OnedriveBuilder {

stat: true,
stat_with_if_none_match: true,
stat_with_version: self.config.enable_versioning,
// OneDrive's version API doesn't preserve complete metadata for historical versions,
// so we only advertise list-based version discovery for now.
stat_with_version: false,

delete: true,
create_dir: true,
Expand Down
121 changes: 77 additions & 44 deletions core/services/onedrive/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,28 +108,55 @@ impl OneDriveCore {
///
/// * `path` - a relative folder path
pub(crate) async fn ensure_directory(&self, path: &str) -> Result<OneDriveItem> {
let response = self.onedrive_get_stat_plain(path).await?;
let item: OneDriveItem = match response.status() {
StatusCode::OK => {
let bytes = response.into_body();
serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)?
}
StatusCode::NOT_FOUND => {
// We must create directory for the destination
let response = self.onedrive_create_dir(path).await?;
match response.status() {
StatusCode::CREATED | StatusCode::OK => {
let bytes = response.into_body();
serde_json::from_reader(bytes.reader())
.map_err(new_json_deserialize_error)?
let mut pending: Vec<String> = Vec::new();
let mut current = path;

loop {
let response = self.onedrive_get_stat_plain(current).await?;
match response.status() {
StatusCode::OK => {
let bytes = response.into_body();
let item: OneDriveItem = serde_json::from_reader(bytes.reader())
.map_err(new_json_deserialize_error)?;

if !matches!(item.item_type, ItemType::Folder { .. }) {
return Err(Error::new(
ErrorKind::NotADirectory,
"path exists but is not a directory",
)
.with_context("path", current));
}
_ => return Err(parse_error(response)),

let mut ensured = item;
for missing in pending.into_iter().rev() {
let response = self.onedrive_create_dir(&missing).await?;
ensured = match response.status() {
StatusCode::CREATED | StatusCode::OK => {
let bytes = response.into_body();
serde_json::from_reader(bytes.reader())
.map_err(new_json_deserialize_error)?
}
_ => return Err(parse_error(response)),
};
}

return Ok(ensured);
}
StatusCode::NOT_FOUND => {
pending.push(current.to_string());
let parent = get_parent(current);
if parent == current {
return Err(Error::new(
ErrorKind::Unexpected,
"failed to resolve parent while creating directory",
)
.with_context("path", path));
}
current = parent;
}
_ => return Err(parse_error(response)),
}
_ => return Err(parse_error(response)),
};

Ok(item)
}
}

pub(crate) async fn sign<T>(&self, request: &mut Request<T>) -> Result<()> {
Expand Down Expand Up @@ -158,13 +185,15 @@ impl OneDriveCore {
///
/// See also [`onedrive_get_stat_plain()`].
pub(crate) async fn onedrive_stat(&self, path: &str, args: OpStat) -> Result<Metadata> {
let mut url: String = self.onedrive_item_url(path, true);
if args.version().is_some() {
url += "?$expand=versions(";
url += VERSION_SELECT_PARAM;
url += ")";
return Err(Error::new(
ErrorKind::Unsupported,
"stat with version is not supported by OneDrive",
));
}

let url: String = self.onedrive_item_url(path, true);

let mut request = Request::get(&url);
if let Some(etag) = args.if_none_match() {
request = request.header(header::IF_NONE_MATCH, etag);
Expand Down Expand Up @@ -195,22 +224,6 @@ impl OneDriveCore {
.with_etag(decoded_response.e_tag)
.with_content_length(decoded_response.size.max(0) as u64);

if let Some(version) = args.version() {
for item_version in decoded_response.versions.as_deref().unwrap_or_default() {
if item_version.id == version {
meta.set_version(version);
break; // early exit
}
}

if meta.version().is_none() {
return Err(Error::new(
ErrorKind::NotFound,
"cannot find this version of the item",
));
}
}

let last_modified = decoded_response.last_modified_date_time;
let date_utc_last_modified = last_modified.parse::<Timestamp>()?;
meta.set_last_modified(date_utc_last_modified);
Expand Down Expand Up @@ -242,6 +255,10 @@ impl OneDriveCore {
self.sign(&mut request).await?;

let response = self.info.http_client().send(request).await?;
if !response.status().is_success() {
return Err(parse_error(response));
}

let decoded_response: GraphApiOneDriveVersionsResponse =
serde_json::from_reader(response.into_body().reader())
.map_err(new_json_deserialize_error)?;
Expand Down Expand Up @@ -273,6 +290,13 @@ impl OneDriveCore {
path: &str,
args: &OpRead,
) -> Result<Response<HttpBody>> {
if args.version().is_some() {
return Err(Error::new(
ErrorKind::Unsupported,
"read with version is not supported by OneDrive",
));
}

// We can't "select" the OneDrive API response fields when reading because "select" shadows not found error
let url: String = format!("{}:/content", self.onedrive_item_url(path, true));

Expand Down Expand Up @@ -528,20 +552,29 @@ impl OneDriveCore {

pub(crate) async fn wait_until_complete(&self, monitor_url: String) -> Result<()> {
for _attempt in 0..MAX_MONITOR_ATTEMPT {
let mut request = Request::get(monitor_url.to_string())
let request = Request::get(monitor_url.as_str())
.header(header::CONTENT_TYPE, "application/json")
.extension(Operation::Copy)
.body(Buffer::new())
.map_err(new_request_build_error)?;

self.sign(&mut request).await?;

let response = self.info.http_client().send(request).await?;
if !response.status().is_success() {
return Err(parse_error(response));
}

let status: OneDriveMonitorStatus =
serde_json::from_reader(response.into_body().reader())
.map_err(new_json_deserialize_error)?;
if status.status == "completed" {
return Ok(());
match status.status.as_str() {
"completed" => return Ok(()),
"notStarted" | "inProgress" => {}
_ => {
return Err(Error::new(
ErrorKind::Unexpected,
format!("OneDrive copy operation returned status {}", status.status),
));
}
}

tokio::time::sleep(Duration::from_secs(MONITOR_WAIT_SECOND)).await;
Expand Down
9 changes: 8 additions & 1 deletion core/services/onedrive/src/deleter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@ impl OneDriveDeleter {
}

impl oio::OneShotDelete for OneDriveDeleter {
async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
async fn delete_once(&self, path: String, args: OpDelete) -> Result<()> {
if args.version().is_some() {
return Err(Error::new(
ErrorKind::Unsupported,
"delete with version is not supported by OneDrive",
));
}

let response = self.core.onedrive_delete(&path).await?;
match response.status() {
StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => Ok(()),
Expand Down
5 changes: 4 additions & 1 deletion core/services/onedrive/src/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ You should consider [`RetryLayer`] and monitor your operations carefully.
- `client_id`: Set the client ID for a Microsoft Graph API application (available though Azure's registration portal)
- `client_secret`: Set the client secret for a Microsoft Graph API application
- `root`: Set the work directory for OneDrive backend
- `enable_versioning`: Enable versioning support for OneDrive items
- `enable_versioning`: Enable version listing support for OneDrive items

Historical OneDrive versions don't preserve the full item metadata surface, so OpenDAL only
advertises `list_with_versions` when versioning is enabled.

The configuration for tokens is one of the following:

Expand Down
24 changes: 18 additions & 6 deletions core/services/onedrive/src/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,27 @@ impl oio::PageList for OneDriveLister {
// Thus, `list_with_versions` induces N+1 requests. This N+1 is intentional.
// N+1 is horrendous but we can't do any better without OneDrive's API support.
// When OneDrive supports listing with versions API, remove this.
if list_with_versions {
if list_with_versions && entry_mode == EntryMode::FILE {
meta.set_is_current(true);
let current_entry = oio::Entry::new(&normalized_path, meta);
ctx.entries.push_back(current_entry);

let versions = self.core.onedrive_list_versions(&path).await?;
if let Some(version) = versions.first() {
meta.set_version(&version.id);
for version in versions {
let mut version_meta =
Metadata::new(entry_mode).with_content_length(version.size.max(0) as u64);
version_meta
.set_last_modified(version.last_modified_date_time.parse::<Timestamp>()?);
version_meta.set_version(&version.id);
version_meta.set_is_current(false);

let entry = oio::Entry::new(&normalized_path, version_meta);
ctx.entries.push_back(entry);
}
} else {
let entry = oio::Entry::new(&normalized_path, meta);
ctx.entries.push_back(entry)
}

let entry = oio::Entry::new(&normalized_path, meta);
ctx.entries.push_back(entry)
}

Ok(())
Expand Down
61 changes: 29 additions & 32 deletions core/services/onedrive/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ impl OneDriveWriter {
// use `OneShotWrite` instead of `MultipartWrite`.
impl oio::OneShotWrite for OneDriveWriter {
async fn write_once(&self, bs: Buffer) -> Result<Metadata> {
if self.op.if_none_match().is_some() || self.op.if_not_exists() {
return Err(Error::new(
ErrorKind::Unsupported,
"write with if_none_match/if_not_exists is not supported by OneDrive",
));
}

let size = bs.len();

let meta = if size <= Self::MAX_SIMPLE_SIZE {
Expand All @@ -72,20 +79,7 @@ impl OneDriveWriter {
.await?;

match response.status() {
StatusCode::CREATED | StatusCode::OK => {
let item: OneDriveItem = serde_json::from_reader(response.into_body().reader())
.map_err(new_json_deserialize_error)?;

let mut meta = Metadata::new(EntryMode::FILE)
.with_etag(item.e_tag)
.with_content_length(item.size.max(0) as u64);

let last_modified = item.last_modified_date_time;
let date_utc_last_modified = last_modified.parse::<Timestamp>()?;
meta.set_last_modified(date_utc_last_modified);

Ok(meta)
}
StatusCode::CREATED | StatusCode::OK => Self::parse_uploaded_item(response),
_ => Err(parse_error(response)),
}
}
Expand Down Expand Up @@ -125,30 +119,18 @@ impl OneDriveWriter {
match response.status() {
// Typical response code: 202 Accepted
// Reference: https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_put_content?view=odsp-graph-online#response
StatusCode::ACCEPTED | StatusCode::OK => {} // skip, in the middle of upload
StatusCode::CREATED => {
// last trunk
let item: OneDriveItem = serde_json::from_reader(response.into_body().reader())
.map_err(new_json_deserialize_error)?;

let mut meta = Metadata::new(EntryMode::FILE)
.with_etag(item.e_tag)
.with_content_length(item.size.max(0) as u64);

let last_modified = item.last_modified_date_time;
let date_utc_last_modified = last_modified.parse::<Timestamp>()?;
meta.set_last_modified(date_utc_last_modified);
return Ok(meta);
}
StatusCode::ACCEPTED => {} // still uploading
StatusCode::CREATED | StatusCode::OK => return Self::parse_uploaded_item(response),
_ => return Err(parse_error(response)),
}

offset += OneDriveWriter::CHUNK_SIZE_FACTOR;
}

debug_assert!(false, "should have returned");

Ok(Metadata::default()) // should not happen, but start with handling this gracefully - do nothing, but return the default metadata
Err(Error::new(
ErrorKind::Unexpected,
"OneDrive upload session finished without a terminal response",
))
}

async fn create_upload_session(&self) -> Result<OneDriveUploadSessionCreationResponseBody> {
Expand All @@ -166,4 +148,19 @@ impl OneDriveWriter {
_ => Err(parse_error(response)),
}
}

fn parse_uploaded_item(response: http::Response<Buffer>) -> Result<Metadata> {
let item: OneDriveItem = serde_json::from_reader(response.into_body().reader())
.map_err(new_json_deserialize_error)?;

let mut meta = Metadata::new(EntryMode::FILE)
.with_etag(item.e_tag)
.with_content_length(item.size.max(0) as u64);

let last_modified = item.last_modified_date_time;
let date_utc_last_modified = last_modified.parse::<Timestamp>()?;
meta.set_last_modified(date_utc_last_modified);

Ok(meta)
}
}
Loading
Loading