Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable short circuit IO for Dora by default #16892

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
600c84e
Remove the restriction of UFS for SDK
JiamingMai Nov 21, 2022
1e8670c
Add property key to enable dora client read location policy
beinan Dec 1, 2022
97b08d2
Implement the skeleton of dora worker
beinan Dec 5, 2022
2795075
[SIKPCI]Add method to get file status
JiamingMai Dec 7, 2022
ad2dab6
Fix and improve dora test
beinan Dec 8, 2022
c736475
Before solving conflicts
LuQQiu Dec 9, 2022
35072d7
solve conflicts
LuQQiu Dec 9, 2022
83ac569
Merge branch 'master' of github.com:Alluxio/alluxio into dora
LuQQiu Dec 9, 2022
a5ea9c8
Support S3 range read with buffer cache
LuQQiu Dec 22, 2022
932c7e4
Remove unneeded isFile call in UfsBaseFileSystem.getStatus
LuQQiu Dec 28, 2022
732ba42
Remove redundant get file status call in fuse.open()
LuQQiu Dec 28, 2022
d063972
Remove some overhead in big file read
LuQQiu Dec 28, 2022
459d766
Enable default metadata cache for FUSE SDK
LuQQiu Dec 30, 2022
a00a787
Implement Dora worker
dbw9580 Jan 5, 2023
9690209
Fix NPE in PermissionCheckTest
dbw9580 Jan 9, 2023
9cd870e
Set default fuse version to 3 and support setting via command line
LuQQiu Jan 9, 2023
ebcd8fd
Add missing proto lock content
qian0817 Jan 3, 2023
bc44b65
Implement Dora client side affinity
beinan Jan 10, 2023
4e47a0d
Enable data transmission with Netty
JiamingMai Jan 11, 2023
74500b7
Implement getChannel for PagedBlockReader
dbw9580 Jan 11, 2023
8bd3ee4
Implement Worker metadata caching
beinan Jan 13, 2023
3051ba2
Fix hanging page store test
LuQQiu Jan 19, 2023
c7a272f
Fix Fuse SDK with local cache
LuQQiu Jan 20, 2023
a7fc9e8
Support FUSE SDK with dora
LuQQiu Jan 24, 2023
a24e5f0
Use alluxio fuse sdk script only
LuQQiu Jan 24, 2023
f0a9d8b
Implement local cache invalidate method
beinan Feb 2, 2023
f67b183
Add RocksDB metadata store for Dora
huanghua78 Feb 3, 2023
368a55a
Fix data not being cached by dora worker
dbw9580 Feb 3, 2023
71a59d1
Fix bug that paged block store can neither write nor cache after reading
JiamingMai Feb 3, 2023
7df6228
Add load to Dora
jja725 Feb 10, 2023
5fcd53b
Disable short circuit IO for Dora
dbw9580 Feb 15, 2023
6465198
Fix unit test
dbw9580 Feb 16, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/client/fs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.alluxio</groupId>
<artifactId>alluxio-underfs-local</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public static BlockInStream create(FileSystemContext context, BlockInfo info,
alluxioConf.getBoolean(PropertyKey.USER_SHORT_CIRCUIT_PREFERRED);
boolean sourceSupportsDomainSocket = NettyUtils.isDomainSocketSupported(dataSource);
boolean sourceIsLocal = dataSourceType == BlockInStreamSource.NODE_LOCAL;
boolean nettyTransEnabled =
alluxioConf.getBoolean(PropertyKey.USER_NETTY_DATA_TRANSMISSION_ENABLED);

// Short circuit is enabled when
// 1. data source is local node
Expand All @@ -136,6 +138,18 @@ public static BlockInStream create(FileSystemContext context, BlockInfo info,
}
}

// use Netty to transfer data
if (nettyTransEnabled) {
// TODO(JiamingMai): implement this logic
LOG.debug("Creating Netty input stream for block {} @ {} from client {} reading through {} ("
+ "data locates in the local worker {}, shortCircuitEnabled {}, "
+ "shortCircuitPreferred {}, sourceSupportDomainSocket {})",
blockId, dataSource, NetworkAddressUtils.getClientHostName(alluxioConf), dataSource,
sourceIsLocal, shortCircuit, shortCircuitPreferred, sourceSupportsDomainSocket);
return createNettyBlockInStream(context, dataSource, dataSourceType, blockId,
blockSize, options);
}

// gRPC
LOG.debug("Creating gRPC input stream for block {} @ {} from client {} reading through {} ("
+ "data locates in the local worker {}, shortCircuitEnabled {}, "
Expand Down Expand Up @@ -190,6 +204,32 @@ private static BlockInStream createLocalBlockInStream(FileSystemContext context,
address, BlockInStreamSource.NODE_LOCAL, blockId, length);
}

