Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 53 additions & 168 deletions core/src/services/hdfs_native/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;

use hdfs_native::HdfsError;
use hdfs_native::WriteOptions;
use log::debug;

use super::HDFS_NATIVE_SCHEME;
use super::core::HdfsNativeCore;
use super::delete::HdfsNativeDeleter;
use super::error::parse_hdfs_error;
use super::lister::HdfsNativeLister;
Expand Down Expand Up @@ -110,9 +109,37 @@ impl Builder for HdfsNativeBuilder {

// need to check if root dir exists, create if not
Ok(HdfsNativeBackend {
root,
client: Arc::new(client),
enable_append: self.config.enable_append,
core: Arc::new(HdfsNativeCore {
info: {
let am = AccessorInfo::default();
am.set_scheme(HDFS_NATIVE_SCHEME)
.set_root(&root)
.set_native_capability(Capability {
stat: true,

read: true,

write: true,
write_can_append: self.config.enable_append,

create_dir: true,
delete: true,

list: true,

rename: true,

shared: true,

..Default::default()
});

am.into()
},
root,
client: Arc::new(client),
enable_append: self.config.enable_append,
}),
})
}
}
Expand All @@ -128,209 +155,67 @@ impl Builder for HdfsNativeBuilder {
/// Backend for hdfs-native services.
#[derive(Debug, Clone)]
pub struct HdfsNativeBackend {
pub root: String,
pub client: Arc<hdfs_native::Client>,
enable_append: bool,
core: Arc<HdfsNativeCore>,
}

/// hdfs_native::Client is thread-safe.
unsafe impl Send for HdfsNativeBackend {}
unsafe impl Sync for HdfsNativeBackend {}

impl Access for HdfsNativeBackend {
type Reader = HdfsNativeReader;
type Writer = HdfsNativeWriter;
type Lister = Option<HdfsNativeLister>;
type Deleter = oio::OneShotDeleter<HdfsNativeDeleter>;

fn info(&self) -> Arc<AccessorInfo> {
let am = AccessorInfo::default();
am.set_scheme(HDFS_NATIVE_SCHEME)
.set_root(&self.root)
.set_native_capability(Capability {
stat: true,

read: true,

write: true,
write_can_append: self.enable_append,

create_dir: true,
delete: true,

list: true,

rename: true,

shared: true,

..Default::default()
});

am.into()
self.core.info.clone()
}

async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result<RpCreateDir> {
let p = build_rooted_abs_path(&self.root, path);

self.client
.mkdirs(&p, 0o777, true)
.await
.map_err(parse_hdfs_error)?;

self.core.hdfs_create_dir(path).await?;
Ok(RpCreateDir::default())
}

async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
let p = build_rooted_abs_path(&self.root, path);

let status: hdfs_native::client::FileStatus = self
.client
.get_file_info(&p)
.await
.map_err(parse_hdfs_error)?;

let mode = if status.isdir {
EntryMode::DIR
} else {
EntryMode::FILE
};

let mut metadata = Metadata::new(mode);
metadata
.set_last_modified(Timestamp::from_millisecond(
status.modification_time as i64,
)?)
.set_content_length(status.length as u64);

Ok(RpStat::new(metadata))
let m = self.core.hdfs_stat(path).await?;
Ok(RpStat::new(m))
}

async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let p = build_rooted_abs_path(&self.root, path);
let (f, offset, size) = self.core.hdfs_read(path, &args).await?;

let f = self.client.read(&p).await.map_err(parse_hdfs_error)?;

let r = HdfsNativeReader::new(
f,
args.range().offset() as _,
args.range().size().unwrap_or(u64::MAX) as _,
);
let r = HdfsNativeReader::new(f, offset as _, size as _);

Ok((RpRead::new(), r))
}

async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let target_path = build_rooted_abs_path(&self.root, path);
let mut initial_size = 0;

let target_exists = match self.client.get_file_info(&target_path).await {
Ok(status) => {
initial_size = status.length as u64;
true
}
Err(err) => match &err {
HdfsError::FileNotFound(_) => false,
_ => return Err(parse_hdfs_error(err)),
},
};

let f = if target_exists {
if args.append() {
assert!(self.enable_append, "append is not enabled");
self.client
.append(&target_path)
.await
.map_err(parse_hdfs_error)?
} else {
initial_size = 0;
self.client
.create(&target_path, WriteOptions::default().overwrite(true))
.await
.map_err(parse_hdfs_error)?
}
} else {
initial_size = 0;
self.client
.create(&target_path, WriteOptions::default())
.await
.map_err(parse_hdfs_error)?
};
let (f, initial_size) = self.core.hdfs_write(path, &args).await?;

Ok((RpWrite::new(), HdfsNativeWriter::new(f, initial_size)))
}

async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
Ok((
RpDelete::default(),
oio::OneShotDeleter::new(HdfsNativeDeleter::new(Arc::new(self.clone()))),
oio::OneShotDeleter::new(HdfsNativeDeleter::new(Arc::clone(&self.core))),
))
}

async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> {
let p: String = build_rooted_abs_path(&self.root, path);

let isdir = match self.client.get_file_info(&p).await {
Ok(status) => status.isdir,
Err(err) => {
return match &err {
HdfsError::FileNotFound(_) => Ok((RpList::default(), None)),
_ => Err(parse_hdfs_error(err)),
};
}
};
let current_path = if isdir {
if !path.ends_with("/") {
Some(path.to_string() + "/")
} else {
Some(path.to_string())
}
} else {
None
};

Ok((
RpList::default(),
Some(HdfsNativeLister::new(
&self.root,
&self.client,
&p,
current_path,
match self.core.hdfs_list(path).await? {
Some((p, current_path)) => Ok((
RpList::default(),
Some(HdfsNativeLister::new(
&self.core.root,
&self.core.client,
&p,
current_path,
)),
)),
))
None => Ok((RpList::default(), None)),
}
}

async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> {
let from_path = build_rooted_abs_path(&self.root, from);
let to_path = build_rooted_abs_path(&self.root, to);
match self.client.get_file_info(&to_path).await {
Ok(status) => {
if status.isdir {
return Err(Error::new(ErrorKind::IsADirectory, "path should be a file")
.with_context("input", &to_path));
} else {
self.client
.delete(&to_path, true)
.await
.map_err(parse_hdfs_error)?;
}
}
Err(err) => match &err {
HdfsError::FileNotFound(_) => {
self.client
.create(&to_path, WriteOptions::default().create_parent(true))
.await
.map_err(parse_hdfs_error)?;
}
_ => return Err(parse_hdfs_error(err)),
},
};

self.client
.rename(&from_path, &to_path, true)
.await
.map_err(parse_hdfs_error)?;

self.core.hdfs_rename(from, to).await?;
Ok(RpRename::default())
}
}
Loading
Loading