Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 40 additions & 172 deletions core/src/services/hdfs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
use std::fmt::Debug;
use std::fmt::Formatter;
use std::io;
use std::io::SeekFrom;
use std::path::PathBuf;
use std::sync::Arc;

use log::debug;

use super::HDFS_SCHEME;
use super::core::HdfsCore;
use super::delete::HdfsDeleter;
use super::lister::HdfsLister;
use super::reader::HdfsReader;
Expand Down Expand Up @@ -165,107 +164,69 @@ impl Builder for HdfsBuilder {
}

Ok(HdfsBackend {
info: {
let am = AccessorInfo::default();
am.set_scheme(HDFS_SCHEME)
.set_root(&root)
.set_native_capability(Capability {
stat: true,
core: Arc::new(HdfsCore {
info: {
let am = AccessorInfo::default();
am.set_scheme(HDFS_SCHEME)
.set_root(&root)
.set_native_capability(Capability {
stat: true,

read: true,
read: true,

write: true,
write_can_append: self.config.enable_append,
write: true,
write_can_append: self.config.enable_append,

create_dir: true,
delete: true,
create_dir: true,
delete: true,

list: true,
list: true,

rename: true,
rename: true,

shared: true,
shared: true,

..Default::default()
});
..Default::default()
});

am.into()
},
root,
atomic_write_dir,
client: Arc::new(client),
am.into()
},
root,
atomic_write_dir,
client: Arc::new(client),
}),
})
}
}

/// Backend for hdfs services.
#[derive(Debug, Clone)]
pub struct HdfsBackend {
pub info: Arc<AccessorInfo>,
pub root: String,
atomic_write_dir: Option<String>,
pub client: Arc<hdrs::Client>,
core: Arc<HdfsCore>,
}

/// hdrs::Client is thread-safe.
unsafe impl Send for HdfsBackend {}
unsafe impl Sync for HdfsBackend {}

impl Access for HdfsBackend {
type Reader = HdfsReader<hdrs::AsyncFile>;
type Writer = HdfsWriter<hdrs::AsyncFile>;
type Lister = Option<HdfsLister>;
type Deleter = oio::OneShotDeleter<HdfsDeleter>;

fn info(&self) -> Arc<AccessorInfo> {
self.info.clone()
self.core.info.clone()
}

async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> {
let p = build_rooted_abs_path(&self.root, path);

self.client.create_dir(&p).map_err(new_std_io_error)?;

self.core.hdfs_create_dir(path)?;
Ok(RpCreateDir::default())
}

async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
let p = build_rooted_abs_path(&self.root, path);

let meta = self.client.metadata(&p).map_err(new_std_io_error)?;

let mode = if meta.is_dir() {
EntryMode::DIR
} else if meta.is_file() {
EntryMode::FILE
} else {
EntryMode::Unknown
};
let mut m = Metadata::new(mode);
m.set_content_length(meta.len());
m.set_last_modified(Timestamp::try_from(meta.modified())?);

let m = self.core.hdfs_stat(path)?;
Ok(RpStat::new(m))
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let p = build_rooted_abs_path(&self.root, path);

let client = self.client.clone();
let mut f = client
.open_file()
.read(true)
.async_open(&p)
.await
.map_err(new_std_io_error)?;

if args.range().offset() != 0 {
use futures::AsyncSeekExt;

f.seek(SeekFrom::Start(args.range().offset()))
.await
.map_err(new_std_io_error)?;
}
let f = self.core.hdfs_read(path, &args).await?;

Ok((
RpRead::new(),
Expand All @@ -274,58 +235,16 @@ impl Access for HdfsBackend {
}

async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let target_path = build_rooted_abs_path(&self.root, path);
let mut initial_size = 0;
let target_exists = match self.client.metadata(&target_path) {
Ok(meta) => {
initial_size = meta.len();
true
}
Err(err) => {
if err.kind() != io::ErrorKind::NotFound {
return Err(new_std_io_error(err));
}
false
}
};

let should_append = op.append() && target_exists;
let tmp_path = self.atomic_write_dir.as_ref().and_then(|atomic_write_dir| {
// If the target file exists, we should append to the end of it directly.
(!should_append).then_some(build_rooted_abs_path(
atomic_write_dir,
&build_tmp_path_of(path),
))
});

if !target_exists {
let parent = get_parent(&target_path);
self.client.create_dir(parent).map_err(new_std_io_error)?;
}
if !should_append {
initial_size = 0;
}

let mut open_options = self.client.open_file();
open_options.create(true);
if should_append {
open_options.append(true);
} else {
open_options.write(true);
}

let f = open_options
.async_open(tmp_path.as_ref().unwrap_or(&target_path))
.await
.map_err(new_std_io_error)?;
let (target_path, tmp_path, f, target_exists, initial_size) =
self.core.hdfs_write(path, &op).await?;

Ok((
RpWrite::new(),
HdfsWriter::new(
target_path,
tmp_path,
f,
Arc::clone(&self.client),
Arc::clone(&self.core.client),
target_exists,
initial_size,
),
Expand All @@ -335,73 +254,22 @@ impl Access for HdfsBackend {
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
Ok((
RpDelete::default(),
oio::OneShotDeleter::new(HdfsDeleter::new(Arc::new(self.clone()))),
oio::OneShotDeleter::new(HdfsDeleter::new(Arc::clone(&self.core))),
))
}

async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
let p = build_rooted_abs_path(&self.root, path);

let f = match self.client.read_dir(&p) {
Ok(f) => f,
Err(e) => {
return if e.kind() == io::ErrorKind::NotFound {
Ok((RpList::default(), None))
} else {
Err(new_std_io_error(e))
};
match self.core.hdfs_list(path)? {
Some(f) => {
let rd = HdfsLister::new(&self.core.root, f, path);
Ok((RpList::default(), Some(rd)))
}
};

let rd = HdfsLister::new(&self.root, f, path);

Ok((RpList::default(), Some(rd)))
None => Ok((RpList::default(), None)),
}
}

async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
let from_path = build_rooted_abs_path(&self.root, from);
self.client.metadata(&from_path).map_err(new_std_io_error)?;

let to_path = build_rooted_abs_path(&self.root, to);
let result = self.client.metadata(&to_path);
match result {
Err(err) => {
// Early return if other error happened.
if err.kind() != io::ErrorKind::NotFound {
return Err(new_std_io_error(err));
}

let parent = PathBuf::from(&to_path)
.parent()
.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"path should have parent but not, it must be malformed",
)
.with_context("input", &to_path)
})?
.to_path_buf();

self.client
.create_dir(&parent.to_string_lossy())
.map_err(new_std_io_error)?;
}
Ok(metadata) => {
if metadata.is_file() {
self.client
.remove_file(&to_path)
.map_err(new_std_io_error)?;
} else {
return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
.with_context("input", &to_path));
}
}
}

self.client
.rename_file(&from_path, &to_path)
.map_err(new_std_io_error)?;

self.core.hdfs_rename(from, to)?;
Ok(RpRename::new())
}
}
Loading
Loading