Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.BufferedIOStatisticsOutputStream;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.IOUtils;
Expand All @@ -81,6 +82,7 @@
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SKIP_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SKIP_OPERATIONS;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_EXCEPTIONS;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
Expand Down Expand Up @@ -158,7 +160,8 @@ class LocalFSFileInputStream extends FSInputStream implements
STREAM_READ_EXCEPTIONS,
STREAM_READ_SEEK_OPERATIONS,
STREAM_READ_SKIP_OPERATIONS,
STREAM_READ_SKIP_BYTES)
STREAM_READ_SKIP_BYTES,
STREAM_READ_VECTORED_OPERATIONS)
.build();

/** Reference to the bytes read counter for slightly faster counting. */
Expand Down Expand Up @@ -225,8 +228,7 @@ public int read() throws IOException {
int value = fis.read();
if (value >= 0) {
this.position++;
statistics.incrementBytesRead(1);
bytesRead.addAndGet(1);
recordBytesRead(1);
}
return value;
} catch (IOException e) { // unexpected exception
Expand All @@ -243,16 +245,26 @@ public int read(byte[] b, int off, int len) throws IOException {
int value = fis.read(b, off, len);
if (value > 0) {
this.position += value;
statistics.incrementBytesRead(value);
bytesRead.addAndGet(value);
recordBytesRead(value);
}
return value;
} catch (IOException e) { // unexpected exception
ioStatistics.incrementCounter(STREAM_READ_EXCEPTIONS);
throw new FSError(e); // assume native fs error
}
}


/**
* Count the number of bytes read in fs and io statistics.
* @param count
*/
private void recordBytesRead(final int count) {
if (count > 0) {
statistics.incrementBytesRead(count);
bytesRead.addAndGet(count);
}
}

