Skip to content

Commit

Permalink
HADOOP-19303. VectorIO API: support pass-down of a release() operator (
Browse files Browse the repository at this point in the history
…#7418)

The PositionedReadable vector IO API has a new readVectored()
method which takes a release operator as its third argument.

readVectored(List<? extends FileRange> ranges,
      IntFunction<ByteBuffer> allocate,
      Consumer<ByteBuffer> release)

This is return buffers to pools even in failures.

The default implementation hands back to readVectored/2,
so that existing custom implementations of that will get
invoked.

Contributed by Steve Loughran
  • Loading branch information
steveloughran authored Mar 3, 2025
1 parent 32dad20 commit 1c2a92a
Show file tree
Hide file tree
Showing 11 changed files with 427 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.StringJoiner;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -181,4 +182,11 @@ public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
((PositionedReadable) in).readVectored(ranges, allocate);
}

@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {
((PositionedReadable) in).readVectored(ranges, allocate, release);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Consumer;
import java.util.function.IntFunction;
import java.util.zip.CRC32;

Expand Down Expand Up @@ -438,6 +439,13 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes,
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
readVectored(ranges, allocate, (b) -> { });
}

@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {

// If the stream doesn't have checksums, just delegate.
if (sums == null) {
Expand All @@ -462,8 +470,8 @@ public void readVectored(List<? extends FileRange> ranges,
}
List<CombinedFileRange> checksumRanges = findChecksumRanges(dataRanges,
bytesPerSum, minSeek, maxSize);
sums.readVectored(checksumRanges, allocate);
datas.readVectored(dataRanges, allocate);
sums.readVectored(checksumRanges, allocate, release);
datas.readVectored(dataRanges, allocate, release);
for(CombinedFileRange checksumRange: checksumRanges) {
for(FileRange dataRange: checksumRange.getUnderlying()) {
// when we have both the ranges, validate the checksum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -306,4 +307,11 @@ public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
((PositionedReadable) in).readVectored(ranges, allocate);
}

@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {
((PositionedReadable) in).readVectored(ranges, allocate, release);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.io.Sizes.S_16K;
import static org.apache.hadoop.io.Sizes.S_1M;

Expand Down Expand Up @@ -136,4 +138,31 @@ default void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
VectoredReadUtils.readVectored(this, ranges, allocate);
}

/**
* Extension of {@link #readVectored(List, IntFunction)} where a {@code release(buffer)}
* operation may be invoked if problems surface during reads.
* <p>
* The {@code release} operation is invoked after an IOException
* to return the actively buffer to a pool before reporting a failure
* in the future.
* <p>
* The default implementation calls {@link #readVectored(List, IntFunction)}.p
* <p>
* Implementations SHOULD override this method if they can release buffers as
* part of their error handling.
* @param ranges the byte ranges to read
* @param allocate function to allocate ByteBuffer
* @param release callable to release a ByteBuffer.
* @throws IOException any IOE.
* @throws IllegalArgumentException if any of ranges are invalid, or they overlap.
* @throws NullPointerException null arguments.
*/
default void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate,
Consumer<ByteBuffer> release) throws IOException {
requireNonNull(release);
readVectored(ranges, allocate);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,25 +49,29 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.IntFunction;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.fs.impl.VectorIOBufferPool;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.BufferedIOStatisticsOutputStream;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList;
import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
Expand Down Expand Up @@ -319,74 +323,131 @@ AsynchronousFileChannel getAsyncChannel() throws IOException {
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
}

@Override
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {

// Validate, but do not pass in a file length as it may change.
List<? extends FileRange> sortedRanges = sortRangeList(ranges);
// Set up all of the futures, so that we can use them if things fail
for(FileRange range: sortedRanges) {
// Set up all of the futures, so that the caller can await on
// their completion.
for (FileRange range: sortedRanges) {
validateRangeRequest(range);
range.setData(new CompletableFuture<>());
}
try {
AsynchronousFileChannel channel = getAsyncChannel();
ByteBuffer[] buffers = new ByteBuffer[sortedRanges.size()];
AsyncHandler asyncHandler = new AsyncHandler(channel, sortedRanges, buffers);
for(int i = 0; i < sortedRanges.size(); ++i) {
FileRange range = sortedRanges.get(i);
buffers[i] = allocate.apply(range.getLength());
channel.read(buffers[i], range.getOffset(), i, asyncHandler);
}
} catch (IOException ioe) {
LOG.debug("Exception occurred during vectored read ", ioe);
for(FileRange range: sortedRanges) {
range.getData().completeExceptionally(ioe);
}
}
final ByteBufferPool pool = new VectorIOBufferPool(allocate, release);
// Initiate the asynchronous reads.
new AsyncHandler(getAsyncChannel(),
sortedRanges,
pool)
.initiateRead();
}
}

/**
* A CompletionHandler that implements readFully and translates back
* into the form of CompletionHandler that our users expect.
* <p>
* All reads are started in {@link #initiateRead()};
* the handler then receives callbacks on success
* {@link #completed(Integer, Integer)}, and on failure
* by {@link #failed(Throwable, Integer)}.
* These are mapped to the specific range in the read, and its
* outcome updated.
*/
static class AsyncHandler implements CompletionHandler<Integer, Integer> {
private static class AsyncHandler implements CompletionHandler<Integer, Integer> {
/** File channel to read from. */
private final AsynchronousFileChannel channel;

/** Ranges to fetch. */
private final List<? extends FileRange> ranges;

/**
* Pool providing allocate/release operations.
*/
private final ByteBufferPool allocateRelease;

/** Buffers being read. */
private final ByteBuffer[] buffers;

AsyncHandler(AsynchronousFileChannel channel,
List<? extends FileRange> ranges,
ByteBuffer[] buffers) {
/**
* Instantiate.
* @param channel open channel.
* @param ranges ranges to read.
* @param allocateRelease pool for allocating buffers, and releasing on failure
*/
AsyncHandler(
final AsynchronousFileChannel channel,
final List<? extends FileRange> ranges,
final ByteBufferPool allocateRelease) {
this.channel = channel;
this.ranges = ranges;
this.buffers = buffers;
this.buffers = new ByteBuffer[ranges.size()];
this.allocateRelease = allocateRelease;
}

/**
* Initiate the read operation.
* <p>
* Allocate all buffers, queue the read into the channel,
* providing this object as the handler.
*/
private void initiateRead() {
for(int i = 0; i < ranges.size(); ++i) {
FileRange range = ranges.get(i);
buffers[i] = allocateRelease.getBuffer(false, range.getLength());
channel.read(buffers[i], range.getOffset(), i, this);
}
}

/**
* Callback for a completed full/partial read.
* <p>
* For an EOF the number of bytes may be -1.
* That is mapped to a {@link #failed(Throwable, Integer)} outcome.
* @param result The bytes read.
* @param rangeIndex range index within the range list.
*/
@Override
public void completed(Integer result, Integer r) {
FileRange range = ranges.get(r);
ByteBuffer buffer = buffers[r];
public void completed(Integer result, Integer rangeIndex) {
FileRange range = ranges.get(rangeIndex);
ByteBuffer buffer = buffers[rangeIndex];
if (result == -1) {
failed(new EOFException("Read past End of File"), r);
// no data was read back.
failed(new EOFException("Read past End of File"), rangeIndex);
} else {
if (buffer.remaining() > 0) {
// issue a read for the rest of the buffer
// QQ: What if this fails? It has the same handler.
channel.read(buffer, range.getOffset() + buffer.position(), r, this);
channel.read(buffer, range.getOffset() + buffer.position(), rangeIndex, this);
} else {
// QQ: Why is this required? I think because we don't want the
// user to read data beyond limit.
// Flip the buffer and declare success.
buffer.flip();
range.getData().complete(buffer);
}
}
}

/**
* The read of the range failed.
* <p>
* Release the buffer supplied for this range, then
* report to the future as {{completeExceptionally(exc)}}
* @param exc exception.
* @param rangeIndex range index within the range list.
*/
@Override
public void failed(Throwable exc, Integer r) {
LOG.debug("Failed while reading range {} ", r, exc);
ranges.get(r).getData().completeExceptionally(exc);
public void failed(Throwable exc, Integer rangeIndex) {
LOG.debug("Failed while reading range {} ", rangeIndex, exc);
// release the buffer
allocateRelease.putBuffer(buffers[rangeIndex]);
// report the failure.
ranges.get(rangeIndex).getData().completeExceptionally(exc);
}

}

@Override
Expand Down
Loading

0 comments on commit 1c2a92a

Please sign in to comment.