Skip to content

Commit

Permalink
Fix bug in buf_channel::consume() where exact size doesn't receive eof (
Browse files Browse the repository at this point in the history
#858)

In the event the receiver is receiving the exact size of data, but
the writer sends all the data, but not an eof then the writer
decides not to send the eof the receiver will consider it "good".
This issue has been fixed by this PR.
  • Loading branch information
allada authored Apr 18, 2024
1 parent b2b83df commit 5583a5d
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 14 deletions.
9 changes: 5 additions & 4 deletions nativelink-store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ mod filesystem_store_tests {

const HASH1: &str = "0123456789abcdef000000000000000000010000000000000123456789abcdef";
const HASH2: &str = "0123456789abcdef000000000000000000020000000000000123456789abcdef";
const VALUE1: &str = "1234";
const VALUE2: &str = "4321";
const VALUE1: &str = "0123456789";
const VALUE2: &str = "9876543210";

#[tokio::test]
async fn valid_results_after_shutdown_test() -> Result<(), Error> {
Expand Down Expand Up @@ -814,8 +814,9 @@ mod filesystem_store_tests {

#[tokio::test]
async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> {
const SMALL_VALUE: &str = "01";
const BIG_VALUE: &str = "0123";
let small_digest = DigestInfo::try_new(HASH1, VALUE1.len())?;
let small_digest = DigestInfo::try_new(HASH1, SMALL_VALUE.len())?;
let big_digest = DigestInfo::try_new(HASH1, BIG_VALUE.len())?;

static UNREFED_DIGESTS: Lazy<Mutex<Vec<DigestInfo>>> = Lazy::new(|| Mutex::new(Vec::new()));
Expand Down Expand Up @@ -851,7 +852,7 @@ mod filesystem_store_tests {
// Insert data into store.
store
.as_ref()
.update_oneshot(small_digest, VALUE1.into())
.update_oneshot(small_digest, SMALL_VALUE.into())
.await?;
store
.as_ref()
Expand Down
9 changes: 4 additions & 5 deletions nativelink-util/src/buf_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl DropCloserReadHalf {
if !self.eof_sent.load(Ordering::Acquire) {
return Err(make_err!(
Code::Internal,
"EOF received before sending EOF; sender was probably dropped"
"Sender dropped before sending EOF"
));
}
self.maybe_populate_recent_data(&ZERO_DATA);
Expand Down Expand Up @@ -323,14 +323,13 @@ impl DropCloserReadHalf {
if chunk.len() > size {
let remaining = chunk.split_off(size);
self.queued_data.push_front(Ok(remaining));
}
if chunk.len() == size {
// No need to read EOF if we are a partial chunk.
return Ok(chunk);
}
// If we are a partial chunk and our next chunk is EOF, we are done.
// Try to read our EOF to ensure our sender did not error out.
match self.peek().await {
Ok(peeked_chunk) => {
if peeked_chunk.is_empty() {
if peeked_chunk.is_empty() || chunk.len() == size {
return Ok(chunk);
}
}
Expand Down
30 changes: 25 additions & 5 deletions nativelink-util/tests/buf_channel_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::task::Poll;

use bytes::{Bytes, BytesMut};
use futures::poll;
use nativelink_error::{make_err, Code, Error, ResultExt};
use nativelink_util::buf_channel::make_buf_channel_pair;
use tokio::try_join;
Expand Down Expand Up @@ -67,7 +70,7 @@ mod buf_channel_tests {
}

#[tokio::test]
async fn consume_test() -> Result<(), Error> {
async fn consume_all_test() -> Result<(), Error> {
let (mut tx, mut rx) = make_buf_channel_pair();
let tx_fut = async move {
tx.send(DATA1.into()).await?;
Expand All @@ -91,7 +94,7 @@ mod buf_channel_tests {
/// Test to ensure data is optimized so that the exact same pointer is received
/// when calling `collect_all_with_size_hint` when a copy is not needed.
#[tokio::test]
async fn consume_is_optimized_test() -> Result<(), Error> {
async fn consume_all_is_optimized_test() -> Result<(), Error> {
let (mut tx, mut rx) = make_buf_channel_pair();
let sent_data = Bytes::from(DATA1);
let send_data_ptr = sent_data.as_ptr();
Expand All @@ -112,7 +115,7 @@ mod buf_channel_tests {
}

#[tokio::test]
async fn take_test() -> Result<(), Error> {
async fn consume_some_test() -> Result<(), Error> {
let (mut tx, mut rx) = make_buf_channel_pair();
let tx_fut = async move {
tx.send(DATA1.into()).await?;
Expand All @@ -139,7 +142,7 @@ mod buf_channel_tests {
/// we don't need to concat the data together and instead return a view to
/// the original data instead of making a copy.
#[tokio::test]
async fn take_optimized_test() -> Result<(), Error> {
async fn consume_some_optimized_test() -> Result<(), Error> {
let (mut tx, mut rx) = make_buf_channel_pair();
let first_chunk = Bytes::from(DATA1);
let first_chunk_ptr = first_chunk.as_ptr();
Expand All @@ -159,6 +162,23 @@ mod buf_channel_tests {
Ok(())
}

#[tokio::test]
async fn consume_some_reads_eof() -> Result<(), Error> {
let (mut tx, mut rx) = make_buf_channel_pair();
tx.send(DATA1.into()).await?;

let consume_fut = rx.consume(Some(DATA1.len()));
tokio::pin!(consume_fut);
assert_eq!(
poll!(&mut consume_fut),
Poll::Pending,
"Consume should not have completed yet"
);
tx.send_eof()?;
assert_eq!(consume_fut.await?, Bytes::from(DATA1));
Ok(())
}

#[tokio::test]
async fn simple_stream_test() -> Result<(), Error> {
use futures::StreamExt;
Expand Down Expand Up @@ -245,7 +265,7 @@ mod buf_channel_tests {
rx.recv().await,
Err(make_err!(
Code::Internal,
"EOF received before sending EOF; sender was probably dropped"
"Sender dropped before sending EOF"
))
);
Result::<(), Error>::Ok(())
Expand Down

0 comments on commit 5583a5d

Please sign in to comment.