From e5e78c55bc3ebc3fd38d525889679ddb1e6adf1a Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 6 Jul 2023 12:47:50 -0700 Subject: [PATCH 1/6] update to object_store 0.6.1 --- hdfs/Cargo.toml | 2 +- hdfs/src/object_store/hdfs.rs | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/hdfs/Cargo.toml b/hdfs/Cargo.toml index 2ee206d..3d3acbe 100644 --- a/hdfs/Cargo.toml +++ b/hdfs/Cargo.toml @@ -46,5 +46,5 @@ chrono = { version = "0.4" } fs-hdfs = { version = "^0.1.11", optional = true } fs-hdfs3 = { version = "^0.1.11", optional = true } futures = "0.3" -object_store = "0.5.4" +object_store = "0.6" tokio = { version = "1.18", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] } diff --git a/hdfs/src/object_store/hdfs.rs b/hdfs/src/object_store/hdfs.rs index 214cc8e..0ad65f8 100644 --- a/hdfs/src/object_store/hdfs.rs +++ b/hdfs/src/object_store/hdfs.rs @@ -29,10 +29,7 @@ use chrono::{DateTime, NaiveDateTime, Utc}; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use hdfs::hdfs::{get_hdfs_by_full_path, FileStatus, HdfsErr, HdfsFile, HdfsFs}; use hdfs::walkdir::HdfsWalkDir; -use object_store::{ - path::{self, Path}, - Error, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result, -}; +use object_store::{path::{self, Path}, Error, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result, GetOptions}; use tokio::io::AsyncWrite; /// scheme for HDFS File System @@ -176,6 +173,10 @@ impl ObjectStore for HadoopFileSystem { )) } + async fn get_opts(&self, _location: &Path, _options: GetOptions) -> Result { + todo!() + } + async fn get_range(&self, location: &Path, range: Range) -> Result { let hdfs = self.hdfs.clone(); let location = HadoopFileSystem::path_to_filesystem(location); @@ -410,6 +411,7 @@ pub fn convert_metadata(file: FileStatus, prefix: &str) -> ObjectMeta { Utc, ), size: file.len(), + e_tag: None, } } From 8cd416e687ac6ea1454bb2d0b9d49d48423252e5 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 25 Jul 2023 10:42:33 -0700 Subject: [PATCH 2/6] Multipart upload --- Cargo.toml | 5 ++- hdfs/Cargo.toml | 2 +- hdfs/src/object_store/hdfs.rs | 67 ++++++++++++++++++++++++++++++++--- 3 files changed, 67 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4e8a0d4..a8d88b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,4 +20,7 @@ members = [ "hdfs", "hdfs-examples", "hdfs-testing", -] \ No newline at end of file +] + +[patch.crates-io] +object_store = { git = "https://github.com/yjshen/arrow-rs.git", branch = "pub_multipart_upload" } diff --git a/hdfs/Cargo.toml b/hdfs/Cargo.toml index 3d3acbe..bc079ca 100644 --- a/hdfs/Cargo.toml +++ b/hdfs/Cargo.toml @@ -46,5 +46,5 @@ chrono = { version = "0.4" } fs-hdfs = { version = "^0.1.11", optional = true } fs-hdfs3 = { version = "^0.1.11", optional = true } futures = "0.3" -object_store = "0.6" +object_store = { version = "0.6", features = ["cloud"] } tokio = { version = "1.18", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] } diff --git a/hdfs/src/object_store/hdfs.rs b/hdfs/src/object_store/hdfs.rs index 0ad65f8..3d6f098 100644 --- a/hdfs/src/object_store/hdfs.rs +++ b/hdfs/src/object_store/hdfs.rs @@ -17,11 +17,11 @@ //! Object store that represents the HDFS File System. -use std::collections::{BTreeSet, VecDeque}; +use std::collections::{BTreeSet, HashMap, VecDeque}; use std::fmt::{Display, Formatter}; use std::ops::Range; use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use async_trait::async_trait; use bytes::Bytes; @@ -30,6 +30,7 @@ use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use hdfs::hdfs::{get_hdfs_by_full_path, FileStatus, HdfsErr, HdfsFile, HdfsFs}; use hdfs::walkdir::HdfsWalkDir; use object_store::{path::{self, Path}, Error, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result, GetOptions}; +use object_store::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl}; use tokio::io::AsyncWrite; /// scheme for HDFS File System @@ -108,6 +109,55 @@ impl Display for HadoopFileSystem { } } +struct HdfsMultiPartUpload { + location: Path, + hdfs: Arc, + content: Arc>>>, +} + +#[async_trait] +impl CloudMultiPartUploadImpl for HdfsMultiPartUpload { + async fn put_multipart_part(&self, buf: Vec, part_idx: usize) -> Result { + let mut content = self.content.lock().unwrap(); + content.insert(part_idx, buf); + + Ok(object_store::multipart::UploadPart { + content_id: part_idx.to_string(), + }) + } + + async fn complete(&self, completed_parts: Vec) -> Result<(), std::io::Error> { + let hdfs = self.hdfs.clone(); + let location = HadoopFileSystem::path_to_filesystem(&self.location); + + maybe_spawn_blocking(move || { + let file = match hdfs.create_with_overwrite(&location, true) { + Ok(f) => f, + Err(e) => { + return Err(to_error(e)); + } + }; + + let mut content = self.content.lock().unwrap(); + // sort by hash key and put into file + let mut keys: Vec = content.keys().cloned().collect(); + keys.sort(); + + assert_eq!(keys[0], 0, "Missing part 0 for multipart upload"); + assert_eq!(keys[keys.len() - 1], keys.len() - 1, "Missing last part for multipart upload"); + + for key in keys { + let buf = content.get(&key).unwrap(); + file.write(buf.as_slice()).map_err(to_error)?; + } + + file.close().map_err(to_error)?; + + Ok(()) + }) + } +} + #[async_trait] impl ObjectStore for HadoopFileSystem { // Current implementation is very simple due to missing configs, @@ -135,13 +185,20 @@ impl ObjectStore for HadoopFileSystem { async fn put_multipart( &self, - _location: &Path, + location: &Path, ) -> Result<(MultipartId, Box)> { - todo!() + let upload = HdfsMultiPartUpload { + location: location.clone(), + hdfs: self.hdfs.clone(), + content: Arc::new(Mutex::new(HashMap::new())), + }; + + Ok((MultipartId::default(), Box::new(CloudMultiPartUpload::new(upload, 8)))) } async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> { - todo!() + // Currently, the implementation doesn't put anything to HDFS until complete is called. + Ok(()) } async fn get(&self, location: &Path) -> Result { From 8b872e9b1799a57f06aafe6b142068f2559a8628 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 25 Jul 2023 14:56:53 -0700 Subject: [PATCH 3/6] fix --- hdfs/src/object_store/hdfs.rs | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/hdfs/src/object_store/hdfs.rs b/hdfs/src/object_store/hdfs.rs index 3d6f098..0fb1e22 100644 --- a/hdfs/src/object_store/hdfs.rs +++ b/hdfs/src/object_store/hdfs.rs @@ -126,9 +126,10 @@ impl CloudMultiPartUploadImpl for HdfsMultiPartUpload { }) } - async fn complete(&self, completed_parts: Vec) -> Result<(), std::io::Error> { + async fn complete(&self, _completed_parts: Vec) -> Result<(), std::io::Error> { let hdfs = self.hdfs.clone(); - let location = HadoopFileSystem::path_to_filesystem(&self.location); + let location = HadoopFileSystem::path_to_filesystem(&self.location.clone()); + let content = self.content.clone(); maybe_spawn_blocking(move || { let file = match hdfs.create_with_overwrite(&location, true) { @@ -138,7 +139,7 @@ impl CloudMultiPartUploadImpl for HdfsMultiPartUpload { } }; - let mut content = self.content.lock().unwrap(); + let content = content.lock().unwrap(); // sort by hash key and put into file let mut keys: Vec = content.keys().cloned().collect(); keys.sort(); @@ -155,6 +156,8 @@ impl CloudMultiPartUploadImpl for HdfsMultiPartUpload { Ok(()) }) + .await + .map_err(to_io_error) } } @@ -607,6 +610,27 @@ fn to_error(err: HdfsErr) -> Error { } } +fn to_io_error(err: Error) -> std::io::Error { + match err { + Error::Generic { store, source } => { + std::io::Error::new(std::io::ErrorKind::Other, format!("{}: {}", store, source)) + } + Error::NotFound { path, source } => { + std::io::Error::new(std::io::ErrorKind::NotFound, format!("{}: {}", path, source)) + } + Error::AlreadyExists { path, source } => { + std::io::Error::new(std::io::ErrorKind::AlreadyExists, format!("{}: {}", path, source)) + } + Error::InvalidPath { source } => { + std::io::Error::new(std::io::ErrorKind::InvalidInput, source) + } + + _ => { + std::io::Error::new(std::io::ErrorKind::Other, format!("HadoopFileSystem: {}", err)) + } + } +} + #[cfg(test)] mod tests { use super::*; From 1b82a3c190cc455eca4441ffd602e30554bff16b Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Wed, 26 Jul 2023 09:26:53 -0700 Subject: [PATCH 4/6] object_store depend on un-released arrow-rs commit --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index a8d88b9..4eaa6dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,4 +23,4 @@ members = [ ] [patch.crates-io] -object_store = { git = "https://github.com/yjshen/arrow-rs.git", branch = "pub_multipart_upload" } +object_store = { git = "https://github.com/apache/arrow-rs.git", rev = "bff6155d38e19bfe62a776731b78b435560f2c8e" } From c242f5e0507675e95920a1140e7dbd62faa35e8a Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 24 Aug 2023 11:37:29 -0700 Subject: [PATCH 5/6] feat: add support for AsyncWrite --- hdfs/src/object_store/hdfs.rs | 171 +++++++++++++++++++++++++--------- 1 file changed, 125 insertions(+), 46 deletions(-) diff --git a/hdfs/src/object_store/hdfs.rs b/hdfs/src/object_store/hdfs.rs index 0fb1e22..5ab88ba 100644 --- a/hdfs/src/object_store/hdfs.rs +++ b/hdfs/src/object_store/hdfs.rs @@ -17,7 +17,7 @@ //! Object store that represents the HDFS File System. -use std::collections::{BTreeSet, HashMap, VecDeque}; +use std::collections::{BTreeSet, VecDeque}; use std::fmt::{Display, Formatter}; use std::ops::Range; use std::path::PathBuf; @@ -29,8 +29,11 @@ use chrono::{DateTime, NaiveDateTime, Utc}; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use hdfs::hdfs::{get_hdfs_by_full_path, FileStatus, HdfsErr, HdfsFile, HdfsFs}; use hdfs::walkdir::HdfsWalkDir; -use object_store::{path::{self, Path}, Error, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result, GetOptions}; use object_store::multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl}; +use object_store::{ + path::{self, Path}, + Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result, +}; use tokio::io::AsyncWrite; /// scheme for HDFS File System @@ -112,52 +115,120 @@ impl Display for HadoopFileSystem { struct HdfsMultiPartUpload { location: Path, hdfs: Arc, - content: Arc>>>, + content: Arc>>>>, + first_unwritten_idx: Arc>, + file_created: Arc>, +} + +impl HdfsMultiPartUpload { + fn create_file_if_necessary(&self) -> Result<()> { + let mut file_created = self.file_created.lock().unwrap(); + if !*file_created { + let location = HadoopFileSystem::path_to_filesystem(&self.location.clone()); + match self.hdfs.create_with_overwrite(&location, true) { + Ok(_) => { + *file_created = true; + Ok(()) + } + Err(e) => Err(to_error(e)), + } + } else { + Ok(()) + } + } } #[async_trait] impl CloudMultiPartUploadImpl for HdfsMultiPartUpload { - async fn put_multipart_part(&self, buf: Vec, part_idx: usize) -> Result { - let mut content = self.content.lock().unwrap(); - content.insert(part_idx, buf); - - Ok(object_store::multipart::UploadPart { - content_id: part_idx.to_string(), - }) - } + async fn put_multipart_part( + &self, + buf: Vec, + part_idx: usize, + ) -> Result { + { + let mut content = self.content.lock().unwrap(); + while content.len() <= part_idx { + content.push(None); + } + content[part_idx] = Some(buf); + } - async fn complete(&self, _completed_parts: Vec) -> Result<(), std::io::Error> { - let hdfs = self.hdfs.clone(); let location = HadoopFileSystem::path_to_filesystem(&self.location.clone()); - let content = self.content.clone(); + let first_unwritten_idx = { + let guard = self.first_unwritten_idx.lock().unwrap(); + *guard + }; - maybe_spawn_blocking(move || { - let file = match hdfs.create_with_overwrite(&location, true) { - Ok(f) => f, - Err(e) => { - return Err(to_error(e)); - } - }; + self.create_file_if_necessary()?; - let content = content.lock().unwrap(); - // sort by hash key and put into file - let mut keys: Vec = content.keys().cloned().collect(); - keys.sort(); + // Attempt to write all contiguous sequences of parts + if first_unwritten_idx <= part_idx { + let hdfs = self.hdfs.clone(); + let content = self.content.clone(); + let first_unwritten_idx = self.first_unwritten_idx.clone(); - assert_eq!(keys[0], 0, "Missing part 0 for multipart upload"); - assert_eq!(keys[keys.len() - 1], keys.len() - 1, "Missing last part for multipart upload"); + maybe_spawn_blocking(move || { + let file = hdfs.append(&location).map_err(to_error)?; + let mut content = content.lock().unwrap(); - for key in keys { - let buf = content.get(&key).unwrap(); - file.write(buf.as_slice()).map_err(to_error)?; - } + let mut first_unwritten_idx = first_unwritten_idx.lock().unwrap(); - file.close().map_err(to_error)?; + // Write all contiguous parts and free up the memory + while let Some(buf) = content.get_mut(*first_unwritten_idx).and_then(Option::take) { + file.write(buf.as_slice()).map_err(to_error)?; + *first_unwritten_idx += 1; + } - Ok(()) + file.close().map_err(to_error)?; + Ok(()) + }) + .await + .map_err(to_io_error)?; + } + + Ok(object_store::multipart::UploadPart { + content_id: part_idx.to_string(), }) - .await - .map_err(to_io_error) + } + + async fn complete( + &self, + completed_parts: Vec, + ) -> Result<(), std::io::Error> { + let content = self.content.lock().unwrap(); + if content.len() != completed_parts.len() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "Expected {} parts, but only {} parts were received", + content.len(), + completed_parts.len() + ), + )); + } + + // check first_unwritten_idx + let first_unwritten_idx = self.first_unwritten_idx.lock().unwrap(); + if *first_unwritten_idx != content.len() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "Expected to write {} parts, but only {} parts were written", + content.len(), + *first_unwritten_idx + ), + )); + } + + // Last check: make sure all parts were written, since we change it to None after writing + if content.iter().any(Option::is_some) { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Not all parts were written", + )); + } + + Ok(()) } } @@ -193,10 +264,15 @@ impl ObjectStore for HadoopFileSystem { let upload = HdfsMultiPartUpload { location: location.clone(), hdfs: self.hdfs.clone(), - content: Arc::new(Mutex::new(HashMap::new())), + content: Arc::new(Mutex::new(Vec::new())), + first_unwritten_idx: Arc::new(Mutex::new(0)), + file_created: Arc::new(Mutex::new(false)), }; - Ok((MultipartId::default(), Box::new(CloudMultiPartUpload::new(upload, 8)))) + Ok(( + MultipartId::default(), + Box::new(CloudMultiPartUpload::new(upload, 8)), + )) } async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> { @@ -615,19 +691,22 @@ fn to_io_error(err: Error) -> std::io::Error { Error::Generic { store, source } => { std::io::Error::new(std::io::ErrorKind::Other, format!("{}: {}", store, source)) } - Error::NotFound { path, source } => { - std::io::Error::new(std::io::ErrorKind::NotFound, format!("{}: {}", path, source)) - } - Error::AlreadyExists { path, source } => { - std::io::Error::new(std::io::ErrorKind::AlreadyExists, format!("{}: {}", path, source)) - } + Error::NotFound { path, source } => std::io::Error::new( + std::io::ErrorKind::NotFound, + format!("{}: {}", path, source), + ), + Error::AlreadyExists { path, source } => std::io::Error::new( + std::io::ErrorKind::AlreadyExists, + format!("{}: {}", path, source), + ), Error::InvalidPath { source } => { std::io::Error::new(std::io::ErrorKind::InvalidInput, source) } - _ => { - std::io::Error::new(std::io::ErrorKind::Other, format!("HadoopFileSystem: {}", err)) - } + _ => std::io::Error::new( + std::io::ErrorKind::Other, + format!("HadoopFileSystem: {}", err), + ), } } From 5b7631d7ca6065e3008f2b82e435aaa8b5db9363 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Thu, 24 Aug 2023 11:46:31 -0700 Subject: [PATCH 6/6] remove uploaded part while aborting --- hdfs/src/object_store/hdfs.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hdfs/src/object_store/hdfs.rs b/hdfs/src/object_store/hdfs.rs index 5ab88ba..f8ad84b 100644 --- a/hdfs/src/object_store/hdfs.rs +++ b/hdfs/src/object_store/hdfs.rs @@ -275,9 +275,9 @@ impl ObjectStore for HadoopFileSystem { )) } - async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> { - // Currently, the implementation doesn't put anything to HDFS until complete is called. - Ok(()) + async fn abort_multipart(&self, location: &Path, _multipart_id: &MultipartId) -> Result<()> { + // remove the file if it exists + self.delete(location).await } async fn get(&self, location: &Path) -> Result {