-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-1593. [ABFS] Add vectored read support in ABFS driver #8400
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from 28 commits
8c28b18
08617b7
95cbb73
d03d3cd
1e62df3
3f36997
c4313e9
5043c90
0106fc1
1430b4a
0927ca1
08ca94a
02834ef
64e5da6
eb52c29
1ea571e
279e7f4
7de0740
7c46352
25fa821
5b2632a
437ffc8
975bf73
ba7c9ff
e7bf14a
4f8f755
9df8272
12b69dc
cfcb033
3c6ad3e
339d7e3
4bd2ec8
7899f50
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -627,5 +627,31 @@ public static String containerProperty(String property, String fsName, String ac | |
| */ | ||
| public static final String FS_AZURE_TAIL_LATENCY_MAX_RETRY_COUNT = "fs.azure.tail.latency.max.retry.count"; | ||
|
|
||
| /** | ||
| * Configuration key to control the vectored read strategy used by ABFS: {@value} | ||
| */ | ||
| public static final String FS_AZURE_VECTORED_READ_STRATEGY = "fs.azure.vectored.read.strategy"; | ||
|
|
||
| /** | ||
| * Configuration key that defines the minimum gap between adjacent read ranges | ||
| * for merging ranges during vectored reads in ABFS: {@value}. | ||
| */ | ||
| public static final String FS_AZURE_MIN_SEEK_FOR_VECTORED_READS = | ||
| "fs.azure.min.seek.for.vectored.reads"; | ||
|
|
||
| /** | ||
| * Configuration key that defines the maximum gap between adjacent read ranges | ||
| * for merging ranges during vectored reads in ABFS: {@value}. | ||
| */ | ||
| public static final String FS_AZURE_MAX_SEEK_FOR_VECTORED_READS = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be good if you use consistent parameter names. |
||
| "fs.azure.max.seek.for.vectored.reads"; | ||
|
|
||
| /** | ||
| * Configuration key that defines the maximum gap between adjacent read ranges | ||
| * for merging ranges during vectored reads in ABFS throughput optimized: {@value}. | ||
| */ | ||
| public static final String FS_AZURE_MAX_SEEK_FOR_VECTORED_READS_THROUGHPUT = | ||
| "fs.azure.max.seek.for.vectored.reads.throughput"; | ||
|
|
||
| private ConfigurationKeys() {} | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,6 +48,16 @@ public enum ReadType { | |
| * Only triggered when small file read optimization kicks in. | ||
| */ | ||
| SMALLFILE_READ("SR"), | ||
| /** | ||
| * Read multiple disjoint ranges from the storage service using vectored reads. | ||
| * Used to coalesce and execute non-contiguous reads efficiently. | ||
| */ | ||
| VECTORED_READ("VR"), | ||
| /** | ||
| * Performs a vectored direct read by fetching multiple non-contiguous | ||
| * ranges in a single operation. | ||
| */ | ||
| VECTORED_DIRECT_READ("VDR"), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. azure does multirange? nice
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here this direct read is when we are not able to queue a vectored read in the read ahead queue due to read ahead buffers not being available, we do a direct readRemote call |
||
| /** | ||
| * Reads from Random Input Stream with read ahead up to readAheadRange | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.fs.azurebfs.enums; | ||
|
|
||
| import org.apache.hadoop.fs.azurebfs.services.ReadBuffer; | ||
|
|
||
| /** | ||
| * Enum for buffer types. | ||
| * Used in {@link ReadBuffer} to separate normal vs vectored read. | ||
| */ | ||
| public enum BufferType { | ||
| /** | ||
| * Normal read buffer. | ||
| */ | ||
| NORMAL, | ||
| /** | ||
| * Vectored read buffer. | ||
| */ | ||
| VECTORED | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,77 @@ | ||
| /** | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.fs.azurebfs.enums; | ||
|
|
||
| /** | ||
| * Defines the strategy used for vectored reads in ABFS. | ||
| * | ||
| * <p> | ||
| * The strategy controls how read ranges are planned and executed, trading off | ||
| * between request parallelism and per-request payload size. | ||
| * </p> | ||
| */ | ||
| public enum VectoredReadStrategy { | ||
|
|
||
| /** | ||
| * Optimizes for transactions per second (TPS). | ||
| */ | ||
| TPS_OPTIMIZED("TPS"), | ||
|
|
||
| /** | ||
| * Optimizes for overall data throughput. | ||
| */ | ||
| THROUGHPUT_OPTIMIZED("THROUGHPUT"); | ||
|
|
||
| /** Short name used for configuration and logging. */ | ||
| private final String name; | ||
|
|
||
| /** | ||
| * Constructs a vectored read strategy with a short, user-friendly name. | ||
| * | ||
| * @param name short identifier for the strategy | ||
| */ | ||
| VectoredReadStrategy(String name) { | ||
| this.name = name; | ||
| } | ||
|
|
||
| /** | ||
| * Returns the short name of the vectored read strategy. | ||
| * | ||
| * @return short strategy name | ||
| */ | ||
| public String getName() { | ||
| return name; | ||
| } | ||
|
|
||
| /** | ||
| * Parses a configuration value into a {@link VectoredReadStrategy}. | ||
| * @param value configuration value | ||
| * @return matching vectored read strategy | ||
| * @throws IllegalArgumentException if the value is invalid | ||
| */ | ||
| public static VectoredReadStrategy fromString(String value) { | ||
| for (VectoredReadStrategy strategy : values()) { | ||
| if (strategy.name().equalsIgnoreCase(value) | ||
| || strategy.getName().equalsIgnoreCase(value)) { | ||
| return strategy; | ||
| } | ||
| } | ||
| throw new IllegalArgumentException("Invalid vectored read strategy: " + value); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,10 +22,14 @@ | |
| import java.io.FileNotFoundException; | ||
| import java.io.IOException; | ||
| import java.net.HttpURLConnection; | ||
| import java.nio.ByteBuffer; | ||
| import java.util.List; | ||
| import java.util.UUID; | ||
| import java.util.function.IntFunction; | ||
|
|
||
| import org.apache.commons.lang3.StringUtils; | ||
| import org.apache.hadoop.classification.VisibleForTesting; | ||
| import org.apache.hadoop.fs.FileRange; | ||
| import org.apache.hadoop.fs.azurebfs.constants.ReadType; | ||
| import org.apache.hadoop.fs.impl.BackReference; | ||
| import org.apache.hadoop.util.Preconditions; | ||
|
|
@@ -54,8 +58,6 @@ | |
| import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; | ||
| import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN; | ||
| import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD; | ||
| import static org.apache.hadoop.io.Sizes.S_128K; | ||
| import static org.apache.hadoop.io.Sizes.S_2M; | ||
| import static org.apache.hadoop.util.StringUtils.toLowerCase; | ||
|
|
||
| /** | ||
|
|
@@ -125,6 +127,7 @@ public abstract class AbfsInputStream extends FSInputStream implements CanUnbuff | |
| private final AbfsInputStreamContext context; | ||
| private IOStatistics ioStatistics; | ||
| private String filePathIdentifier; | ||
|
|
||
| /** | ||
| * This is the actual position within the object, used by | ||
| * lazy seek to decide whether to seek on the next read or not. | ||
|
|
@@ -199,7 +202,7 @@ public AbfsInputStream( | |
| readAheadBlockSize, client.getAbfsConfiguration()); | ||
| readBufferManager = ReadBufferManagerV2.getBufferManager(client.getAbfsCounters()); | ||
| } else { | ||
| ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize); | ||
| ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize, client.getAbfsConfiguration()); | ||
| readBufferManager = ReadBufferManagerV1.getBufferManager(); | ||
| } | ||
|
|
||
|
|
@@ -220,6 +223,14 @@ private String createInputStreamId() { | |
| return StringUtils.right(UUID.randomUUID().toString(), STREAM_ID_LEN); | ||
| } | ||
|
|
||
| /** | ||
| * Retrieves the handler responsible for processing vectored read requests. | ||
| * @return the {@link VectoredReadHandler} instance associated with the buffer manager. | ||
| */ | ||
| VectoredReadHandler getVectoredReadHandler() { | ||
| return getReadBufferManager().getVectoredReadHandler(); | ||
| } | ||
|
|
||
| @Override | ||
| public int read(long position, byte[] buffer, int offset, int length) | ||
| throws IOException { | ||
|
|
@@ -331,6 +342,19 @@ public synchronized int read(final byte[] b, final int off, final int len) throw | |
| return totalReadBytes > 0 ? totalReadBytes : lastReadBytes; | ||
| } | ||
|
|
||
| /** | ||
| * {@inheritDoc} | ||
| * Vectored read implementation for AbfsInputStream. | ||
| * | ||
| * @param ranges the byte ranges to read. | ||
| * @param allocate the function to allocate ByteBuffer. | ||
| */ | ||
| @Override | ||
| public void readVectored(List<? extends FileRange> ranges, | ||
| IntFunction<ByteBuffer> allocate) throws EOFException { | ||
| getVectoredReadHandler().readVectored(this, ranges, allocate); | ||
| } | ||
|
|
||
| private boolean shouldReadFully() { | ||
| return this.firstRead && this.context.readSmallFilesCompletely() | ||
| && this.contentLength <= this.bufferSize; | ||
|
|
@@ -795,7 +819,10 @@ public synchronized void unbuffer() { | |
|
|
||
| @Override | ||
| public boolean hasCapability(String capability) { | ||
| return StreamCapabilities.UNBUFFER.equals(toLowerCase(capability)); | ||
| return switch (toLowerCase(capability)) { | ||
| case StreamCapabilities.UNBUFFER, StreamCapabilities.VECTOREDIO -> true; | ||
| default -> false; | ||
| }; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1069,7 +1096,7 @@ ReadBufferManager getReadBufferManager() { | |
| */ | ||
| @Override | ||
| public int minSeekForVectorReads() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm actually going to recommend not coalescing vectors unless you have a good strategy to deal with partial failures, memory releases, retries etc. I did try to do that in s3a and gave up, even after extending the api to allow a "release" operation to be passed (which parquet passes down FWIW). We've never seen failures with s3 in production and coalesced ranges as parquet/orc rowgroups are too far apart. So it's better to focus on retry and recovery there than range coalescing
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In case of Azure, we do 4 MB buffer reads, so if don't do coalescing, we are wasting a lot of read data per range and hence we found that merging ranges which fall in one buffer i.e. 4 MB is giving better results |
||
| return S_128K; | ||
| return client.getAbfsConfiguration().getMinSeekForVectoredReads(); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -1078,7 +1105,7 @@ public int minSeekForVectorReads() { | |
| */ | ||
| @Override | ||
| public int maxReadSizeForVectorReads() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the method mentions the read size but we're returning the max gap size
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is the name of the method in the superclass
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But why are we returning the seek size here? Why not actual read size? |
||
| return S_2M; | ||
| return client.getAbfsConfiguration().getMaxSeekForVectoredReads(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should be in lines of maxReadSizeForVectorReads |
||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why they can't run in parallel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, we are taking dependency on buffer states, running in parallel leads to buffer moving out of queues and hence moved it to sequential runs