Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions compio-driver/src/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,38 @@ impl<S, F, D> IntoInner for AsyncifyFd<S, F, D> {
}
}

pin_project! {
/// Spawn a blocking function with two file descriptors in the thread pool.
pub struct AsyncifyFd2<S1, S2, F, D> {
pub(crate) fd1: SharedFd<S1>,
pub(crate) fd2: SharedFd<S2>,
pub(crate) f: Option<F>,
pub(crate) data: Option<D>,
_p: PhantomPinned,
}
}

impl<S1, S2, F, D> AsyncifyFd2<S1, S2, F, D> {
/// Create [`AsyncifyFd2`].
pub fn new(fd1: SharedFd<S1>, fd2: SharedFd<S2>, f: F) -> Self {
Self {
fd1,
fd2,
f: Some(f),
data: None,
_p: PhantomPinned,
}
}
}

impl<S1, S2, F, D> IntoInner for AsyncifyFd2<S1, S2, F, D> {
type Inner = D;

fn into_inner(mut self) -> Self::Inner {
self.data.take().expect("the data should not be None")
}
}

/// Close the file fd.
pub struct CloseFile {
pub(crate) fd: ManuallyDrop<OwnedFd>,
Expand Down
24 changes: 24 additions & 0 deletions compio-driver/src/sys/iocp/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,30 @@ unsafe impl<
}
}

unsafe impl<
S1,
S2,
D: std::marker::Send + 'static,
F: (FnOnce(&S1, &S2) -> BufResult<usize, D>) + std::marker::Send + 'static,
> OpCode for AsyncifyFd2<S1, S2, F, D>
{
fn op_type(&self) -> OpType {
OpType::Blocking
}

unsafe fn operate(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
// SAFETY: self won't be moved
let this = self.project();
let f = this
.f
.take()
.expect("the operate method could only be called once");
let BufResult(res, data) = f(this.fd1, this.fd2);
*this.data = Some(data);
Poll::Ready(res)
}
}

unsafe impl OpCode for CloseFile {
fn op_type(&self) -> OpType {
OpType::Blocking
Expand Down
23 changes: 23 additions & 0 deletions compio-driver/src/sys/iour/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,29 @@ unsafe impl<
}
}

unsafe impl<
S1,
S2,
D: std::marker::Send + 'static,
F: (FnOnce(&S1, &S2) -> BufResult<usize, D>) + std::marker::Send + 'static,
> OpCode for AsyncifyFd2<S1, S2, F, D>
{
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
OpEntry::Blocking
}

fn call_blocking(self: Pin<&mut Self>) -> std::io::Result<usize> {
let this = self.project();
let f = this
.f
.take()
.expect("the operate method could only be called once");
let BufResult(res, data) = f(this.fd1, this.fd2);
*this.data = Some(data);
res
}
}

unsafe impl<S: AsFd> OpCode for OpenFile<S> {
fn create_entry(self: Pin<&mut Self>) -> OpEntry {
opcode::OpenAt::new(Fd(self.dirfd.as_fd().as_raw_fd()), self.path.as_ptr())
Expand Down
23 changes: 23 additions & 0 deletions compio-driver/src/sys/poll/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,29 @@ unsafe impl<
}
}

unsafe impl<
S1,
S2,
D: std::marker::Send + 'static,
F: (FnOnce(&S1, &S2) -> BufResult<usize, D>) + std::marker::Send + 'static,
> OpCode for AsyncifyFd2<S1, S2, F, D>
{
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
Ok(Decision::Blocking)
}

fn operate(self: Pin<&mut Self>) -> Poll<io::Result<usize>> {
let this = self.project();
let f = this
.f
.take()
.expect("the operate method could only be called once");
let BufResult(res, data) = f(this.fd1, this.fd2);
*this.data = Some(data);
Poll::Ready(res)
}
}

