From 5a69949762d05c8861d655bd08895f1497bfc6e6 Mon Sep 17 00:00:00 2001 From: congyi <15605187270@163.com> Date: Thu, 9 Jan 2025 15:23:21 +0800 Subject: [PATCH] bump opendal to v0.51 --- Cargo.lock | 12 +++++------- Cargo.toml | 2 +- .../filesystem/opendal_source/opendal_enumerator.rs | 9 ++------- .../object/opendal_engine/opendal_object_store.rs | 10 +++------- .../src/object/opendal_engine/opendal_s3.rs | 2 +- 5 files changed, 12 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e2547112a44e3..6ab0fcb63a669 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8126,7 +8126,6 @@ dependencies = [ "base64 0.22.0", "bytes", "chrono", - "crc32c", "flagset", "futures", "getrandom", @@ -8137,11 +8136,9 @@ dependencies = [ "percent-encoding", "prometheus", "quick-xml 0.36.2", - "reqsign", "reqwest 0.12.4", "serde", "serde_json", - "sha2", "tokio", "uuid", ] @@ -8171,6 +8168,7 @@ dependencies = [ "reqwest 0.12.4", "serde", "serde_json", + "sha2", "tokio", "uuid", ] @@ -10347,7 +10345,7 @@ dependencies = [ "madsim-tonic", "memcomparable", "mysql_async", - "opendal 0.49.2", + "opendal 0.51.0", "parking_lot 0.12.1", "parquet 53.2.0", "prometheus", @@ -10392,7 +10390,7 @@ dependencies = [ "linkme", "madsim-tokio", "mysql_async", - "opendal 0.49.2", + "opendal 0.51.0", "prometheus", "rand", "risingwave_batch", @@ -10899,7 +10897,7 @@ dependencies = [ "mysql_common", "nexmark", "num-bigint", - "opendal 0.49.2", + "opendal 0.51.0", "opensearch", "openssl", "parking_lot 0.12.1", @@ -11626,7 +11624,7 @@ dependencies = [ "madsim", "madsim-aws-sdk-s3", "madsim-tokio", - "opendal 0.49.2", + "opendal 0.51.0", "prometheus", "reqwest 0.12.4", "risingwave_common", diff --git a/Cargo.toml b/Cargo.toml index 771601e38e8a0..22baa598b4afc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -157,7 +157,7 @@ icelake = { git = "https://github.com/risingwavelabs/icelake.git", rev = "0ec44f iceberg = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "683fb89edeaf8d1baae69e1f376d68b92be1d496", features = ["storage-s3", "storage-gcs"] } iceberg-catalog-rest = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "683fb89edeaf8d1baae69e1f376d68b92be1d496" } iceberg-catalog-glue = { git = "https://github.com/risingwavelabs/iceberg-rust.git", rev = "683fb89edeaf8d1baae69e1f376d68b92be1d496" } -opendal = "0.49" +opendal = "0.51" # used only by arrow-udf-flight arrow-flight = "53" arrow-udf-js = "0.5" diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs index a770d600282a2..6021c97824f3b 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_enumerator.rs @@ -19,7 +19,7 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use futures::stream::{self, BoxStream}; use futures::StreamExt; -use opendal::{Metakey, Operator}; +use opendal::Operator; use risingwave_common::types::Timestamptz; use super::OpendalSource; @@ -69,12 +69,7 @@ impl OpendalEnumerator { pub async fn list(&self) -> ConnectorResult { let prefix = self.prefix.as_deref().unwrap_or("/"); - let object_lister = self - .op - .lister_with(prefix) - .recursive(true) - .metakey(Metakey::ContentLength | Metakey::LastModified) - .await?; + let object_lister = self.op.lister_with(prefix).recursive(true).await?; let stream = stream::unfold(object_lister, |mut object_lister| async move { match object_lister.next().await { Some(Ok(object)) => { diff --git a/src/object_store/src/object/opendal_engine/opendal_object_store.rs b/src/object_store/src/object/opendal_engine/opendal_object_store.rs index 659bdd59316d8..4d6e306d1c75a 100644 --- a/src/object_store/src/object/opendal_engine/opendal_object_store.rs +++ b/src/object_store/src/object/opendal_engine/opendal_object_store.rs @@ -22,7 +22,7 @@ use futures::{stream, StreamExt}; use opendal::layers::{RetryLayer, TimeoutLayer}; use opendal::raw::BoxedStaticFuture; use opendal::services::Memory; -use opendal::{Execute, Executor, Metakey, Operator, Writer}; +use opendal::{Execute, Executor, Operator, Writer}; use risingwave_common::config::ObjectStoreConfig; use risingwave_common::range::RangeBoundsExt; use thiserror_ext::AsReport; @@ -227,7 +227,7 @@ impl ObjectStore for OpendalObjectStore { /// Deletes the objects with the given paths permanently from the storage. If an object /// specified in the request is not found, it will be considered as successfully deleted. async fn delete_objects(&self, paths: &[String]) -> ObjectResult<()> { - self.op.remove(paths.to_vec()).await?; + self.op.delete_iter(paths.to_vec()).await?; Ok(()) } @@ -237,11 +237,7 @@ impl ObjectStore for OpendalObjectStore { start_after: Option, limit: Option, ) -> ObjectResult { - let mut object_lister = self - .op - .lister_with(prefix) - .recursive(true) - .metakey(Metakey::ContentLength); + let mut object_lister = self.op.lister_with(prefix).recursive(true); if let Some(start_after) = start_after { object_lister = object_lister.start_after(&start_after); } diff --git a/src/object_store/src/object/opendal_engine/opendal_s3.rs b/src/object_store/src/object/opendal_engine/opendal_s3.rs index 24b4783ff761e..f8b675df2b055 100644 --- a/src/object_store/src/object/opendal_engine/opendal_s3.rs +++ b/src/object_store/src/object/opendal_engine/opendal_s3.rs @@ -108,7 +108,7 @@ impl OpendalObjectStore { if let Some(nodelay) = config.s3.nodelay.as_ref() { client_builder = client_builder.tcp_nodelay(*nodelay); } - + #[allow(deprecated)] Ok(HttpClient::build(client_builder)?) }