diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java index 77211b6db..8559a4523 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java @@ -25,7 +25,6 @@ import static java.lang.Math.min; import static java.lang.Math.toIntExact; -import com.google.cloud.ReadChannel; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.Fadvise; import com.google.cloud.hadoop.util.ErrorTypeExtractor; import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus; @@ -60,6 +59,7 @@ class GoogleCloudStorageClientReadChannel implements SeekableByteChannel { private final GoogleCloudStorageReadOptions readOptions; private final GoogleCloudStorageOptions storageOptions; private final Storage storage; + private final BlobSourceOption[] storageReadOptions; // The size of this object generation, in bytes. private long objectSize; private final ErrorTypeExtractor errorExtractor; @@ -88,6 +88,7 @@ public GoogleCloudStorageClientReadChannel( this.storageOptions = storageOptions; this.contentReadChannel = new ContentReadChannel(readOptions, resourceId); initMetadata(itemInfo.getContentEncoding(), itemInfo.getSize()); + this.storageReadOptions = generateReadOptions(resourceId); } protected void initMetadata(@Nullable String encoding, long sizeFromMetadata) throws IOException { @@ -531,44 +532,31 @@ private void performPendingSeeks() { } private ReadableByteChannel getStorageReadChannel(long seek, long limit) throws IOException { - ReadChannel readChannel = storage.reader(blobId, generateReadOptions(blobId)); - try { - readChannel.seek(seek); - readChannel.limit(limit); - // bypass the storage-client caching layer hence eliminates the need to maintain a copy of - // chunk - readChannel.setChunkSize(0); - return readChannel; - } catch (Exception e) { - GoogleCloudStorageEventBus.postOnException(); - throw new IOException( - String.format( - "Unable to update the boundaries/Range of contentChannel %s. cause=%s", - resourceId, e), - e); - } + return RangeValidatingReadableByteChannel.of(storage, blobId, seek, limit, + storageReadOptions); } - private BlobSourceOption[] generateReadOptions(BlobId blobId) { - List blobReadOptions = new ArrayList<>(); - // To get decoded content - blobReadOptions.add(BlobSourceOption.shouldReturnRawInputStream(false)); - - if (blobId.getGeneration() != null) { - blobReadOptions.add(BlobSourceOption.generationMatch(blobId.getGeneration())); - } - if (storageOptions.getEncryptionKey() != null) { - blobReadOptions.add( - BlobSourceOption.decryptionKey(storageOptions.getEncryptionKey().value())); - } - return blobReadOptions.toArray(new BlobSourceOption[blobReadOptions.size()]); - } private boolean isFooterRead() { return objectSize - currentPosition <= readOptions.getMinRangeRequestSize(); } } + private BlobSourceOption[] generateReadOptions(StorageResourceId blobId) { + List blobReadOptions = new ArrayList<>(); + // enable transparent gzip-decompression + blobReadOptions.add(BlobSourceOption.shouldReturnRawInputStream(false)); + + if (blobId.getGenerationId() > StorageResourceId.UNKNOWN_GENERATION_ID) { + blobReadOptions.add(BlobSourceOption.generationMatch(blobId.getGenerationId())); + } + if (storageOptions.getEncryptionKey() != null) { + blobReadOptions.add( + BlobSourceOption.decryptionKey(storageOptions.getEncryptionKey().value())); + } + return blobReadOptions.toArray(new BlobSourceOption[0]); + } + @VisibleForTesting boolean randomAccessStatus() { return contentReadChannel.fileAccessManager.shouldAdaptToRandomAccess(); diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/RangeValidatingReadableByteChannel.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/RangeValidatingReadableByteChannel.java new file mode 100644 index 000000000..16c598dff --- /dev/null +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/RangeValidatingReadableByteChannel.java @@ -0,0 +1,126 @@ +package com.google.cloud.hadoop.gcsio; + +import com.google.cloud.ReadChannel; +import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.Storage.BlobSourceOption; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.flogger.GoogleLogger; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.util.Locale; + +final class RangeValidatingReadableByteChannel implements ReadableByteChannel { + + private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); + + private final String resourceId; + private final long beginOffset; + private final long endOffset; + private final ReadableByteChannel delegate; + + private long position; + + /** + * @param endOffset expected to be <= to the length of the object -- this class does not possess + * the capability of clamping the end offset to the object size + */ + private RangeValidatingReadableByteChannel( + String resourceId, long beginOffset, long endOffset, ReadableByteChannel delegate) { + this.resourceId = resourceId; + this.beginOffset = beginOffset; + this.endOffset = endOffset; + this.position = beginOffset; + this.delegate = delegate; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + int expectedMaxRead; + long expectedChannelRemaining = endOffset - position; + if (expectedChannelRemaining < dst.remaining()) { + expectedMaxRead = Math.toIntExact(expectedChannelRemaining); + } else { + expectedMaxRead = dst.remaining(); + } + int read = delegate.read(dst); + if (read > -1) { + position += read; + if (read > expectedMaxRead) { + // over-read + logger.atWarning().log( + "over-read of object %s detected. " + + "Channel opened with {beginOffset: %s, endOffset: %s}, " + + "over-read {position: %s, expectedMaxRead: %s, read: %s} (read %s additional bytes)", + resourceId, + beginOffset, + endOffset, + position, + expectedMaxRead, + read, + read - expectedMaxRead); + } + } else { + if (position < endOffset) { + // in reality, if this were to ever happen it should be handled as a retryable error, where + // the next read would begin at the current offset/position. + // under-read + throw new IOException( + String.format( + Locale.US, + "under-read of object %s detected. " + + "Channel opened with {beginOffset: %s, endOffset: %s}, " + + "EOF detected at position: %s (missing %s bytes)", + resourceId, + beginOffset, + endOffset, + position, + expectedChannelRemaining)); + } + } + + return read; + } + + @Override + public boolean isOpen() { + return delegate.isOpen(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @VisibleForTesting + static RangeValidatingReadableByteChannel of( + String resourceId, long beginOffset, long endOffset, ReadableByteChannel delegate) { + return new RangeValidatingReadableByteChannel(resourceId, beginOffset, endOffset, delegate); + } + + static RangeValidatingReadableByteChannel of( + Storage storage, + BlobId id, + long beginOffset, + long endOffset, + BlobSourceOption... storageReadOptions) + throws IOException { + ReadChannel readChannel = storage.reader(id, storageReadOptions); + try { + readChannel.seek(beginOffset); + readChannel.limit(endOffset); + // disable client level chunk buffering. This also makes the channel semi-non-blocking + readChannel.setChunkSize(0); + return of(id.toGsUtilUri(), beginOffset, endOffset, readChannel); + } catch (Exception e) { + GoogleCloudStorageEventBus.postOnException(); + throw new IOException( + String.format( + "Unable to update the boundaries/Range of contentChannel %s. cause=%s", + id.toGsUtilUri(), e), + e); + } + } +} diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/RangeValidatingReadableByteChannelTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/RangeValidatingReadableByteChannelTest.java new file mode 100644 index 000000000..3e5e25bd2 --- /dev/null +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/RangeValidatingReadableByteChannelTest.java @@ -0,0 +1,243 @@ +package com.google.cloud.hadoop.gcsio; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import com.google.common.flogger.GoogleLogger; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public final class RangeValidatingReadableByteChannelTest { + + @Rule public final TestName testName = new TestName(); + + @Test + public void copy_happyPath() throws IOException { + int beginOffset = 13; + int endOffsetExclusive = 31; + TestReadableByteChannel delegate = new TestReadableByteChannel(beginOffset, endOffsetExclusive); + RangeValidatingReadableByteChannel r = + RangeValidatingReadableByteChannel.of(objName(), beginOffset, endOffsetExclusive, delegate); + + int expectedCopiedBytes = endOffsetExclusive - beginOffset; + byte[] expected = new byte[expectedCopiedBytes]; + for (int i = beginOffset; i < endOffsetExclusive; i++) { + expected[i - beginOffset] = base64Byte(i); + } + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + WritableByteChannel w = Channels.newChannel(baos); + + long copied = copyUsingBuffer(ByteBuffer.allocate(3), r, w); + + assertThat(copied).isEqualTo(expectedCopiedBytes); + assertThat(delegate.position).isEqualTo(endOffsetExclusive); + assertThat(baos.toByteArray()).isEqualTo(expected); + } + + @Test + public void copy_overRead() throws IOException { + int beginOffset = 13; + int endOffsetExclusive = 31; + TestReadableByteChannel delegate = + new TestReadableByteChannel(beginOffset, endOffsetExclusive + 1); + RangeValidatingReadableByteChannel r = + RangeValidatingReadableByteChannel.of(objName(), beginOffset, endOffsetExclusive, delegate); + + int expectedCopiedBytes = (endOffsetExclusive - beginOffset) + 1; + byte[] expected = new byte[expectedCopiedBytes]; + for (int i = beginOffset; i < endOffsetExclusive + 1; i++) { + expected[i - beginOffset] = base64Byte(i); + } + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + WritableByteChannel w = Channels.newChannel(baos); + + long copied = copyUsingBuffer(ByteBuffer.allocate(3), r, w); + + // we're fully consuming the stream, so our assertion should be to the point of EOF + assertThat(copied).isEqualTo(expectedCopiedBytes); + assertThat(delegate.position).isEqualTo(endOffsetExclusive + 1); + assertThat(baos.toByteArray()).isEqualTo(expected); + } + + @Test + public void copy_underRead() { + int beginOffset = 13; + int endOffsetExclusive = 31; + TestReadableByteChannel delegate = + new TestReadableByteChannel(beginOffset, endOffsetExclusive - 1); + RangeValidatingReadableByteChannel r = + RangeValidatingReadableByteChannel.of(objName(), beginOffset, endOffsetExclusive, delegate); + + int expectedCopiedBytes = (endOffsetExclusive - beginOffset) - 1; + byte[] expected = new byte[expectedCopiedBytes]; + for (int i = beginOffset; i < endOffsetExclusive - 1; i++) { + expected[i - beginOffset] = base64Byte(i); + } + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + WritableByteChannel w = Channels.newChannel(baos); + + IOException ioException = + assertThrows(IOException.class, () -> copyUsingBuffer(ByteBuffer.allocate(3), r, w)); + assertThat(ioException).hasMessageThat().contains("under-read of object "); + assertThat(ioException) + .hasMessageThat() + .contains( + "detected. Channel opened with {beginOffset: 13, endOffset: 31}, EOF detected at position: 30 (missing 1 bytes)"); + + assertThat(delegate.position).isEqualTo(endOffsetExclusive - 1); + assertThat(baos.toByteArray()).isEqualTo(expected); + } + + @Test + public void byteBuffer_happyPath() throws IOException { + int beginOffset = 13; + int endOffsetExclusive = 31; + TestReadableByteChannel delegate = new TestReadableByteChannel(beginOffset, endOffsetExclusive); + RangeValidatingReadableByteChannel r = + RangeValidatingReadableByteChannel.of(objName(), beginOffset, endOffsetExclusive, delegate); + + int expectedCopiedBytes = (endOffsetExclusive - beginOffset); + + ByteBuffer buf = ByteBuffer.allocate(37); + long copied = r.read(buf); + assertThat(copied).isEqualTo(expectedCopiedBytes); + assertThat(r.read(buf)).isEqualTo(-1); + + assertThat(delegate.position).isEqualTo(endOffsetExclusive); + assertThat(buf.position()).isEqualTo(expectedCopiedBytes); + } + + @Test + public void byteBuffer_overRead() throws IOException { + int beginOffset = 13; + int endOffsetExclusive = 31; + TestReadableByteChannel delegate = + new TestReadableByteChannel(beginOffset, endOffsetExclusive + 1); + RangeValidatingReadableByteChannel r = + RangeValidatingReadableByteChannel.of(objName(), beginOffset, endOffsetExclusive, delegate); + + int expectedCopiedBytes = (endOffsetExclusive - beginOffset) + 1; + + ByteBuffer buf = ByteBuffer.allocate(37); + long copied = r.read(buf); + assertThat(r.read(buf)).isEqualTo(-1); + + assertThat(copied).isEqualTo(expectedCopiedBytes); + assertThat(delegate.position).isEqualTo(endOffsetExclusive + 1); + assertThat(buf.position()).isEqualTo(expectedCopiedBytes); + } + + @Test + public void byteBuffer_underRead() throws IOException { + int beginOffset = 13; + int endOffsetExclusive = 31; + TestReadableByteChannel delegate = + new TestReadableByteChannel(beginOffset, endOffsetExclusive - 1); + RangeValidatingReadableByteChannel r = + RangeValidatingReadableByteChannel.of(objName(), beginOffset, endOffsetExclusive, delegate); + + int expectedCopiedBytes = (endOffsetExclusive - beginOffset) - 1; + + ByteBuffer buf = ByteBuffer.allocate(37); + long copied = r.read(buf); + assertThat(copied).isEqualTo(expectedCopiedBytes); + + // EOF is only observed on a second read + IOException ioException = assertThrows(IOException.class, () -> r.read(buf)); + assertThat(ioException).hasMessageThat().contains("under-read of object "); + assertThat(ioException) + .hasMessageThat() + .contains( + "detected. Channel opened with {beginOffset: 13, endOffset: 31}, EOF detected at position: 30 (missing 1 bytes)"); + + assertThat(delegate.position).isEqualTo(endOffsetExclusive - 1); + assertThat(buf.position()).isEqualTo(expectedCopiedBytes); + } + + private String objName() { + return "gs://test-bucket/obj-" + testName.getMethodName(); + } + + private static byte base64Byte(long i) { + return (byte) (0x41 + i % 64); + } + + private static long copyUsingBuffer( + ByteBuffer buf, ReadableByteChannel from, WritableByteChannel to) throws IOException { + long total = 0; + while (from.read(buf) != -1) { + buf.flip(); + while (buf.hasRemaining()) { + total += to.write(buf); + } + buf.clear(); + } + return total; + } + + private static class TestReadableByteChannel implements ReadableByteChannel { + private static final GoogleLogger LOGGER = GoogleLogger.forEnclosingClass(); + + private long position; + private boolean open; + private final long eofAt; + + public TestReadableByteChannel(long position, long eofAt) { + this.position = position; + this.open = true; + this.eofAt = eofAt; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + if (!open) { + throw new ClosedChannelException(); + } + final long positionBegin = position; + LOGGER.atInfo().log("positionBegin = %s", positionBegin); + long channelRemaining = eofAt - positionBegin; + LOGGER.atInfo().log("channelRemaining = %s", channelRemaining); + if (channelRemaining == 0) { + return -1; + } + int bufferRemaining = dst.remaining(); + LOGGER.atInfo().log("bufferRemaining = %s", bufferRemaining); + int toWrite = Math.toIntExact(Math.min(channelRemaining, bufferRemaining)); + LOGGER.atInfo().log("toWrite = %s", toWrite); + int written = 0; + for (int i = 0; i < toWrite; i++) { + byte base64Byte = base64Byte(position + i); + dst.put(base64Byte); + written++; + } + long newPosition = positionBegin + written; + LOGGER.atInfo().log("newPosition = %s", newPosition); + position = newPosition; + return written; + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public void close() throws IOException { + open = false; + } + } +}