Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support futures cancellation. Migrate to parking_lot #18

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,7 @@ mio = "0.6.13"
aio-bindings = { path = "aio-bindings", version = "0.1.2" }
libc = "0.2"
memmap = "0.7.0"
parking_lot = "0.7.1"
fnv = "1.0.6"

[workspace]
150 changes: 111 additions & 39 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@
//! such a lock once the operation has completed.

extern crate aio_bindings;
extern crate fnv;
extern crate futures;
extern crate futures_cpupool;
extern crate libc;
extern crate memmap;
extern crate mio;
extern crate parking_lot;
extern crate rand;
extern crate tokio;

Expand Down Expand Up @@ -112,6 +114,13 @@ struct IocbInfo {
flags: u32,
}

// Data which is passed to AIO request
#[derive(Debug)]
struct RequestData {
// We have both sides of a oneshot channel here
completed_sender: Option<futures::sync::oneshot::Sender<aio_bindings::__s64>>,
}

// State information that is associated with an I/O request that is currently in flight.
#[derive(Debug)]
struct RequestState {
Expand All @@ -120,16 +129,16 @@ struct RequestState {

// Concurrency primitive to notify completion to the associated future
completed_receiver: futures::sync::oneshot::Receiver<aio_bindings::__s64>,

// We have both sides of a oneshot channel here
completed_sender: Option<futures::sync::oneshot::Sender<aio_bindings::__s64>>,
}

// Common data structures for futures returned by `AioContext`.
struct AioBaseFuture {
// reference to the `AioContext` that controls the submission queue for asynchronous I/O
context: std::sync::Arc<AioContextInner>,

// set of links to in_flight data
in_flight: std::sync::Arc<parking_lot::Mutex<fnv::FnvHashSet<usize>>>,

// request information captured for the kernel request
iocb_info: IocbInfo,

Expand All @@ -155,22 +164,23 @@ impl AioBaseFuture {
Ok(futures::Async::NotReady) => return Ok(futures::Async::NotReady),
Ok(futures::Async::Ready(_)) => {
// retrieve a state container from the set of available ones and move it into the future
let mut guard = self.context.capacity.write();
match guard {
Ok(ref mut guard) => {
self.state = guard.state.pop();
}
Err(_) => panic!("TODO: Figure out how to handle this kind of error"),
}
self.state = self.context.capacity.write().state.pop();
}
}

assert!(self.state.is_some());
let state = self.state.as_mut().unwrap();
let state_addr = state.deref().deref() as *const RequestState;

let (sender, receiver) = futures::sync::oneshot::channel();

let mut data = Box::new(RequestData {
completed_sender: Some(sender),
});
let data_ptr = Box::into_raw(data);
let data_addr = unsafe { mem::transmute::<_, usize>(data_ptr) };

// Fill in the iocb data structure to be submitted to the kernel
state.request.aio_data = unsafe { mem::transmute::<_, usize>(state_addr) } as u64;
state.request.aio_data = data_addr as u64;
state.request.aio_resfd = self.context.completed_fd as u32;
state.request.aio_flags = aio::IOCB_FLAG_RESFD | self.iocb_info.flags;
state.request.aio_fildes = self.iocb_info.fd as u32;
Expand All @@ -180,9 +190,12 @@ impl AioBaseFuture {
state.request.aio_lio_opcode = self.iocb_info.opcode as u16;

// attach synchronization primitives that are used to indicate completion of this request
let (sender, receiver) = futures::sync::oneshot::channel();
state.completed_receiver = receiver;
state.completed_sender = Some(sender);

let in_flight = &mut *self.in_flight.lock();

in_flight.insert(data_addr);


// submit the request
let mut request_ptr_array: [*mut aio::iocb; 1] =
Expand Down Expand Up @@ -216,12 +229,9 @@ impl AioBaseFuture {
};

// Release the kernel queue slot and the state variable that we just processed
match self.context.capacity.write() {
Ok(ref mut guard) => {
guard.state.push(self.state.take().unwrap());
}
Err(_) => panic!("TODO: Figure out how to handle this kind of error"),
}
let lock = &mut *self.context.capacity.write();

lock.state.push(self.state.take().unwrap());

// notify others that we release a state slot
self.context.have_capacity.release();
Expand Down Expand Up @@ -367,6 +377,9 @@ pub struct AioPollFuture {
// the context handle for retrieving AIO completions from the kernel
context: aio::aio_context_t,

// set of links to in_flight data
in_flight: std::sync::Arc<parking_lot::Mutex<fnv::FnvHashSet<usize>>>,

// the eventfd on which the kernel will notify I/O completions
eventfd: eventfd::EventFd,

Expand Down Expand Up @@ -410,19 +423,33 @@ impl futures::Future for AioPollFuture {
};

// dispatch the retrieved events to the associated futures
let in_flight = &mut *self.in_flight.lock();
for ref event in &self.events {
let request_state: &mut RequestState = unsafe { mem::transmute(event.data as usize) } ;
request_state
let addr = event.data as usize;
assert!(in_flight.remove(&addr));
let mut request_state: Box<RequestData> = unsafe { Box::from_raw(mem::transmute(addr)) };

let _ = request_state
.completed_sender
.take()
.unwrap()
.send(event.res)
.unwrap();
.send(event.res);
}
}
}
}

impl Drop for AioPollFuture {
fn drop(&mut self) {
let in_flight = &mut *self.in_flight.lock();
for addr in in_flight.drain() {
// delete all in_flight data which will never arrive from AIO (after termination of AioPollFuture)
let _: Box<RequestData> = unsafe { Box::from_raw(mem::transmute(addr)) };
};
}
}


// Shared state within AioContext that is backing I/O requests as represented by the individual futures.
#[derive(Debug)]
struct Capacity {
Expand All @@ -442,7 +469,6 @@ impl Capacity {
state.push(Box::new(RequestState {
request: unsafe { mem::zeroed() },
completed_receiver: receiver,
completed_sender: None,
}));
}

Expand All @@ -466,7 +492,7 @@ struct AioContextInner {
have_capacity: sync::Semaphore,

// pre-allocated eventfds and a capacity semaphore
capacity: std::sync::RwLock<Capacity>,
capacity: parking_lot::RwLock<Capacity>,

// handle for the spawned background task; dropping it will cancel the task
// we are using an Option value with delayed initialization to keep the generic
Expand All @@ -486,7 +512,7 @@ impl AioContextInner {

Ok(AioContextInner {
context,
capacity: std::sync::RwLock::new(Capacity::new(nr)?),
capacity: parking_lot::RwLock::new(Capacity::new(nr)?),
have_capacity: sync::Semaphore::new(nr),
completed_fd: fd,
poll_task_handle: None,
Expand All @@ -506,6 +532,9 @@ impl Drop for AioContextInner {
#[derive(Clone, Debug)]
pub struct AioContext {
inner: std::sync::Arc<AioContextInner>,

// set of links to in_flight data
in_flight: std::sync::Arc<parking_lot::Mutex<fnv::FnvHashSet<usize>>>,
}

/// Synchronization levels associated with I/O operations
Expand Down Expand Up @@ -535,11 +564,14 @@ impl AioContext {
let eventfd = eventfd::EventFd::create(0, false)?;
let fd = eventfd.evented.get_ref().fd;

let in_flight = std::sync::Arc::new(parking_lot::Mutex::new(fnv::FnvHashSet::<usize>::default()));

let mut inner = AioContextInner::new(fd, nr)?;
let context = inner.context;

let poll_future = AioPollFuture {
context,
in_flight: in_flight.clone(),
eventfd,
events: Vec::with_capacity(nr),
};
Expand All @@ -548,6 +580,7 @@ impl AioContext {

Ok(AioContext {
inner: std::sync::Arc::new(inner),
in_flight,
})
}

Expand Down Expand Up @@ -580,6 +613,7 @@ impl AioContext {
AioReadResultFuture {
base: AioBaseFuture {
context: self.inner.clone(),
in_flight: self.in_flight.clone(),
iocb_info: IocbInfo {
opcode: aio::IOCB_CMD_PREAD,
fd,
Expand Down Expand Up @@ -631,7 +665,7 @@ impl AioContext {
fd: RawFd,
offset: u64,
buffer_obj: ReadOnlyHandle,
sync_level: SyncLevel
sync_level: SyncLevel,
) -> AioWriteResultFuture<ReadOnlyHandle>
where
ReadOnlyHandle: convert::AsRef<[u8]>,
Expand All @@ -647,6 +681,7 @@ impl AioContext {
AioWriteResultFuture {
base: AioBaseFuture {
context: self.inner.clone(),
in_flight: self.in_flight.clone(),
iocb_info: IocbInfo {
opcode: aio::IOCB_CMD_PWRITE,
fd,
Expand Down Expand Up @@ -679,6 +714,7 @@ impl AioContext {
AioSyncResultFuture {
base: AioBaseFuture {
context: self.inner.clone(),
in_flight: self.in_flight.clone(),
iocb_info: IocbInfo {
opcode: aio::IOCB_CMD_FSYNC,
fd,
Expand Down Expand Up @@ -711,6 +747,7 @@ impl AioContext {
AioSyncResultFuture {
base: AioBaseFuture {
context: self.inner.clone(),
in_flight: self.in_flight.clone(),
iocb_info: IocbInfo {
opcode: aio::IOCB_CMD_FDSYNC,
fd,
Expand All @@ -732,8 +769,6 @@ impl AioContext {

#[cfg(test)]
mod tests {
use super::*;

use std::borrow::{Borrow, BorrowMut};
use std::env;
use std::fs;
Expand All @@ -742,14 +777,14 @@ mod tests {
use std::path;
use std::sync;

use futures_cpupool;
use libc::{close, O_DIRECT, O_RDWR, open};
use memmap;
use rand::Rng;

use tokio::executor::current_thread;
use tokio::prelude::*;

use memmap;
use futures_cpupool;

use libc::{close, open, O_DIRECT, O_RDWR};
use super::*;

const FILE_SIZE: u64 = 1024 * 512;

Expand Down Expand Up @@ -787,7 +822,7 @@ mod tests {
}

struct MemoryBlock {
bytes: sync::RwLock<memmap::MmapMut>,
bytes: parking_lot::RwLock<memmap::MmapMut>,
}

impl MemoryBlock {
Expand All @@ -798,7 +833,7 @@ mod tests {
MemoryBlock {
// for real uses, we'll have a buffer pool with locks associated with individual pages
// simplifying the logic here for test case development
bytes: sync::RwLock::new(map),
bytes: parking_lot::RwLock::new(map),
}
}
}
Expand All @@ -825,13 +860,13 @@ mod tests {

impl convert::AsRef<[u8]> for MemoryHandle {
fn as_ref(&self) -> &[u8] {
unsafe { mem::transmute(&(*self.block.bytes.read().unwrap())[..]) }
unsafe { mem::transmute(&(*self.block.bytes.read())[..]) }
}
}

impl convert::AsMut<[u8]> for MemoryHandle {
fn as_mut(&mut self) -> &mut [u8] {
unsafe { mem::transmute(&mut (*self.block.bytes.write().unwrap())[..]) }
unsafe { mem::transmute(&mut (*self.block.bytes.write())[..]) }
}
}

Expand Down Expand Up @@ -1019,6 +1054,43 @@ mod tests {
}
}

#[test]
fn future_cancellation() {
let file_name = temp_file_name();
create_temp_file(&file_name);

{
let owned_fd = OwnedFd::new_from_raw_fd(unsafe {
open(
mem::transmute(file_name.as_os_str().as_bytes().as_ptr()),
O_DIRECT | O_RDWR,
)
});
let fd = owned_fd.fd;

let mut rt = tokio::runtime::Runtime::new().unwrap();
let buffer = MemoryHandle::new();

{
let context = AioContext::new(&rt.executor(), 10).unwrap();
let read_future = context
.read(fd, 0, buffer)
.map(move |result_buffer| {
assert!(validate_block(result_buffer.as_ref()));
})
.map_err(|err| {
panic!("{:?}", err);
});

let result = rt.block_on(read_future.timeout(std::time::Duration::from_secs(0)));

assert!(result.is_err());
}
}

remove_file(&file_name);
}

/*
For some reason, this test does not pass on Travis. Need to research why the out-of-range
file offset does not trip an invalid argument error.
Expand Down
Loading