Skip to content
Original file line number Diff line number Diff line change
@@ -1,51 +1,61 @@
package com.google.cloud.hadoop.gcsio;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add finer logging across the implementation for major decisions. This will help in debugging. Refer: GoogleCloudStorageClientReadChannel for major logging spots.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


import static com.google.common.base.Strings.nullToEmpty;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobReadSession;
import com.google.cloud.storage.RangeSpec;
import com.google.cloud.storage.ReadProjectionConfigs;
import com.google.cloud.storage.Storage;
import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus;
import com.google.cloud.storage.*;
import com.google.cloud.storage.ZeroCopySupport.DisposableByteString;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SeekableByteChannel;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.IntFunction;
import javax.annotation.Nullable;

public class GoogleCloudStorageBidiReadChannel implements ReadVectoredSeekableByteChannel {

private final Storage storage;
private final StorageResourceId resourceId;
private final BlobId blobId;
private final GoogleCloudStorageReadOptions readOptions;
private final BlobReadSession blobReadSession;
private ExecutorService boundedThreadPool;
private final ExecutorService boundedThreadPool;
private static final String GZIP_ENCODING = "gzip";
private long objectSize;
private boolean isOpen = true;
private boolean gzipEncoded = false;
@VisibleForTesting private SeekableByteChannel contentReadChannel;
@VisibleForTesting private long currentPosition = 0;

private boolean isProjectionObtained= false;

public GoogleCloudStorageBidiReadChannel(
Storage storage,
GoogleCloudStorageItemInfo itemInfo,
GoogleCloudStorageReadOptions readOptions,
ExecutorService boundedThreadPool)
throws IOException {
this.storage = storage;
this.resourceId =
StorageResourceId resourceId =
new StorageResourceId(
itemInfo.getBucketName(), itemInfo.getObjectName(), itemInfo.getContentGeneration());
this.blobId =
BlobId blobId =
BlobId.of(
resourceId.getBucketName(), resourceId.getObjectName(), resourceId.getGenerationId());
this.readOptions = readOptions;
this.blobReadSession =
initializeBlobReadSession(storage, blobId, readOptions.getBidiClientTimeout());
this.boundedThreadPool = boundedThreadPool;
initMetadata(itemInfo.getContentEncoding());
}

private void initializeReadSession() {
ReadAsSeekableChannel seekableChannelConfig = ReadProjectionConfigs.asSeekableChannel();
this.contentReadChannel = blobReadSession.readAs(seekableChannelConfig);
this.currentPosition = 0;
}

private static BlobReadSession initializeBlobReadSession(
Expand All @@ -59,8 +69,26 @@ private static BlobReadSession initializeBlobReadSession(

@Override
public int read(ByteBuffer dst) throws IOException {
// TODO(dhritichopra) Add read flow
return 0;
if(!isProjectionObtained){
isProjectionObtained = true;
initializeReadSession();
}
throwIfNotOpen();
if (!dst.hasRemaining()) {
return 0;
}

if (currentPosition >= objectSize) {
return -1;
}
int bytesRead = contentReadChannel.read(dst);

if (bytesRead == -1) {
this.currentPosition = objectSize;
return -1;
}

return bytesRead;
}

@Override
Expand All @@ -70,32 +98,48 @@ public int write(ByteBuffer src) throws IOException {

@Override
public long position() throws IOException {
// TODO(dhritichopra) Add read flow
return 0;
throwIfNotOpen();
return currentPosition;
}

@Override
public SeekableByteChannel position(long newPosition) throws IOException {
// TODO(dhritichopra) Add read flow
return null;
throwIfNotOpen();
if (newPosition < 0) {
throw new IOException(String.format("Invalid seek position: %d", newPosition));
}
if (newPosition > objectSize) {
throw new java.io.EOFException(
String.format("Seek position %d is beyond file size %d", newPosition, objectSize));
}
if (gzipEncoded) {
throw new IOException("Gzip is not supported");
}
if (newPosition == this.currentPosition) {
return this;
}
contentReadChannel.position(newPosition);
this.currentPosition = newPosition;
return this;
}

@Override
public long size() throws IOException {
// TODO(dhritichopra) Add read flow
return 0;
throwIfNotOpen();
if (objectSize == -1) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this being set to -1?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will not be set to -1 in the BidiReadChannel. Item info (From where we are populating objectSize) itself defaults to -1 if the object does not exist. I had added this for debugging. Although this case is probably unlikely that the object would not exist and size() function would be called on this channel without having failed at the initialization point. So we can consider removing this.

throw new IOException("Size of file is not available");
}
return objectSize;
}

@Override
public SeekableByteChannel truncate(long size) throws IOException {
// TODO(dhritichopra) Add read flow
return null;
throw new UnsupportedOperationException("Cannot truncate a read-only channel");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GoogleCloudStorageEventBus.postOnException() is missing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added in the following commits. Please ignore the debugging code that got pushed in the commit. I fixed it in a following commit.

}

@Override
public boolean isOpen() {
// TODO(dhritichopra) Add read flow
return false;
return isOpen;
}

@Override
Expand All @@ -106,6 +150,7 @@ public void close() throws IOException {
@Override
public void readVectored(List<VectoredIORange> ranges, IntFunction<ByteBuffer> allocate)
throws IOException {
isProjectionObtained = true;
ranges.forEach(
range -> {
ApiFuture<DisposableByteString> futureBytes =
Expand Down Expand Up @@ -151,4 +196,18 @@ private void processBytesAndCompleteRange(
range.getData().complete(buf);
}
}

private void throwIfNotOpen() throws IOException {
if (!isOpen()) {
GoogleCloudStorageEventBus.postOnException();
throw new ClosedChannelException();
}
}

protected void initMetadata(@Nullable String encoding) throws UnsupportedOperationException {
gzipEncoded = nullToEmpty(encoding).contains(GZIP_ENCODING);
if (gzipEncoded) {
throw new UnsupportedOperationException("Gzip Encoded Files are not supported");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,15 @@ public void write_unsupportedOperationException() throws IOException {
() -> bidiReadChannel.write(ByteBuffer.allocateDirect(0)));
}

// TODO(dhritichopra) Dummy test to complete coverage, remove with actual test once we have these
// implemented.
@Test
public void readPath_dummyImplementation() throws IOException {
public void truncate_unsupportedOperationException() throws IOException {
GoogleCloudStorageBidiReadChannel bidiReadChannel = getMockedBidiReadChannel();
assertEquals(bidiReadChannel.read(ByteBuffer.allocateDirect(0)), 0);
assertEquals(bidiReadChannel.position(), 0);
assertEquals(bidiReadChannel.position(12), null);
assertEquals(bidiReadChannel.size(), 0);
assertEquals(bidiReadChannel.truncate(12), null);
assertEquals(bidiReadChannel.isOpen(), false);
assertThrows(
UnsupportedOperationException.class,
() -> bidiReadChannel.truncate(10));
}


private String getReadVectoredData(VectoredIORange range)
throws ExecutionException, InterruptedException, TimeoutException {
Charset charset = StandardCharsets.UTF_8;
Expand Down