@Override
public int read(long position, byte[] b, int off, int len)
throws IOException {
Expand All @@ -266,8 +278,7 @@ public int read(long position, byte[] b, int off, int len)
try {
int value = fis.getChannel().read(bb, position);
if (value > 0) {
statistics.incrementBytesRead(value);
ioStatistics.incrementCounter(STREAM_READ_BYTES, value);
recordBytesRead(value);
}
return value;
} catch (IOException e) {
Expand Down Expand Up @@ -328,6 +339,7 @@ public void readVectored(List<? extends FileRange> ranges,
public void readVectored(final List<? extends FileRange> ranges,
final IntFunction<ByteBuffer> allocate,
final Consumer<ByteBuffer> release) throws IOException {
ioStatistics.incrementCounter(STREAM_READ_VECTORED_OPERATIONS);

// Validate, but do not pass in a file length as it may change.
List<? extends FileRange> sortedRanges = sortRangeList(ranges);
Expand All @@ -341,7 +353,8 @@ public void readVectored(final List<? extends FileRange> ranges,
// Initiate the asynchronous reads.
new AsyncHandler(getAsyncChannel(),
sortedRanges,
pool)
pool,
this::recordBytesRead)
.initiateRead();
}
}
Expand Down Expand Up @@ -372,20 +385,25 @@ private static class AsyncHandler implements CompletionHandler<Integer, Integer>
/** Buffers being read. */
private final ByteBuffer[] buffers;

/* Callback to update statistics. */
private final Consumer<Integer> statisticsUpdater;

/**
* Instantiate.
* @param channel open channel.
* @param ranges ranges to read.
* @param allocateRelease pool for allocating buffers, and releasing on failure
* @param statisticsUpdater callback to update statistics.
*/
AsyncHandler(
final AsynchronousFileChannel channel,
final List<? extends FileRange> ranges,
final ByteBufferPool allocateRelease) {
final ByteBufferPool allocateRelease, final Consumer<Integer> statisticsUpdater) {
this.channel = channel;
this.ranges = ranges;
this.buffers = new ByteBuffer[ranges.size()];
this.allocateRelease = allocateRelease;
this.statisticsUpdater = statisticsUpdater;
}

/**
Expand Down Expand Up @@ -426,6 +444,8 @@ public void completed(Integer result, Integer rangeIndex) {
// issue a read for the rest of the buffer
channel.read(buffer, range.getOffset() + buffer.position(), rangeIndex, this);
} else {
// read finished
statisticsUpdater.accept(range.getLength());
// Flip the buffer and declare success.
buffer.flip();
range.getData().complete(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,21 @@ public void testVectoredReadMultipleRanges() throws Exception {
combinedFuture.get();

validateVectoredReadResult(fileRanges, DATASET, 0);
assertionsWithinTestVectoredReadMultipleRanges(in, fileRanges);
returnBuffersToPoolPostRead(fileRanges, pool);
}
}

/**
* Place to add some custom assertions within {@link #testVectoredReadMultipleRanges()}.
* @param in active input stream.
* @param fileRanges ranges of files read.
*/
protected void assertionsWithinTestVectoredReadMultipleRanges(final FSDataInputStream in,
final List<FileRange> fileRanges) {

}

@Test
public void testVectoredReadAndReadFully() throws Exception {
List<FileRange> fileRanges = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,39 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedClass;
import org.junit.jupiter.params.provider.MethodSource;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedClass;
import org.junit.jupiter.params.provider.MethodSource;
import org.apache.hadoop.fs.statistics.IOStatistics;

import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.assertj.core.api.Assertions.assertThat;

@ParameterizedClass(name="buffer-{0}")
@ParameterizedClass(name = "buffer-{0}")
@MethodSource("params")
public class TestLocalFSContractVectoredRead extends AbstractContractVectoredReadTest {

private long initialBytesRead;

public TestLocalFSContractVectoredRead(final String bufferType) {
super(bufferType);
}
Expand Down Expand Up @@ -87,28 +95,28 @@ public void testChecksumValidationDuringVectoredReadSmallFile() throws Exception
* @throws Exception any exception other than ChecksumException
*/
private void validateCheckReadException(Path testPath,
int length,
List<FileRange> ranges) throws Exception {
int length,
List<FileRange> ranges) throws Exception {
LocalFileSystem localFs = (LocalFileSystem) getFileSystem();
final byte[] datasetCorrect = ContractTestUtils.dataset(length, 'a', 32);
try (FSDataOutputStream out = localFs.create(testPath, true)){
try (FSDataOutputStream out = localFs.create(testPath, true)) {
out.write(datasetCorrect);
}
Path checksumPath = localFs.getChecksumFile(testPath);
Assertions.assertThat(localFs.exists(checksumPath))
.describedAs("Checksum file should be present")
.isTrue();
.describedAs("Checksum file should be present")
.isTrue();
CompletableFuture<FSDataInputStream> fis = localFs.openFile(testPath).build();
try (FSDataInputStream in = fis.get()){
try (FSDataInputStream in = fis.get()) {
in.readVectored(ranges, getAllocate());
validateVectoredReadResult(ranges, datasetCorrect, 0);
}
final byte[] datasetCorrupted = ContractTestUtils.dataset(length, 'a', 64);
try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)){
try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)) {
out.write(datasetCorrupted);
}
CompletableFuture<FSDataInputStream> fisN = localFs.openFile(testPath).build();
try (FSDataInputStream in = fisN.get()){
try (FSDataInputStream in = fisN.get()) {
in.readVectored(ranges, getAllocate());
// Expect checksum exception when data is updated directly through
// raw local fs instance.
Expand All @@ -123,20 +131,68 @@ public void tesChecksumVectoredReadBoundaries() throws Exception {
final int length = 1071;
LocalFileSystem localFs = (LocalFileSystem) getFileSystem();
final byte[] datasetCorrect = ContractTestUtils.dataset(length, 'a', 32);
try (FSDataOutputStream out = localFs.create(testPath, true)){
try (FSDataOutputStream out = localFs.create(testPath, true)) {
out.write(datasetCorrect);
}
Path checksumPath = localFs.getChecksumFile(testPath);
Assertions.assertThat(localFs.exists(checksumPath))
.describedAs("Checksum file should be present at {} ", checksumPath)
.isTrue();
.describedAs("Checksum file should be present at {} ", checksumPath)
.isTrue();
CompletableFuture<FSDataInputStream> fis = localFs.openFile(testPath).build();
List<FileRange> smallRange = new ArrayList<>();
smallRange.add(FileRange.createFileRange(1000, 71));
try (FSDataInputStream in = fis.get()){
try (FSDataInputStream in = fis.get()) {
in.readVectored(smallRange, getAllocate());
validateVectoredReadResult(smallRange, datasetCorrect, 0);
}
}

/**
* subclass so that the bytes read count can be cached before the test run.
*/
@Test
@Override
public void testVectoredReadMultipleRanges() throws Exception {
initialBytesRead = getBytesRead();
super.testVectoredReadMultipleRanges();
}

/**
* Validate statistics.
* Sometimes the tests failed with more than expected read, so the assertions are on
* {@code isGreaterThanOrEqualTo()} rather than exact values.
*/
@Override
protected void assertionsWithinTestVectoredReadMultipleRanges(
final FSDataInputStream in,
final List<FileRange> fileRanges) {

// check the iostats
final long totalVectorReadLength = fileRanges.stream().mapToLong(FileRange::getLength).sum();
final IOStatistics stats = in.getIOStatistics();
assertThatStatisticCounter(stats, STREAM_READ_VECTORED_OPERATIONS)
.describedAs(STREAM_READ_VECTORED_OPERATIONS + " stream %s", stats)
.isEqualTo(1);
assertThatStatisticCounter(stats, STREAM_READ_BYTES)
.describedAs(STREAM_READ_BYTES + " in bytes read in stream %s", stats)
.isGreaterThanOrEqualTo(totalVectorReadLength);

// validate filesystem stats, went up by at least that amount.
// expect counting of other things, crc files in particular
long currentBytesRead = getBytesRead();
assertThat(currentBytesRead)
.describedAs("bytes read in stream %s", in)
.isGreaterThanOrEqualTo(initialBytesRead + totalVectorReadLength);
}

/**
* API is deprecated, but Spark uses it, and it's how the regression was found.
* this is how the production code looks at our stats.
* @return counter of bytes read across all stores. Never reset.
*/
private static long getBytesRead() {
AtomicLong bytes = new AtomicLong();
FileSystem.getAllStatistics().forEach(st -> bytes.addAndGet(st.getBytesRead()));
return bytes.get();
}
}
Loading