diff --git a/Cargo.lock b/Cargo.lock index 5abb7c6..707ff25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -217,7 +217,7 @@ dependencies = [ "js-sys", "num-traits", "wasm-bindgen", - "windows-link", + "windows-link 0.1.3", ] [[package]] @@ -318,14 +318,14 @@ dependencies = [ [[package]] name = "dns-lookup" -version = "2.0.4" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc" +checksum = "6e39034cee21a2f5bbb66ba0e3689819c4bb5d00382a282006e802a7ffa6c41d" dependencies = [ "cfg-if", "libc", - "socket2 0.5.10", - "windows-sys 0.48.0", + "socket2", + "windows-sys 0.60.2", ] [[package]] @@ -535,13 +535,14 @@ checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" [[package]] name = "hdfs-native" -version = "0.12.2" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "411cba6a8e2c07a9476729690d05bd445c1425771eca41d2f5c3e41edebf7900" +checksum = "588a47148201def3d6922371f6de676068c7ea08a7c29e59457660f76bdc0918" dependencies = [ "aes", "base64", "bitflags", + "bumpalo", "bytes", "cbc", "chrono", @@ -565,7 +566,7 @@ dependencies = [ "rand", "regex", "roxmltree", - "socket2 0.6.0", + "socket2", "thiserror", "tokio", "url", @@ -576,7 +577,7 @@ dependencies = [ [[package]] name = "hdfs-native-object-store" -version = "0.15.0" +version = "0.16.0" dependencies = [ "async-trait", "bytes", @@ -837,12 +838,12 @@ checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543" [[package]] name = "libloading" -version = "0.8.8" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" +checksum = "754ca22de805bb5744484a5b151a9e1a8e837d5dc232c2d7d8c2e3492edc8b60" dependencies = [ "cfg-if", - "windows-targets 0.53.2", + "windows-link 0.2.1", ] [[package]] @@ -929,9 +930,9 @@ dependencies = [ [[package]] name = "object_store" -version = "0.12.3" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efc4f07659e11cd45a341cd24d71e683e3be65d9ff1f8150061678fe60437496" +checksum = "d180d5469872facb82dec7e233ff850d615330ea3044a7813550b22ffa4f05ce" dependencies = [ "async-trait", "bytes", @@ -1163,9 +1164,12 @@ checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] name = "roxmltree" -version = "0.20.0" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c20b6793b5c2fa6553b250154b78d6d0db37e72700ae35fad9387a46f487c97" +checksum = "f1964b10c76125c36f8afe190065a4bf9a87bf324842c05701330bba9f1cacbb" +dependencies = [ + "memchr", +] [[package]] name = "rustc-demangle" @@ -1285,16 +1289,6 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" -[[package]] -name = "socket2" -version = "0.5.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" -dependencies = [ - "libc", - "windows-sys 0.52.0", -] - [[package]] name = "socket2" version = "0.6.0" @@ -1382,7 +1376,7 @@ dependencies = [ "mio", "pin-project-lite", "slab", - "socket2 0.6.0", + "socket2", "tokio-macros", "windows-sys 0.59.0", ] @@ -1631,7 +1625,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -1642,7 +1636,7 @@ checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" dependencies = [ "windows-implement", "windows-interface", - "windows-link", + "windows-link 0.1.3", "windows-result", "windows-strings", ] @@ -1675,13 +1669,19 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + [[package]] name = "windows-result" version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" dependencies = [ - "windows-link", + "windows-link 0.1.3", ] [[package]] @@ -1690,25 +1690,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" dependencies = [ - "windows-link", -] - -[[package]] -name = "windows-sys" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" -dependencies = [ - "windows-targets 0.48.5", -] - -[[package]] -name = "windows-sys" -version = "0.52.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" -dependencies = [ - "windows-targets 0.52.6", + "windows-link 0.1.3", ] [[package]] @@ -1729,21 +1711,6 @@ dependencies = [ "windows-targets 0.53.2", ] -[[package]] -name = "windows-targets" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" -dependencies = [ - "windows_aarch64_gnullvm 0.48.5", - "windows_aarch64_msvc 0.48.5", - "windows_i686_gnu 0.48.5", - "windows_i686_msvc 0.48.5", - "windows_x86_64_gnu 0.48.5", - "windows_x86_64_gnullvm 0.48.5", - "windows_x86_64_msvc 0.48.5", -] - [[package]] name = "windows-targets" version = "0.52.6" @@ -1776,12 +1743,6 @@ dependencies = [ "windows_x86_64_msvc 0.53.0", ] -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" - [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -1794,12 +1755,6 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86b8d5f90ddd19cb4a147a5fa63ca848db3df085e25fee3cc10b39b6eebae764" -[[package]] -name = "windows_aarch64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" - [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -1812,12 +1767,6 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7651a1f62a11b8cbd5e0d42526e55f2c99886c77e007179efff86c2b137e66c" -[[package]] -name = "windows_i686_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" - [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -1842,12 +1791,6 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ce6ccbdedbf6d6354471319e781c0dfef054c81fbc7cf83f338a4296c0cae11" -[[package]] -name = "windows_i686_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" - [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -1860,12 +1803,6 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "581fee95406bb13382d2f65cd4a908ca7b1e4c2f1917f143ba16efe98a589b5d" -[[package]] -name = "windows_x86_64_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" - [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -1878,12 +1815,6 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e55b5ac9ea33f2fc1716d1742db15574fd6fc8dadc51caab1c16a3d3b4190ba" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" - [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -1896,12 +1827,6 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0a6e035dd0599267ce1ee132e51c27dd29437f63325753051e71dd9e42406c57" -[[package]] -name = "windows_x86_64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" - [[package]] name = "windows_x86_64_msvc" version = "0.52.6" diff --git a/Cargo.toml b/Cargo.toml index d70c586..cf9f5ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hdfs-native-object-store" -version = "0.15.0" +version = "0.16.0" edition = "2021" authors = ["Adam Binford "] homepage = "https://github.com/datafusion-contrib/hdfs-native-object-store" @@ -15,8 +15,8 @@ async-trait = "0.1" bytes = "1" chrono = "0.4" futures = "0.3" -hdfs-native = "0.12.2" -object_store = "0.12.2" +hdfs-native = "0.13" +object_store = "0.13" thiserror = "2" tokio = { version = "1", features = ["rt", "net", "io-util", "macros", "sync", "time"] } diff --git a/README.md b/README.md index 992d595..589bfb3 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ Each release supports a certain minor release of both the `object_store` crate a |0.13.x|>=0.10, <0.12|0.11| |0.14.x|0.12|0.11| |0.15.x|>=0.12.2, <0.13|0.12| +|0.16.x|0.13|0.13| # Usage ```rust diff --git a/src/lib.rs b/src/lib.rs index 7823365..6a5f5e8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,6 +31,7 @@ use object_store::{ path::Path, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart, }; +use object_store::{CopyMode, CopyOptions, RenameOptions, RenameTargetMode}; use tokio::{ runtime::Handle, sync::{mpsc, oneshot}, @@ -158,40 +159,6 @@ impl HdfsObjectStore { Ok(Self { client }) } - async fn internal_copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> { - let overwrite = match self.client.get_file_info(&make_absolute_file(to)).await { - Ok(_) if overwrite => true, - Ok(_) => Err(HdfsError::AlreadyExists(make_absolute_file(to))).to_object_store_err()?, - Err(HdfsError::FileNotFound(_)) => false, - Err(e) => Err(e).to_object_store_err()?, - }; - - let write_options = WriteOptions { - overwrite, - ..Default::default() - }; - - let file = self - .client - .read(&make_absolute_file(from)) - .await - .to_object_store_err()?; - let mut stream = file.read_range_stream(0, file.file_length()).boxed(); - - let mut new_file = self - .client - .create(&make_absolute_file(to), write_options) - .await - .to_object_store_err()?; - - while let Some(bytes) = stream.next().await.transpose().to_object_store_err()? { - new_file.write(bytes).await.to_object_store_err()?; - } - new_file.close().await.to_object_store_err()?; - - Ok(()) - } - async fn open_tmp_file(&self, file_path: &str) -> Result<(FileWriter, String)> { let path_buf = PathBuf::from(file_path); @@ -253,7 +220,10 @@ impl ObjectStore for HdfsObjectStore { PutMode::Create => false, PutMode::Overwrite => true, PutMode::Update(_) => { - return Err(object_store::Error::NotImplemented); + return Err(object_store::Error::NotImplemented { + operation: "PutOptions with Update precondition".to_string(), + implementer: "HdfsObjectStore".to_string(), + }); } }; @@ -278,7 +248,11 @@ impl ObjectStore for HdfsObjectStore { .await .to_object_store_err()?; - let e_tag = self.head(location).await?.e_tag; + let e_tag = self + .get_opts(location, GetOptions::default().with_head(true)) + .await? + .meta + .e_tag; Ok(PutResult { e_tag, @@ -308,47 +282,6 @@ impl ObjectStore for HdfsObjectStore { /// Reads data for the specified location. async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { - let meta = self.head(location).await?; - - options.check_preconditions(&meta)?; - - let range = options - .range - .map(|r| r.as_range(meta.size)) - .transpose() - .map_err(|source| generic_error(source.into()))? - .unwrap_or(0..meta.size); - - let reader = self - .client - .read(&make_absolute_file(location)) - .await - .to_object_store_err()?; - let start: usize = range - .start - .try_into() - .expect("unable to convert range.start to usize"); - let end: usize = range - .end - .try_into() - .expect("unable to convert range.end to usize"); - let stream = reader - .read_range_stream(start, end - start) - .map(|b| b.to_object_store_err()) - .boxed(); - - let payload = GetResultPayload::Stream(stream); - - Ok(GetResult { - payload, - meta, - range, - attributes: Default::default(), - }) - } - - /// Return the metadata for the specified location - async fn head(&self, location: &Path) -> Result { let status = self .client .get_file_info(&make_absolute_file(location)) @@ -362,22 +295,72 @@ impl ObjectStore for HdfsObjectStore { }); } - get_object_meta(&status) - } + let meta = get_object_meta(&status)?; - /// Delete the object at the specified location. - async fn delete(&self, location: &Path) -> Result<()> { - let result = self - .client - .delete(&make_absolute_file(location), false) - .await - .to_object_store_err()?; + options.check_preconditions(&meta)?; - if !result { - Err(HdfsError::FileNotFound(location.to_string())).to_object_store_err()? - } + let (range, payload) = if options.head { + ( + (0..0), + GetResultPayload::Stream(futures::stream::empty().boxed()), + ) + } else { + let range = options + .range + .map(|r| r.as_range(meta.size)) + .transpose() + .map_err(|source| generic_error(source.into()))? + .unwrap_or(0..meta.size); + + let reader = self + .client + .read(&make_absolute_file(location)) + .await + .to_object_store_err()?; + let start: usize = range + .start + .try_into() + .expect("unable to convert range.start to usize"); + let end: usize = range + .end + .try_into() + .expect("unable to convert range.end to usize"); + let stream = reader + .read_range_stream(start, end - start) + .map(|b| b.to_object_store_err()) + .boxed(); + + (range, GetResultPayload::Stream(stream)) + }; - Ok(()) + Ok(GetResult { + payload, + meta, + range, + attributes: Default::default(), + }) + } + + /// Delete a stream of objects. + fn delete_stream( + &self, + locations: BoxStream<'static, Result>, + ) -> BoxStream<'static, Result> { + let client = self.client.clone(); + locations + .map(move |location| { + let client = client.clone(); + async move { + let location = location?; + client + .delete(&make_absolute_file(&location), false) + .await + .to_object_store_err()?; + Ok(location) + } + }) + .buffered(10) + .boxed() } /// List all the objects with the given prefix. @@ -454,8 +437,7 @@ impl ObjectStore for HdfsObjectStore { }) } - /// Renames a file. This operation is guaranteed to be atomic. - async fn rename(&self, from: &Path, to: &Path) -> Result<()> { + async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> { // Make sure the parent directory exists let mut parent: Vec<_> = to.parts().collect(); parent.pop(); @@ -468,38 +450,63 @@ impl ObjectStore for HdfsObjectStore { .to_object_store_err()?; } + let overwrite = match options.target_mode { + RenameTargetMode::Overwrite => true, + RenameTargetMode::Create => false, + }; + Ok(self .client - .rename(&make_absolute_file(from), &make_absolute_file(to), true) + .rename( + &make_absolute_file(from), + &make_absolute_file(to), + overwrite, + ) .await .to_object_store_err()?) } - /// Renames a file only if the distination doesn't exist. This operation is guaranteed - /// to be atomic. - async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - self.client - .rename(&make_absolute_file(from), &make_absolute_file(to), false) + /// Copy an object from one path to another with options. + async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> { + let overwrite = match options.mode { + CopyMode::Create => { + // Eagerly check if the target file exists first before wasting time copying + match self.client.get_file_info(&make_absolute_file(to)).await { + Ok(_) => { + return Err(HdfsError::AlreadyExists(make_absolute_file(to))) + .to_object_store_err(); + } + Err(HdfsError::FileNotFound(_)) => false, + Err(e) => return Err(e).to_object_store_err(), + } + } + CopyMode::Overwrite => true, + }; + + let write_options = WriteOptions { + overwrite, + ..Default::default() + }; + + let file = self + .client + .read(&make_absolute_file(from)) .await - .to_object_store_err() - } + .to_object_store_err()?; + let mut stream = file.read_range_stream(0, file.file_length()).boxed(); - /// Copy an object from one path to another in the same object store. - /// - /// If there exists an object at the destination, it will be overwritten. - async fn copy(&self, from: &Path, to: &Path) -> Result<()> { - self.internal_copy(from, to, true).await - } + let mut new_file = self + .client + .create(&make_absolute_file(to), write_options) + .await + .to_object_store_err()?; - /// Copy an object from one path to another, only if destination is empty. - /// - /// Will return an error if the destination already has an object. - /// - /// Performs an atomic operation if the underlying object storage supports it. - /// If atomic operations are not supported by the underlying object storage (like S3) - /// it will return an error. - async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { - self.internal_copy(from, to, false).await + while let Some(bytes) = stream.next().await.transpose().to_object_store_err()? { + new_file.write(bytes).await.to_object_store_err()?; + } + new_file.close().await.to_object_store_err()?; + + Ok(()) } }