From 0a1c5672e29b67f303388965c1115c5a44ee5ea9 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Thu, 6 Mar 2025 16:18:10 +0800 Subject: [PATCH] HDFS-17751. [ARR] Add unit tests using asynchronous router rpc for all in org.apache.hadoop.hdfs.server.federation.router. --- .../federation/router/RouterRpcClient.java | 5 +- .../router/RouterStateIdContext.java | 2 +- .../async/TestDisableNameservicesExample.java | 349 ++++++++++++++++++ 3 files changed, 353 insertions(+), 3 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestDisableNameservicesExample.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index c7c3699f33ec7..6530a4784b5d0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1991,10 +1991,11 @@ private static boolean isReadCall(Method method) { * Checks and sets last refresh time for a namespace's stateId. * Returns true if refresh time is newer than threshold. * Otherwise, return false and call should be handled by active namenode. - * @param nsId namespaceID + * @param nsId namespaceID. + * @return true if refresh time is newer than threshold. Otherwise, return false. */ @VisibleForTesting - boolean isNamespaceStateIdFresh(String nsId) { + public boolean isNamespaceStateIdFresh(String nsId) { if (activeNNStateIdRefreshPeriodMs < 0) { return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java index 21e3f16f20613..6a360bb4cb6ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java @@ -64,7 +64,7 @@ public class RouterStateIdContext implements AlignmentContext { /** Nameservice specific overrides of the default setting for enabling observer reads. */ private HashSet observerReadEnabledOverrides = new HashSet<>(); - RouterStateIdContext(Configuration conf) { + public RouterStateIdContext(Configuration conf) { this.coordinatedMethods = new HashSet<>(); // For now, only ClientProtocol methods can be coordinated, so only checking // against ClientProtocol. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestDisableNameservicesExample.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestDisableNameservicesExample.java new file mode 100644 index 0000000000000..9bbff5692de03 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestDisableNameservicesExample.java @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode; +import static org.apache.hadoop.hdfs.server.federation.router.async.TestDisableNameservicesExample.cluster; +import static org.apache.hadoop.hdfs.server.federation.router.async.TestDisableNameservicesExample.routerContext; +import static org.apache.hadoop.hdfs.server.federation.router.async.TestDisableNameservicesExample.setUp; +import static org.apache.hadoop.util.Time.monotonicNow; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.metrics.RBFMetrics; +import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.router.NameserviceManager; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterClient; +import org.apache.hadoop.hdfs.server.federation.store.DisabledNameserviceStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.DisableNameserviceRequest; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.codehaus.jettison.json.JSONObject; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +/** + * Test the behavior when disabling name services. + */ +@SuppressWarnings("checkstyle:VisibilityModifier") +public class TestDisableNameservicesExample { + + public static final String ASYNC_MODE = "ASYNC"; + public static final String SYNC_MODE = "SYNC"; + + static StateStoreDFSCluster cluster; + static RouterContext routerContext; + static RouterClient routerAdminClient; + static ClientProtocol routerProtocol; + + @Nested + @ExtendWith(RouterServerHelper.class) + class TestWithAsyncRouterRpc { + + @ParameterizedTest + @ValueSource(strings = {ASYNC_MODE}) + public void testMetricsAsync(String rpcMode) throws Exception { + testMetrics(); + } + +/* @ParameterizedTest + @ValueSource(strings = {ASYNC_MODE) + public void testDisablingAsync() throws Exception { + testDisabling(); + } + + @ParameterizedTest + @ValueSource(strings = {ASYNC_MODE) + public void testWithoutDisablingAsync() throws IOException { + testWithoutDisabling(); + }*/ + } + + @Nested + @ExtendWith(RouterServerHelper.class) + class TestWithSyncRouterRpc { + + @ParameterizedTest + @ValueSource(strings = {SYNC_MODE}) + public void testMetricsSync(String rpcMode) throws Exception { + testMetrics(); + } + + @ParameterizedTest + @ValueSource(strings = {SYNC_MODE}) + public void testDisablingSync() throws Exception { + testDisabling(); + } + + @ParameterizedTest + @ValueSource(strings = {SYNC_MODE}) + public void testWithoutDisablingSync() throws IOException { + testWithoutDisabling(); + } + } + + public static void setUp(String rpcMode) throws Exception { + // Build and start a federated cluster. + cluster = new StateStoreDFSCluster(false, 2); + Configuration routerConf = new RouterConfigBuilder() + .stateStore() + .metrics() + .admin() + .rpc() + .build(); + // Reduce the number of RPC threads to saturate the Router easy. + routerConf.setInt(RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY, 8); + routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 4); + + // Use async router rpc. + if (rpcMode.equals("ASYNC")) { + routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true); + } + + // Set the DNs to belong to only one subcluster. + cluster.setIndependentDNs(); + + cluster.addRouterOverrides(routerConf); + // Override some settings for the client. + cluster.startCluster(); + cluster.startRouters(); + cluster.waitClusterUp(); + + routerContext = cluster.getRandomRouter(); + routerProtocol = routerContext.getClient().getNamenode(); + routerAdminClient = routerContext.getAdminClient(); + + setupNamespace(); + + // Simulate one of the subclusters to be slow. + MiniDFSCluster dfsCluster = cluster.getCluster(); + NameNode nn0 = dfsCluster.getNameNode(0); + simulateSlowNamenode(nn0, 1); + } + + private static void setupNamespace() throws IOException { + + // Setup a mount table to map to the two namespaces. + MountTableManager mountTable = routerAdminClient.getMountTableManager(); + Map destinations = new TreeMap<>(); + destinations.put("ns0", "/dirns0"); + MountTable newEntry = MountTable.newInstance("/dirns0", destinations); + AddMountTableEntryRequest request = + AddMountTableEntryRequest.newInstance(newEntry); + mountTable.addMountTableEntry(request); + + destinations = new TreeMap<>(); + destinations.put("ns1", "/dirns1"); + newEntry = MountTable.newInstance("/dirns1", destinations); + request = AddMountTableEntryRequest.newInstance(newEntry); + mountTable.addMountTableEntry(request); + + // Refresh the cache in the Router. + Router router = routerContext.getRouter(); + MountTableResolver mountTableResolver = + (MountTableResolver) router.getSubclusterResolver(); + mountTableResolver.loadCache(true); + + // Add a folder to each namespace. + NamenodeContext nn0 = cluster.getNamenode("ns0", null); + nn0.getFileSystem().mkdirs(new Path("/dirns0/0")); + nn0.getFileSystem().mkdirs(new Path("/dir-ns")); + NamenodeContext nn1 = cluster.getNamenode("ns1", null); + nn1.getFileSystem().mkdirs(new Path("/dirns1/1")); + } + + public static void tearDown() { + if (cluster != null) { + cluster.stopRouter(routerContext); + cluster.shutdown(); + cluster = null; + } + } + + public void cleanup() throws IOException { + Router router = routerContext.getRouter(); + StateStoreService stateStore = router.getStateStore(); + DisabledNameserviceStore store = + stateStore.getRegisteredRecordStore(DisabledNameserviceStore.class); + store.loadCache(true); + + Set disabled = store.getDisabledNameservices(); + for (String nsId : disabled) { + store.enableNameservice(nsId); + } + store.loadCache(true); + } + + public void testWithoutDisabling() throws IOException { + // ns0 is slow and renewLease should take a long time. + long t0 = monotonicNow(); + routerProtocol.renewLease("client0", null); + long t = monotonicNow() - t0; + assertTrue("It took too little: " + t + "ms", + t > TimeUnit.SECONDS.toMillis(1)); + // Return the results from all subclusters even if slow. + FileSystem routerFs = routerContext.getFileSystem(); + FileStatus[] filesStatus = routerFs.listStatus(new Path("/")); + assertEquals(3, filesStatus.length); + assertEquals("dir-ns", filesStatus[0].getPath().getName()); + assertEquals("dirns0", filesStatus[1].getPath().getName()); + assertEquals("dirns1", filesStatus[2].getPath().getName()); + } + + public void testDisabling() throws Exception { + disableNameservice("ns0"); + + // RenewLease should be fast as we are skipping ns0. + long t0 = monotonicNow(); + routerProtocol.renewLease("client0", null); + long t = monotonicNow() - t0; + assertTrue("It took too long: " + t + "ms", + t < TimeUnit.SECONDS.toMillis(1)); + // We should not report anything from ns0. + FileSystem routerFs = routerContext.getFileSystem(); + + FileStatus[] filesStatus = routerFs.listStatus(new Path("/")); + assertEquals(2, filesStatus.length); + assertEquals("dirns0", filesStatus[0].getPath().getName()); + assertEquals("dirns1", filesStatus[1].getPath().getName()); + + filesStatus = routerFs.listStatus(new Path("/dirns1")); + assertEquals(1, filesStatus.length); + assertEquals("1", filesStatus[0].getPath().getName()); + } + + public void testMetrics() throws Exception { + disableNameservice("ns0"); + + int numActive = 0; + int numDisabled = 0; + Router router = routerContext.getRouter(); + RBFMetrics metrics = router.getMetrics(); + String jsonString = metrics.getNameservices(); + JSONObject jsonObject = new JSONObject(jsonString); + Iterator keys = jsonObject.keys(); + while (keys.hasNext()) { + String key = (String) keys.next(); + JSONObject json = jsonObject.getJSONObject(key); + String nsId = json.getString("nameserviceId"); + String state = json.getString("state"); + if (nsId.equals("ns0")) { + assertEquals("DISABLED", state); + numDisabled++; + } else { + assertEquals("ACTIVE", state); + numActive++; + } + } + assertEquals(1, numActive); + assertEquals(1, numDisabled); + } + + private static void disableNameservice(final String nsId) + throws IOException { + NameserviceManager nsManager = routerAdminClient.getNameserviceManager(); + DisableNameserviceRequest req = + DisableNameserviceRequest.newInstance(nsId); + nsManager.disableNameservice(req); + + Router router = routerContext.getRouter(); + StateStoreService stateStore = router.getStateStore(); + DisabledNameserviceStore store = + stateStore.getRegisteredRecordStore(DisabledNameserviceStore.class); + store.loadCache(true); + MembershipNamenodeResolver resolver = + (MembershipNamenodeResolver) router.getNamenodeResolver(); + resolver.loadCache(true); + } +} + +class RouterServerHelper implements BeforeEachCallback, AfterEachCallback, AfterAllCallback { + + private static final ThreadLocal TEST_ROUTER_SERVER_TL = + new InheritableThreadLocal(); + + @Override + public void afterEach(ExtensionContext context) throws Exception { + Router router = routerContext.getRouter(); + StateStoreService stateStore = router.getStateStore(); + DisabledNameserviceStore store = + stateStore.getRegisteredRecordStore(DisabledNameserviceStore.class); + store.loadCache(true); + + Set disabled = store.getDisabledNameservices(); + for (String nsId : disabled) { + store.enableNameservice(nsId); + } + store.loadCache(true); + } + + @Override + public void beforeEach(ExtensionContext context) throws Exception { + Method testMethod = context.getRequiredTestMethod(); + ValueSource enumAnnotation = testMethod.getAnnotation(ValueSource.class); + if (enumAnnotation != null) { + String[] strings = enumAnnotation.strings(); + for (String rpcMode : strings) { + if (TEST_ROUTER_SERVER_TL.get() == null) { + setUp(rpcMode); + } + } + } + TEST_ROUTER_SERVER_TL.set(RouterServerHelper.this); + } + + @Override + public void afterAll(ExtensionContext context) throws Exception { + if (cluster != null) { + cluster.stopRouter(routerContext); + cluster.shutdown(); + cluster = null; + } + TEST_ROUTER_SERVER_TL.remove(); + } +}