diff --git a/src/main/java/com/microsoft/azure/datalake/store/ADLFileInputStream.java b/src/main/java/com/microsoft/azure/datalake/store/ADLFileInputStream.java index 4d16ff5..cc35bcf 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/ADLFileInputStream.java +++ b/src/main/java/com/microsoft/azure/datalake/store/ADLFileInputStream.java @@ -14,6 +14,7 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.UUID; @@ -38,7 +39,7 @@ public class ADLFileInputStream extends InputStream { private static final int defaultQueueDepth = 0; // default queue depth is zero - disables read-ahead private int blocksize = 4 * 1024 * 1024; // 4MB default buffer size - private byte[] buffer = null; // will be initialized on first use + private Buffer buffer = null; // will be initialized on first use private int readAheadQueueDepth; // initialized in constructor private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server @@ -47,6 +48,111 @@ public class ADLFileInputStream extends InputStream { // of valid bytes in buffer) private boolean streamClosed = false; + /** + * Represents the internal buffer used by ADLFileInputStream. + */ + private interface Buffer { + /** + * Retrieves bytes from this buffer to dst. + */ + void getBytes(int srcOffset, byte[] dst, int dstOffset, int bytesToRead); + + /** + * Allocates the internal buffer. + */ + void allocate(int blockSize); + + /** + * Clears the internal to its initial state. + */ + void clear(); + + /** + * Sets the position of the internal buffer + */ + void setPosition(int position); + + /** + * Retrieves the byte array representing this internal buffer or null if not valid. + */ + byte[] getAsByteArray(); + + /** + * Retrieves the ByteBuffer representing this internal buffer or null if not valid. + */ + ByteBuffer getAsByteBuffer(); + } + + /** + * A byte-array based internal buffer. + */ + private static final class ByteArrayBuffer implements Buffer { + byte[] bytes; + + @Override + public void getBytes(int srcOffset, byte[] dst, int dstOffset, int bytesToRead) { + System.arraycopy(bytes, srcOffset, dst, dstOffset, bytesToRead); + } + + @Override + public void allocate(int blockSize) { + bytes = new byte[blockSize]; + } + + @Override + public void clear() { } + + @Override + public void setPosition(int position) { } + + @Override + public byte[] getAsByteArray() { + return bytes; + } + + @Override + public ByteBuffer getAsByteBuffer() { + return null; + } + } + + /** + * An internal buffer allocated using off-heap memory. + */ + private static final class OffHeapBuffer implements Buffer { + ByteBuffer buffer; + + @Override + public void getBytes(int srcOffset, byte[] dst, int dstOffset, int bytesToRead) { + buffer.position(srcOffset); + buffer.get(dst, dstOffset, bytesToRead); + } + + @Override + public void allocate(int blockSize) { + buffer = ByteBuffer.allocateDirect(blockSize); + } + + @Override + public void clear() { + buffer.clear(); + } + + @Override + public void setPosition(int position) { + buffer.position(position); + } + + @Override + public byte[] getAsByteArray() { + return null; + } + + @Override + public ByteBuffer getAsByteBuffer() { + return buffer; + } + } // no public constructor - use Factory Method in AzureDataLakeStoreClient ADLFileInputStream(String filename, DirectoryEntry de, ADLStoreClient client) { @@ -105,7 +211,9 @@ public int read(byte[] b, int off, int len) throws IOException { //(bytes returned may be less than requested) int bytesRemaining = limit - bCursor; int bytesToRead = Math.min(len, bytesRemaining); - System.arraycopy(buffer, bCursor, b, off, bytesToRead); + + buffer.getBytes(bCursor, b, off, bytesToRead); + bCursor += bytesToRead; return bytesToRead; } @@ -127,9 +235,14 @@ protected long readFromService() throws IOException { //reset buffer to initial state - i.e., throw away existing data bCursor = 0; limit = 0; - if (buffer == null) buffer = new byte[blocksize]; + if (!isBufferAllocated()) { + allocateBuffer(blocksize); + } else { + buffer.clear(); + } - int bytesRead = readInternal(fCursor, buffer, 0, blocksize, false); + int bytesRead = readInternal(fCursor, buffer.getAsByteArray(), buffer.getAsByteBuffer(), + 0, blocksize, client.isUsingOffHeapMemory()); limit += bytesRead; fCursor += bytesRead; return bytesRead; @@ -147,20 +260,23 @@ protected long slurpFullFile() throws IOException { log.trace("ADLFileInputStream.slurpFullFile() - using client {} from file {}. At offset {}", client.getClientId(), filename, getPos()); } - if (buffer == null) { + if (!isBufferAllocated()) { blocksize = (int) directoryEntry.length; - buffer = new byte[blocksize]; + allocateBuffer(blocksize); } //reset buffer to initial state - i.e., throw away existing data bCursor = (int) getPos(); // preserve current file offset (may not be 0 if app did a seek before first read) limit = 0; fCursor = 0; // read from beginning + buffer.clear(); + int loopCount = 0; // if one OPEN request doesnt get full file, then read again at fCursor while (fCursor < directoryEntry.length) { - int bytesRead = readInternal(fCursor, buffer, limit, blocksize - limit, true); + int bytesRead = readInternal(fCursor, buffer.getAsByteArray(), buffer.getAsByteBuffer(), + limit, blocksize - limit, true); limit += bytesRead; fCursor += bytesRead; @@ -190,10 +306,24 @@ public int read(long position, byte[] b, int offset, int length) if (log.isTraceEnabled()) { log.trace("ADLFileInputStream positioned read() - at offset {} using client {} from file {}", position, client.getClientId(), filename); } - return readInternal(position, b, offset, length, true); + return readInternal(position, b, null, offset, length, true); + } + + private boolean isBufferAllocated() { + return buffer != null; + } + + private void allocateBuffer(int blocksize) { + if (client.isUsingOffHeapMemory()) { + buffer = new OffHeapBuffer(); + } else { + buffer = new ByteArrayBuffer(); + } + + buffer.allocate(blocksize); } - private int readInternal(long position, byte[] b, int offset, int length, boolean bypassReadAhead) throws IOException { + private int readInternal(long position, byte[] b, ByteBuffer offHeapMemory, int offset, int length, boolean bypassReadAhead) throws IOException { boolean readAheadEnabled = true; if (readAheadEnabled && !bypassReadAhead && !client.disableReadAheads) { // try reading from read-ahead @@ -218,20 +348,20 @@ private int readInternal(long position, byte[] b, int offset, int length, boolea if (receivedBytes > 0) return receivedBytes; // got nothing from read-ahead, do our own read now - receivedBytes = readRemote(position, b, offset, length, false); + receivedBytes = readRemote(position, b, offHeapMemory, offset, length, false); return receivedBytes; } else { - return readRemote(position, b, offset, length, false); + return readRemote(position, b, offHeapMemory, offset, length, false); } } - int readRemote(long position, byte[] b, int offset, int length, boolean speculative) throws IOException { + int readRemote(long position, byte[] b, ByteBuffer offHeapMemory, int offset, int length, boolean speculative) throws IOException { if (position < 0) throw new IllegalArgumentException("attempting to read from negative offset"); if (position >= directoryEntry.length) return -1; // Hadoop prefers -1 to EOFException - if (b == null) throw new IllegalArgumentException("null byte array passed in to read() method"); - if (offset >= b.length) throw new IllegalArgumentException("offset greater than length of array"); + if (b == null && offHeapMemory == null) throw new IllegalArgumentException("null byte array passed in to read() method"); + if (b != null && offset >= b.length) throw new IllegalArgumentException("offset greater than length of array"); if (length < 0) throw new IllegalArgumentException("requested read length is less than zero"); - if (length > (b.length - offset)) + if (b != null && length > (b.length - offset)) throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); int totalBytesRead = 0; @@ -254,12 +384,27 @@ int readRemote(long position, byte[] b, int offset, int length, boolean speculat int bytesRead; long start = System.nanoTime(); try { - do { - bytesRead = inStream.read(b, offset + totalBytesRead, length - totalBytesRead); - if (bytesRead > 0) { // if not EOF of the Core.open's stream - totalBytesRead += bytesRead; - } - } while (bytesRead >= 0 && totalBytesRead < length); + if (offHeapMemory != null && b == null) { + // We need to write to the ByteBuffer. + // Read blocks into junkbuffer then write it to the byte buffer. + offHeapMemory.clear(); + offHeapMemory.position(offset); + do { + bytesRead = inStream.read(junkbuffer, 0, Math.min(length - totalBytesRead, junkbuffer.length)); + if (bytesRead > 0) { + offHeapMemory.put(junkbuffer, 0, bytesRead); + totalBytesRead += bytesRead; + } + } while (bytesRead >= 0 && totalBytesRead < length); + offHeapMemory.flip(); + } else { + do { + bytesRead = inStream.read(b, offset + totalBytesRead, length - totalBytesRead); + if (bytesRead > 0) { // if not EOF of the Core.open's stream + totalBytesRead += bytesRead; + } + } while (bytesRead >= 0 && totalBytesRead < length); + } if (bytesRead >= 0) { // read to EOF on the stream, so connection can be reused while (inStream.read(junkbuffer, 0, junkbuffer.length)>=0); // read and consume rest of stream, if any remains } @@ -313,6 +458,7 @@ public void seek(long n) throws IOException, EOFException { if (n>=fCursor-limit && n<=fCursor) { // within buffer bCursor = (int) (n-(fCursor-limit)); + if (isBufferAllocated()) buffer.setPosition(bCursor); return; } @@ -322,6 +468,7 @@ public void seek(long n) throws IOException, EOFException { //invalidate buffer limit = 0; bCursor = 0; + if (isBufferAllocated()) buffer.clear(); } @Override @@ -422,6 +569,7 @@ public void unbuffer() throws IOException { fCursor = getPos(); limit = 0; bCursor = 0; + if (isBufferAllocated()) buffer.clear(); } @Override diff --git a/src/main/java/com/microsoft/azure/datalake/store/ADLStoreClient.java b/src/main/java/com/microsoft/azure/datalake/store/ADLStoreClient.java index a503297..60521fe 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/ADLStoreClient.java +++ b/src/main/java/com/microsoft/azure/datalake/store/ADLStoreClient.java @@ -43,6 +43,7 @@ public class ADLStoreClient { private String tiHeader = null; private String proto = "https"; private boolean enableRemoteExceptions = false; + private boolean useOffHeapMemory = Boolean.getBoolean(ADLStoreOptions.USE_OFF_HEAP_MEMORY_KEY); private String pathPrefix = null; private int readAheadQueueDepth = -1; // no preference set by caller, use default in ADLFileInputStream volatile boolean disableReadAheads = false; @@ -1002,6 +1003,7 @@ public synchronized void setOptions(ADLStoreOptions o) throws IOException { if (o.getReadAheadQueueDepth() >= 0 ) this.readAheadQueueDepth = o.getReadAheadQueueDepth(); if (o.getDefaultTimeout() > 0) this.timeout = o.getDefaultTimeout(); this.alterCipherSuits = o.shouldAlterCipherSuits(); + this.useOffHeapMemory = o.isUsingOffHeapMemory(); } @@ -1217,5 +1219,12 @@ private static IOException getRemoteException(String className, String message) } } + boolean isUsingOffHeapMemory() { + return useOffHeapMemory; + } + + public void setUseOffHeapMemory(boolean useOffHeapMemory) { + this.useOffHeapMemory = useOffHeapMemory; + } } diff --git a/src/main/java/com/microsoft/azure/datalake/store/ADLStoreOptions.java b/src/main/java/com/microsoft/azure/datalake/store/ADLStoreOptions.java index b93e728..9c51007 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/ADLStoreOptions.java +++ b/src/main/java/com/microsoft/azure/datalake/store/ADLStoreOptions.java @@ -11,6 +11,8 @@ */ public class ADLStoreOptions { + public static final String USE_OFF_HEAP_MEMORY_KEY = "com.microsoft.azure.datalake.store.use_off_heap_memory"; + private String userAgentSuffix = null; private boolean insecureTransport = false; private boolean enableRemoteExceptions = false; @@ -18,6 +20,7 @@ public class ADLStoreOptions { private int readAheadQueueDepth = -1; // no preference set by caller, use default in ADLFileInputStream private int defaultTimeout = -1; private boolean alterCipherSuits = true; + private boolean useOffHeapMemory = Boolean.getBoolean(USE_OFF_HEAP_MEMORY_KEY); public ADLStoreOptions() { } @@ -169,4 +172,12 @@ public void alterCipherSuits(boolean alterCipherSuits) { boolean shouldAlterCipherSuits() { return this.alterCipherSuits; } + + boolean isUsingOffHeapMemory() { + return this.useOffHeapMemory; + } + + public void setUseOffHeapMemory(boolean useOffHeapMemory) { + this.useOffHeapMemory = useOffHeapMemory; + } } diff --git a/src/main/java/com/microsoft/azure/datalake/store/ReadBufferWorker.java b/src/main/java/com/microsoft/azure/datalake/store/ReadBufferWorker.java index 1f14a63..1477af8 100644 --- a/src/main/java/com/microsoft/azure/datalake/store/ReadBufferWorker.java +++ b/src/main/java/com/microsoft/azure/datalake/store/ReadBufferWorker.java @@ -45,7 +45,7 @@ public void run() { if (buffer != null) { try { // do the actual read, from the file. - int bytesRead = buffer.file.readRemote(buffer.offset, buffer.buffer, 0, buffer.requestedLength, true); + int bytesRead = buffer.file.readRemote(buffer.offset, buffer.buffer, null, 0, buffer.requestedLength, true); bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager } catch (Exception ex) { bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);