diff --git a/core/core/src/types/read/reader.rs b/core/core/src/types/read/reader.rs index 576a0390b4e4..a2af933907d6 100644 --- a/core/core/src/types/read/reader.rs +++ b/core/core/src/types/read/reader.rs @@ -194,7 +194,7 @@ impl Reader { let gap = self.ctx.options().gap().unwrap_or(1024 * 1024) as u64; // We don't care about the order of range with same start, they // will be merged in the next step. - ranges.sort_unstable_by(|a, b| a.start.cmp(&b.start)); + ranges.sort_unstable_by_key(|a| a.start); // We know that this vector will have at most element let mut merged = Vec::with_capacity(ranges.len()); diff --git a/core/services/onedrive/src/backend.rs b/core/services/onedrive/src/backend.rs index e6ecb815000d..6fbcffa401cf 100644 --- a/core/services/onedrive/src/backend.rs +++ b/core/services/onedrive/src/backend.rs @@ -50,11 +50,9 @@ impl Access for OnedriveBackend { return Ok(RpCreateDir::default()); } - let response = self.core.onedrive_create_dir(path).await?; - match response.status() { - StatusCode::CREATED | StatusCode::OK => Ok(RpCreateDir::default()), - _ => Err(parse_error(response)), - } + self.core.ensure_directory(path).await?; + + Ok(RpCreateDir::default()) } async fn stat(&self, path: &str, args: OpStat) -> Result { @@ -96,6 +94,13 @@ impl Access for OnedriveBackend { } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { + if _args.if_not_exists() { + return Err(Error::new( + ErrorKind::Unsupported, + "copy with if_not_exists is not supported by OneDrive", + )); + } + let monitor_url = self.core.initialize_copy(from, to).await?; self.core.wait_until_complete(monitor_url).await?; Ok(RpCopy::default()) diff --git a/core/services/onedrive/src/builder.rs b/core/services/onedrive/src/builder.rs index 8d2371a711d1..96ec400a327a 100644 --- a/core/services/onedrive/src/builder.rs +++ b/core/services/onedrive/src/builder.rs @@ -132,7 +132,9 @@ impl Builder for OnedriveBuilder { stat: true, stat_with_if_none_match: true, - stat_with_version: self.config.enable_versioning, + // OneDrive's version API doesn't preserve complete metadata for historical versions, + // so we only advertise list-based version discovery for now. + stat_with_version: false, delete: true, create_dir: true, diff --git a/core/services/onedrive/src/core.rs b/core/services/onedrive/src/core.rs index aa85145f6a03..17df75a195b2 100644 --- a/core/services/onedrive/src/core.rs +++ b/core/services/onedrive/src/core.rs @@ -108,28 +108,55 @@ impl OneDriveCore { /// /// * `path` - a relative folder path pub(crate) async fn ensure_directory(&self, path: &str) -> Result { - let response = self.onedrive_get_stat_plain(path).await?; - let item: OneDriveItem = match response.status() { - StatusCode::OK => { - let bytes = response.into_body(); - serde_json::from_reader(bytes.reader()).map_err(new_json_deserialize_error)? - } - StatusCode::NOT_FOUND => { - // We must create directory for the destination - let response = self.onedrive_create_dir(path).await?; - match response.status() { - StatusCode::CREATED | StatusCode::OK => { - let bytes = response.into_body(); - serde_json::from_reader(bytes.reader()) - .map_err(new_json_deserialize_error)? + let mut pending: Vec = Vec::new(); + let mut current = path; + + loop { + let response = self.onedrive_get_stat_plain(current).await?; + match response.status() { + StatusCode::OK => { + let bytes = response.into_body(); + let item: OneDriveItem = serde_json::from_reader(bytes.reader()) + .map_err(new_json_deserialize_error)?; + + if !matches!(item.item_type, ItemType::Folder { .. }) { + return Err(Error::new( + ErrorKind::NotADirectory, + "path exists but is not a directory", + ) + .with_context("path", current)); } - _ => return Err(parse_error(response)), + + let mut ensured = item; + for missing in pending.into_iter().rev() { + let response = self.onedrive_create_dir(&missing).await?; + ensured = match response.status() { + StatusCode::CREATED | StatusCode::OK => { + let bytes = response.into_body(); + serde_json::from_reader(bytes.reader()) + .map_err(new_json_deserialize_error)? + } + _ => return Err(parse_error(response)), + }; + } + + return Ok(ensured); } + StatusCode::NOT_FOUND => { + pending.push(current.to_string()); + let parent = get_parent(current); + if parent == current { + return Err(Error::new( + ErrorKind::Unexpected, + "failed to resolve parent while creating directory", + ) + .with_context("path", path)); + } + current = parent; + } + _ => return Err(parse_error(response)), } - _ => return Err(parse_error(response)), - }; - - Ok(item) + } } pub(crate) async fn sign(&self, request: &mut Request) -> Result<()> { @@ -158,13 +185,15 @@ impl OneDriveCore { /// /// See also [`onedrive_get_stat_plain()`]. pub(crate) async fn onedrive_stat(&self, path: &str, args: OpStat) -> Result { - let mut url: String = self.onedrive_item_url(path, true); if args.version().is_some() { - url += "?$expand=versions("; - url += VERSION_SELECT_PARAM; - url += ")"; + return Err(Error::new( + ErrorKind::Unsupported, + "stat with version is not supported by OneDrive", + )); } + let url: String = self.onedrive_item_url(path, true); + let mut request = Request::get(&url); if let Some(etag) = args.if_none_match() { request = request.header(header::IF_NONE_MATCH, etag); @@ -195,22 +224,6 @@ impl OneDriveCore { .with_etag(decoded_response.e_tag) .with_content_length(decoded_response.size.max(0) as u64); - if let Some(version) = args.version() { - for item_version in decoded_response.versions.as_deref().unwrap_or_default() { - if item_version.id == version { - meta.set_version(version); - break; // early exit - } - } - - if meta.version().is_none() { - return Err(Error::new( - ErrorKind::NotFound, - "cannot find this version of the item", - )); - } - } - let last_modified = decoded_response.last_modified_date_time; let date_utc_last_modified = last_modified.parse::()?; meta.set_last_modified(date_utc_last_modified); @@ -242,6 +255,10 @@ impl OneDriveCore { self.sign(&mut request).await?; let response = self.info.http_client().send(request).await?; + if !response.status().is_success() { + return Err(parse_error(response)); + } + let decoded_response: GraphApiOneDriveVersionsResponse = serde_json::from_reader(response.into_body().reader()) .map_err(new_json_deserialize_error)?; @@ -273,6 +290,13 @@ impl OneDriveCore { path: &str, args: &OpRead, ) -> Result> { + if args.version().is_some() { + return Err(Error::new( + ErrorKind::Unsupported, + "read with version is not supported by OneDrive", + )); + } + // We can't "select" the OneDrive API response fields when reading because "select" shadows not found error let url: String = format!("{}:/content", self.onedrive_item_url(path, true)); @@ -528,20 +552,29 @@ impl OneDriveCore { pub(crate) async fn wait_until_complete(&self, monitor_url: String) -> Result<()> { for _attempt in 0..MAX_MONITOR_ATTEMPT { - let mut request = Request::get(monitor_url.to_string()) + let request = Request::get(monitor_url.as_str()) .header(header::CONTENT_TYPE, "application/json") .extension(Operation::Copy) .body(Buffer::new()) .map_err(new_request_build_error)?; - self.sign(&mut request).await?; - let response = self.info.http_client().send(request).await?; + if !response.status().is_success() { + return Err(parse_error(response)); + } + let status: OneDriveMonitorStatus = serde_json::from_reader(response.into_body().reader()) .map_err(new_json_deserialize_error)?; - if status.status == "completed" { - return Ok(()); + match status.status.as_str() { + "completed" => return Ok(()), + "notStarted" | "inProgress" => {} + _ => { + return Err(Error::new( + ErrorKind::Unexpected, + format!("OneDrive copy operation returned status {}", status.status), + )); + } } tokio::time::sleep(Duration::from_secs(MONITOR_WAIT_SECOND)).await; diff --git a/core/services/onedrive/src/deleter.rs b/core/services/onedrive/src/deleter.rs index 30e477021ccf..6671115f32a0 100644 --- a/core/services/onedrive/src/deleter.rs +++ b/core/services/onedrive/src/deleter.rs @@ -38,7 +38,14 @@ impl OneDriveDeleter { } impl oio::OneShotDelete for OneDriveDeleter { - async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> { + async fn delete_once(&self, path: String, args: OpDelete) -> Result<()> { + if args.version().is_some() { + return Err(Error::new( + ErrorKind::Unsupported, + "delete with version is not supported by OneDrive", + )); + } + let response = self.core.onedrive_delete(&path).await?; match response.status() { StatusCode::NO_CONTENT | StatusCode::NOT_FOUND => Ok(()), diff --git a/core/services/onedrive/src/docs.md b/core/services/onedrive/src/docs.md index c7a2786d6da3..b432b35f94ea 100644 --- a/core/services/onedrive/src/docs.md +++ b/core/services/onedrive/src/docs.md @@ -47,7 +47,10 @@ You should consider [`RetryLayer`] and monitor your operations carefully. - `client_id`: Set the client ID for a Microsoft Graph API application (available though Azure's registration portal) - `client_secret`: Set the client secret for a Microsoft Graph API application - `root`: Set the work directory for OneDrive backend -- `enable_versioning`: Enable versioning support for OneDrive items +- `enable_versioning`: Enable version listing support for OneDrive items + +Historical OneDrive versions don't preserve the full item metadata surface, so OpenDAL only +advertises `list_with_versions` when versioning is enabled. The configuration for tokens is one of the following: diff --git a/core/services/onedrive/src/lister.rs b/core/services/onedrive/src/lister.rs index 98d3abd53175..001e0e0e3569 100644 --- a/core/services/onedrive/src/lister.rs +++ b/core/services/onedrive/src/lister.rs @@ -134,15 +134,27 @@ impl oio::PageList for OneDriveLister { // Thus, `list_with_versions` induces N+1 requests. This N+1 is intentional. // N+1 is horrendous but we can't do any better without OneDrive's API support. // When OneDrive supports listing with versions API, remove this. - if list_with_versions { + if list_with_versions && entry_mode == EntryMode::FILE { + meta.set_is_current(true); + let current_entry = oio::Entry::new(&normalized_path, meta); + ctx.entries.push_back(current_entry); + let versions = self.core.onedrive_list_versions(&path).await?; - if let Some(version) = versions.first() { - meta.set_version(&version.id); + for version in versions { + let mut version_meta = + Metadata::new(entry_mode).with_content_length(version.size.max(0) as u64); + version_meta + .set_last_modified(version.last_modified_date_time.parse::()?); + version_meta.set_version(&version.id); + version_meta.set_is_current(false); + + let entry = oio::Entry::new(&normalized_path, version_meta); + ctx.entries.push_back(entry); } + } else { + let entry = oio::Entry::new(&normalized_path, meta); + ctx.entries.push_back(entry) } - - let entry = oio::Entry::new(&normalized_path, meta); - ctx.entries.push_back(entry) } Ok(()) diff --git a/core/services/onedrive/src/writer.rs b/core/services/onedrive/src/writer.rs index adf2364b12ad..5e6c3fb7adcd 100644 --- a/core/services/onedrive/src/writer.rs +++ b/core/services/onedrive/src/writer.rs @@ -52,6 +52,13 @@ impl OneDriveWriter { // use `OneShotWrite` instead of `MultipartWrite`. impl oio::OneShotWrite for OneDriveWriter { async fn write_once(&self, bs: Buffer) -> Result { + if self.op.if_none_match().is_some() || self.op.if_not_exists() { + return Err(Error::new( + ErrorKind::Unsupported, + "write with if_none_match/if_not_exists is not supported by OneDrive", + )); + } + let size = bs.len(); let meta = if size <= Self::MAX_SIMPLE_SIZE { @@ -72,20 +79,7 @@ impl OneDriveWriter { .await?; match response.status() { - StatusCode::CREATED | StatusCode::OK => { - let item: OneDriveItem = serde_json::from_reader(response.into_body().reader()) - .map_err(new_json_deserialize_error)?; - - let mut meta = Metadata::new(EntryMode::FILE) - .with_etag(item.e_tag) - .with_content_length(item.size.max(0) as u64); - - let last_modified = item.last_modified_date_time; - let date_utc_last_modified = last_modified.parse::()?; - meta.set_last_modified(date_utc_last_modified); - - Ok(meta) - } + StatusCode::CREATED | StatusCode::OK => Self::parse_uploaded_item(response), _ => Err(parse_error(response)), } } @@ -125,30 +119,18 @@ impl OneDriveWriter { match response.status() { // Typical response code: 202 Accepted // Reference: https://learn.microsoft.com/en-us/onedrive/developer/rest-api/api/driveitem_put_content?view=odsp-graph-online#response - StatusCode::ACCEPTED | StatusCode::OK => {} // skip, in the middle of upload - StatusCode::CREATED => { - // last trunk - let item: OneDriveItem = serde_json::from_reader(response.into_body().reader()) - .map_err(new_json_deserialize_error)?; - - let mut meta = Metadata::new(EntryMode::FILE) - .with_etag(item.e_tag) - .with_content_length(item.size.max(0) as u64); - - let last_modified = item.last_modified_date_time; - let date_utc_last_modified = last_modified.parse::()?; - meta.set_last_modified(date_utc_last_modified); - return Ok(meta); - } + StatusCode::ACCEPTED => {} // still uploading + StatusCode::CREATED | StatusCode::OK => return Self::parse_uploaded_item(response), _ => return Err(parse_error(response)), } offset += OneDriveWriter::CHUNK_SIZE_FACTOR; } - debug_assert!(false, "should have returned"); - - Ok(Metadata::default()) // should not happen, but start with handling this gracefully - do nothing, but return the default metadata + Err(Error::new( + ErrorKind::Unexpected, + "OneDrive upload session finished without a terminal response", + )) } async fn create_upload_session(&self) -> Result { @@ -166,4 +148,19 @@ impl OneDriveWriter { _ => Err(parse_error(response)), } } + + fn parse_uploaded_item(response: http::Response) -> Result { + let item: OneDriveItem = serde_json::from_reader(response.into_body().reader()) + .map_err(new_json_deserialize_error)?; + + let mut meta = Metadata::new(EntryMode::FILE) + .with_etag(item.e_tag) + .with_content_length(item.size.max(0) as u64); + + let last_modified = item.last_modified_date_time; + let date_utc_last_modified = last_modified.parse::()?; + meta.set_last_modified(date_utc_last_modified); + + Ok(meta) + } } diff --git a/core/services/webdav/src/core.rs b/core/services/webdav/src/core.rs index 57a52dcba966..c3d66209ddbd 100644 --- a/core/services/webdav/src/core.rs +++ b/core/services/webdav/src/core.rs @@ -609,27 +609,23 @@ pub fn parse_user_metadata_from_xml(xml: &str, namespace_uri: &str) -> HashMap { - if current_prop_key.is_some() { - // Text content - add directly (no escaping needed) - let text_str = String::from_utf8_lossy(e.as_ref()); - current_prop_value.push_str(&text_str); - } + Ok(Event::Text(ref e)) if current_prop_key.is_some() => { + // Text content - add directly (no escaping needed) + let text_str = String::from_utf8_lossy(e.as_ref()); + current_prop_value.push_str(&text_str); } - Ok(Event::GeneralRef(ref e)) => { - if current_prop_key.is_some() { - // Handle XML entity references (e.g., < > & " ') - let entity_name = String::from_utf8_lossy(e.as_ref()); - let decoded = match entity_name.as_ref() { - "lt" => "<", - "gt" => ">", - "amp" => "&", - "quot" => "\"", - "apos" => "'", - _ => "", // Unknown entity, skip - }; - current_prop_value.push_str(decoded); - } + Ok(Event::GeneralRef(ref e)) if current_prop_key.is_some() => { + // Handle XML entity references (e.g., < > & " ') + let entity_name = String::from_utf8_lossy(e.as_ref()); + let decoded = match entity_name.as_ref() { + "lt" => "<", + "gt" => ">", + "amp" => "&", + "quot" => "\"", + "apos" => "'", + _ => "", // Unknown entity, skip + }; + current_prop_value.push_str(decoded); } Ok(Event::End(ref e)) => { let name = String::from_utf8_lossy(e.name().as_ref()).to_string(); @@ -682,11 +678,9 @@ pub fn check_proppatch_response(xml: &str) -> Result<()> { status_text.clear(); } } - Ok(Event::Text(ref e)) => { - if in_status { - let text_str = String::from_utf8_lossy(e.as_ref()); - status_text.push_str(&text_str); - } + Ok(Event::Text(ref e)) if in_status => { + let text_str = String::from_utf8_lossy(e.as_ref()); + status_text.push_str(&text_str); } Ok(Event::End(ref e)) => { let name = String::from_utf8_lossy(e.name().as_ref()).to_lowercase(); diff --git a/dev/src/generate/python.rs b/dev/src/generate/python.rs index 5eda8b581f5a..ae7a611426ab 100644 --- a/dev/src/generate/python.rs +++ b/dev/src/generate/python.rs @@ -121,12 +121,10 @@ pub fn format_text(text: &str, indent: usize, max_line_length: usize) -> String for (i, &ch) in chars.iter().enumerate() { match ch { - '.' | '!' | '?' => { + '.' | '!' | '?' if i + 1 < chars.len() && chars[i + 1].is_whitespace() => { // Sentence end - if i + 1 < chars.len() && chars[i + 1].is_whitespace() { - segments.push(&text[start..=i]); - start = i + 1; - } + segments.push(&text[start..=i]); + start = i + 1; } '\n' => { if i > start { diff --git a/integrations/object_store/src/store.rs b/integrations/object_store/src/store.rs index a0e6356ed6ff..32f2b2025c9c 100644 --- a/integrations/object_store/src/store.rs +++ b/integrations/object_store/src/store.rs @@ -656,7 +656,7 @@ impl MultipartUpload for OpendalMultipartUpload { let mut writer = writer.lock().await; let result = writer - .write(Buffer::from_iter(data.into_iter())) + .write(Buffer::from_iter(data)) .await .map_err(|err| format_object_store_error(err, location.as_ref()));