Skip to content

Commit f6f42c6

Browse files
authored
Merge pull request #21 from blackbeam/sync-opts-2
Fix #17
2 parents 54c0242 + a701da4 commit f6f42c6

File tree

4 files changed

+49
-15
lines changed

4 files changed

+49
-15
lines changed

src/conn/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,12 @@ mod test {
469469
builder
470470
}
471471

472+
#[test]
473+
fn opts_should_satisfy_send_and_sync() {
474+
struct A<T: Sync + Send>(T);
475+
A(get_opts());
476+
}
477+
472478
#[test]
473479
fn should_connect() {
474480
let mut lp = Core::new().unwrap();

src/connection_like/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,10 +388,13 @@ pub trait ConnectionLike {
388388
let fut = parse_local_infile_packet(&*packet.0)
389389
.chain_err(|| Error::from(ErrorKind::UnexpectedPacket))
390390
.and_then(|local_infile| match this.get_local_infile_handler() {
391-
Some(handler) => handler.handle(local_infile.file_name_ref()),
391+
Some(handler) => Ok((local_infile.into_owned(), handler)),
392392
None => Err(ErrorKind::NoLocalInfileHandler.into()),
393393
})
394394
.into_future()
395+
.and_then(|(local_infile, handler)| {
396+
handler.handle(local_infile.file_name_ref())
397+
})
395398
.and_then(|reader| {
396399
let mut buf = Vec::with_capacity(4096);
397400
unsafe {

src/local_infile_handler/builtin.rs

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
// option. All files in the project carrying such notice may not be copied,
77
// modified, or distributed except according to those terms.
88

9+
use BoxFuture;
10+
use lib_futures::{Future, IntoFuture, oneshot};
911
use std::fs;
1012
use std::io::{self, Read};
1113
use mio::{Evented, Poll, PollOpt, Ready, Registration, Token};
@@ -16,7 +18,7 @@ use errors::*;
1618
use std::collections::HashSet;
1719
use std::str::from_utf8;
1820
use super::LocalInfileHandler;
19-
use tokio::reactor::{Handle, PollEvented};
21+
use tokio::reactor::{Handle, Remote, PollEvented};
2022
use tokio_io::AsyncRead;
2123

2224
#[derive(Debug)]
@@ -163,7 +165,7 @@ impl Evented for File {
163165
#[derive(Clone, Debug)]
164166
pub struct WhiteListFsLocalInfileHandler {
165167
white_list: HashSet<PathBuf>,
166-
handle: Handle,
168+
handle: Remote,
167169
}
168170

169171
impl WhiteListFsLocalInfileHandler {
@@ -176,22 +178,45 @@ impl WhiteListFsLocalInfileHandler {
176178
for path in white_list.into_iter() {
177179
white_list_set.insert(Into::<PathBuf>::into(path));
178180
}
179-
WhiteListFsLocalInfileHandler { white_list: white_list_set, handle: handle.clone() }
181+
WhiteListFsLocalInfileHandler {
182+
white_list: white_list_set,
183+
handle: handle.remote().clone(),
184+
}
180185
}
181186
}
182187

183188
impl LocalInfileHandler for WhiteListFsLocalInfileHandler {
184-
fn handle(&self, file_name: &[u8]) -> Result<Box<AsyncRead>> {
189+
fn handle(&self, file_name: &[u8]) -> BoxFuture<Box<AsyncRead>> {
185190
let path: PathBuf = match from_utf8(file_name) {
186191
Ok(path_str) => path_str.into(),
187-
Err(_) => bail!("Invalid file name"),
192+
Err(_) => return Box::new(Err("Invalid file name".into()).into_future()),
188193
};
189194
if self.white_list.contains(&path) {
190-
Ok(Box::new(
191-
PollEvented::new(File::new(path), &self.handle)?,
192-
) as Box<AsyncRead>)
195+
match self.handle.handle() {
196+
Some(handle) => {
197+
let fut = PollEvented::new(File::new(path), &handle)
198+
.map_err(Into::into)
199+
.map(|poll_evented| Box::new(poll_evented) as Box<AsyncRead>)
200+
.into_future();
201+
Box::new(fut) as BoxFuture<Box<AsyncRead>>
202+
},
203+
None => {
204+
let (tx, rx) = oneshot();
205+
self.handle.spawn(|handle| {
206+
let poll_evented_res = PollEvented::new(File::new(path), &handle);
207+
let _ = tx.send(poll_evented_res.map_err(Error::from));
208+
Ok(())
209+
});
210+
let fut = rx
211+
.map_err(|_| Error::from("Future Canceled"))
212+
.and_then(|r| r.into_future())
213+
.map(|poll_evented| Box::new(poll_evented) as Box<AsyncRead>);
214+
Box::new(fut) as BoxFuture<Box<AsyncRead>>
215+
}
216+
}
193217
} else {
194-
bail!(format!("Path `{}' is not in white list", path.display()));
218+
let err_msg = format!("Path `{}' is not in white list", path.display());
219+
return Box::new(Err(err_msg.into()).into_future());
195220
}
196221
}
197222
}

src/local_infile_handler/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
// option. All files in the project carrying such notice may not be copied,
77
// modified, or distributed except according to those terms.
88

9-
use errors::*;
9+
use BoxFuture;
1010
use std::fmt;
1111
use std::sync::Arc;
1212
use tokio_io::AsyncRead;
@@ -32,8 +32,8 @@ pub mod builtin;
3232
/// struct ExampleHandler(&'static [u8]);
3333
///
3434
/// impl LocalInfileHandler for ExampleHandler {
35-
/// fn handle(&self, _: &[u8]) -> my::errors::Result<Box<AsyncRead>> {
36-
/// Ok(Box::new(self.0))
35+
/// fn handle(&self, _: &[u8]) -> Box<Future<Item=Box<AsyncRead>, Error=my::errors::Error>> {
36+
/// Box::new(futures::future::ok(Box::new(self.0) as Box<AsyncRead>))
3737
/// }
3838
/// }
3939
///
@@ -68,10 +68,10 @@ pub mod builtin;
6868
/// lp.run(future).unwrap();
6969
/// # }
7070
/// ```
71-
pub trait LocalInfileHandler {
71+
pub trait LocalInfileHandler: Sync + Send {
7272
/// `file_name` is the file name in `LOAD DATA LOCAL INFILE '<file name>' INTO TABLE ...;`
7373
/// query.
74-
fn handle(&self, file_name: &[u8]) -> Result<Box<AsyncRead>>;
74+
fn handle(&self, file_name: &[u8]) -> BoxFuture<Box<AsyncRead>>;
7575
}
7676

7777
/// Object used to wrap `T: LocalInfileHandler` inside of Opts.

0 commit comments

Comments
 (0)