diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 5938cd466846d..3799505fba510 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2227,20 +2227,7 @@ LocatedBlocks getBlockLocations(String clientMachine, String srcArg, dir, pc, srcArg, offset, length, true); inode = res.getIIp().getLastINode(); if (isInSafeMode()) { - for (LocatedBlock b : res.blocks.getLocatedBlocks()) { - // if safemode & no block locations yet then throw safemodeException - if ((b.getLocations() == null) || (b.getLocations().length == 0)) { - SafeModeException se = newSafemodeException( - "Zero blocklocations for " + srcArg); - if (haEnabled && haContext != null && - (haContext.getState().getServiceState() == ACTIVE || - haContext.getState().getServiceState() == OBSERVER)) { - throw new RetriableException(se); - } else { - throw se; - } - } - } + checkBlockLocationsInSafeMode(res.blocks, srcArg); } else if (isObserver()) { checkBlockLocationsWhenObserver(res.blocks, srcArg); } @@ -4326,11 +4313,15 @@ BatchedDirectoryListing getBatchedListing(String[] srcs, byte[] startAfter, if (dirListing == null) { throw new FileNotFoundException("Path " + src + " does not exist"); } - if (needLocation && isObserver()) { + if (needLocation) { for (HdfsFileStatus fs : dirListing.getPartialListing()) { if (fs instanceof HdfsLocatedFileStatus) { LocatedBlocks lbs = ((HdfsLocatedFileStatus) fs).getLocatedBlocks(); - checkBlockLocationsWhenObserver(lbs, fs.toString()); + if (isInSafeMode()) { + checkBlockLocationsInSafeMode(lbs, fs.toString()); + } else if (isObserver()) { + checkBlockLocationsWhenObserver(lbs, fs.toString()); + } } } } @@ -9206,6 +9197,64 @@ private boolean isObserver() { return haEnabled && haContext != null && haContext.getState().getServiceState() == OBSERVER; } + private boolean hasSufficientReplicas(LocatedBlock block, + ErasureCodingPolicy ecPolicy) { + DatanodeInfo[] locations = block.getLocations(); + if (locations == null) { + return false; + } + + if (block.isStriped()) { + LocatedStripedBlock stripedBlock = (LocatedStripedBlock) block; + // For erasure coded files, require enough data units to reconstruct + // the block, bounded by the number of cells in the block. + long numCells = + (block.getBlockSize() - 1) / (long) ecPolicy.getCellSize() + 1; + int minBlocks = (int) Math.min((long) ecPolicy.getNumDataUnits(), numCells); + + // Units can be over-replicated, so need to account for unique indices + byte[] indices = stripedBlock.getBlockIndices(); + boolean[] seen = new boolean[ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()]; + + int count = 0; + for (byte idx : indices) { + int i = idx & 0xFF; + if (!seen[i]) { + seen[i] = true; + if (++count >= minBlocks) { + return true; + } + } + } + return false; + } else { + return locations.length > 0; + } + } + + private void checkBlockLocationsInSafeMode(LocatedBlocks blocks, String src) + throws SafeModeException, RetriableException { + if (blocks == null) { + return; + } + List locatedBlockList = blocks.getLocatedBlocks(); + if (locatedBlockList != null) { + ErasureCodingPolicy ecPolicy = blocks.getErasureCodingPolicy(); + for (LocatedBlock block : locatedBlockList) { + if (!hasSufficientReplicas(block, ecPolicy)) { + SafeModeException se = newSafemodeException( + "Not enough blocklocations for " + src); + if (haEnabled && haContext != null && + (haContext.getState().getServiceState() == ACTIVE || + haContext.getState().getServiceState() == OBSERVER)) { + throw new RetriableException(se); + } + throw se; + } + } + } + } + private void checkBlockLocationsWhenObserver(LocatedBlocks blocks, String src) throws ObserverRetryOnActiveException { if (blocks == null) { @@ -9213,9 +9262,10 @@ private void checkBlockLocationsWhenObserver(LocatedBlocks blocks, String src) } List locatedBlockList = blocks.getLocatedBlocks(); if (locatedBlockList != null) { - for (LocatedBlock b : locatedBlockList) { - if (b.getLocations() == null || b.getLocations().length == 0) { - throw new ObserverRetryOnActiveException("Zero blocklocations for " + src); + ErasureCodingPolicy ecPolicy = blocks.getErasureCodingPolicy(); + for (LocatedBlock block : locatedBlockList) { + if (!hasSufficientReplicas(block, ecPolicy)) { + throw new ObserverRetryOnActiveException("Not enough blocklocations for " + src); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index 4ed7c8f5370d9..25299c39e5093 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -36,6 +36,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -57,10 +58,13 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.namenode.FSEditLog; @@ -69,6 +73,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer; import org.apache.hadoop.hdfs.server.namenode.TestFsck; import org.apache.hadoop.hdfs.tools.GetGroups; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.test.GenericTestUtils; @@ -419,6 +424,11 @@ public void testObserverNodeSafeModeWithBlockLocations() throws Exception { // Set observer to safe mode. dfsCluster.getFileSystem(2).setSafeMode(SafeModeAction.ENTER); + DatanodeInfo fakeDatanodeInfo = new DatanodeInfo.DatanodeInfoBuilder() + // Stiped blocks need a UUID to be hashed + .setNodeID(new DatanodeID(UUID.randomUUID().toString(), DatanodeID.EMPTY_DATANODE_ID)) + .build(); + // Mock block manager for observer to generate some fake blocks which // will trigger the (retriable) safe mode exception. BlockManager bmSpy = @@ -437,6 +447,43 @@ public void testObserverNodeSafeModeWithBlockLocations() throws Exception { dfs.open(testPath).close(); assertSentTo(0); + // Test erasure coded files + ErasureCodingPolicy ecPolicy = new ErasureCodingPolicy(new ECSchema("rs", 3, 2), 1024); + + // Fake a small file that only needs 1 block + doAnswer((invocation) -> { + List fakeBlocks = new ArrayList<>(); + // Return a single location, which is enough for the small file but not for the large file + ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L, 1, 0)); + LocatedStripedBlock fakeBlock = new LocatedStripedBlock(b, new DatanodeInfo[] {fakeDatanodeInfo}, + null, null, new byte[] {0}, 0, false, null); + fakeBlocks.add(fakeBlock); + return new LocatedBlocks(1, false, fakeBlocks, null, true, null, ecPolicy); + }).when(bmSpy).createLocatedBlocks(Mockito.any(), anyLong(), + anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(), + Mockito.any(), Mockito.any()); + + // Small file should suceed with just the one block + dfs.open(testPath).close(); + assertSentTo(2); + + // Fake a larger file that needs all 3 data shards + doAnswer((invocation) -> { + List fakeBlocks = new ArrayList<>(); + // Return a single location, which is enough for the small file but not for the large file + ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L, 1024 * 3, 0)); + LocatedStripedBlock fakeBlock = new LocatedStripedBlock(b, new DatanodeInfo[] {fakeDatanodeInfo}, + null, null, new byte[] {0}, 0, false, null); + fakeBlocks.add(fakeBlock); + return new LocatedBlocks(1024 * 3, false, fakeBlocks, null, true, null, ecPolicy); + }).when(bmSpy).createLocatedBlocks(Mockito.any(), anyLong(), + anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(), + Mockito.any(), Mockito.any()); + + // Large file should failover to the active + dfs.open(testPath).close(); + assertSentTo(0); + Mockito.reset(bmSpy); // Remove safe mode on observer, request should still go to it. @@ -454,6 +501,11 @@ public void testObserverNodeBlockMissingRetry() throws Exception { dfsCluster.rollEditLogAndTail(0); + DatanodeInfo fakeDatanodeInfo = new DatanodeInfo.DatanodeInfoBuilder() + // Stiped blocks need a UUID to be hashed + .setNodeID(new DatanodeID(UUID.randomUUID().toString(), DatanodeID.EMPTY_DATANODE_ID)) + .build(); + // Mock block manager for observer to generate some fake blocks which // will trigger the block missing exception. @@ -471,7 +523,62 @@ public void testObserverNodeBlockMissingRetry() throws Exception { anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(), Mockito.any(), Mockito.any()); - dfs.open(testPath); + dfs.open(testPath).close(); + assertSentTo(0); + + dfs.getClient().listPaths("/", new byte[0], true); + assertSentTo(0); + + dfs.getClient().getLocatedFileInfo(testPath.toString(), false); + assertSentTo(0); + + dfs.getClient().batchedListPaths(new String[]{"/"}, new byte[0], true); + assertSentTo(0); + + // Test erasure coded files + ErasureCodingPolicy ecPolicy = new ErasureCodingPolicy(new ECSchema("rs", 3, 2), 1024); + + // Fake a small file that only needs 1 block + doAnswer((invocation) -> { + List fakeBlocks = new ArrayList<>(); + // Return a single location, which is enough for the small file but not for the large file + ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L, 1, 0)); + LocatedStripedBlock fakeBlock = new LocatedStripedBlock(b, new DatanodeInfo[] {fakeDatanodeInfo}, + null, null, new byte[] {0}, 0, false, null); + fakeBlocks.add(fakeBlock); + return new LocatedBlocks(1, false, fakeBlocks, null, true, null, ecPolicy); + }).when(bmSpy).createLocatedBlocks(Mockito.any(), anyLong(), + anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(), + Mockito.any(), Mockito.any()); + + // The small file should succeed on the observer, while the large file should not + + dfs.open(testPath).close(); + assertSentTo(2); + + dfs.getClient().listPaths("/", new byte[0], true); + assertSentTo(2); + + dfs.getClient().getLocatedFileInfo(testPath.toString(), false); + assertSentTo(2); + + dfs.getClient().batchedListPaths(new String[]{"/"}, new byte[0], true); + assertSentTo(2); + + // Fake a larger file that needs all 3 data shards + doAnswer((invocation) -> { + List fakeBlocks = new ArrayList<>(); + // Return a single location, which is enough for the small file but not for the large file + ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L, 1024 * 3, 0)); + LocatedStripedBlock fakeBlock = new LocatedStripedBlock(b, new DatanodeInfo[] {fakeDatanodeInfo}, + null, null, new byte[] {0}, 0, false, null); + fakeBlocks.add(fakeBlock); + return new LocatedBlocks(1024 * 3, false, fakeBlocks, null, true, null, ecPolicy); + }).when(bmSpy).createLocatedBlocks(Mockito.any(), anyLong(), + anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(), + Mockito.any(), Mockito.any()); + + dfs.open(testPath).close(); assertSentTo(0); dfs.getClient().listPaths("/", new byte[0], true);