Skip to content

Commit

Permalink
tmp save
Browse files Browse the repository at this point in the history
  • Loading branch information
hfutatzhanghb committed Mar 4, 2025
1 parent 31d29e5 commit eb4b395
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2225,7 +2225,7 @@ protected static FsPermission getParentPermission(final FsPermission mask) {
* @return New HDFS file status representing a mount point.
*/
@VisibleForTesting
protected HdfsFileStatus getMountPointStatus(
public HdfsFileStatus getMountPointStatus(
String name, int childrenNum, long date) {
return getMountPointStatus(name, childrenNum, date, true);
}
Expand All @@ -2240,7 +2240,7 @@ protected HdfsFileStatus getMountPointStatus(
* @return New HDFS file status representing a mount point.
*/
@VisibleForTesting
protected HdfsFileStatus getMountPointStatus(
public HdfsFileStatus getMountPointStatus(
String name, int childrenNum, long date, boolean setPath) {
long modTime = date;
long accessTime = date;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
Expand All @@ -43,6 +44,7 @@
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException;
import org.apache.hadoop.hdfs.server.federation.router.NoLocationException;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
import org.apache.hadoop.hdfs.server.federation.router.RemoteParam;
import org.apache.hadoop.hdfs.server.federation.router.RemoteResult;
Expand Down Expand Up @@ -83,6 +85,7 @@
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncComplete;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCompleteWith;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncFinally;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncForEach;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncReturn;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncTry;
Expand All @@ -104,6 +107,8 @@ public class RouterAsyncClientProtocol extends RouterClientProtocol {
private final boolean allowPartialList;
/** Time out when getting the mount statistics. */
private long mountStatusTimeOut;
/** Default nameservice enabled. */
private final boolean defaultNameServiceEnabled;
/** Identifier for the super user. */
private String superUser;
/** Identifier for the super group. */
Expand All @@ -126,6 +131,9 @@ public RouterAsyncClientProtocol(Configuration conf, RouterRpcServer rpcServer)
this.mountStatusTimeOut = getMountStatusTimeOut();
this.superUser = getSuperUser();
this.superGroup = getSuperGroup();
this.defaultNameServiceEnabled = conf.getBoolean(
RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE,
RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE_DEFAULT);
}

@Override
Expand Down Expand Up @@ -348,7 +356,6 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent)
return rpcClient.invokeAll(locations, method);
}

asyncComplete(false);
if (locations.size() > 1) {
// Check if this directory already exists
asyncTry(() -> {
Expand All @@ -367,14 +374,23 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent)
src, ex.getMessage());
return false;
}, IOException.class);
}
asyncApply((AsyncApplyFunction<Boolean, Boolean>)ret -> {
if (!ret) {
final RemoteLocation firstLocation = locations.get(0);
asyncTry(() -> {
rpcClient.invokeSingle(firstLocation, method, Boolean.class);
});

final RemoteLocation firstLocation = locations.get(0);
asyncApply((AsyncApplyFunction<Boolean, Boolean>) success -> {
if (success) {
asyncComplete(true);
return;
}
asyncCatch((AsyncCatchFunction<Object, IOException>) (o, ioe) -> {
final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
method, src, ioe, firstLocation, locations);
rpcClient.invokeSequential(
newLocations, method, Boolean.class, Boolean.TRUE);
}, IOException.class);
}
});
} else {
final RemoteLocation firstLocation = locations.get(0);
asyncTry(() -> {
rpcClient.invokeSingle(firstLocation, method, Boolean.class);
});
Expand All @@ -385,7 +401,7 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent)
rpcClient.invokeSequential(
newLocations, method, Boolean.class, Boolean.TRUE);
}, IOException.class);
});
}

return asyncReturn(Boolean.class);
}
Expand Down Expand Up @@ -480,6 +496,7 @@ public DirectoryListing getListing(
return null;
});
});
boolean finalNamenodeListingExists = namenodeListingExists;
asyncApply(o -> {
// Update the remaining count to include left mount points
if (nnListing.size() > 0) {
Expand All @@ -492,10 +509,12 @@ public DirectoryListing getListing(
}
}
}
return null;
return finalNamenodeListingExists;
});
} else {
asyncComplete(namenodeListingExists);
}
asyncComplete(namenodeListingExists);

