From 8c7d1e472ee73d6e596e3aa5958dc4e1e1c6c4e5 Mon Sep 17 00:00:00 2001 From: chandrasekhar-188k <154109917+chandrasekhar-188k@users.noreply.github.com> Date: Tue, 6 May 2025 08:50:37 +0530 Subject: [PATCH] HBASE-29285 Use DataNode port as favored-node port to make bulk load follow locality --- .../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java index 51e23abc8ca6..627374ad032b 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java @@ -79,6 +79,8 @@ import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.MapReduceExtendedCell; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -90,6 +92,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter; import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; +import org.apache.hadoop.net.NetUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -306,8 +309,7 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { LOG.trace("Failed get of location, use default writer {}", Bytes.toString(rowKey)); } else { LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey)); - InetSocketAddress initialIsa = - new InetSocketAddress(loc.getHostname(), loc.getPort()); + InetSocketAddress initialIsa = getDNFavoredNode(conf, loc); if (initialIsa.isUnresolved()) { LOG.trace("Failed resolve address {}, use default writer", loc.getHostnamePort()); } else { @@ -329,6 +331,14 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException { this.previousRows.put(family, rowKey); } + private InetSocketAddress getDNFavoredNode(Configuration conf, HRegionLocation loc) { + HdfsConfiguration.init(); + Configuration dnConf = new HdfsConfiguration(conf); + int dnPort = NetUtils.createSocketAddr(dnConf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, + DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort(); + return new InetSocketAddress(loc.getHostname(), dnPort); + } + private Path getTableRelativePath(byte[] tableNameBytes) { String tableName = Bytes.toString(tableNameBytes); String[] tableNameParts = tableName.split(":");