Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 169 additions & 21 deletions src/main/java/com/microsoft/azure/datalake/store/ADLFileInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.UUID;


Expand All @@ -38,7 +39,7 @@ public class ADLFileInputStream extends InputStream {
private static final int defaultQueueDepth = 0; // default queue depth is zero - disables read-ahead

private int blocksize = 4 * 1024 * 1024; // 4MB default buffer size
private byte[] buffer = null; // will be initialized on first use
private Buffer buffer = null; // will be initialized on first use
private int readAheadQueueDepth; // initialized in constructor

private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server
Expand All @@ -47,6 +48,111 @@ public class ADLFileInputStream extends InputStream {
// of valid bytes in buffer)
private boolean streamClosed = false;

/**
* Represents the internal buffer used by ADLFileInputStream.
*/
private interface Buffer {
/**
* Retrieves bytes from this buffer to dst.
*/
void getBytes(int srcOffset, byte[] dst, int dstOffset, int bytesToRead);

/**
* Allocates the internal buffer.
*/
void allocate(int blockSize);

/**
* Clears the internal to its initial state.
*/
void clear();

/**
* Sets the position of the internal buffer
*/
void setPosition(int position);

/**
* Retrieves the byte array representing this internal buffer or null if not valid.
*/
byte[] getAsByteArray();

/**
* Retrieves the ByteBuffer representing this internal buffer or null if not valid.
*/
ByteBuffer getAsByteBuffer();
}

/**
* A byte-array based internal buffer.
*/
private static final class ByteArrayBuffer implements Buffer {
byte[] bytes;

@Override
public void getBytes(int srcOffset, byte[] dst, int dstOffset, int bytesToRead) {
System.arraycopy(bytes, srcOffset, dst, dstOffset, bytesToRead);
}

@Override
public void allocate(int blockSize) {
bytes = new byte[blockSize];
}

@Override
public void clear() { }

@Override
public void setPosition(int position) { }

@Override
public byte[] getAsByteArray() {
return bytes;
}

@Override
public ByteBuffer getAsByteBuffer() {
return null;
}
}

/**
* An internal buffer allocated using off-heap memory.
*/
private static final class OffHeapBuffer implements Buffer {
ByteBuffer buffer;

@Override
public void getBytes(int srcOffset, byte[] dst, int dstOffset, int bytesToRead) {
buffer.position(srcOffset);
buffer.get(dst, dstOffset, bytesToRead);
}

@Override
public void allocate(int blockSize) {
buffer = ByteBuffer.allocateDirect(blockSize);
}

@Override
public void clear() {
buffer.clear();
}

@Override
public void setPosition(int position) {
buffer.position(position);
}

@Override
public byte[] getAsByteArray() {
return null;
}

@Override
public ByteBuffer getAsByteBuffer() {
return buffer;
}
}

// no public constructor - use Factory Method in AzureDataLakeStoreClient
ADLFileInputStream(String filename, DirectoryEntry de, ADLStoreClient client) {
Expand Down Expand Up @@ -105,7 +211,9 @@ public int read(byte[] b, int off, int len) throws IOException {
//(bytes returned may be less than requested)
int bytesRemaining = limit - bCursor;
int bytesToRead = Math.min(len, bytesRemaining);
System.arraycopy(buffer, bCursor, b, off, bytesToRead);

buffer.getBytes(bCursor, b, off, bytesToRead);

bCursor += bytesToRead;
return bytesToRead;
}
Expand All @@ -127,9 +235,14 @@ protected long readFromService() throws IOException {
//reset buffer to initial state - i.e., throw away existing data
bCursor = 0;
limit = 0;
if (buffer == null) buffer = new byte[blocksize];
if (!isBufferAllocated()) {
allocateBuffer(blocksize);
} else {
buffer.clear();
}

int bytesRead = readInternal(fCursor, buffer, 0, blocksize, false);
int bytesRead = readInternal(fCursor, buffer.getAsByteArray(), buffer.getAsByteBuffer(),
0, blocksize, client.isUsingOffHeapMemory());
limit += bytesRead;
fCursor += bytesRead;
return bytesRead;
Expand All @@ -147,20 +260,23 @@ protected long slurpFullFile() throws IOException {
log.trace("ADLFileInputStream.slurpFullFile() - using client {} from file {}. At offset {}", client.getClientId(), filename, getPos());
}

if (buffer == null) {
if (!isBufferAllocated()) {
blocksize = (int) directoryEntry.length;
buffer = new byte[blocksize];
allocateBuffer(blocksize);
}

//reset buffer to initial state - i.e., throw away existing data
bCursor = (int) getPos(); // preserve current file offset (may not be 0 if app did a seek before first read)
limit = 0;
fCursor = 0; // read from beginning
buffer.clear();

int loopCount = 0;

// if one OPEN request doesnt get full file, then read again at fCursor
while (fCursor < directoryEntry.length) {
int bytesRead = readInternal(fCursor, buffer, limit, blocksize - limit, true);
int bytesRead = readInternal(fCursor, buffer.getAsByteArray(), buffer.getAsByteBuffer(),
limit, blocksize - limit, true);
limit += bytesRead;
fCursor += bytesRead;

Expand Down Expand Up @@ -190,10 +306,24 @@ public int read(long position, byte[] b, int offset, int length)
if (log.isTraceEnabled()) {
log.trace("ADLFileInputStream positioned read() - at offset {} using client {} from file {}", position, client.getClientId(), filename);
}
return readInternal(position, b, offset, length, true);
return readInternal(position, b, null, offset, length, true);
}

private boolean isBufferAllocated() {
return buffer != null;
}

private void allocateBuffer(int blocksize) {
if (client.isUsingOffHeapMemory()) {
buffer = new OffHeapBuffer();
} else {
buffer = new ByteArrayBuffer();
}

buffer.allocate(blocksize);
}

private int readInternal(long position, byte[] b, int offset, int length, boolean bypassReadAhead) throws IOException {
private int readInternal(long position, byte[] b, ByteBuffer offHeapMemory, int offset, int length, boolean bypassReadAhead) throws IOException {
boolean readAheadEnabled = true;
if (readAheadEnabled && !bypassReadAhead && !client.disableReadAheads) {
// try reading from read-ahead
Expand All @@ -218,20 +348,20 @@ private int readInternal(long position, byte[] b, int offset, int length, boolea
if (receivedBytes > 0) return receivedBytes;

// got nothing from read-ahead, do our own read now
receivedBytes = readRemote(position, b, offset, length, false);
receivedBytes = readRemote(position, b, offHeapMemory, offset, length, false);
return receivedBytes;
} else {
return readRemote(position, b, offset, length, false);
return readRemote(position, b, offHeapMemory, offset, length, false);
}
}

int readRemote(long position, byte[] b, int offset, int length, boolean speculative) throws IOException {
int readRemote(long position, byte[] b, ByteBuffer offHeapMemory, int offset, int length, boolean speculative) throws IOException {
if (position < 0) throw new IllegalArgumentException("attempting to read from negative offset");
if (position >= directoryEntry.length) return -1; // Hadoop prefers -1 to EOFException
if (b == null) throw new IllegalArgumentException("null byte array passed in to read() method");
if (offset >= b.length) throw new IllegalArgumentException("offset greater than length of array");
if (b == null && offHeapMemory == null) throw new IllegalArgumentException("null byte array passed in to read() method");
if (b != null && offset >= b.length) throw new IllegalArgumentException("offset greater than length of array");
if (length < 0) throw new IllegalArgumentException("requested read length is less than zero");
if (length > (b.length - offset))
if (b != null && length > (b.length - offset))
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");

int totalBytesRead = 0;
Expand All @@ -254,12 +384,27 @@ int readRemote(long position, byte[] b, int offset, int length, boolean speculat
int bytesRead;
long start = System.nanoTime();
try {
do {
bytesRead = inStream.read(b, offset + totalBytesRead, length - totalBytesRead);
if (bytesRead > 0) { // if not EOF of the Core.open's stream
totalBytesRead += bytesRead;
}
} while (bytesRead >= 0 && totalBytesRead < length);
if (offHeapMemory != null && b == null) {
// We need to write to the ByteBuffer.
// Read blocks into junkbuffer then write it to the byte buffer.
offHeapMemory.clear();
offHeapMemory.position(offset);
do {
bytesRead = inStream.read(junkbuffer, 0, Math.min(length - totalBytesRead, junkbuffer.length));
if (bytesRead > 0) {
offHeapMemory.put(junkbuffer, 0, bytesRead);
totalBytesRead += bytesRead;
}
} while (bytesRead >= 0 && totalBytesRead < length);
offHeapMemory.flip();
} else {
do {
bytesRead = inStream.read(b, offset + totalBytesRead, length - totalBytesRead);
if (bytesRead > 0) { // if not EOF of the Core.open's stream
totalBytesRead += bytesRead;
}
} while (bytesRead >= 0 && totalBytesRead < length);
}
if (bytesRead >= 0) { // read to EOF on the stream, so connection can be reused
while (inStream.read(junkbuffer, 0, junkbuffer.length)>=0); // read and consume rest of stream, if any remains
}
Expand Down Expand Up @@ -313,6 +458,7 @@ public void seek(long n) throws IOException, EOFException {

if (n>=fCursor-limit && n<=fCursor) { // within buffer
bCursor = (int) (n-(fCursor-limit));
if (isBufferAllocated()) buffer.setPosition(bCursor);
return;
}

Expand All @@ -322,6 +468,7 @@ public void seek(long n) throws IOException, EOFException {
//invalidate buffer
limit = 0;
bCursor = 0;
if (isBufferAllocated()) buffer.clear();
}

@Override
Expand Down Expand Up @@ -422,6 +569,7 @@ public void unbuffer() throws IOException {
fCursor = getPos();
limit = 0;
bCursor = 0;
if (isBufferAllocated()) buffer.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class ADLStoreClient {
private String tiHeader = null;
private String proto = "https";
private boolean enableRemoteExceptions = false;
private boolean useOffHeapMemory = Boolean.getBoolean(ADLStoreOptions.USE_OFF_HEAP_MEMORY_KEY);
private String pathPrefix = null;
private int readAheadQueueDepth = -1; // no preference set by caller, use default in ADLFileInputStream
volatile boolean disableReadAheads = false;
Expand Down Expand Up @@ -1002,6 +1003,7 @@ public synchronized void setOptions(ADLStoreOptions o) throws IOException {
if (o.getReadAheadQueueDepth() >= 0 ) this.readAheadQueueDepth = o.getReadAheadQueueDepth();
if (o.getDefaultTimeout() > 0) this.timeout = o.getDefaultTimeout();
this.alterCipherSuits = o.shouldAlterCipherSuits();
this.useOffHeapMemory = o.isUsingOffHeapMemory();
}


Expand Down Expand Up @@ -1217,5 +1219,12 @@ private static IOException getRemoteException(String className, String message)
}
}

boolean isUsingOffHeapMemory() {
return useOffHeapMemory;
}

public void setUseOffHeapMemory(boolean useOffHeapMemory) {
this.useOffHeapMemory = useOffHeapMemory;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
*/
public class ADLStoreOptions {

public static final String USE_OFF_HEAP_MEMORY_KEY = "com.microsoft.azure.datalake.store.use_off_heap_memory";

private String userAgentSuffix = null;
private boolean insecureTransport = false;
private boolean enableRemoteExceptions = false;
private String pathPrefix = null;
private int readAheadQueueDepth = -1; // no preference set by caller, use default in ADLFileInputStream
private int defaultTimeout = -1;
private boolean alterCipherSuits = true;
private boolean useOffHeapMemory = Boolean.getBoolean(USE_OFF_HEAP_MEMORY_KEY);

public ADLStoreOptions() {
}
Expand Down Expand Up @@ -169,4 +172,12 @@ public void alterCipherSuits(boolean alterCipherSuits) {
boolean shouldAlterCipherSuits() {
return this.alterCipherSuits;
}

boolean isUsingOffHeapMemory() {
return this.useOffHeapMemory;
}

public void setUseOffHeapMemory(boolean useOffHeapMemory) {
this.useOffHeapMemory = useOffHeapMemory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void run() {
if (buffer != null) {
try {
// do the actual read, from the file.
int bytesRead = buffer.file.readRemote(buffer.offset, buffer.buffer, 0, buffer.requestedLength, true);
int bytesRead = buffer.file.readRemote(buffer.offset, buffer.buffer, null, 0, buffer.requestedLength, true);
bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
} catch (Exception ex) {
bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
Expand Down