unsafe impl<S: AsFd> OpCode for OpenFile<S> {
fn pre_submit(self: Pin<&mut Self>) -> io::Result<Decision> {
Ok(Decision::Blocking)
Expand Down
9 changes: 9 additions & 0 deletions compio-driver/src/sys/stub/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ impl<
{
}

impl<
S1,
S2,
D: std::marker::Send + 'static,
F: (FnOnce(&S1, &S2) -> BufResult<usize, D>) + std::marker::Send + 'static,
> OpCode for AsyncifyFd2<S1, S2, F, D>
{
}

impl<S: AsFd> OpCode for OpenFile<S> {}

impl OpCode for CloseFile {}
Expand Down
8 changes: 8 additions & 0 deletions compio-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ windows-sys = { workspace = true, features = [
"Win32_System_SystemServices",
] }

cap-primitives = { version = "4.0.0", optional = true }

# Windows specific dev dependencies
[target.'cfg(windows)'.dev-dependencies]
windows-sys = { workspace = true, features = ["Win32_Security_Authorization"] }
Expand Down Expand Up @@ -64,6 +66,12 @@ nix = { workspace = true, features = ["fs"] }
compio-net = { workspace = true }

[features]
dir = ["dep:cap-primitives"]

read_buf = ["compio-buf/read_buf", "compio-io/read_buf"]
windows_by_handle = []
nightly = ["read_buf", "windows_by_handle"]

[[test]]
name = "dir"
required-features = ["dir"]
1 change: 1 addition & 0 deletions compio-fs/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ fn main() {
solarish: { any(target_os = "illumos", target_os = "solaris") },
gnulinux: { all(target_os = "linux", target_env = "gnu") },
linux_all: { any(target_os = "linux", target_os = "android") },
dirfd: { feature = "dir" },
}
}
205 changes: 205 additions & 0 deletions compio-fs/src/dirfd/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
use std::{io, path::Path};

use compio_buf::{BufResult, IoBuf, buf_try};
use compio_io::{AsyncReadAtExt, AsyncWriteAtExt};

use crate::{DirBuilder, File, Metadata, OpenOptions};

#[cfg(unix)]
#[path = "unix.rs"]
mod sys;

#[cfg(windows)]
#[path = "windows.rs"]
mod sys;

/// A reference to an open directory on a filesystem.
///
/// ## Platform specific
/// * Windows: the operations are forwarded to `cap-primitives`. They will treat
/// self as the root of the filesystem.
/// * Unix: the operatiosn are forwarded to syscalls directly. They don't limit
/// the path to be under self. If the path is absolute, the directory
/// represented by self will be ignored.
#[derive(Debug, Clone)]
pub struct Dir {
inner: sys::Dir,
}

impl Dir {
/// Opens a directory at the specified path and returns a reference to it.
pub async fn open(path: impl AsRef<Path>) -> io::Result<Self> {
Ok(Dir {
inner: sys::Dir::open(path).await?,
})
}

/// Opens a file at `path` with the options specified by `options`.
pub async fn open_file_with(
&self,
path: impl AsRef<Path>,
options: &OpenOptions,
) -> io::Result<File> {
self.inner.open_file_with(path, options).await
}
Comment thread
Berrysoft marked this conversation as resolved.

/// Attempts to open a file in read-only mode.
pub async fn open_file(&self, path: impl AsRef<Path>) -> io::Result<File> {
self.open_file_with(path, OpenOptions::new().read(true))
.await
}

/// Opens a file in write-only mode.
pub async fn create_file(&self, path: impl AsRef<Path>) -> io::Result<File> {
self.open_file_with(
path,
OpenOptions::new().write(true).create(true).truncate(true),
)
.await
Comment thread
Berrysoft marked this conversation as resolved.
Comment thread
Berrysoft marked this conversation as resolved.
}

/// Attempts to open a directory.
pub async fn open_dir(&self, path: impl AsRef<Path>) -> io::Result<Self> {
Ok(Self {
inner: self.inner.open_dir(path).await?,
})
}

/// Creates the specified directory with the options configured in this
/// builder.
pub async fn create_dir_with(
&self,
path: impl AsRef<Path>,
builder: &DirBuilder,
) -> io::Result<()> {
self.inner.create_dir_with(path, builder).await
}

/// Creates a new, empty directory at the provided path.
pub async fn create_dir(&self, path: impl AsRef<Path>) -> io::Result<()> {
self.create_dir_with(path, &DirBuilder::new()).await
}

/// Recursively create a directory and all of its parent components if they
/// are missing.
pub async fn create_dir_all(&self, path: impl AsRef<Path>) -> io::Result<()> {
self.create_dir_with(path, DirBuilder::new().recursive(true))
.await
Comment thread
Berrysoft marked this conversation as resolved.
}

/// Queries metadata about the underlying directory.
pub async fn dir_metadata(&self) -> io::Result<Metadata> {
self.inner.dir_metadata().await
}

/// Given a path, query the file system to get information about a file,
/// directory, etc.
pub async fn metadata(&self, path: impl AsRef<Path>) -> io::Result<Metadata> {
self.inner.metadata(path).await
}

/// Query the metadata about a file without following symlinks.
pub async fn symlink_metadata(&self, path: impl AsRef<Path>) -> io::Result<Metadata> {
self.inner.symlink_metadata(path).await
}

/// Creates a new hard link on a filesystem.
pub async fn hard_link(
&self,
source: impl AsRef<Path>,
target_dir: &Self,
target: impl AsRef<Path>,
) -> io::Result<()> {
self.inner
.hard_link(source, &target_dir.inner, target)
.await
}

/// Creates a new symbolic link on a filesystem.
///
/// The `original` argument provides the target of the symlink. The `link`
/// argument provides the name of the created symlink.
///
/// Despite the argument ordering, `original` is not resolved relative to
/// self here. `link` is resolved relative to self, and `original` is
/// not resolved within this function.
#[cfg(unix)]
pub async fn symlink(
&self,
original: impl AsRef<Path>,
link: impl AsRef<Path>,
) -> io::Result<()> {
self.inner.symlink(original, link).await
}

/// Creates a new file symbolic link on a filesystem.
///
/// The `original` argument provides the target of the symlink. The `link`
/// argument provides the name of the created symlink.
///
/// Despite the argument ordering, `original` is not resolved relative to
/// self here. `link` is resolved relative to self, and `original` is
/// not resolved within this function.
#[cfg(windows)]
pub async fn symlink_file(
&self,
original: impl AsRef<Path>,
link: impl AsRef<Path>,
) -> io::Result<()> {
self.inner.symlink_file(original, link).await
}

/// Creates a new directory symlink on a filesystem.
///
/// The `original` argument provides the target of the symlink. The `link`
/// argument provides the name of the created symlink.
///
/// Despite the argument ordering, `original` is not resolved relative to
/// self here. `link` is resolved relative to self, and `original` is
/// not resolved within this function.
#[cfg(windows)]
pub async fn symlink_dir(
&self,
original: impl AsRef<Path>,
link: impl AsRef<Path>,
) -> io::Result<()> {
self.inner.symlink_dir(original, link).await
}

/// Rename a file or directory to a new name, replacing the original file if
/// it already exists.
pub async fn rename(
&self,
from: impl AsRef<Path>,
to_dir: &Self,
to: impl AsRef<Path>,
) -> io::Result<()> {
self.inner.rename(from, &to_dir.inner, to).await
}

/// Removes a file from a filesystem.
pub async fn remove_file(&self, path: impl AsRef<Path>) -> io::Result<()> {
self.inner.remove_file(path).await
}

/// Removes an empty directory.
pub async fn remove_dir(&self, path: impl AsRef<Path>) -> io::Result<()> {
self.inner.remove_dir(path).await
}

/// Read the entire contents of a file into a bytes vector.
pub async fn read(&self, path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
let file = self.open_file(path).await?;
let BufResult(res, buf) = file.read_to_end_at(Vec::new(), 0).await;
res?;
Ok(buf)
}

/// Write a buffer as the entire contents of a file.
pub async fn write<B: IoBuf>(&self, path: impl AsRef<Path>, buf: B) -> BufResult<(), B> {
let (mut file, buf) = buf_try!(self.create_file(path).await, buf);
file.write_all_at(buf, 0).await
}
}

compio_driver::impl_raw_fd!(Dir, std::fs::File, inner);
Loading