Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 73 additions & 23 deletions gio/src/input_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ pub trait InputStreamExtManual: IsA<InputStream> + Sized {
&self,
mut buffer: B,
cancellable: Option<&C>,
) -> Result<(usize, Option<glib::Error>), glib::Error> {
) -> Result<(B, usize), (B, usize, glib::Error)> {
let cancellable = cancellable.map(|c| c.as_ref());
let gcancellable = cancellable.to_glib_none();
let buffer = buffer.as_mut();
let buffer_ptr = buffer.as_mut_ptr();
let count = buffer.len();
let (count, buffer_ptr) = {
let buffer = buffer.as_mut();
(buffer.len(), buffer.as_mut_ptr())
};
unsafe {
let mut bytes_read = mem::MaybeUninit::uninit();
let mut error = ptr::null_mut();
Expand All @@ -62,19 +63,17 @@ pub trait InputStreamExtManual: IsA<InputStream> + Sized {

let bytes_read = bytes_read.assume_init();
if error.is_null() {
Ok((bytes_read, None))
} else if bytes_read != 0 {
Ok((bytes_read, Some(from_glib_full(error))))
Ok((buffer, bytes_read))
} else {
Err(from_glib_full(error))
Err((buffer, bytes_read, from_glib_full(error)))
}
}
}

#[doc(alias = "g_input_stream_read_all_async")]
fn read_all_async<
B: AsMut<[u8]> + Send + 'static,
Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
Q: FnOnce(Result<(B, usize), (B, usize, glib::Error)>) + 'static,
C: IsA<Cancellable>,
>(
&self,
Expand Down Expand Up @@ -105,7 +104,7 @@ pub trait InputStreamExtManual: IsA<InputStream> + Sized {
};
unsafe extern "C" fn read_all_async_trampoline<
B: AsMut<[u8]> + Send + 'static,
Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
Q: FnOnce(Result<(B, usize), (B, usize, glib::Error)>) + 'static,
>(
_source_object: *mut glib::gobject_ffi::GObject,
res: *mut ffi::GAsyncResult,
Expand All @@ -128,11 +127,9 @@ pub trait InputStreamExtManual: IsA<InputStream> + Sized {

let bytes_read = bytes_read.assume_init();
let result = if error.is_null() {
Ok((buffer, bytes_read, None))
} else if bytes_read != 0 {
Ok((buffer, bytes_read, Some(from_glib_full(error))))
Ok((buffer, bytes_read))
} else {
Err((buffer, from_glib_full(error)))
Err((buffer, bytes_read, from_glib_full(error)))
};

callback(result);
Expand Down Expand Up @@ -231,9 +228,7 @@ pub trait InputStreamExtManual: IsA<InputStream> + Sized {
io_priority: Priority,
) -> Pin<
Box<
dyn std::future::Future<
Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
> + 'static,
dyn std::future::Future<Output = Result<(B, usize), (B, usize, glib::Error)>> + 'static,
>,
> {
Box::pin(crate::GioFuture::new(
Expand Down Expand Up @@ -569,29 +564,84 @@ mod tests {
);
});

let (buf, count, err) = ret.unwrap();
let (buf, count) = ret.unwrap();
assert_eq!(count, 3);
assert!(err.is_none());
assert_eq!(buf[0], 1);
assert_eq!(buf[1], 2);
assert_eq!(buf[2], 3);
}

#[test]
fn read_all_future() {
let c = glib::MainContext::new();
let b = Bytes::from_owned(vec![1, 2, 3]);
let strm = MemoryInputStream::from_bytes(&b);

let (buf, count) = c
.block_on(strm.read_all_future(vec![0; 10], glib::Priority::default()))
.unwrap();

assert_eq!(count, 3);
assert_eq!(buf[0], 1);
assert_eq!(buf[1], 2);
assert_eq!(buf[2], 3);
}

#[test]
fn read_all_async_cancelled() {
let ret = run_async(|tx, l| {
let b = Bytes::from_owned(vec![1, 2, 3]);
let strm = MemoryInputStream::from_bytes(&b);
let cancellable = crate::Cancellable::new();
cancellable.cancel();

let buf = vec![0; 10];
strm.read_all_async(
buf,
glib::Priority::DEFAULT_IDLE,
Some(&cancellable),
move |ret| {
tx.send(ret).unwrap();
l.quit();
},
);
});

let (buf, count, err) = ret.unwrap_err();
assert_eq!(count, 0);
assert_eq!(buf, vec![0; 10]);
assert!(err.matches::<crate::IOErrorEnum>(crate::IOErrorEnum::Cancelled));
}

#[test]
fn read_all() {
let b = Bytes::from_owned(vec![1, 2, 3]);
let strm = MemoryInputStream::from_bytes(&b);
let mut buf = vec![0; 10];

let ret = strm.read_all(&mut buf, crate::Cancellable::NONE).unwrap();
let (buf, count) = strm
.read_all(vec![0; 10], crate::Cancellable::NONE)
.unwrap();

assert_eq!(ret.0, 3);
assert!(ret.1.is_none());
assert_eq!(count, 3);
assert_eq!(buf[0], 1);
assert_eq!(buf[1], 2);
assert_eq!(buf[2], 3);
}

#[test]
fn read_all_cancelled() {
let b = Bytes::from_owned(vec![1, 2, 3]);
let strm = MemoryInputStream::from_bytes(&b);
let cancellable = crate::Cancellable::new();
cancellable.cancel();

let (buf, count, err) = strm.read_all(vec![0; 10], Some(&cancellable)).unwrap_err();

assert_eq!(count, 0);
assert_eq!(buf, vec![0; 10]);
assert!(err.matches::<crate::IOErrorEnum>(crate::IOErrorEnum::Cancelled));
}

#[test]
fn read() {
let b = Bytes::from_owned(vec![1, 2, 3]);
Expand Down
55 changes: 44 additions & 11 deletions gio/src/output_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub trait OutputStreamExtManual: IsA<OutputStream> + Sized {
#[doc(alias = "g_output_stream_write_all_async")]
fn write_all_async<
B: AsRef<[u8]> + Send + 'static,
Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
Q: FnOnce(Result<(B, usize), (B, usize, glib::Error)>) + 'static,
C: IsA<Cancellable>,
>(
&self,
Expand Down Expand Up @@ -146,7 +146,7 @@ pub trait OutputStreamExtManual: IsA<OutputStream> + Sized {
};
unsafe extern "C" fn write_all_async_trampoline<
B: AsRef<[u8]> + Send + 'static,
Q: FnOnce(Result<(B, usize, Option<glib::Error>), (B, glib::Error)>) + 'static,
Q: FnOnce(Result<(B, usize), (B, usize, glib::Error)>) + 'static,
>(
_source_object: *mut glib::gobject_ffi::GObject,
res: *mut ffi::GAsyncResult,
Expand All @@ -168,11 +168,9 @@ pub trait OutputStreamExtManual: IsA<OutputStream> + Sized {
);
let bytes_written = bytes_written.assume_init();
let result = if error.is_null() {
Ok((buffer, bytes_written, None))
} else if bytes_written != 0 {
Ok((buffer, bytes_written, from_glib_full(error)))
Ok((buffer, bytes_written))
} else {
Err((buffer, from_glib_full(error)))
Err((buffer, bytes_written, from_glib_full(error)))
};
callback(result);
}
Expand Down Expand Up @@ -213,9 +211,7 @@ pub trait OutputStreamExtManual: IsA<OutputStream> + Sized {
io_priority: Priority,
) -> Pin<
Box<
dyn std::future::Future<
Output = Result<(B, usize, Option<glib::Error>), (B, glib::Error)>,
> + 'static,
dyn std::future::Future<Output = Result<(B, usize), (B, usize, glib::Error)>> + 'static,
>,
> {
Box::pin(crate::GioFuture::new(
Expand Down Expand Up @@ -666,10 +662,47 @@ mod tests {
);
});

let (buf, size, err) = ret.unwrap();
let (buf, size) = ret.unwrap();
assert_eq!(buf, vec![1, 2, 3]);
assert_eq!(size, 3);
assert!(err.is_none());
}

#[test]
fn write_all_future() {
let c = glib::MainContext::new();
let strm = MemoryOutputStream::new_resizable();

let (buf, size) = c
.block_on(strm.write_all_future(vec![1, 2, 3], glib::Priority::default()))
.unwrap();

assert_eq!(buf, vec![1, 2, 3]);
assert_eq!(size, 3);
}

#[test]
fn write_all_async_cancelled() {
let ret = run_async(|tx, l| {
let strm = MemoryOutputStream::new_resizable();
let cancellable = crate::Cancellable::new();
cancellable.cancel();

let buf = vec![1, 2, 3];
strm.write_all_async(
buf,
glib::Priority::DEFAULT_IDLE,
Some(&cancellable),
move |ret| {
tx.send(ret).unwrap();
l.quit();
},
);
});

let (buf, size, err) = ret.unwrap_err();
assert_eq!(buf, vec![1, 2, 3]);
assert_eq!(size, 0);
assert!(err.matches::<crate::IOErrorEnum>(crate::IOErrorEnum::Cancelled));
}

#[test]
Expand Down
Loading