-
Notifications
You must be signed in to change notification settings - Fork 260
Adding support for Bidi read #1429
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 5 commits
3077024
e0de392
66359a6
18959a4
2590bb9
3748df7
7c0f668
195cf34
1bca7ee
8ca8264
ad46edc
fbb4d0a
e41b4f4
e824f04
7273b34
16be4dd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,51 +1,60 @@ | ||
| package com.google.cloud.hadoop.gcsio; | ||
|
|
||
| import static com.google.common.base.Strings.nullToEmpty; | ||
|
|
||
| import com.google.api.core.ApiFuture; | ||
| import com.google.api.core.ApiFutureCallback; | ||
| import com.google.api.core.ApiFutures; | ||
| import com.google.cloud.storage.BlobId; | ||
| import com.google.cloud.storage.BlobReadSession; | ||
| import com.google.cloud.storage.RangeSpec; | ||
| import com.google.cloud.storage.ReadProjectionConfigs; | ||
| import com.google.cloud.storage.Storage; | ||
| import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus; | ||
| import com.google.cloud.storage.*; | ||
| import com.google.cloud.storage.ZeroCopySupport.DisposableByteString; | ||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.google.protobuf.ByteString; | ||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
| import java.nio.channels.ClosedChannelException; | ||
| import java.nio.channels.SeekableByteChannel; | ||
| import java.util.List; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.TimeoutException; | ||
| import java.util.function.IntFunction; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| public class GoogleCloudStorageBidiReadChannel implements ReadVectoredSeekableByteChannel { | ||
|
|
||
| private final Storage storage; | ||
| private final StorageResourceId resourceId; | ||
| private final BlobId blobId; | ||
| private final GoogleCloudStorageReadOptions readOptions; | ||
| private final BlobReadSession blobReadSession; | ||
| private ExecutorService boundedThreadPool; | ||
| private final ExecutorService boundedThreadPool; | ||
| private static final String GZIP_ENCODING = "gzip"; | ||
| private long objectSize; | ||
| private boolean isOpen = true; | ||
| private boolean gzipEncoded = false; | ||
| @VisibleForTesting public SeekableByteChannel contentReadChannel; | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be package private? |
||
| @VisibleForTesting public long currentPosition = 0; | ||
|
|
||
| public GoogleCloudStorageBidiReadChannel( | ||
| Storage storage, | ||
| GoogleCloudStorageItemInfo itemInfo, | ||
| GoogleCloudStorageReadOptions readOptions, | ||
| ExecutorService boundedThreadPool) | ||
| throws IOException { | ||
| this.storage = storage; | ||
| this.resourceId = | ||
| StorageResourceId resourceId = | ||
| new StorageResourceId( | ||
| itemInfo.getBucketName(), itemInfo.getObjectName(), itemInfo.getContentGeneration()); | ||
| this.blobId = | ||
| BlobId blobId = | ||
| BlobId.of( | ||
| resourceId.getBucketName(), resourceId.getObjectName(), resourceId.getGenerationId()); | ||
| this.readOptions = readOptions; | ||
| this.blobReadSession = | ||
| initializeBlobReadSession(storage, blobId, readOptions.getBidiClientTimeout()); | ||
| this.boundedThreadPool = boundedThreadPool; | ||
| initMetadata(itemInfo.getContentEncoding(), itemInfo.getSize()); | ||
| initializeReadSession(); | ||
| } | ||
|
|
||
| private void initializeReadSession() { | ||
| ReadAsSeekableChannel seekableChannelConfig = ReadProjectionConfigs.asSeekableChannel(); | ||
| this.contentReadChannel = blobReadSession.readAs(seekableChannelConfig); | ||
| this.currentPosition = 0; | ||
| } | ||
|
|
||
| private static BlobReadSession initializeBlobReadSession( | ||
|
|
@@ -59,8 +68,23 @@ private static BlobReadSession initializeBlobReadSession( | |
|
|
||
| @Override | ||
| public int read(ByteBuffer dst) throws IOException { | ||
| // TODO(dhritichopra) Add read flow | ||
| return 0; | ||
| throwIfNotOpen(); | ||
| if (!dst.hasRemaining()) { | ||
| return 0; | ||
| } | ||
|
|
||
| if (currentPosition >= objectSize) { | ||
| return -1; | ||
| } | ||
| int bytesRead = contentReadChannel.read(dst); | ||
|
|
||
| if (bytesRead == -1) { | ||
| this.currentPosition = objectSize; | ||
| return -1; | ||
| } | ||
| currentPosition = currentPosition + bytesRead; | ||
|
|
||
| return bytesRead; | ||
Dhriti07 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -70,37 +94,61 @@ public int write(ByteBuffer src) throws IOException { | |
|
|
||
| @Override | ||
| public long position() throws IOException { | ||
| // TODO(dhritichopra) Add read flow | ||
| return 0; | ||
| throwIfNotOpen(); | ||
| return currentPosition; | ||
| } | ||
|
|
||
| @Override | ||
| public SeekableByteChannel position(long newPosition) throws IOException { | ||
| // TODO(dhritichopra) Add read flow | ||
| return null; | ||
| throwIfNotOpen(); | ||
| if (newPosition < 0) { | ||
| throw new IOException(String.format("Invalid seek position: %d", newPosition)); | ||
| } | ||
| if (newPosition > objectSize) { | ||
| throw new java.io.EOFException( | ||
| String.format("Seek position %d is beyond file size %d", newPosition, objectSize)); | ||
| } | ||
| if (gzipEncoded) { | ||
| throw new IOException("Gzip is not supported"); | ||
| } | ||
| if (newPosition == this.currentPosition) { | ||
| return this; | ||
| } | ||
| contentReadChannel.position(newPosition); | ||
| this.currentPosition = newPosition; | ||
| return this; | ||
| } | ||
|
|
||
| @Override | ||
| public long size() throws IOException { | ||
| // TODO(dhritichopra) Add read flow | ||
| return 0; | ||
| throwIfNotOpen(); | ||
| if (objectSize == -1) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How is this being set to -1?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will not be set to -1 in the BidiReadChannel. Item info (From where we are populating objectSize) itself defaults to -1 if the object does not exist. I had added this for debugging. Although this case is probably unlikely that the object would not exist and size() function would be called on this channel without having failed at the initialization point. So we can consider removing this. |
||
| throw new IOException("Size of file is not available"); | ||
| } | ||
| return objectSize; | ||
| } | ||
|
|
||
| @Override | ||
| public SeekableByteChannel truncate(long size) throws IOException { | ||
| // TODO(dhritichopra) Add read flow | ||
| return null; | ||
| throw new UnsupportedOperationException("Cannot truncate a read-only channel"); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. GoogleCloudStorageEventBus.postOnException() is missing?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Added in the following commits. Please ignore the debugging code that got pushed in the commit. I fixed it in a following commit. |
||
| } | ||
|
|
||
| @Override | ||
| public boolean isOpen() { | ||
| // TODO(dhritichopra) Add read flow | ||
| return false; | ||
| return isOpen; | ||
| } | ||
|
|
||
| @Override | ||
| public void close() throws IOException { | ||
| blobReadSession.close(); | ||
| if (isOpen) { | ||
| isOpen = false; | ||
| if (contentReadChannel != null) { | ||
| contentReadChannel.close(); | ||
| } | ||
| if (blobReadSession != null) { | ||
| blobReadSession.close(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -151,4 +199,20 @@ private void processBytesAndCompleteRange( | |
| range.getData().complete(buf); | ||
| } | ||
| } | ||
|
|
||
| private void throwIfNotOpen() throws IOException { | ||
| if (!isOpen()) { | ||
| GoogleCloudStorageEventBus.postOnException(); | ||
| throw new ClosedChannelException(); | ||
| } | ||
| } | ||
|
|
||
| protected void initMetadata(@Nullable String encoding, long sizeFromMetadata) | ||
| throws UnsupportedOperationException { | ||
| gzipEncoded = nullToEmpty(encoding).contains(GZIP_ENCODING); | ||
| if (gzipEncoded) { | ||
| throw new UnsupportedOperationException("Gzip Encoded Files are not supported"); | ||
| } | ||
| objectSize = sizeFromMetadata; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add finer logging across the implementation for major decisions. This will help in debugging. Refer: GoogleCloudStorageClientReadChannel for major logging spots.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done