diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java index 77211b6db..dd1992597 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java @@ -198,6 +198,23 @@ private class ContentReadChannel { // Size of buffer to allocate for skipping bytes in-place when performing in-place seeks. private static final int SKIP_BUFFER_SIZE = 8192; + + private static final String FORMATTED_EOF_ERROR_MESSAGE = + "Unexpected EndOfStream detected at, (" + + "beginDstPosition: %d, " + + "beginDstLimit: %d, " + + "beginCurrentPosition: %d, " + + "beginContentChannelCurrentPosition: %d, " + + "beginContentChannelEnd: %d, " + + "remainingBeforeRead: %d, " + + "currentPosition: %d, " + + "contentChannelCurrentPosition: %d, " + + "currentDstPosition: %d, " + + "currentDstLimit: %d, " + + "totalBytesRead: %d, " + + "expectedContentChannelEnd: %d, " + + "objectSize: %d, " + + "resourceId: %s)."; private final BlobId blobId; // This is the actual current position in `contentChannel` from where read can happen. @@ -234,11 +251,16 @@ public int readContent(ByteBuffer dst) throws IOException { currentPosition); int totalBytesRead = 0; + final int beginDstPosition = dst.position(); + final int beginDstLimit = dst.limit(); + final long beginCurrentPosition = currentPosition; + final long beginContentChannelCurrentPosition = contentChannelCurrentPosition; + final long beginContentChannelEnd = contentChannelEnd; // We read from a streaming source. We may not get all the bytes we asked for // in the first read. Therefore, loop till we either read the required number of // bytes or we reach end-of-stream. while (dst.hasRemaining()) { - int remainingBeforeRead = dst.remaining(); + final int remainingBeforeRead = dst.remaining(); try { if (byteChannel == null) { byteChannel = openByteChannel(dst.remaining()); @@ -277,13 +299,66 @@ public int readContent(ByteBuffer dst) throws IOException { contentChannelEnd = currentPosition; } - if (currentPosition != contentChannelEnd && currentPosition != objectSize) { + if (currentPosition < contentChannelEnd && currentPosition < objectSize) { + GoogleCloudStorageEventBus.postOnException(); throw new IOException( String.format( "Received end of stream result before all requestedBytes were received;" - + "EndOf stream signal received at offset: %d where as stream was suppose to end at: %d for resource: %s of size: %d", - currentPosition, contentChannelEnd, resourceId, objectSize)); + + FORMATTED_EOF_ERROR_MESSAGE, + beginDstPosition, + beginDstLimit, + beginCurrentPosition, + beginContentChannelCurrentPosition, + beginContentChannelEnd, + remainingBeforeRead, + currentPosition, + contentChannelCurrentPosition, + dst.position(), + dst.limit(), + totalBytesRead, + contentChannelEnd, + objectSize, + resourceId)); + } else if (currentPosition > objectSize) { + throw new IOException( + String.format( + "Received end of stream result beyond the object size;" + + FORMATTED_EOF_ERROR_MESSAGE, + beginDstPosition, + beginDstLimit, + beginCurrentPosition, + beginContentChannelCurrentPosition, + beginContentChannelEnd, + remainingBeforeRead, + currentPosition, + contentChannelCurrentPosition, + dst.position(), + dst.limit(), + totalBytesRead, + contentChannelEnd, + objectSize, + resourceId)); + } else if (currentPosition > contentChannelEnd) { + logger.atWarning().log( + "Received end of stream result after the channel end;" + + FORMATTED_EOF_ERROR_MESSAGE, + beginDstPosition, + beginDstLimit, + beginCurrentPosition, + beginContentChannelCurrentPosition, + beginContentChannelEnd, + remainingBeforeRead, + currentPosition, + contentChannelCurrentPosition, + dst.position(), + dst.limit(), + totalBytesRead, + contentChannelEnd, + objectSize, + resourceId); + closeContentChannel(); + continue; } // If we have reached an end of a contentChannel but not an end of an object. // then close contentChannel and continue reading an object if necessary. @@ -345,6 +420,11 @@ private ReadableByteChannel openByteChannel(long bytesToRead) throws IOException ReadableByteChannel readableByteChannel = getStorageReadChannel(contentChannelCurrentPosition, contentChannelEnd); + // Logging at warning so that we don't need to change log level. + logger.atWarning().log( + "Storage ReadChannel opened at, contentChannelCurrentPosition: %d, contentChannelEnd: %d", + contentChannelCurrentPosition, contentChannelEnd); + if (contentChannelEnd == objectSize && (contentChannelEnd - contentChannelCurrentPosition) <= readOptions.getMinRangeRequestSize()) { diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/FakeReadChannel.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/FakeReadChannel.java index 553793566..e1336b021 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/FakeReadChannel.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/FakeReadChannel.java @@ -35,7 +35,9 @@ public static enum REQUEST_TYPE { READ_EXCEPTION, PARTIAL_READ, NEGATIVE_READ, - ZERO_READ + ZERO_READ, + MORE_THAN_CHANNEL_LENGTH, + MORE_THAN_OBJECT_SIZE } private final List orderRequestsList; @@ -119,6 +121,12 @@ public int read(ByteBuffer dst) throws IOException { case NEGATIVE_READ: bytesRead = -1; break; + case MORE_THAN_CHANNEL_LENGTH: + bytesRead = toIntExact(limit + 1); + break; + case MORE_THAN_OBJECT_SIZE: + bytesRead = content.size() + 1; + break; case READ_EXCEPTION: throw new IOException("Exception occurred in read"); case PARTIAL_READ: diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannelTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannelTest.java index baec24438..641527f9c 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannelTest.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannelTest.java @@ -576,6 +576,38 @@ public void requestRangeOverlapWithCachedFooter() throws IOException { assertThat(limitValue.getAllValues().get(1)).isEqualTo(startPosition + 1); } + @Test + public void readBeyondChannelLength() throws IOException { + int bufferSize = 100; + fakeReadChannel = + spy(new FakeReadChannel(CONTENT, ImmutableList.of(REQUEST_TYPE.MORE_THAN_CHANNEL_LENGTH))); + when(mockedStorage.reader(any(), any())).thenReturn(fakeReadChannel); + readChannel = getJavaStorageChannel(DEFAULT_ITEM_INFO, DEFAULT_READ_OPTION); + + int startPosition = 0; + readChannel.position(startPosition); + + assertThat(readChannel.read(ByteBuffer.allocate(bufferSize))) + .isEqualTo(bufferSize + CHUNK_SIZE + 1); + } + + @Test + public void readBeyondObjectSize() throws IOException { + fakeReadChannel = + spy(new FakeReadChannel(CONTENT, ImmutableList.of(REQUEST_TYPE.MORE_THAN_OBJECT_SIZE))); + when(mockedStorage.reader(any(), any())).thenReturn(fakeReadChannel); + readChannel = getJavaStorageChannel(DEFAULT_ITEM_INFO, DEFAULT_READ_OPTION); + + int startPosition = 0; + readChannel.position(startPosition); + IOException e = + assertThrows(IOException.class, () -> readChannel.read(ByteBuffer.allocate(CHUNK_SIZE))); + + assertThat(e) + .hasMessageThat() + .contains("Received end of stream result beyond the object size;"); + } + private void verifyContent(ByteBuffer buffer, int startPosition, int length) { assertThat(buffer.position()).isEqualTo(length); assertByteArrayEquals(