/**
* Creates a {@link BlockInStream} to read from a Netty data server.
*
* @param context the file system context
* @param address the address of the gRPC data server
* @param blockSource the source location of the block
* @param blockSize the block size
* @param blockId the block id
* @return the {@link BlockInStream} created
*/
private static BlockInStream createNettyBlockInStream(FileSystemContext context,
WorkerNetAddress address, BlockInStreamSource blockSource,
long blockId, long blockSize, InStreamOptions options) {
AlluxioConfiguration conf = context.getClusterConf();
long chunkSize = conf.getBytes(
PropertyKey.USER_STREAMING_READER_CHUNK_SIZE_BYTES);
// Construct the partial read request
Protocol.ReadRequest.Builder builder = Protocol.ReadRequest.newBuilder()
.setBlockId(blockId)
.setPromote(ReadType.fromProto(options.getOptions().getReadType()).isPromote())
.setOpenUfsBlockOptions(options.getOpenUfsBlockOptions(blockId)) // Add UFS fallback options
.setChunkSize(chunkSize);
DataReader.Factory factory = new NettyDataReader.Factory(context, address, builder);
return new BlockInStream(factory, address, blockSource, blockId, blockSize);
}

/**
* Creates a {@link BlockInStream} to read from a gRPC data server.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import alluxio.grpc.ClearMetricsResponse;
import alluxio.grpc.CreateLocalBlockRequest;
import alluxio.grpc.CreateLocalBlockResponse;
import alluxio.grpc.GetStatusPRequest;
import alluxio.grpc.GetStatusPResponse;
import alluxio.grpc.GrpcServerAddress;
import alluxio.grpc.LoadRequest;
import alluxio.grpc.LoadResponse;
Expand All @@ -43,6 +45,7 @@
* gRPC client for worker communication.
*/
public interface BlockWorkerClient extends Closeable {

/**
* Factory for block worker client.
*/
Expand Down Expand Up @@ -161,4 +164,12 @@ StreamObserver<OpenLocalBlockRequest> openLocalBlock(
* @throws StatusRuntimeException if any error occurs
*/
ListenableFuture<LoadResponse> load(LoadRequest request);

/**
* get file status.
*
* @param request
* @return status
*/
GetStatusPResponse getStatus(GetStatusPRequest request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public static DataWriter create(FileSystemContext context, long blockId, long bl
boolean ufsFallbackEnabled = options.getWriteType() == WriteType.ASYNC_THROUGH
&& alluxioConf.getBoolean(PropertyKey.USER_FILE_UFS_TIER_ENABLED);
boolean workerIsLocal = CommonUtils.isLocalHost(address, alluxioConf);
boolean nettyTransEnabled =
alluxioConf.getBoolean(PropertyKey.USER_NETTY_DATA_TRANSMISSION_ENABLED);

if (workerIsLocal && context.hasProcessLocalWorker() && !ufsFallbackEnabled) {
LOG.debug("Creating worker process local output stream for block {} @ {}",
Expand All @@ -84,16 +86,28 @@ public static DataWriter create(FileSystemContext context, long blockId, long bl
}
LOG.debug("Creating short circuit output stream for block {} @ {}", blockId, address);
return LocalFileDataWriter.create(context, address, blockId, blockSize, options);
} else {
LOG.debug("Creating gRPC output stream for block {} @ {} from client {} "
+ "(data locates in local worker: {}, shortCircuitEnabled: {}, "
+ "shortCircuitPreferred: {}, domainSocketSupported: {})",
}

if (nettyTransEnabled) {
LOG.debug("Creating netty output stream for block {} @ {} from client {} "
+ "(data locates in local worker: {}, shortCircuitEnabled: {}, "
+ "shortCircuitPreferred: {}, domainSocketSupported: {})",
blockId, address, NetworkAddressUtils.getClientHostName(alluxioConf),
workerIsLocal, shortCircuit, shortCircuitPreferred, domainSocketSupported);
return GrpcDataWriter
// TODO(JiamingMai): implement the netty writer here
return NettyDataWriter
.create(context, address, blockId, blockSize, RequestType.ALLUXIO_BLOCK,
options);
}

LOG.debug("Creating gRPC output stream for block {} @ {} from client {} "
+ "(data locates in local worker: {}, shortCircuitEnabled: {}, "
+ "shortCircuitPreferred: {}, domainSocketSupported: {})",
blockId, address, NetworkAddressUtils.getClientHostName(alluxioConf),
workerIsLocal, shortCircuit, shortCircuitPreferred, domainSocketSupported);
return GrpcDataWriter
.create(context, address, blockId, blockSize, RequestType.ALLUXIO_BLOCK,
options);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import alluxio.grpc.DataMessageMarshaller;
import alluxio.grpc.DataMessageMarshallerProvider;
import alluxio.grpc.FreeWorkerRequest;
import alluxio.grpc.GetStatusPRequest;
import alluxio.grpc.GetStatusPResponse;
import alluxio.grpc.GrpcChannel;
import alluxio.grpc.GrpcChannelBuilder;
import alluxio.grpc.GrpcNetworkGroup;
Expand Down Expand Up @@ -248,4 +250,10 @@ public void freeWorker() {
public ListenableFuture<LoadResponse> load(LoadRequest request) {
return mRpcFutureStub.load(request);
}

@Override
public GetStatusPResponse getStatus(GetStatusPRequest request) {
return mRpcBlockingStub.withDeadlineAfter(mRpcTimeoutMs, TimeUnit.MILLISECONDS)
.getStatus(request);
}
}
Loading