From a20cfa9aee58bc11cf5ad3fc94f2ec1ae9a0eed9 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Sat, 1 Mar 2025 18:06:11 +0800 Subject: [PATCH] tmp save --- ...amenodeProtocolServerSideTranslatorPB.java | 2 + .../router/RouterClientProtocol.java | 4 +- .../federation/router/RouterRpcClient.java | 2 +- .../router/RouterStateIdContext.java | 2 +- .../router/RouterWebHdfsMethods.java | 12 +- .../async/RouterAsyncClientProtocol.java | 189 ++- .../router/TestRouterMountTable.java | 26 +- .../TestRouterMountTableWithoutDefaultNS.java | 18 +- .../async/TestObserverWithRouterAsync.java | 1055 +++++++++++++++++ .../async/TestRouterAsyncMountTable.java | 173 +++ ...RouterAsyncMountTableWithoutDefaultNS.java | 167 +++ .../async/TestRouterAsyncRpcSingleNS.java | 206 ++++ .../async/TestRouterAsyncWebHdfsMethods.java | 155 +++ .../fsdataset/impl/FsDatasetImpl.java | 4 +- 14 files changed, 1932 insertions(+), 83 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestObserverWithRouterAsync.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncMountTable.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncMountTableWithoutDefaultNS.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcSingleNS.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncWebHdfsMethods.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterClientNamenodeProtocolServerSideTranslatorPB.java index e70240681d27e..74c4a5c0d84c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterClientNamenodeProtocolServerSideTranslatorPB.java @@ -276,6 +276,7 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil.LOG; import static org.apache.hadoop.hdfs.protocolPB.AsyncRpcProtocolPBUtil.asyncRouterServer; public class RouterClientNamenodeProtocolServerSideTranslatorPB @@ -562,6 +563,7 @@ public MkdirsResponseProto mkdirs( PBHelperClient.convert(req.getMasked()); boolean result = server.mkdirs(req.getSrc(), masked, req.getCreateParent()); + LOG.info("BZL#mkdirs, result is {}", result); return result; }, result -> MkdirsResponseProto.newBuilder().setResult(result).build()); return null; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index f86905cacfb9a..d94aa75f426a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -2225,7 +2225,7 @@ protected static FsPermission getParentPermission(final FsPermission mask) { * @return New HDFS file status representing a mount point. */ @VisibleForTesting - protected HdfsFileStatus getMountPointStatus( + public HdfsFileStatus getMountPointStatus( String name, int childrenNum, long date) { return getMountPointStatus(name, childrenNum, date, true); } @@ -2240,7 +2240,7 @@ protected HdfsFileStatus getMountPointStatus( * @return New HDFS file status representing a mount point. */ @VisibleForTesting - protected HdfsFileStatus getMountPointStatus( + public HdfsFileStatus getMountPointStatus( String name, int childrenNum, long date, boolean setPath) { long modTime = date; long accessTime = date; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index c7c3699f33ec7..fe9162a79e428 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1994,7 +1994,7 @@ private static boolean isReadCall(Method method) { * @param nsId namespaceID */ @VisibleForTesting - boolean isNamespaceStateIdFresh(String nsId) { + public boolean isNamespaceStateIdFresh(String nsId) { if (activeNNStateIdRefreshPeriodMs < 0) { return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java index 21e3f16f20613..6a360bb4cb6ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java @@ -64,7 +64,7 @@ public class RouterStateIdContext implements AlignmentContext { /** Nameservice specific overrides of the default setting for enabling observer reads. */ private HashSet observerReadEnabledOverrides = new HashSet<>(); - RouterStateIdContext(Configuration conf) { + public RouterStateIdContext(Configuration conf) { this.coordinatedMethods = new HashSet<>(); // For now, only ClientProtocol methods can be coordinated, so only checking // against ClientProtocol. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java index 4689b2d036b04..1cf65bfda19a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.federation.router; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.apache.hadoop.util.StringUtils.getTrimmedStringCollection; import org.apache.hadoop.fs.InvalidPathException; @@ -28,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.common.JspHelper; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager; import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; @@ -478,8 +480,14 @@ private DatanodeInfo chooseDatanode(final Router router, if (op == PutOpParam.Op.CREATE) { try { - resolvedNs = rpcServer.getCreateLocation(path).getNameserviceId(); - } catch (IOException e) { + if (rpcServer.isAsync()) { + rpcServer.getCreateLocation(path); + RemoteLocation remoteLocation = syncReturn(RemoteLocation.class); + resolvedNs = remoteLocation.getNameserviceId(); + } else { + resolvedNs = rpcServer.getCreateLocation(path).getNameserviceId(); + } + } catch (Exception e) { LOG.error("Cannot get the name service " + "to create file for path {} ", path, e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java index f00ae63b902be..0fdff3a8427c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncClientProtocol.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; @@ -43,6 +44,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException; import org.apache.hadoop.hdfs.server.federation.router.NoLocationException; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; import org.apache.hadoop.hdfs.server.federation.router.RemoteParam; import org.apache.hadoop.hdfs.server.federation.router.RemoteResult; @@ -104,6 +106,8 @@ public class RouterAsyncClientProtocol extends RouterClientProtocol { private final boolean allowPartialList; /** Time out when getting the mount statistics. */ private long mountStatusTimeOut; + /** Default nameservice enabled. */ + private final boolean defaultNameServiceEnabled; /** Identifier for the super user. */ private String superUser; /** Identifier for the super group. */ @@ -126,6 +130,9 @@ public RouterAsyncClientProtocol(Configuration conf, RouterRpcServer rpcServer) this.mountStatusTimeOut = getMountStatusTimeOut(); this.superUser = getSuperUser(); this.superGroup = getSuperGroup(); + this.defaultNameServiceEnabled = conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE, + RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE_DEFAULT); } @Override @@ -348,7 +355,6 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent) return rpcClient.invokeAll(locations, method); } - asyncComplete(false); if (locations.size() > 1) { // Check if this directory already exists asyncTry(() -> { @@ -361,20 +367,32 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent) return false; }); }); - asyncCatch((ret, ex) -> { + asyncCatch((ret, ioe) -> { // Can't query if this file exists or not. LOG.error("Error getting file info for {} while proxying mkdirs: {}", - src, ex.getMessage()); + src, ioe.getMessage()); return false; }, IOException.class); - } - final RemoteLocation firstLocation = locations.get(0); - asyncApply((AsyncApplyFunction) success -> { - if (success) { - asyncComplete(true); - return; - } + asyncApply((AsyncApplyFunction)ret -> { + if (!ret) { + final RemoteLocation firstLocation = locations.get(0); + asyncTry(() -> { + rpcClient.invokeSingle(firstLocation, method, Boolean.class); + }); + + asyncCatch((CatchFunction) (o, ioe) -> { + final List newLocations = checkFaultTolerantRetry( + method, src, ioe, firstLocation, locations); + return rpcClient.invokeSequential( + newLocations, method, Boolean.class, Boolean.TRUE); + }, IOException.class); + } else { + asyncComplete(ret); + } + }); + } else { + final RemoteLocation firstLocation = locations.get(0); asyncTry(() -> { rpcClient.invokeSingle(firstLocation, method, Boolean.class); }); @@ -385,7 +403,7 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent) rpcClient.invokeSequential( newLocations, method, Boolean.class, Boolean.TRUE); }, IOException.class); - }); + } return asyncReturn(Boolean.class); } @@ -480,6 +498,7 @@ public DirectoryListing getListing( return null; }); }); + boolean finalNamenodeListingExists = namenodeListingExists; asyncApply(o -> { // Update the remaining count to include left mount points if (nnListing.size() > 0) { @@ -492,10 +511,12 @@ public DirectoryListing getListing( } } } - return null; + return finalNamenodeListingExists; }); + } else { + asyncComplete(namenodeListingExists); } - asyncComplete(namenodeListingExists); + asyncApply((ApplyFunction) exists -> { if (!exists && nnListing.size() == 0 && children == null) { // NN returns a null object if the directory cannot be found and has no @@ -519,29 +540,26 @@ public DirectoryListing getListing( @Override protected List> getListingInt( String src, byte[] startAfter, boolean needLocation) throws IOException { - List locations = - rpcServer.getLocationsForPath(src, false, false); - // Locate the dir and fetch the listing. - if (locations.isEmpty()) { - asyncComplete(new ArrayList<>()); - return asyncReturn(List.class); - } - asyncTry(() -> { + try { + List locations = + rpcServer.getLocationsForPath(src, false, false); + // Locate the dir and fetch the listing. + if (locations.isEmpty()) { + asyncComplete(new ArrayList<>()); + return asyncReturn(List.class); + } RemoteMethod method = new RemoteMethod("getListing", new Class[] {String.class, startAfter.getClass(), boolean.class}, new RemoteParam(), startAfter, needLocation); rpcClient.invokeConcurrent(locations, method, false, -1, DirectoryListing.class); - }); - asyncCatch((CatchFunction) (o, e) -> { + } catch (NoLocationException | RouterResolveException e) { LOG.debug("Cannot get locations for {}, {}.", src, e.getMessage()); - LOG.info("Cannot get locations for {}, {}.", src, e.getMessage()); - return new ArrayList<>(); - }, RouterResolveException.class); + asyncComplete(new ArrayList<>()); + } return asyncReturn(List.class); } - @Override public HdfsFileStatus getFileInfo(String src) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ); @@ -564,7 +582,7 @@ public HdfsFileStatus getFileInfo(String src) throws IOException { || e instanceof RouterResolveException) { noLocationException[0] = e; } - throw e; + return null; }, IOException.class); asyncApply((AsyncApplyFunction) ret -> { @@ -582,7 +600,11 @@ public HdfsFileStatus getFileInfo(String src) throws IOException { // The src is a mount point, but there are no files or directories getMountPointStatus(src, 0, 0, false); } else { + if (noLocationException[0] != null) { + throw noLocationException[0]; + } asyncComplete(null); + return; } asyncApply((ApplyFunction) result -> { // Can't find mount point for path and the path didn't contain any sub monit points, @@ -590,7 +612,6 @@ public HdfsFileStatus getFileInfo(String src) throws IOException { if (result == null && noLocationException[0] != null) { throw noLocationException[0]; } - return result; }); } else { @@ -639,7 +660,14 @@ public HdfsFileStatus getMountPointStatus( final int[] childrenNums = new int[]{childrenNum}; final EnumSet[] flags = new EnumSet[]{EnumSet.noneOf(HdfsFileStatus.Flags.class)}; - asyncComplete(null); + long inodeId = 0; + HdfsFileStatus.Builder builder = new HdfsFileStatus.Builder(); + if (setPath) { + Path path = new Path(name); + String nameStr = path.getName(); + builder.path(DFSUtil.string2Bytes(nameStr)); + } + if (getSubclusterResolver() instanceof MountTableResolver) { asyncTry(() -> { String mName = name.startsWith("/") ? name : "/" + name; @@ -664,13 +692,45 @@ public HdfsFileStatus getMountPointStatus( .getFlags(fInfo.isEncrypted(), fInfo.isErasureCoded(), fInfo.isSnapshotEnabled(), fInfo.hasAcl()); } - return fInfo; + return builder.isdir(true) + .mtime(modTime) + .atime(accessTime) + .perm(permission[0]) + .owner(owner[0]) + .group(group[0]) + .symlink(new byte[0]) + .fileId(inodeId) + .children(childrenNums[0]) + .flags(flags[0]) + .build(); }); + } else { + asyncComplete(builder.isdir(true) + .mtime(modTime) + .atime(accessTime) + .perm(permission[0]) + .owner(owner[0]) + .group(group[0]) + .symlink(new byte[0]) + .fileId(inodeId) + .children(childrenNums[0]) + .flags(flags[0]) + .build()); } }); asyncCatch((CatchFunction) (status, e) -> { LOG.error("Cannot get mount point: {}", e.getMessage()); - return status; + return builder.isdir(true) + .mtime(modTime) + .atime(accessTime) + .perm(permission[0]) + .owner(owner[0]) + .group(group[0]) + .symlink(new byte[0]) + .fileId(inodeId) + .children(childrenNums[0]) + .flags(flags[0]) + .build(); }, IOException.class); } else { try { @@ -684,37 +744,27 @@ public HdfsFileStatus getMountPointStatus( } else { LOG.debug(msg); } + } finally { + asyncComplete(builder.isdir(true) + .mtime(modTime) + .atime(accessTime) + .perm(permission[0]) + .owner(owner[0]) + .group(group[0]) + .symlink(new byte[0]) + .fileId(inodeId) + .children(childrenNums[0]) + .flags(flags[0]) + .build()); } } - long inodeId = 0; - HdfsFileStatus.Builder builder = new HdfsFileStatus.Builder(); - asyncApply((ApplyFunction) status -> { - if (setPath) { - Path path = new Path(name); - String nameStr = path.getName(); - builder.path(DFSUtil.string2Bytes(nameStr)); - } - return builder.isdir(true) - .mtime(modTime) - .atime(accessTime) - .perm(permission[0]) - .owner(owner[0]) - .group(group[0]) - .symlink(new byte[0]) - .fileId(inodeId) - .children(childrenNums[0]) - .flags(flags[0]) - .build(); - }); return asyncReturn(HdfsFileStatus.class); } @Override protected HdfsFileStatus getFileInfoAll(final List locations, final RemoteMethod method, long timeOutMs) throws IOException { - - asyncComplete(null); // Get the file info from everybody rpcClient.invokeConcurrent(locations, method, false, false, timeOutMs, HdfsFileStatus.class); @@ -1086,4 +1136,35 @@ public boolean isMultiDestDirectory(String src) { return asyncReturn(boolean.class); } + + @Override + public Path getEnclosingRoot(String src) throws IOException { + final Path[] mountPath = new Path[1]; + if (defaultNameServiceEnabled) { + mountPath[0] = new Path("/"); + } + + if (subclusterResolver instanceof MountTableResolver) { + MountTableResolver mountTable = (MountTableResolver) subclusterResolver; + if (mountTable.getMountPoint(src) != null) { + mountPath[0] = new Path(mountTable.getMountPoint(src).getSourcePath()); + } + } + + if (mountPath[0] == null) { + throw new IOException(String.format("No mount point for %s", src)); + } + + getEZForPath(src); + asyncApply((ApplyFunction)zone -> { + if (zone == null) { + return mountPath[0]; + } else { + Path zonePath = new Path(zone.getPath()); + return zonePath.depth() > mountPath[0].depth() ? zonePath : mountPath[0]; + } + }); + return asyncReturn(Path.class); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTable.java index fc957bab0f8c1..77cee101523c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTable.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTable.java @@ -70,16 +70,16 @@ */ public class TestRouterMountTable { - private static StateStoreDFSCluster cluster; - private static NamenodeContext nnContext0; - private static NamenodeContext nnContext1; - private static RouterContext routerContext; - private static MountTableResolver mountTable; - private static ClientProtocol routerProtocol; - private static long startTime; - private static FileSystem nnFs0; - private static FileSystem nnFs1; - private static FileSystem routerFs; + protected static StateStoreDFSCluster cluster; + protected static NamenodeContext nnContext0; + protected static NamenodeContext nnContext1; + protected static RouterContext routerContext; + protected static MountTableResolver mountTable; + protected static ClientProtocol routerProtocol; + protected static long startTime; + protected static FileSystem nnFs0; + protected static FileSystem nnFs1; + protected static FileSystem routerFs; @BeforeClass public static void globalSetUp() throws Exception { @@ -179,7 +179,7 @@ public void testReadOnly() throws Exception { * @return If it was succesfully added. * @throws IOException Problems adding entries. */ - private boolean addMountTable(final MountTable entry) throws IOException { + protected boolean addMountTable(final MountTable entry) throws IOException { RouterClient client = routerContext.getAdminClient(); MountTableManager mountTableManager = client.getMountTableManager(); AddMountTableEntryRequest addRequest = @@ -378,10 +378,11 @@ public void testGetMountPointStatus() throws IOException { clientProtocol.getMountPointStatus(childPath2.toString(), 0, 0, false); assertTrue(dirStatus3.isEmptyLocalName()); } + /** * GetListing of testPath through router. */ - private void getListing(String testPath) + protected void getListing(String testPath) throws IOException, URISyntaxException { ClientProtocol clientProtocol1 = routerContext.getClient().getNamenode(); @@ -789,7 +790,6 @@ public void testListStatusMountPoint() throws Exception { @Test public void testGetEnclosingRoot() throws Exception { - // Add a read only entry MountTable readOnlyEntry = MountTable.newInstance( "/readonly", Collections.singletonMap("ns0", "/testdir")); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableWithoutDefaultNS.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableWithoutDefaultNS.java index 57d4c69db698f..953b974c16aab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableWithoutDefaultNS.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableWithoutDefaultNS.java @@ -57,12 +57,13 @@ * Test a router end-to-end including the MountTable without default nameservice. */ public class TestRouterMountTableWithoutDefaultNS { - private static StateStoreDFSCluster cluster; - private static RouterContext routerContext; - private static MountTableResolver mountTable; - private static ClientProtocol routerProtocol; - private static FileSystem nnFs0; - private static FileSystem nnFs1; + protected static StateStoreDFSCluster cluster; + protected static RouterContext routerContext; + protected static MountTableResolver mountTable; + protected static ClientProtocol routerProtocol; + protected static FileSystem routerFs; + protected static FileSystem nnFs0; + protected static FileSystem nnFs1; @BeforeClass public static void globalSetUp() throws Exception { @@ -84,6 +85,7 @@ public static void globalSetUp() throws Exception { nnFs0 = cluster.getNamenode("ns0", null).getFileSystem(); nnFs1 = cluster.getNamenode("ns1", null).getFileSystem(); routerContext = cluster.getRandomRouter(); + routerFs = routerContext.getFileSystem(); Router router = routerContext.getRouter(); routerProtocol = routerContext.getClient().getNamenode(); mountTable = (MountTableResolver) router.getSubclusterResolver(); @@ -117,7 +119,7 @@ public void clearMountTable() throws IOException { * @return If it was succesfully added. * @throws IOException Problems adding entries. */ - private boolean addMountTable(final MountTable entry) throws IOException { + protected boolean addMountTable(final MountTable entry) throws IOException { RouterClient client = routerContext.getAdminClient(); MountTableManager mountTableManager = client.getMountTableManager(); AddMountTableEntryRequest addRequest = AddMountTableEntryRequest.newInstance(entry); @@ -258,7 +260,7 @@ public void testGetContentSummary() throws Exception { } } - void writeData(FileSystem fs, Path path, int fileLength) throws IOException { + protected void writeData(FileSystem fs, Path path, int fileLength) throws IOException { try (FSDataOutputStream outputStream = fs.create(path)) { for (int writeSize = 0; writeSize < fileLength; writeSize++) { outputStream.write(writeSize); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestObserverWithRouterAsync.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestObserverWithRouterAsync.java new file mode 100644 index 0000000000000..8d485a8e76923 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestObserverWithRouterAsync.java @@ -0,0 +1,1055 @@ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.ClientGSIContext; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.MockResolver; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.RouterStateIdContext; +import org.apache.hadoop.hdfs.server.federation.router.TestObserverWithRouter; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.ha.RouterObserverReadConfiguredFailoverProxyProvider; +import org.apache.hadoop.hdfs.server.namenode.ha.RouterObserverReadProxyProvider; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +public class TestObserverWithRouterAsync extends TestObserverWithRouter { + private static final int NUM_NAMESERVICES = 2; + private static final String SKIP_BEFORE_EACH_CLUSTER_STARTUP = "SkipBeforeEachClusterStartup"; + private MiniRouterDFSCluster cluster; + private MiniRouterDFSCluster.RouterContext routerContext; + private FileSystem fileSystem; + + private static final String ROUTER_NS_ID = "router-service"; + private static final String AUTO_MSYNC_PERIOD_KEY_PREFIX = + "dfs.client.failover.observer.auto-msync-period"; + + @BeforeEach + void init(TestInfo info) throws Exception { + if (info.getTags().contains(SKIP_BEFORE_EACH_CLUSTER_STARTUP)) { + return; + } + startUpCluster(2, null); + } + + @AfterEach + public void teardown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + + routerContext = null; + + if (fileSystem != null) { + fileSystem.close(); + fileSystem = null; + } + } + + public void startUpCluster(int numberOfObserver, Configuration confOverrides) throws Exception { + int numberOfNamenode = 2 + numberOfObserver; + Configuration conf = new Configuration(false); + setConfDefaults(conf); + if (confOverrides != null) { + confOverrides + .iterator() + .forEachRemaining(entry -> conf.set(entry.getKey(), entry.getValue())); + } + cluster = new MiniRouterDFSCluster(true, NUM_NAMESERVICES, numberOfNamenode); + cluster.addNamenodeOverrides(conf); + // Start NNs and DNs and wait until ready + cluster.startCluster(); + + // Making one Namenode active per nameservice + if (cluster.isHighAvailability()) { + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, NAMENODES[0]); + cluster.switchToStandby(ns, NAMENODES[1]); + for (int i = 2; i < numberOfNamenode; i++) { + cluster.switchToObserver(ns, NAMENODES[i]); + } + } + } + + Configuration routerConf = new RouterConfigBuilder() + .metrics() + .rpc() + .build(); + + routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true); + cluster.addRouterOverrides(conf); + cluster.addRouterOverrides(routerConf); + + // Start routers with only an RPC service + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + // Setup the mount table + cluster.installMockLocations(); + + cluster.waitActiveNamespaces(); + routerContext = cluster.getRandomRouter(); + } + + private void setConfDefaults(Configuration conf) { + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true); + conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); + conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms"); + conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true); + } + + public enum ConfigSetting { + USE_NAMENODE_PROXY_FLAG, + USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER, + USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER + } + + private Configuration getConfToEnableObserverReads(TestObserverWithRouter.ConfigSetting configSetting) { + Configuration conf = new Configuration(); + switch (configSetting) { + case USE_NAMENODE_PROXY_FLAG: + conf.setBoolean(HdfsClientConfigKeys.DFS_RBF_OBSERVER_READ_ENABLE, true); + break; + case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER: + conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + + "." + + routerContext.getRouter() + .getRpcServerAddress() + .getHostName(), RouterObserverReadProxyProvider.class.getName()); + break; + case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER: + // HA configs + conf.set(DFS_NAMESERVICES, ROUTER_NS_ID); + conf.set(DFS_HA_NAMENODES_KEY_PREFIX + "." + ROUTER_NS_ID, "router1"); + conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY+ "." + ROUTER_NS_ID + ".router1", + routerContext.getFileSystemURI().toString()); + DistributedFileSystem.setDefaultUri(conf, "hdfs://" + ROUTER_NS_ID); + conf.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + ROUTER_NS_ID, + RouterObserverReadConfiguredFailoverProxyProvider.class.getName()); + break; + default: + Assertions.fail("Unknown config setting: " + configSetting); + } + return conf; + } + + @EnumSource(TestObserverWithRouter.ConfigSetting.class) + @ParameterizedTest + public void testObserverRead(TestObserverWithRouter.ConfigSetting configSetting) throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); + internalTestObserverRead(); + } + + /** + * Tests that without adding config to use ObserverProxyProvider, the client shouldn't + * have reads served by Observers. + * Fixes regression in HDFS-13522. + */ + @Test + public void testReadWithoutObserverClientConfigurations() throws Exception { + fileSystem = routerContext.getFileSystem(); + assertThrows(AssertionError.class, this::internalTestObserverRead); + } + + public void internalTestObserverRead() + throws Exception { + List namenodes = routerContext + .getRouter().getNamenodeResolver() + .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); + assertEquals("First namenode should be observer", namenodes.get(0).getState(), + FederationNamenodeServiceState.OBSERVER); + Path path = new Path("/testFile"); + // Send create call + fileSystem.create(path).close(); + + // Send read request + fileSystem.open(path).close(); + + long rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + // Create and complete calls should be sent to active + assertEquals("Two calls should be sent to active", 2, rpcCountForActive); + + long rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + // getBlockLocations should be sent to observer + assertEquals("One call should be sent to observer", 1, rpcCountForObserver); + } + + @EnumSource(TestObserverWithRouter.ConfigSetting.class) + @ParameterizedTest + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) + public void testObserverReadWithoutFederatedStatePropagation(TestObserverWithRouter.ConfigSetting configSetting) + throws Exception { + Configuration confOverrides = new Configuration(false); + confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0); + startUpCluster(2, confOverrides); + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); + List namenodes = routerContext + .getRouter().getNamenodeResolver() + .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); + assertEquals("First namenode should be observer", namenodes.get(0).getState(), + FederationNamenodeServiceState.OBSERVER); + Path path = new Path("/testFile"); + // Send Create call to active + fileSystem.create(path).close(); + + // Send read request to observer. The router will msync to the active namenode. + fileSystem.open(path).close(); + + long rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + // Create, complete and getBlockLocations calls should be sent to active + assertEquals("Three calls should be sent to active", 3, rpcCountForActive); + + long rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + assertEquals("No call should be sent to observer", 0, rpcCountForObserver); + } + + @EnumSource(TestObserverWithRouter.ConfigSetting.class) + @ParameterizedTest + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) + public void testDisablingObserverReadUsingNameserviceOverride(TestObserverWithRouter.ConfigSetting configSetting) + throws Exception { + // Disable observer reads using per-nameservice override + Configuration confOverrides = new Configuration(false); + confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0"); + startUpCluster(2, confOverrides); + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); + + Path path = new Path("/testFile"); + fileSystem.create(path).close(); + fileSystem.open(path).close(); + + long rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + // Create, complete and read calls should be sent to active + assertEquals("Three calls should be sent to active", 3, rpcCountForActive); + + long rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + assertEquals("Zero calls should be sent to observer", 0, rpcCountForObserver); + } + + @EnumSource(TestObserverWithRouter.ConfigSetting.class) + @ParameterizedTest + public void testReadWhenObserverIsDown(TestObserverWithRouter.ConfigSetting configSetting) throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); + Path path = new Path("/testFile1"); + // Send Create call to active + fileSystem.create(path).close(); + + // Stop observer NN + int nnIndex = stopObserver(1); + assertNotEquals("No observer found", 3, nnIndex); + nnIndex = stopObserver(1); + assertNotEquals("No observer found", 4, nnIndex); + + // Send read request + fileSystem.open(path).close(); + + long rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + // Create, complete and getBlockLocation calls should be sent to active + assertEquals("Three calls should be sent to active", 3, + rpcCountForActive); + + long rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + assertEquals("No call should send to observer", 0, + rpcCountForObserver); + } + + @EnumSource(TestObserverWithRouter.ConfigSetting.class) + @ParameterizedTest + public void testMultipleObserver(TestObserverWithRouter.ConfigSetting configSetting) throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); + Path path = new Path("/testFile1"); + // Send Create call to active + fileSystem.create(path).close(); + + // Stop one observer NN + stopObserver(1); + + // Send read request + fileSystem.open(path).close(); + + long rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + + long expectedActiveRpc = 2; + long expectedObserverRpc = 1; + + // Create and complete calls should be sent to active + assertEquals("Two calls should be sent to active", + expectedActiveRpc, rpcCountForActive); + + long rpcCountForObserver = routerContext.getRouter() + .getRpcServer().getRPCMetrics().getObserverProxyOps(); + // getBlockLocation call should send to observer + assertEquals("Read should be success with another observer", + expectedObserverRpc, rpcCountForObserver); + + // Stop one observer NN + stopObserver(1); + + // Send read request + fileSystem.open(path).close(); + + rpcCountForActive = routerContext.getRouter() + .getRpcServer().getRPCMetrics().getActiveProxyOps(); + + // getBlockLocation call should be sent to active + expectedActiveRpc += 1; + assertEquals("One call should be sent to active", expectedActiveRpc, + rpcCountForActive); + expectedObserverRpc += 0; + rpcCountForObserver = routerContext.getRouter() + .getRpcServer().getRPCMetrics().getObserverProxyOps(); + assertEquals("No call should send to observer", + expectedObserverRpc, rpcCountForObserver); + } + + private int stopObserver(int num) { + int nnIndex; + for (nnIndex = 0; nnIndex < cluster.getNamenodes().size(); nnIndex++) { + NameNode nameNode = cluster.getCluster().getNameNode(nnIndex); + if (nameNode != null && nameNode.isObserverState()) { + cluster.getCluster().shutdownNameNode(nnIndex); + num--; + if (num == 0) { + break; + } + } + } + return nnIndex; + } + + // test router observer with multiple to know which observer NN received + // requests + @Test + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) + public void testMultipleObserverRouter() throws Exception { + StateStoreDFSCluster innerCluster; + MembershipNamenodeResolver resolver; + + String ns0; + String ns1; + //create 4NN, One Active One Standby and Two Observers + innerCluster = new StateStoreDFSCluster(true, 4, 4, TimeUnit.SECONDS.toMillis(5), + TimeUnit.SECONDS.toMillis(5)); + Configuration routerConf = + new RouterConfigBuilder().stateStore().admin().rpc() + .enableLocalHeartbeat(true).heartbeat().build(); + + StringBuilder sb = new StringBuilder(); + ns0 = innerCluster.getNameservices().get(0); + MiniRouterDFSCluster.NamenodeContext context = + innerCluster.getNamenodes(ns0).get(1); + routerConf.set(DFS_NAMESERVICE_ID, ns0); + routerConf.set(DFS_HA_NAMENODE_ID_KEY, context.getNamenodeId()); + + // Specify namenodes (ns1.nn0,ns1.nn1) to monitor + ns1 = innerCluster.getNameservices().get(1); + for (MiniRouterDFSCluster.NamenodeContext ctx : innerCluster.getNamenodes(ns1)) { + String suffix = ctx.getConfSuffix(); + if (sb.length() != 0) { + sb.append(","); + } + sb.append(suffix); + } + routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, sb.toString()); + routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true); + routerConf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); + routerConf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms"); + + innerCluster.addNamenodeOverrides(routerConf); + innerCluster.addRouterOverrides(routerConf); + innerCluster.startCluster(); + + if (innerCluster.isHighAvailability()) { + for (String ns : innerCluster.getNameservices()) { + innerCluster.switchToActive(ns, NAMENODES[0]); + innerCluster.switchToStandby(ns, NAMENODES[1]); + for (int i = 2; i < 4; i++) { + innerCluster.switchToObserver(ns, NAMENODES[i]); + } + } + } + innerCluster.startRouters(); + innerCluster.waitClusterUp(); + + routerContext = innerCluster.getRandomRouter(); + resolver = (MembershipNamenodeResolver) routerContext.getRouter() + .getNamenodeResolver(); + + resolver.loadCache(true); + List namespaceInfo0 = + resolver.getNamenodesForNameserviceId(ns0, true); + List namespaceInfo1 = + resolver.getNamenodesForNameserviceId(ns1, true); + assertEquals(namespaceInfo0.get(0).getState(), + FederationNamenodeServiceState.OBSERVER); + assertEquals(namespaceInfo0.get(1).getState(), + FederationNamenodeServiceState.OBSERVER); + assertNotEquals(namespaceInfo0.get(0).getNamenodeId(), + namespaceInfo0.get(1).getNamenodeId()); + assertEquals(namespaceInfo1.get(0).getState(), + FederationNamenodeServiceState.OBSERVER); + + innerCluster.shutdown(); + } + + @EnumSource(TestObserverWithRouter.ConfigSetting.class) + @ParameterizedTest + public void testUnavailableObserverNN(TestObserverWithRouter.ConfigSetting configSetting) throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); + stopObserver(2); + + Path path = new Path("/testFile"); + // Send Create call to active + fileSystem.create(path).close(); + + // Send read request. + fileSystem.open(path).close(); + + long rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + + // Create, complete and getBlockLocations + // calls should be sent to active. + assertEquals("Three calls should be send to active", + 3, rpcCountForActive); + + + boolean hasUnavailable = false; + for(String ns : cluster.getNameservices()) { + List nns = routerContext.getRouter() + .getNamenodeResolver().getNamenodesForNameserviceId(ns, false); + for(FederationNamenodeContext nn : nns) { + if(FederationNamenodeServiceState.UNAVAILABLE == nn.getState()) { + hasUnavailable = true; + } + } + } + // After attempting to communicate with unavailable observer namenode, + // its state is updated to unavailable. + assertTrue("There must be unavailable namenodes", hasUnavailable); + } + + @EnumSource(TestObserverWithRouter.ConfigSetting.class) + @ParameterizedTest + public void testRouterMsync(TestObserverWithRouter.ConfigSetting configSetting) throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); + Path path = new Path("/testFile"); + + // Send Create call to active + fileSystem.create(path).close(); + long rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + // Create and complete calls should be sent to active + assertEquals("Two calls should be sent to active", 2, + rpcCountForActive); + + // Send msync + fileSystem.msync(); + rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + // 2 msync calls should be sent. One to each active namenode in the two namespaces. + assertEquals("Four calls should be sent to active", 4, + rpcCountForActive); + } + + @EnumSource(TestObserverWithRouter.ConfigSetting.class) + @ParameterizedTest + public void testSingleRead(TestObserverWithRouter.ConfigSetting configSetting) throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); + List namenodes = routerContext + .getRouter().getNamenodeResolver() + .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); + assertEquals("First namenode should be observer", namenodes.get(0).getState(), + FederationNamenodeServiceState.OBSERVER); + Path path = new Path("/"); + + long rpcCountForActive; + long rpcCountForObserver; + + // Send read request + fileSystem.listFiles(path, false); + fileSystem.close(); + + rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + // getListingCall sent to active. + assertEquals("Only one call should be sent to active", 1, rpcCountForActive); + + rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + // getList call should be sent to observer + assertEquals("No calls should be sent to observer", 0, rpcCountForObserver); + } + + @Test + public void testSingleReadUsingObserverReadProxyProvider() throws Exception { + fileSystem = routerContext.getFileSystemWithObserverReadProxyProvider(); + List namenodes = routerContext + .getRouter().getNamenodeResolver() + .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); + assertEquals("First namenode should be observer", namenodes.get(0).getState(), + FederationNamenodeServiceState.OBSERVER); + Path path = new Path("/"); + + long rpcCountForActive; + long rpcCountForObserver; + + // Send read request + fileSystem.listFiles(path, false); + fileSystem.close(); + + rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + // Two msync calls to the active namenodes. + assertEquals("Two calls should be sent to active", 2, rpcCountForActive); + + rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + // getList call should be sent to observer + assertEquals("One call should be sent to observer", 1, rpcCountForObserver); + } + + @Test + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) + public void testClientReceiveResponseState() { + ClientGSIContext clientGSIContext = new ClientGSIContext(); + + Map mockMapping = new HashMap<>(); + mockMapping.put("ns0", 10L); + HdfsProtos.RouterFederatedStateProto.Builder builder = HdfsProtos.RouterFederatedStateProto.newBuilder(); + mockMapping.forEach(builder::putNamespaceStateIds); + RpcHeaderProtos.RpcResponseHeaderProto header = RpcHeaderProtos.RpcResponseHeaderProto + .newBuilder() + .setCallId(1) + .setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS) + .setRouterFederatedState(builder.build().toByteString()) + .build(); + clientGSIContext.receiveResponseState(header); + + Map mockLowerMapping = new HashMap<>(); + mockLowerMapping.put("ns0", 8L); + builder = HdfsProtos.RouterFederatedStateProto.newBuilder(); + mockLowerMapping.forEach(builder::putNamespaceStateIds); + header = RpcHeaderProtos.RpcResponseHeaderProto.newBuilder() + .setRouterFederatedState(builder.build().toByteString()) + .setCallId(2) + .setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS) + .build(); + clientGSIContext.receiveResponseState(header); + + Map latestFederateState = ClientGSIContext.getRouterFederatedStateMap( + clientGSIContext.getRouterFederatedState()); + Assertions.assertEquals(1, latestFederateState.size()); + Assertions.assertEquals(10L, latestFederateState.get("ns0")); + } + + @Test + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) + public void testRouterResponseHeaderState() { + // This conf makes ns1 that is not eligible for observer reads. + Configuration conf = new Configuration(); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true); + conf.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns1"); + + RouterStateIdContext routerStateIdContext = new RouterStateIdContext(conf); + + ConcurrentHashMap namespaceIdMap = + routerStateIdContext.getNamespaceIdMap(); + namespaceIdMap.put("ns0", new LongAccumulator(Math::max, 10)); + namespaceIdMap.put("ns1", new LongAccumulator(Math::max, 100)); + namespaceIdMap.put("ns2", new LongAccumulator(Math::max, Long.MIN_VALUE)); + + RpcHeaderProtos.RpcResponseHeaderProto.Builder responseHeaderBuilder = + RpcHeaderProtos.RpcResponseHeaderProto + .newBuilder() + .setCallId(1) + .setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS); + routerStateIdContext.updateResponseState(responseHeaderBuilder); + + Map latestFederateState = RouterStateIdContext.getRouterFederatedStateMap( + responseHeaderBuilder.build().getRouterFederatedState()); + // Only ns0 will be in latestFederateState + Assertions.assertEquals(1, latestFederateState.size()); + Assertions.assertEquals(10L, latestFederateState.get("ns0")); + } + + @Test + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) + public void testRouterResponseHeaderStateMaxSizeLimit() { + Configuration conf = new Configuration(); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true); + conf.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 1); + + RouterStateIdContext routerStateIdContext = new RouterStateIdContext(conf); + + ConcurrentHashMap namespaceIdMap = + routerStateIdContext.getNamespaceIdMap(); + namespaceIdMap.put("ns0", new LongAccumulator(Math::max, 10)); + namespaceIdMap.put("ns1", new LongAccumulator(Math::max, Long.MIN_VALUE)); + + RpcHeaderProtos.RpcResponseHeaderProto.Builder responseHeaderBuilder = + RpcHeaderProtos.RpcResponseHeaderProto + .newBuilder() + .setCallId(1) + .setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS); + routerStateIdContext.updateResponseState(responseHeaderBuilder); + + Map latestFederateState = RouterStateIdContext.getRouterFederatedStateMap( + responseHeaderBuilder.build().getRouterFederatedState()); + // Validate that ns0 is still part of the header + Assertions.assertEquals(1, latestFederateState.size()); + + namespaceIdMap.put("ns2", new LongAccumulator(Math::max, 20)); + // Rebuild header + responseHeaderBuilder = + RpcHeaderProtos.RpcResponseHeaderProto + .newBuilder() + .setCallId(1) + .setStatus(RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.SUCCESS); + routerStateIdContext.updateResponseState(responseHeaderBuilder); + latestFederateState = RouterStateIdContext.getRouterFederatedStateMap( + responseHeaderBuilder.build().getRouterFederatedState()); + // Validate that ns0 is still part of the header + Assertions.assertEquals(0, latestFederateState.size()); + } + + @EnumSource(TestObserverWithRouter.ConfigSetting.class) + @ParameterizedTest + public void testStateIdProgressionInRouter(TestObserverWithRouter.ConfigSetting configSetting) throws Exception { + Path rootPath = new Path("/"); + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); + RouterStateIdContext routerStateIdContext = routerContext + .getRouterRpcServer() + .getRouterStateIdContext(); + for (int i = 0; i < 10; i++) { + fileSystem.create(new Path(rootPath, "file" + i)).close(); + } + + // Get object storing state of the namespace in the shared RouterStateIdContext + LongAccumulator namespaceStateId = routerStateIdContext.getNamespaceStateId("ns0"); + assertEquals("Router's shared should have progressed.", 21, namespaceStateId.get()); + } + + @EnumSource(TestObserverWithRouter.ConfigSetting.class) + @ParameterizedTest + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) + public void testSharedStateInRouterStateIdContext(TestObserverWithRouter.ConfigSetting configSetting) throws Exception { + Path rootPath = new Path("/"); + long cleanupPeriodMs = 1000; + + Configuration conf = new Configuration(false); + conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_CLEAN, cleanupPeriodMs); + conf.setLong(RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS, cleanupPeriodMs / 10); + startUpCluster(1, conf); + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); + RouterStateIdContext routerStateIdContext = routerContext.getRouterRpcServer() + .getRouterStateIdContext(); + + // First read goes to active and creates connection pool for this user to active + fileSystem.listStatus(rootPath); + // Second read goes to observer and creates connection pool for this user to observer + fileSystem.listStatus(rootPath); + // Get object storing state of the namespace in the shared RouterStateIdContext + LongAccumulator namespaceStateId1 = routerStateIdContext.getNamespaceStateId("ns0"); + + // Wait for connection pools to expire and be cleaned up. + Thread.sleep(cleanupPeriodMs * 2); + + // Third read goes to observer. + // New connection pool to observer is created since existing one expired. + fileSystem.listStatus(rootPath); + fileSystem.close(); + // Get object storing state of the namespace in the shared RouterStateIdContext + LongAccumulator namespaceStateId2 = routerStateIdContext.getNamespaceStateId("ns0"); + + long rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + long rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + + // First list status goes to active + assertEquals("One call should be sent to active", 1, rpcCountForActive); + // Last two listStatuses go to observer. + assertEquals("Two calls should be sent to observer", 2, rpcCountForObserver); + + Assertions.assertSame(namespaceStateId1, namespaceStateId2, + "The same object should be used in the shared RouterStateIdContext"); + } + + + @EnumSource(TestObserverWithRouter.ConfigSetting.class) + @ParameterizedTest + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) + public void testRouterStateIdContextCleanup(TestObserverWithRouter.ConfigSetting configSetting) throws Exception { + Path rootPath = new Path("/"); + long recordExpiry = TimeUnit.SECONDS.toMillis(1); + + Configuration confOverride = new Configuration(false); + confOverride.setLong(RBFConfigKeys.FEDERATION_STORE_MEMBERSHIP_EXPIRATION_MS, recordExpiry); + + startUpCluster(1, confOverride); + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); + RouterStateIdContext routerStateIdContext = routerContext.getRouterRpcServer() + .getRouterStateIdContext(); + + fileSystem.listStatus(rootPath); + List namespace1 = routerStateIdContext.getNamespaces(); + fileSystem.close(); + + MockResolver mockResolver = (MockResolver) routerContext.getRouter().getNamenodeResolver(); + mockResolver.cleanRegistrations(); + mockResolver.setDisableRegistration(true); + Thread.sleep(recordExpiry * 2); + + List namespace2 = routerStateIdContext.getNamespaces(); + assertEquals(1, namespace1.size()); + assertEquals("ns0", namespace1.get(0)); + assertTrue(namespace2.isEmpty()); + } + + @EnumSource(TestObserverWithRouter.ConfigSetting.class) + @ParameterizedTest + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) + public void testPeriodicStateRefreshUsingActiveNamenode(TestObserverWithRouter.ConfigSetting configSetting) + throws Exception { + Path rootPath = new Path("/"); + + Configuration confOverride = new Configuration(false); + confOverride.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY, "500ms"); + confOverride.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "3s"); + startUpCluster(1, confOverride); + + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); + fileSystem.listStatus(rootPath); + int initialLengthOfRootListing = fileSystem.listStatus(rootPath).length; + + DFSClient activeClient = cluster.getNamenodes("ns0") + .stream() + .filter(nnContext -> nnContext.getNamenode().isActiveState()) + .findFirst().orElseThrow(() -> new IllegalStateException("No active namenode.")) + .getClient(); + + for (int i = 0; i < 10; i++) { + activeClient.mkdirs("/dir" + i, null, false); + } + activeClient.close(); + + // Wait long enough for state in router to be considered stale. + GenericTestUtils.waitFor( + () -> !routerContext + .getRouterRpcClient() + .isNamespaceStateIdFresh("ns0"), + 100, + 10000, + "Timeout: Namespace state was never considered stale."); + FileStatus[] rootFolderAfterMkdir = fileSystem.listStatus(rootPath); + assertEquals("List-status should show newly created directories.", + initialLengthOfRootListing + 10, rootFolderAfterMkdir.length); + } + + @EnumSource(TestObserverWithRouter.ConfigSetting.class) + @ParameterizedTest + public void testAutoMsyncEqualsZero(TestObserverWithRouter.ConfigSetting configSetting) throws Exception { + Configuration clientConfiguration = getConfToEnableObserverReads(configSetting); + String configKeySuffix = + configSetting == TestObserverWithRouter.ConfigSetting.USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER ? + ROUTER_NS_ID : routerContext.getRouter().getRpcServerAddress().getHostName(); + clientConfiguration.setLong(AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + configKeySuffix, 0); + fileSystem = routerContext.getFileSystem(clientConfiguration); + + List namenodes = routerContext + .getRouter().getNamenodeResolver() + .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); + assertEquals("First namenode should be observer", namenodes.get(0).getState(), + FederationNamenodeServiceState.OBSERVER); + Path path = new Path("/"); + + long rpcCountForActive; + long rpcCountForObserver; + + // Send read requests + int numListings = 15; + for (int i = 0; i < numListings; i++) { + fileSystem.listFiles(path, false); + } + fileSystem.close(); + + rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + + rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + + switch (configSetting) { + case USE_NAMENODE_PROXY_FLAG: + // First read goes to active. + assertEquals("Calls sent to the active", 1, rpcCountForActive); + // The rest of the reads are sent to the observer. + assertEquals("Reads sent to observer", numListings - 1, rpcCountForObserver); + break; + case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER: + case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER: + // An msync is sent to each active namenode for each read. + // Total msyncs will be (numListings * num_of_nameservices). + assertEquals("Msyncs sent to the active namenodes", + NUM_NAMESERVICES * numListings, rpcCountForActive); + // All reads should be sent of the observer. + assertEquals("Reads sent to observer", numListings, rpcCountForObserver); + break; + default: + Assertions.fail("Unknown config setting: " + configSetting); + } + } + + @EnumSource(TestObserverWithRouter.ConfigSetting.class) + @ParameterizedTest + public void testAutoMsyncNonZero(TestObserverWithRouter.ConfigSetting configSetting) throws Exception { + Configuration clientConfiguration = getConfToEnableObserverReads(configSetting); + String configKeySuffix = + configSetting == TestObserverWithRouter.ConfigSetting.USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER ? + ROUTER_NS_ID : routerContext.getRouter().getRpcServerAddress().getHostName(); + clientConfiguration.setLong(AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + configKeySuffix, 3000); + fileSystem = routerContext.getFileSystem(clientConfiguration); + + List namenodes = routerContext + .getRouter().getNamenodeResolver() + .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); + assertEquals("First namenode should be observer", namenodes.get(0).getState(), + FederationNamenodeServiceState.OBSERVER); + Path path = new Path("/"); + + long rpcCountForActive; + long rpcCountForObserver; + + fileSystem.listFiles(path, false); + fileSystem.listFiles(path, false); + Thread.sleep(5000); + fileSystem.listFiles(path, false); + fileSystem.close(); + + rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + + rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + + switch (configSetting) { + case USE_NAMENODE_PROXY_FLAG: + // First read goes to active. + assertEquals("Calls sent to the active", 1, rpcCountForActive); + // The rest of the reads are sent to the observer. + assertEquals("Reads sent to observer", 2, rpcCountForObserver); + break; + case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER: + case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER: + // 4 msyncs expected. 2 for the first read, and 2 for the third read + // after the auto-msync period has elapsed during the sleep. + assertEquals("Msyncs sent to the active namenodes", + 4, rpcCountForActive); + // All three reads should be sent of the observer. + assertEquals("Reads sent to observer", 3, rpcCountForObserver); + break; + default: + Assertions.fail("Unknown config setting: " + configSetting); + } + } + + @EnumSource(TestObserverWithRouter.ConfigSetting.class) + @ParameterizedTest + public void testThatWriteDoesntBypassNeedForMsync(TestObserverWithRouter.ConfigSetting configSetting) throws Exception { + Configuration clientConfiguration = getConfToEnableObserverReads(configSetting); + String configKeySuffix = + configSetting == TestObserverWithRouter.ConfigSetting.USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER ? + ROUTER_NS_ID : routerContext.getRouter().getRpcServerAddress().getHostName(); + clientConfiguration.setLong(AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + configKeySuffix, 3000); + fileSystem = routerContext.getFileSystem(clientConfiguration); + + List namenodes = routerContext + .getRouter().getNamenodeResolver() + .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); + assertEquals("First namenode should be observer", namenodes.get(0).getState(), + FederationNamenodeServiceState.OBSERVER); + Path path = new Path("/"); + + long rpcCountForActive; + long rpcCountForObserver; + + fileSystem.listFiles(path, false); + Thread.sleep(5000); + fileSystem.mkdirs(new Path(path, "mkdirLocation")); + fileSystem.listFiles(path, false); + fileSystem.close(); + + rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + + rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + + switch (configSetting) { + case USE_NAMENODE_PROXY_FLAG: + // First listing and mkdir go to the active. + assertEquals("Calls sent to the active namenodes", 2, rpcCountForActive); + // Second listing goes to the observer. + assertEquals("Read sent to observer", 1, rpcCountForObserver); + break; + case USE_ROUTER_OBSERVER_READ_PROXY_PROVIDER: + case USE_ROUTER_OBSERVER_READ_CONFIGURED_FAILOVER_PROXY_PROVIDER: + // 5 calls to the active namenodes expected. 4 msync and a mkdir. + // Each of the 2 reads results in an msync to 2 nameservices. + // The mkdir also goes to the active. + assertEquals("Calls sent to the active namenodes", + 5, rpcCountForActive); + // Both reads should be sent of the observer. + assertEquals("Reads sent to observer", 2, rpcCountForObserver); + break; + default: + Assertions.fail("Unknown config setting: " + configSetting); + } + } + + @EnumSource(TestObserverWithRouter.ConfigSetting.class) + @ParameterizedTest + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) + public void testMsyncOnlyToNamespaceWithObserver(TestObserverWithRouter.ConfigSetting configSetting) throws Exception { + Configuration confOverride = new Configuration(false); + String namespaceWithObserverReadsDisabled = "ns0"; + // Disable observer reads for ns0 + confOverride.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, + namespaceWithObserverReadsDisabled); + startUpCluster(1, confOverride); + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); + + // Send msync request + fileSystem.msync(); + + long rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + // There should only be one call to the namespace that has an observer. + assertEquals("Only one call to the namespace with an observer", 1, rpcCountForActive); + } + + @EnumSource(TestObserverWithRouter.ConfigSetting.class) + @ParameterizedTest + @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP) + public void testMsyncWithNoNamespacesEligibleForCRS(TestObserverWithRouter.ConfigSetting configSetting) + throws Exception { + Configuration confOverride = new Configuration(false); + // Disable observer reads for all namespaces. + confOverride.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, false); + startUpCluster(1, confOverride); + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); + + // Send msync request. + fileSystem.msync(); + + long rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + // There should no calls to any namespace. + assertEquals("No calls to any namespace", 0, rpcCountForActive); + } + + @EnumSource(TestObserverWithRouter.ConfigSetting.class) + @ParameterizedTest + public void testRestartingNamenodeWithStateIDContextDisabled(TestObserverWithRouter.ConfigSetting configSetting) + throws Exception { + fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads(configSetting)); + Path path = new Path("/testFile1"); + // Send Create call to active + fileSystem.create(path).close(); + + // Send read request + fileSystem.open(path).close(); + + long observerCount1 = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + + // Restart active namenodes and disable sending state id. + restartActiveWithStateIDContextDisabled(); + + Configuration conf = getConfToEnableObserverReads(configSetting); + conf.setBoolean("fs.hdfs.impl.disable.cache", true); + FileSystem fileSystem2 = routerContext.getFileSystem(conf); + fileSystem2.msync(); + fileSystem2.open(path).close(); + + long observerCount2 = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + assertEquals("There should no extra calls to the observer", observerCount1, observerCount2); + + fileSystem.open(path).close(); + long observerCount3 = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + assertTrue("Old filesystem will send calls to observer", observerCount3 > observerCount2); + } + + void restartActiveWithStateIDContextDisabled() throws Exception { + for (int nnIndex = 0; nnIndex < cluster.getNamenodes().size(); nnIndex++) { + NameNode nameNode = cluster.getCluster().getNameNode(nnIndex); + if (nameNode != null && nameNode.isActiveState()) { + Configuration conf = new Configuration(); + setConfDefaults(conf); + cluster.getCluster().getConfiguration(nnIndex) + .setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, false); + cluster.getCluster().restartNameNode(nnIndex, true); + cluster.getCluster().getNameNode(nnIndex).isActiveState(); + } + } + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, NAMENODES[0]); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncMountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncMountTable.java new file mode 100644 index 0000000000000..3e3aba7d3821e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncMountTable.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterClientProtocol; +import org.apache.hadoop.hdfs.server.federation.router.TestRouterMountTable; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.LambdaTestUtils; +import org.apache.hadoop.util.Time; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Collections; + +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test a router end-to-end including the MountTable using async rpc. + */ +public class TestRouterAsyncMountTable extends TestRouterMountTable { + private static final Logger LOG = + LoggerFactory.getLogger(TestRouterAsyncMountTable.class.getName()); + + @BeforeClass + public static void globalSetUp() throws Exception { + startTime = Time.now(); + + // Build and start a federated cluster + cluster = new StateStoreDFSCluster(false, 2); + Configuration conf = new RouterConfigBuilder() + .stateStore() + .admin() + .rpc() + .build(); + conf.setInt(RBFConfigKeys.DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_KEY, 20); + conf.setBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true); + cluster.addRouterOverrides(conf); + cluster.startCluster(); + cluster.startRouters(); + cluster.waitClusterUp(); + + // Get the end points + nnContext0 = cluster.getNamenode("ns0", null); + nnContext1 = cluster.getNamenode("ns1", null); + nnFs0 = nnContext0.getFileSystem(); + nnFs1 = nnContext1.getFileSystem(); + routerContext = cluster.getRandomRouter(); + routerFs = routerContext.getFileSystem(); + Router router = routerContext.getRouter(); + routerProtocol = routerContext.getClient().getNamenode(); + mountTable = (MountTableResolver) router.getSubclusterResolver(); + } + + /** + * Verify the getMountPointStatus result of passing in different parameters. + */ + @Override + @Test + public void testGetMountPointStatus() throws IOException { + MountTable addEntry = MountTable.newInstance("/testA/testB/testC/testD", + Collections.singletonMap("ns0", "/testA/testB/testC/testD")); + assertTrue(addMountTable(addEntry)); + RouterClientProtocol clientProtocol = new RouterAsyncClientProtocol( + nnFs0.getConf(), routerContext.getRouter().getRpcServer()); + String src = "/"; + String child = "testA"; + Path childPath = new Path(src, child); + HdfsFileStatus dirStatus; + clientProtocol.getMountPointStatus(childPath.toString(), 0, 0); + try { + dirStatus = syncReturn(HdfsFileStatus.class); + } catch (Exception e) { + throw new RuntimeException(e); + } + assertEquals(child, dirStatus.getLocalName()); + String src1 = "/testA"; + String child1 = "testB"; + Path childPath1 = new Path(src1, child1); + HdfsFileStatus dirStatus1; + clientProtocol.getMountPointStatus(childPath1.toString(), 0, 0); + try { + dirStatus1 = syncReturn(HdfsFileStatus.class); + } catch (Exception e) { + throw new RuntimeException(e); + } + assertEquals(child1, dirStatus1.getLocalName()); + + String src2 = "/testA/testB"; + String child2 = "testC"; + Path childPath2 = new Path(src2, child2); + HdfsFileStatus dirStatus2; + clientProtocol.getMountPointStatus(childPath2.toString(), 0, 0); + try { + dirStatus2 = syncReturn(HdfsFileStatus.class); + } catch (Exception e) { + throw new RuntimeException(e); + } + assertEquals(child2, dirStatus2.getLocalName()); + + HdfsFileStatus dirStatus3; + clientProtocol.getMountPointStatus(childPath2.toString(), 0, 0, false); + try { + dirStatus3 = syncReturn(HdfsFileStatus.class); + } catch (Exception e) { + throw new RuntimeException(e); + } + assertTrue(dirStatus3.isEmptyLocalName()); + } + + /** + * Validate whether mount point name gets resolved or not. On successful + * resolution the details returned would be the ones actually set on the mount + * point. + */ + @Test + public void testMountPointResolved() throws IOException { + MountTable addEntry = MountTable.newInstance("/testdir", + Collections.singletonMap("ns0", "/tmp/testdir")); + addEntry.setGroupName("group1"); + addEntry.setOwnerName("owner1"); + assertTrue(addMountTable(addEntry)); + HdfsFileStatus finfo = routerProtocol.getFileInfo("/testdir"); + FileStatus[] finfo1 = routerFs.listStatus(new Path("/")); + assertEquals("owner1", finfo.getOwner()); + assertEquals("owner1", finfo1[0].getOwner()); + assertEquals("group1", finfo.getGroup()); + assertEquals("group1", finfo1[0].getGroup()); + } + + @Override + @Test + public void testListNonExistPath() throws Exception { + mountTable.setDefaultNSEnable(false); + LambdaTestUtils.intercept(FileNotFoundException.class, + "File /base does not exist.", + "Expect FileNotFoundException.", + () -> routerFs.listStatus(new Path("/base"))); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncMountTableWithoutDefaultNS.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncMountTableWithoutDefaultNS.java new file mode 100644 index 0000000000000..8185f093bb871 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncMountTableWithoutDefaultNS.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException; +import org.apache.hadoop.hdfs.server.federation.router.NoLocationException; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterClient; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.federation.router.TestRouterMountTableWithoutDefaultNS; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestRouterAsyncMountTableWithoutDefaultNS extends TestRouterMountTableWithoutDefaultNS { + @BeforeClass + public static void globalSetUp() throws Exception { + // Build and start a federated cluster + cluster = new StateStoreDFSCluster(false, 2); + Configuration conf = new RouterConfigBuilder() + .stateStore() + .admin() + .rpc() + .build(); + conf.setInt(RBFConfigKeys.DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_KEY, 20); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE_ENABLE, false); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true); + cluster.addRouterOverrides(conf); + cluster.startCluster(); + cluster.startRouters(); + cluster.waitClusterUp(); + + // Get the end points + nnFs0 = cluster.getNamenode("ns0", null).getFileSystem(); + nnFs1 = cluster.getNamenode("ns1", null).getFileSystem(); + routerContext = cluster.getRandomRouter(); + routerFs = routerContext.getFileSystem(); + Router router = routerContext.getRouter(); + routerProtocol = routerContext.getClient().getNamenode(); + mountTable = (MountTableResolver) router.getSubclusterResolver(); + } + + @AfterClass + public static void tearDown() { + if (cluster != null) { + cluster.stopRouter(routerContext); + cluster.shutdown(); + cluster = null; + } + } + + @After + public void clearMountTable() throws IOException { + RouterClient client = routerContext.getAdminClient(); + MountTableManager mountTableManager = client.getMountTableManager(); + GetMountTableEntriesRequest req1 = GetMountTableEntriesRequest.newInstance("/"); + GetMountTableEntriesResponse response = mountTableManager.getMountTableEntries(req1); + for (MountTable entry : response.getEntries()) { + RemoveMountTableEntryRequest req2 = + RemoveMountTableEntryRequest.newInstance(entry.getSourcePath()); + mountTableManager.removeMountTableEntry(req2); + } + } + + /** + * Verify that RBF doesn't support get the file information + * with no location and sub mount points. + */ + @Test + public void testGetFileInfoWithoutSubMountPoint() throws Exception { + MountTable addEntry = MountTable.newInstance("/testdir/1", + Collections.singletonMap("ns0", "/testdir/1")); + assertTrue(addMountTable(addEntry)); + LambdaTestUtils.intercept(RemoteException.class, + "org.apache.hadoop.hdfs.server.federation.resolver.RouterResolveException", + () -> routerFs.getFileStatus(new Path("/testdir2"))); + } + + @Test + public void testGetContentSummary() throws Exception { + try { + // Add mount table entry. + MountTable addEntry = MountTable.newInstance("/testA", + Collections.singletonMap("ns0", "/testA")); + assertTrue(addMountTable(addEntry)); + addEntry = MountTable.newInstance("/testA/testB", + Collections.singletonMap("ns0", "/testA/testB")); + assertTrue(addMountTable(addEntry)); + addEntry = MountTable.newInstance("/testA/testB/testC", + Collections.singletonMap("ns1", "/testA/testB/testC")); + assertTrue(addMountTable(addEntry)); + + writeData(nnFs0, new Path("/testA/testB/file1"), 1024 * 1024); + writeData(nnFs1, new Path("/testA/testB/testC/file2"), 1024 * 1024); + writeData(nnFs1, new Path("/testA/testB/testC/file3"), 1024 * 1024); + + ContentSummary summary = routerFs.getContentSummary(new Path("/testA")); + assertEquals(3, summary.getFileCount()); + assertEquals(1024 * 1024 * 3, summary.getLength()); + + LambdaTestUtils.intercept(RemoteException.class, + "org.apache.hadoop.hdfs.server.federation.router.NoLocationException", + () -> routerFs.getContentSummary(new Path("/testB"))); + } finally { + nnFs0.delete(new Path("/testA"), true); + nnFs1.delete(new Path("/testA"), true); + } + } + + /** + * Verify that RBF that disable default nameservice should support + * get information about ancestor mount points. + */ + @Test + public void testGetContentSummaryWithSubMountPoint() throws IOException { + MountTable addEntry = MountTable.newInstance("/testdir/1/2", + Collections.singletonMap("ns0", "/testdir/1/2")); + assertTrue(addMountTable(addEntry)); + + try { + writeData(nnFs0, new Path("/testdir/1/2/3"), 10 * 1024 * 1024); + ContentSummary summaryFromRBF = routerFs.getContentSummary(new Path("/testdir")); + assertNotNull(summaryFromRBF); + assertEquals(1, summaryFromRBF.getFileCount()); + assertEquals(10 * 1024 * 1024, summaryFromRBF.getLength()); + } finally { + nnFs0.delete(new Path("/testdir"), true); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcSingleNS.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcSingleNS.java new file mode 100644 index 0000000000000..c17bcd602aee1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncRpcSingleNS.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.SafeModeAction; +import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.TestRouterRpcSingleNS; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestRouterAsyncRpcSingleNS extends TestRouterRpcSingleNS { + /** + * Federated HDFS cluster. + */ + private static MiniRouterDFSCluster cluster; + + /** + * Random Router for this federated cluster. + */ + private MiniRouterDFSCluster.RouterContext router; + + /** + * Random nameservice in the federated cluster. + */ + private String ns; + /** + * First namenode in the nameservice. + */ + private MiniRouterDFSCluster.NamenodeContext namenode; + + /** + * Client interface to the Router. + */ + private ClientProtocol routerProtocol; + /** + * Client interface to the Namenode. + */ + private ClientProtocol nnProtocol; + + /** + * NameNodeProtocol interface to the Router. + */ + private NamenodeProtocol routerNamenodeProtocol; + /** + * NameNodeProtocol interface to the Namenode. + */ + private NamenodeProtocol nnNamenodeProtocol; + + /** + * Filesystem interface to the Router. + */ + private FileSystem routerFS; + /** + * Filesystem interface to the Namenode. + */ + private FileSystem nnFS; + + /** + * File in the Router. + */ + private String routerFile; + /** + * File in the Namenode. + */ + private String nnFile; + + @BeforeClass + public static void globalSetUp() throws Exception { + cluster = new MiniRouterDFSCluster(false, 1); + cluster.setNumDatanodesPerNameservice(2); + + // Start NNs and DNs and wait until ready + cluster.startCluster(); + + // Start routers with only an RPC service + Configuration routerConf = new RouterConfigBuilder().metrics().rpc() + .build(); + // We decrease the DN cache times to make the test faster + routerConf.setTimeDuration(RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, + TimeUnit.SECONDS); + routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true); + cluster.addRouterOverrides(routerConf); + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + } + + @AfterClass + public static void tearDown() { + cluster.shutdown(); + } + + @Before + public void testSetup() throws Exception { + + // Create mock locations + cluster.installMockLocations(); + + // Delete all files via the NNs and verify + cluster.deleteAllFiles(); + + // Create test fixtures on NN + cluster.createTestDirectoriesNamenode(); + + // Wait to ensure NN has fully created its test directories + Thread.sleep(100); + + // Random router for this test + MiniRouterDFSCluster.RouterContext rndRouter = cluster.getRandomRouter(); + this.setRouter(rndRouter); + + // Pick a namenode for this test + String ns0 = cluster.getNameservices().get(0); + this.setNs(ns0); + this.setNamenode(cluster.getNamenode(ns0, null)); + + // Create a test file on the NN + Random rnd = new Random(); + String randomFile = "testfile-" + rnd.nextInt(); + this.nnFile = cluster.getNamenodeTestDirectoryForNS(ns) + "/" + randomFile; + this.routerFile = cluster.getFederatedTestDirectoryForNS(ns) + "/" + + randomFile; + + createFile(nnFS, nnFile, 32); + verifyFileExists(nnFS, nnFile); + } + + protected void setRouter(MiniRouterDFSCluster.RouterContext r) + throws IOException, URISyntaxException { + this.router = r; + this.routerProtocol = r.getClient().getNamenode(); + this.routerFS = r.getFileSystem(); + this.routerNamenodeProtocol = NameNodeProxies.createProxy(router.getConf(), + router.getFileSystem().getUri(), NamenodeProtocol.class).getProxy(); + } + + protected void setNs(String nameservice) { + this.ns = nameservice; + } + + protected void setNamenode(MiniRouterDFSCluster.NamenodeContext nn) + throws IOException, URISyntaxException { + this.namenode = nn; + this.nnProtocol = nn.getClient().getNamenode(); + this.nnFS = nn.getFileSystem(); + + // Namenode from the default namespace + String ns0 = cluster.getNameservices().get(0); + MiniRouterDFSCluster.NamenodeContext nn0 = cluster.getNamenode(ns0, null); + this.nnNamenodeProtocol = NameNodeProxies.createProxy(nn0.getConf(), + nn0.getFileSystem().getUri(), NamenodeProtocol.class).getProxy(); + } + + @Test + public void testGetCurrentTXIDandRollEdits() throws IOException { + Long rollEdits = routerProtocol.rollEdits(); + Long currentTXID = routerProtocol.getCurrentEditLogTxid(); + + assertEquals(rollEdits, currentTXID); + } + + @Test + public void testSaveNamespace() throws IOException { + cluster.getCluster().getFileSystem() + .setSafeMode(SafeModeAction.ENTER); + Boolean saveNamespace = routerProtocol.saveNamespace(0, 0); + + assertTrue(saveNamespace); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncWebHdfsMethods.java new file mode 100644 index 0000000000000..b5469ffd3029c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncWebHdfsMethods.java @@ -0,0 +1,155 @@ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.RouterWebHdfsMethods; +import org.apache.hadoop.hdfs.server.federation.router.TestRouterWebHdfsMethods; +import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createMountTableEntry; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class TestRouterAsyncWebHdfsMethods extends TestRouterWebHdfsMethods { + static final Logger LOG = + LoggerFactory.getLogger(TestRouterWebHdfsMethods.class); + + private static StateStoreDFSCluster cluster; + private static MiniRouterDFSCluster.RouterContext router; + private static String httpUri; + + @BeforeClass + public static void globalSetUp() throws Exception { + cluster = new StateStoreDFSCluster(false, 2); + Configuration conf = new RouterConfigBuilder() + .stateStore() + .rpc() + .http() + .admin() + .build(); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true); + cluster.addRouterOverrides(conf); + cluster.setIndependentDNs(); + cluster.startCluster(); + cluster.startRouters(); + cluster.waitClusterUp(); + router = cluster.getRandomRouter(); + httpUri = "http://"+router.getHttpAddress(); + } + + @AfterClass + public static void tearDown() { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + @Test + public void testWebHdfsCreate() throws Exception { + // the file is created at default ns (ns0) + String path = "/tmp/file"; + URL url = new URL(getUri(path)); + LOG.info("URL: {}", url); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("PUT"); + assertEquals(HttpURLConnection.HTTP_CREATED, conn.getResponseCode()); + verifyFile("ns0", path, true); + verifyFile("ns1", path, false); + conn.disconnect(); + } + + @Test + public void testWebHdfsCreateWithMounts() throws Exception { + // the file is created at mounted ns (ns1) + String mountPoint = "/tmp-ns1"; + String path = "/tmp-ns1/file"; + createMountTableEntry( + router.getRouter(), mountPoint, + DestinationOrder.RANDOM, Collections.singletonList("ns1")); + URL url = new URL(getUri(path)); + LOG.info("URL: {}", url); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("PUT"); + assertEquals(HttpURLConnection.HTTP_CREATED, conn.getResponseCode()); + verifyFile("ns1", path, true); + verifyFile("ns0", path, false); + conn.disconnect(); + } + + private String getUri(String path) { + final String user = System.getProperty("user.name"); + final StringBuilder uri = new StringBuilder(httpUri); + uri.append("/webhdfs/v1"). + append(path). + append("?op=CREATE"). + append("&user.name=" + user); + return uri.toString(); + } + + private void verifyFile(String ns, String path, boolean shouldExist) + throws Exception { + FileSystem fs = cluster.getNamenode(ns, null).getFileSystem(); + try { + fs.getFileStatus(new Path(path)); + if (!shouldExist) { + fail(path + " should not exist in ns " + ns); + } + } catch (FileNotFoundException e) { + if (shouldExist) { + fail(path + " should exist in ns " + ns); + } + } + } + + @Test + public void testGetNsFromDataNodeNetworkLocation() { + assertEquals("ns0", RouterWebHdfsMethods + .getNsFromDataNodeNetworkLocation("/ns0/rack-info1")); + assertEquals("ns0", RouterWebHdfsMethods + .getNsFromDataNodeNetworkLocation("/ns0/row1/rack-info1")); + assertEquals("", RouterWebHdfsMethods + .getNsFromDataNodeNetworkLocation("/row0")); + assertEquals("", RouterWebHdfsMethods + .getNsFromDataNodeNetworkLocation("whatever-rack-info1")); + } + + @Test + public void testWebHdfsCreateWithInvalidPath() throws Exception { + // A path name include duplicated slashes. + String path = "//tmp//file"; + assertResponse(path); + } + + private void assertResponse(String path) throws IOException { + URL url = new URL(getUri(path)); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("PUT"); + // Assert response code. + assertEquals(HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode()); + // Assert exception. + Map response = WebHdfsFileSystem.jsonParse(conn, true); + assertEquals("InvalidPathException", + ((LinkedHashMap) response.get("RemoteException")).get("exception")); + conn.disconnect(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 934c5faee2f92..1ff0b9c920875 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -439,7 +439,7 @@ private synchronized void activateVolume( List allSubDirNameForDataSetLock = datasetSubLockStrategy.getAllSubLockNames(); for (String dir : allSubDirNameForDataSetLock) { lockManager.addLock(LockLevel.DIR, bp, ref.getVolume().getStorageID(), dir); - LOG.info("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}", + LOG.debug("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}", bp, ref.getVolume().getStorageID(), dir); } } @@ -3297,7 +3297,7 @@ public void addBlockPool(String bpid, Configuration conf) List allSubDirNameForDataSetLock = datasetSubLockStrategy.getAllSubLockNames(); for (String dir : allSubDirNameForDataSetLock) { lockManager.addLock(LockLevel.DIR, bpid, v, dir); - LOG.info("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}", + LOG.debug("Added DIR lock for bpid:{}, volume storageid:{}, dir:{}", bpid, v, dir); } }