Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in buf_channel::consume() where exact size doesn't receive eof #858

Merged
merged 1 commit into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions nativelink-store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ mod filesystem_store_tests {

const HASH1: &str = "0123456789abcdef000000000000000000010000000000000123456789abcdef";
const HASH2: &str = "0123456789abcdef000000000000000000020000000000000123456789abcdef";
const VALUE1: &str = "1234";
const VALUE2: &str = "4321";
const VALUE1: &str = "0123456789";
const VALUE2: &str = "9876543210";

#[tokio::test]
async fn valid_results_after_shutdown_test() -> Result<(), Error> {
Expand Down Expand Up @@ -814,8 +814,9 @@ mod filesystem_store_tests {

#[tokio::test]
async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> {
const SMALL_VALUE: &str = "01";
const BIG_VALUE: &str = "0123";
let small_digest = DigestInfo::try_new(HASH1, VALUE1.len())?;
let small_digest = DigestInfo::try_new(HASH1, SMALL_VALUE.len())?;
let big_digest = DigestInfo::try_new(HASH1, BIG_VALUE.len())?;

static UNREFED_DIGESTS: Lazy<Mutex<Vec<DigestInfo>>> = Lazy::new(|| Mutex::new(Vec::new()));
Expand Down Expand Up @@ -851,7 +852,7 @@ mod filesystem_store_tests {
// Insert data into store.
store
.as_ref()
.update_oneshot(small_digest, VALUE1.into())
.update_oneshot(small_digest, SMALL_VALUE.into())
.await?;
store
.as_ref()
Expand Down
9 changes: 4 additions & 5 deletions nativelink-util/src/buf_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl DropCloserReadHalf {
if !self.eof_sent.load(Ordering::Acquire) {
return Err(make_err!(
Code::Internal,
"EOF received before sending EOF; sender was probably dropped"
"Sender dropped before sending EOF"
));
}
self.maybe_populate_recent_data(&ZERO_DATA);
Expand Down Expand Up @@ -323,14 +323,13 @@ impl DropCloserReadHalf {
if chunk.len() > size {
let remaining = chunk.split_off(size);
self.queued_data.push_front(Ok(remaining));
}
if chunk.len() == size {
// No need to read EOF if we are a partial chunk.
return Ok(chunk);
}
// If we are a partial chunk and our next chunk is EOF, we are done.
// Try to read our EOF to ensure our sender did not error out.
match self.peek().await {
Ok(peeked_chunk) => {
if peeked_chunk.is_empty() {
if peeked_chunk.is_empty() || chunk.len() == size {
return Ok(chunk);
}
}
Expand Down
30 changes: 25 additions & 5 deletions nativelink-util/tests/buf_channel_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::task::Poll;

use bytes::{Bytes, BytesMut};
use futures::poll;
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_util::buf_channel::make_buf_channel_pair;
use tokio::try_join;
Expand Down Expand Up @@ -67,7 +70,7 @@ mod buf_channel_tests {
}

#[tokio::test]
async fn consume_test() -> Result<(), Error> {
async fn consume_all_test() -> Result<(), Error> {
let (mut tx, mut rx) = make_buf_channel_pair();
let tx_fut = async move {
tx.send(DATA1.into()).await?;
Expand All @@ -91,7 +94,7 @@ mod buf_channel_tests {
/// Test to ensure data is optimized so that the exact same pointer is received
/// when calling `collect_all_with_size_hint` when a copy is not needed.
#[tokio::test]
async fn consume_is_optimized_test() -> Result<(), Error> {
async fn consume_all_is_optimized_test() -> Result<(), Error> {
let (mut tx, mut rx) = make_buf_channel_pair();
let sent_data = Bytes::from(DATA1);
let send_data_ptr = sent_data.as_ptr();
Expand All @@ -112,7 +115,7 @@ mod buf_channel_tests {
}

#[tokio::test]
async fn take_test() -> Result<(), Error> {
async fn consume_some_test() -> Result<(), Error> {
let (mut tx, mut rx) = make_buf_channel_pair();
let tx_fut = async move {
tx.send(DATA1.into()).await?;
Expand All @@ -139,7 +142,7 @@ mod buf_channel_tests {
/// we don't need to concat the data together and instead return a view to
/// the original data instead of making a copy.
#[tokio::test]
async fn take_optimized_test() -> Result<(), Error> {
async fn consume_some_optimized_test() -> Result<(), Error> {
let (mut tx, mut rx) = make_buf_channel_pair();
let first_chunk = Bytes::from(DATA1);
let first_chunk_ptr = first_chunk.as_ptr();
Expand All @@ -159,6 +162,23 @@ mod buf_channel_tests {
Ok(())
}

#[tokio::test]
async fn consume_some_reads_eof() -> Result<(), Error> {
let (mut tx, mut rx) = make_buf_channel_pair();
tx.send(DATA1.into()).await?;

let consume_fut = rx.consume(Some(DATA1.len()));
tokio::pin!(consume_fut);
assert_eq!(
poll!(&mut consume_fut),
Poll::Pending,
"Consume should not have completed yet"
);
tx.send_eof()?;
assert_eq!(consume_fut.await?, Bytes::from(DATA1));
Ok(())
}

#[tokio::test]
async fn simple_stream_test() -> Result<(), Error> {
use futures::StreamExt;
Expand Down Expand Up @@ -245,7 +265,7 @@ mod buf_channel_tests {
rx.recv().await,
Err(make_err!(
Code::Internal,
"EOF received before sending EOF; sender was probably dropped"
"Sender dropped before sending EOF"
))
);
Result::<(), Error>::Ok(())
Expand Down