Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2227,20 +2227,7 @@ LocatedBlocks getBlockLocations(String clientMachine, String srcArg,
dir, pc, srcArg, offset, length, true);
inode = res.getIIp().getLastINode();
if (isInSafeMode()) {
for (LocatedBlock b : res.blocks.getLocatedBlocks()) {
// if safemode & no block locations yet then throw safemodeException
if ((b.getLocations() == null) || (b.getLocations().length == 0)) {
SafeModeException se = newSafemodeException(
"Zero blocklocations for " + srcArg);
if (haEnabled && haContext != null &&
(haContext.getState().getServiceState() == ACTIVE ||
haContext.getState().getServiceState() == OBSERVER)) {
throw new RetriableException(se);
} else {
throw se;
}
}
}
checkBlockLocationsInSafeMode(res.blocks, srcArg);
} else if (isObserver()) {
checkBlockLocationsWhenObserver(res.blocks, srcArg);
}
Expand Down Expand Up @@ -4326,11 +4313,15 @@ BatchedDirectoryListing getBatchedListing(String[] srcs, byte[] startAfter,
if (dirListing == null) {
throw new FileNotFoundException("Path " + src + " does not exist");
}
if (needLocation && isObserver()) {
if (needLocation) {
for (HdfsFileStatus fs : dirListing.getPartialListing()) {
if (fs instanceof HdfsLocatedFileStatus) {
LocatedBlocks lbs = ((HdfsLocatedFileStatus) fs).getLocatedBlocks();
checkBlockLocationsWhenObserver(lbs, fs.toString());
if (isInSafeMode()) {
checkBlockLocationsInSafeMode(lbs, fs.toString());
} else if (isObserver()) {
checkBlockLocationsWhenObserver(lbs, fs.toString());
}
}
}
}
Expand Down Expand Up @@ -9206,16 +9197,75 @@ private boolean isObserver() {
return haEnabled && haContext != null && haContext.getState().getServiceState() == OBSERVER;
}

private boolean hasSufficientReplicas(LocatedBlock block,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasSufficientBlockLocations

ErasureCodingPolicy ecPolicy) {
DatanodeInfo[] locations = block.getLocations();
if (locations == null) {
return false;
}

if (block.isStriped()) {
LocatedStripedBlock stripedBlock = (LocatedStripedBlock) block;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 LocatedStripedBlock stripedBlock = (LocatedStripedBlock) block;
      // For erasure coded files, require enough data units to reconstruct
      // the block, bounded by the number of cells in the block.
      long numCells = (block.getBlockSize() - 1) / ecPolicy.getCellSize() + 1;
      int minRequiredIndices = (int) Math.min(ecPolicy.getNumDataUnits(), numCells);
      
      // Units can be over-replicated, so need to account for unique indices
      byte[] blockIndices = stripedBlock.getBlockIndices();
      boolean[] seenIndices = new boolean[ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()];

      int uniqueIndexCount = 0;
      for (byte idx : blockIndices) {
        int index = idx & 0xFF;
        if (!seenIndices[index]) {
          seenIndices[index] = true;
          if (++uniqueIndexCount >= minRequiredIndices) {
            return true;
          }
        }
      }
      return false;

// For erasure coded files, require enough data units to reconstruct
// the block, bounded by the number of cells in the block.
long numCells =
(block.getBlockSize() - 1) / (long) ecPolicy.getCellSize() + 1;
int minBlocks = (int) Math.min((long) ecPolicy.getNumDataUnits(), numCells);

// Units can be over-replicated, so need to account for unique indices
byte[] indices = stripedBlock.getBlockIndices();
boolean[] seen = new boolean[ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()];

int count = 0;
for (byte idx : indices) {
int i = idx & 0xFF;
if (!seen[i]) {
seen[i] = true;
if (++count >= minBlocks) {
return true;
}
}
}
return false;
} else {
return locations.length > 0;
}
}

private void checkBlockLocationsInSafeMode(LocatedBlocks blocks, String src)
throws SafeModeException, RetriableException {
if (blocks == null) {
return;
}
List<LocatedBlock> locatedBlockList = blocks.getLocatedBlocks();
if (locatedBlockList != null) {
ErasureCodingPolicy ecPolicy = blocks.getErasureCodingPolicy();
for (LocatedBlock block : locatedBlockList) {
if (!hasSufficientReplicas(block, ecPolicy)) {
SafeModeException se = newSafemodeException(
"Not enough blocklocations for " + src);
if (haEnabled && haContext != null &&
(haContext.getState().getServiceState() == ACTIVE ||
haContext.getState().getServiceState() == OBSERVER)) {
throw new RetriableException(se);
}
throw se;
}
}
}
}

private void checkBlockLocationsWhenObserver(LocatedBlocks blocks, String src)
throws ObserverRetryOnActiveException {
if (blocks == null) {
return;
}
List<LocatedBlock> locatedBlockList = blocks.getLocatedBlocks();
if (locatedBlockList != null) {
for (LocatedBlock b : locatedBlockList) {
if (b.getLocations() == null || b.getLocations().length == 0) {
throw new ObserverRetryOnActiveException("Zero blocklocations for " + src);
ErasureCodingPolicy ecPolicy = blocks.getErasureCodingPolicy();
for (LocatedBlock block : locatedBlockList) {
if (!hasSufficientReplicas(block, ecPolicy)) {
throw new ObserverRetryOnActiveException("Not enough blocklocations for " + src);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -57,10 +58,13 @@
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
Expand All @@ -69,6 +73,7 @@
import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
import org.apache.hadoop.hdfs.tools.GetGroups;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
import org.apache.hadoop.test.GenericTestUtils;
Expand Down Expand Up @@ -419,6 +424,11 @@ public void testObserverNodeSafeModeWithBlockLocations() throws Exception {
// Set observer to safe mode.
dfsCluster.getFileSystem(2).setSafeMode(SafeModeAction.ENTER);

DatanodeInfo fakeDatanodeInfo = new DatanodeInfo.DatanodeInfoBuilder()
// Stiped blocks need a UUID to be hashed
.setNodeID(new DatanodeID(UUID.randomUUID().toString(), DatanodeID.EMPTY_DATANODE_ID))
.build();

// Mock block manager for observer to generate some fake blocks which
// will trigger the (retriable) safe mode exception.
BlockManager bmSpy =
Expand All @@ -437,6 +447,43 @@ public void testObserverNodeSafeModeWithBlockLocations() throws Exception {
dfs.open(testPath).close();
assertSentTo(0);

// Test erasure coded files
ErasureCodingPolicy ecPolicy = new ErasureCodingPolicy(new ECSchema("rs", 3, 2), 1024);

// Fake a small file that only needs 1 block
doAnswer((invocation) -> {
List<LocatedBlock> fakeBlocks = new ArrayList<>();
// Return a single location, which is enough for the small file but not for the large file
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L, 1, 0));
LocatedStripedBlock fakeBlock = new LocatedStripedBlock(b, new DatanodeInfo[] {fakeDatanodeInfo},
null, null, new byte[] {0}, 0, false, null);
fakeBlocks.add(fakeBlock);
return new LocatedBlocks(1, false, fakeBlocks, null, true, null, ecPolicy);
}).when(bmSpy).createLocatedBlocks(Mockito.any(), anyLong(),
anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(),
Mockito.any(), Mockito.any());

// Small file should suceed with just the one block
dfs.open(testPath).close();
assertSentTo(2);

// Fake a larger file that needs all 3 data shards
doAnswer((invocation) -> {
List<LocatedBlock> fakeBlocks = new ArrayList<>();
// Return a single location, which is enough for the small file but not for the large file
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L, 1024 * 3, 0));
LocatedStripedBlock fakeBlock = new LocatedStripedBlock(b, new DatanodeInfo[] {fakeDatanodeInfo},
null, null, new byte[] {0}, 0, false, null);
fakeBlocks.add(fakeBlock);
return new LocatedBlocks(1024 * 3, false, fakeBlocks, null, true, null, ecPolicy);
}).when(bmSpy).createLocatedBlocks(Mockito.any(), anyLong(),
anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(),
Mockito.any(), Mockito.any());

// Large file should failover to the active
dfs.open(testPath).close();
assertSentTo(0);

Mockito.reset(bmSpy);

// Remove safe mode on observer, request should still go to it.
Expand All @@ -454,6 +501,11 @@ public void testObserverNodeBlockMissingRetry() throws Exception {

dfsCluster.rollEditLogAndTail(0);

DatanodeInfo fakeDatanodeInfo = new DatanodeInfo.DatanodeInfoBuilder()
// Stiped blocks need a UUID to be hashed
.setNodeID(new DatanodeID(UUID.randomUUID().toString(), DatanodeID.EMPTY_DATANODE_ID))
.build();

// Mock block manager for observer to generate some fake blocks which
// will trigger the block missing exception.

Expand All @@ -471,7 +523,62 @@ public void testObserverNodeBlockMissingRetry() throws Exception {
anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(),
Mockito.any(), Mockito.any());

dfs.open(testPath);
dfs.open(testPath).close();
assertSentTo(0);

dfs.getClient().listPaths("/", new byte[0], true);
assertSentTo(0);

dfs.getClient().getLocatedFileInfo(testPath.toString(), false);
assertSentTo(0);

dfs.getClient().batchedListPaths(new String[]{"/"}, new byte[0], true);
assertSentTo(0);

// Test erasure coded files
ErasureCodingPolicy ecPolicy = new ErasureCodingPolicy(new ECSchema("rs", 3, 2), 1024);

// Fake a small file that only needs 1 block
doAnswer((invocation) -> {
List<LocatedBlock> fakeBlocks = new ArrayList<>();
// Return a single location, which is enough for the small file but not for the large file
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L, 1, 0));
LocatedStripedBlock fakeBlock = new LocatedStripedBlock(b, new DatanodeInfo[] {fakeDatanodeInfo},
null, null, new byte[] {0}, 0, false, null);
fakeBlocks.add(fakeBlock);
return new LocatedBlocks(1, false, fakeBlocks, null, true, null, ecPolicy);
}).when(bmSpy).createLocatedBlocks(Mockito.any(), anyLong(),
anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(),
Mockito.any(), Mockito.any());

// The small file should succeed on the observer, while the large file should not

dfs.open(testPath).close();
assertSentTo(2);

dfs.getClient().listPaths("/", new byte[0], true);
assertSentTo(2);

dfs.getClient().getLocatedFileInfo(testPath.toString(), false);
assertSentTo(2);

dfs.getClient().batchedListPaths(new String[]{"/"}, new byte[0], true);
assertSentTo(2);

// Fake a larger file that needs all 3 data shards
doAnswer((invocation) -> {
List<LocatedBlock> fakeBlocks = new ArrayList<>();
// Return a single location, which is enough for the small file but not for the large file
ExtendedBlock b = new ExtendedBlock("fake-pool", new Block(12345L, 1024 * 3, 0));
LocatedStripedBlock fakeBlock = new LocatedStripedBlock(b, new DatanodeInfo[] {fakeDatanodeInfo},
null, null, new byte[] {0}, 0, false, null);
fakeBlocks.add(fakeBlock);
return new LocatedBlocks(1024 * 3, false, fakeBlocks, null, true, null, ecPolicy);
}).when(bmSpy).createLocatedBlocks(Mockito.any(), anyLong(),
anyBoolean(), anyLong(), anyLong(), anyBoolean(), anyBoolean(),
Mockito.any(), Mockito.any());

dfs.open(testPath).close();
assertSentTo(0);

dfs.getClient().listPaths("/", new byte[0], true);
Expand Down
Loading