asyncApply((ApplyFunction<Boolean, Object>) exists -> {
if (!exists && nnListing.size() == 0 && children == null) {
// NN returns a null object if the directory cannot be found and has no
Expand All @@ -519,29 +538,26 @@ public DirectoryListing getListing(
@Override
protected List<RemoteResult<RemoteLocation, DirectoryListing>> getListingInt(
String src, byte[] startAfter, boolean needLocation) throws IOException {
List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, false, false);
// Locate the dir and fetch the listing.
if (locations.isEmpty()) {
asyncComplete(new ArrayList<>());
return asyncReturn(List.class);
}
asyncTry(() -> {
try {
List<RemoteLocation> locations =
rpcServer.getLocationsForPath(src, false, false);
// Locate the dir and fetch the listing.
if (locations.isEmpty()) {
asyncComplete(new ArrayList<>());
return asyncReturn(List.class);
}
RemoteMethod method = new RemoteMethod("getListing",
new Class<?>[] {String.class, startAfter.getClass(), boolean.class},
new RemoteParam(), startAfter, needLocation);
rpcClient.invokeConcurrent(locations, method, false, -1,
DirectoryListing.class);
});
asyncCatch((CatchFunction<List, RouterResolveException>) (o, e) -> {
} catch (NoLocationException | RouterResolveException e) {
LOG.debug("Cannot get locations for {}, {}.", src, e.getMessage());
LOG.info("Cannot get locations for {}, {}.", src, e.getMessage());
return new ArrayList<>();
}, RouterResolveException.class);
asyncComplete(new ArrayList<>());
}
return asyncReturn(List.class);
}


@Override
public HdfsFileStatus getFileInfo(String src) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
Expand All @@ -553,8 +569,10 @@ public HdfsFileStatus getFileInfo(String src) throws IOException {
new Class<?>[] {String.class}, new RemoteParam());
// If it's a directory, we check in all locations
if (rpcServer.isPathAll(src)) {
LOG.info("BZL#Test, rpcServer.isPathAll branch.");
getFileInfoAll(locations, method);
} else {
LOG.info("BZL#Test, rpcClient.invokeSequential branch.");
// Check for file information sequentially
rpcClient.invokeSequential(locations, method, HdfsFileStatus.class, null);
}
Expand All @@ -564,21 +582,24 @@ public HdfsFileStatus getFileInfo(String src) throws IOException {
|| e instanceof RouterResolveException) {
noLocationException[0] = e;
}
throw e;
LOG.info("BZL#Test, async catch branch.");
return null;
}, IOException.class);

asyncApply((AsyncApplyFunction<HdfsFileStatus, Object>) ret -> {
// If there is no real path, check mount points
if (ret == null) {
List<String> children = subclusterResolver.getMountPoints(src);
if (children != null && !children.isEmpty()) {
LOG.info("BZL#Test, if 1st branch.");
Map<String, Long> dates = getMountPointDates(src);
long date = 0;
if (dates != null && dates.containsKey(src)) {
date = dates.get(src);
}
getMountPointStatus(src, children.size(), date, false);
} else if (children != null) {
LOG.info("BZL#Test, children != null branch");
// The src is a mount point, but there are no files or directories
getMountPointStatus(src, 0, 0, false);
} else {
Expand All @@ -590,7 +611,6 @@ public HdfsFileStatus getFileInfo(String src) throws IOException {
if (result == null && noLocationException[0] != null) {
throw noLocationException[0];
}

return result;
});
} else {
Expand Down Expand Up @@ -639,7 +659,14 @@ public HdfsFileStatus getMountPointStatus(
final int[] childrenNums = new int[]{childrenNum};
final EnumSet<HdfsFileStatus.Flags>[] flags =
new EnumSet[]{EnumSet.noneOf(HdfsFileStatus.Flags.class)};
asyncComplete(null);
long inodeId = 0;
HdfsFileStatus.Builder builder = new HdfsFileStatus.Builder();
if (setPath) {
Path path = new Path(name);
String nameStr = path.getName();
builder.path(DFSUtil.string2Bytes(nameStr));
}

if (getSubclusterResolver() instanceof MountTableResolver) {
asyncTry(() -> {
String mName = name.startsWith("/") ? name : "/" + name;
Expand All @@ -664,13 +691,35 @@ public HdfsFileStatus getMountPointStatus(
.getFlags(fInfo.isEncrypted(), fInfo.isErasureCoded(),
fInfo.isSnapshotEnabled(), fInfo.hasAcl());
}
return fInfo;
return builder.isdir(true)
.mtime(modTime)
.atime(accessTime)
.perm(permission[0])
.owner(owner[0])
.group(group[0])
.symlink(new byte[0])
.fileId(inodeId)
.children(childrenNums[0])
.flags(flags[0])
.build();
});
} else {
asyncComplete(builder.isdir(true)
.mtime(modTime)
.atime(accessTime)
.perm(permission[0])
.owner(owner[0])
.group(group[0])
.symlink(new byte[0])
.fileId(inodeId)
.children(childrenNums[0])
.flags(flags[0])
.build());
}
});
asyncCatch((CatchFunction<HdfsFileStatus, IOException>) (status, e) -> {
LOG.error("Cannot get mount point: {}", e.getMessage());
return status;
return null;
}, IOException.class);
} else {
try {
Expand All @@ -684,37 +733,27 @@ public HdfsFileStatus getMountPointStatus(
} else {
LOG.debug(msg);
}
} finally {
asyncComplete(builder.isdir(true)
.mtime(modTime)
.atime(accessTime)
.perm(permission[0])
.owner(owner[0])
.group(group[0])
.symlink(new byte[0])
.fileId(inodeId)
.children(childrenNums[0])
.flags(flags[0])
.build());
}
}
long inodeId = 0;
HdfsFileStatus.Builder builder = new HdfsFileStatus.Builder();
asyncApply((ApplyFunction<HdfsFileStatus, HdfsFileStatus>) status -> {
if (setPath) {
Path path = new Path(name);
String nameStr = path.getName();
builder.path(DFSUtil.string2Bytes(nameStr));
}

return builder.isdir(true)
.mtime(modTime)
.atime(accessTime)
.perm(permission[0])
.owner(owner[0])
.group(group[0])
.symlink(new byte[0])
.fileId(inodeId)
.children(childrenNums[0])
.flags(flags[0])
.build();
});
return asyncReturn(HdfsFileStatus.class);
}

@Override
protected HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
final RemoteMethod method, long timeOutMs) throws IOException {

asyncComplete(null);
// Get the file info from everybody
rpcClient.invokeConcurrent(locations, method, false, false, timeOutMs,
HdfsFileStatus.class);
Expand Down Expand Up @@ -1086,4 +1125,35 @@ public boolean isMultiDestDirectory(String src) {

return asyncReturn(boolean.class);
}

@Override
public Path getEnclosingRoot(String src) throws IOException {
final Path[] mountPath = new Path[1];
if (defaultNameServiceEnabled) {
mountPath[0] = new Path("/");
}

if (subclusterResolver instanceof MountTableResolver) {
MountTableResolver mountTable = (MountTableResolver) subclusterResolver;
if (mountTable.getMountPoint(src) != null) {
mountPath[0] = new Path(mountTable.getMountPoint(src).getSourcePath());
}
}

if (mountPath[0] == null) {
throw new IOException(String.format("No mount point for %s", src));
}

getEZForPath(src);
asyncApply((ApplyFunction<EncryptionZone, Path>)zone -> {
if (zone == null) {
return mountPath[0];
} else {
Path zonePath = new Path(zone.getPath());
return zonePath.depth() > mountPath[0].depth() ? zonePath : mountPath[0];
}
});
return asyncReturn(Path.class);
}

}
Loading

0 comments on commit eb4b395

Please sign in to comment.