From d09531a021afdc86906280587304ae50ceab8ad1 Mon Sep 17 00:00:00 2001 From: zhanghaobo Date: Tue, 4 Mar 2025 19:02:58 +0800 Subject: [PATCH] HDFS-17750. [ARR] RouterWebHdfsMethods adapts to async rpc. --- .../router/RouterWebHdfsMethods.java | 12 ++++- .../router/TestRouterWebHdfsMethods.java | 7 +-- .../async/TestRouterAsyncWebHdfsMethods.java | 54 +++++++++++++++++++ 3 files changed, 68 insertions(+), 5 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncWebHdfsMethods.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java index 4689b2d036b04..1cf65bfda19a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterWebHdfsMethods.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.federation.router; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; import static org.apache.hadoop.util.StringUtils.getTrimmedStringCollection; import org.apache.hadoop.fs.InvalidPathException; @@ -28,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.common.JspHelper; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.router.security.RouterSecurityManager; import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; @@ -478,8 +480,14 @@ private DatanodeInfo chooseDatanode(final Router router, if (op == PutOpParam.Op.CREATE) { try { - resolvedNs = rpcServer.getCreateLocation(path).getNameserviceId(); - } catch (IOException e) { + if (rpcServer.isAsync()) { + rpcServer.getCreateLocation(path); + RemoteLocation remoteLocation = syncReturn(RemoteLocation.class); + resolvedNs = remoteLocation.getNameserviceId(); + } else { + resolvedNs = rpcServer.getCreateLocation(path).getNameserviceId(); + } + } catch (Exception e) { LOG.error("Cannot get the name service " + "to create file for path {} ", path, e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterWebHdfsMethods.java index 7c3643f5a511e..39032544eded1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterWebHdfsMethods.java @@ -47,13 +47,14 @@ /** * Test suite for Router Web Hdfs methods. */ +@SuppressWarnings("checkstyle:visibilitymodifier") public class TestRouterWebHdfsMethods { static final Logger LOG = LoggerFactory.getLogger(TestRouterWebHdfsMethods.class); - private static StateStoreDFSCluster cluster; - private static RouterContext router; - private static String httpUri; + protected static StateStoreDFSCluster cluster; + protected static RouterContext router; + protected static String httpUri; @BeforeClass public static void globalSetUp() throws Exception { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncWebHdfsMethods.java new file mode 100644 index 0000000000000..99001eab0563b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncWebHdfsMethods.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys; +import org.apache.hadoop.hdfs.server.federation.router.TestRouterWebHdfsMethods; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test suite for Router Web Hdfs methods using async router rpc. + */ +public class TestRouterAsyncWebHdfsMethods extends TestRouterWebHdfsMethods { + public static final Logger LOG = + LoggerFactory.getLogger(TestRouterAsyncWebHdfsMethods.class); + + @BeforeClass + public static void globalSetUp() throws Exception { + cluster = new StateStoreDFSCluster(false, 2); + Configuration conf = new RouterConfigBuilder() + .stateStore() + .rpc() + .http() + .admin() + .build(); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY, true); + cluster.addRouterOverrides(conf); + cluster.setIndependentDNs(); + cluster.startCluster(); + cluster.startRouters(); + cluster.waitClusterUp(); + router = cluster.getRandomRouter(); + httpUri = "http://"+router.getHttpAddress(); + } +}