Skip to content

Commit 741bdd6

Browse files
authored
HDFS-17721. RBF: Allow routers to declare IP for admin addr (#7342) Contributed by Felix Nguyen.
Reviewed-by: Haiyang Hu <[email protected]> Signed-off-by: Shilun Fan <[email protected]>
1 parent 44a5cba commit 741bdd6

File tree

6 files changed

+69
-15
lines changed

6 files changed

+69
-15
lines changed

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/MountTableRefresherService.java

+12-4
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public class MountTableRefresherService extends AbstractService {
7575

7676
/**
7777
* All router admin clients cached. So no need to create the client again and
78-
* again. Router admin address(host:port) is used as key to cache RouterClient
78+
* again. Router admin address(host:port or ip:port) is used as key to cache RouterClient
7979
* objects.
8080
*/
8181
private LoadingCache<String, RouterClient> routerClientsCache;
@@ -102,8 +102,13 @@ protected void serviceInit(Configuration conf) throws Exception {
102102
this.mountTableStore = getMountTableStore();
103103
// Attach this service to mount table store.
104104
this.mountTableStore.setRefreshService(this);
105-
this.localAdminAddress =
106-
StateStoreUtils.getHostPortString(router.getAdminServerAddress());
105+
if (conf.getBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE,
106+
RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE_DEFAULT)) {
107+
this.localAdminAddress = StateStoreUtils.getIpPortString(router.getAdminServerAddress());
108+
} else {
109+
this.localAdminAddress = StateStoreUtils.getHostPortString(router.getAdminServerAddress());
110+
}
111+
LOG.info("Initialized MountTableRefresherService with addr: {}", this.localAdminAddress);
107112
this.cacheUpdateTimeout = conf.getTimeDuration(
108113
RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT,
109114
RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT_DEFAULT,
@@ -220,7 +225,7 @@ public void refresh() throws StateStoreUnavailableException {
220225
List<MountTableRefresherThread> refreshThreads = new ArrayList<>();
221226
for (RouterState routerState : cachedRecords) {
222227
String adminAddress = routerState.getAdminAddress();
223-
if (adminAddress == null || adminAddress.length() == 0) {
228+
if (adminAddress == null || adminAddress.isEmpty()) {
224229
// this router has not enabled router admin.
225230
continue;
226231
}
@@ -237,11 +242,13 @@ public void refresh() throws StateStoreUnavailableException {
237242
* RouterClient
238243
*/
239244
refreshThreads.add(getLocalRefresher(adminAddress));
245+
LOG.debug("Added local refresher for {}", adminAddress);
240246
} else {
241247
try {
242248
RouterClient client = routerClientsCache.get(adminAddress);
243249
refreshThreads.add(new MountTableRefresherThread(
244250
client.getMountTableManager(), adminAddress));
251+
LOG.debug("Added remote refresher for {}", adminAddress);
245252
} catch (ExecutionException execExcep) {
246253
// Can not connect, seems router is stopped now.
247254
LOG.warn(ROUTER_CONNECT_ERROR_MSG, adminAddress, execExcep);
@@ -296,6 +303,7 @@ private void logResult(List<MountTableRefresherThread> refreshThreads) {
296303
if (mountTableRefreshThread.isSuccess()) {
297304
successCount++;
298305
} else {
306+
LOG.debug("Failed to refresh {}", mountTableRefreshThread.getAdminAddress());
299307
failureCount++;
300308
// remove RouterClient from cache so that new client is created
301309
removeFromCache(mountTableRefreshThread.getAdminAddress());

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java

+3
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
288288
FEDERATION_ROUTER_PREFIX + "safemode.checkperiod";
289289
public static final long DFS_ROUTER_SAFEMODE_CHECKPERIOD_MS_DEFAULT =
290290
TimeUnit.SECONDS.toMillis(5);
291+
public static final String DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE =
292+
FEDERATION_ROUTER_PREFIX + "heartbeat.with.ip.enable";
293+
public static final boolean DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE_DEFAULT = false;
291294

292295
// HDFS Router-based federation mount table entries
293296
/** Maximum number of cache entries to have. */

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterHeartbeatService.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,12 @@ synchronized void updateStateStore() {
8888
getStateStoreVersion(MountTableStore.class));
8989
record.setStateStoreVersion(stateStoreVersion);
9090
// if admin server not started then hostPort will be empty
91-
String hostPort =
92-
StateStoreUtils.getHostPortString(router.getAdminServerAddress());
93-
record.setAdminAddress(hostPort);
91+
if (router.getConfig().getBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE,
92+
RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE_DEFAULT)) {
93+
record.setAdminAddress(StateStoreUtils.getIpPortString(router.getAdminServerAddress()));
94+
} else {
95+
record.setAdminAddress(StateStoreUtils.getHostPortString(router.getAdminServerAddress()));
96+
}
9497
RouterHeartbeatRequest request =
9598
RouterHeartbeatRequest.newInstance(record);
9699
RouterHeartbeatResponse response = routerStore.routerHeartbeat(request);

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/StateStoreUtils.java

+16
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
2727
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
28+
import org.apache.hadoop.net.NetUtils;
2829
import org.slf4j.Logger;
2930
import org.slf4j.LoggerFactory;
3031

@@ -136,4 +137,19 @@ public static String getHostPortString(InetSocketAddress address) {
136137
return hostName + ":" + address.getPort();
137138
}
138139

140+
/**
141+
* Returns address in form of ip:port, empty string if address is null.
142+
*
143+
* @param address address
144+
* @return host:port
145+
*/
146+
public static String getIpPortString(InetSocketAddress address) {
147+
if (null == address) {
148+
return "";
149+
}
150+
address = NetUtils.getConnectAddress(address);
151+
InetAddress inet = address.getAddress();
152+
return inet.getHostAddress() + ":" + address.getPort();
153+
}
154+
139155
}

hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml

+8
Original file line numberDiff line numberDiff line change
@@ -974,4 +974,12 @@
974974
</description>
975975
</property>
976976

977+
<property>
978+
<name>dfs.federation.router.heartbeat.with.ip.enable</name>
979+
<description>
980+
Make router use IP instead of host when communicating with router state state store.
981+
</description>
982+
<value>false</value>
983+
</property>
984+
977985
</configuration>

hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMountTableCacheRefresh.java

+24-8
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.io.IOException;
2626
import java.net.InetSocketAddress;
2727
import java.util.ArrayList;
28+
import java.util.Arrays;
29+
import java.util.Collection;
2830
import java.util.Collections;
2931
import java.util.List;
3032
import java.util.concurrent.TimeUnit;
@@ -53,23 +55,32 @@
5355
import org.apache.hadoop.test.GenericTestUtils;
5456
import org.apache.hadoop.util.Time;
5557
import org.junit.After;
56-
import org.junit.AfterClass;
57-
import org.junit.BeforeClass;
5858
import org.junit.Test;
59+
import org.junit.runner.RunWith;
60+
import org.junit.runners.Parameterized;
5961

6062
/**
6163
* This test class verifies that mount table cache is updated on all the routers
6264
* when MountTableRefreshService is enabled and there is a change in mount table
6365
* entries.
6466
*/
67+
@RunWith(Parameterized.class)
6568
public class TestRouterMountTableCacheRefresh {
6669
private static TestingServer curatorTestingServer;
6770
private static MiniRouterDFSCluster cluster;
6871
private static RouterContext routerContext;
6972
private static MountTableManager mountTableManager;
7073

71-
@BeforeClass
72-
public static void setUp() throws Exception {
74+
@Parameterized.Parameters
75+
public static Collection<Object> data() {
76+
return Arrays.asList(new Object[] {true, false});
77+
}
78+
79+
public TestRouterMountTableCacheRefresh(boolean useIpForHeartbeats) throws Exception {
80+
// Initialize only once per parameter
81+
if (curatorTestingServer != null) {
82+
return;
83+
}
7384
curatorTestingServer = new TestingServer();
7485
curatorTestingServer.start();
7586
final String connectString = curatorTestingServer.getConnectString();
@@ -82,6 +93,7 @@ public static void setUp() throws Exception {
8293
FileSubclusterResolver.class);
8394
conf.set(RBFConfigKeys.FEDERATION_STORE_ZK_ADDRESS, connectString);
8495
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_STORE_ENABLE, true);
96+
conf.setBoolean(RBFConfigKeys.DFS_ROUTER_HEARTBEAT_WITH_IP_ENABLE, useIpForHeartbeats);
8597
cluster.addRouterOverrides(conf);
8698
cluster.startCluster();
8799
cluster.startRouters();
@@ -95,11 +107,15 @@ public static void setUp() throws Exception {
95107
numNameservices, 60000);
96108
}
97109

98-
@AfterClass
99-
public static void destory() {
110+
@Parameterized.AfterParam
111+
public static void destroy() {
100112
try {
101-
curatorTestingServer.close();
102-
cluster.shutdown();
113+
if (curatorTestingServer != null) {
114+
curatorTestingServer.close();
115+
}
116+
if (cluster != null) {
117+
cluster.shutdown();
118+
}
103119
} catch (IOException e) {
104120
// do nothing
105121
}

0 commit comments

Comments
 (0)