Skip to content

Commit b143491

Browse files
HDFS-17905. Clean up pendingDNMessages on block removal (#8415)
1 parent 7bb3afe commit b143491

File tree

2 files changed

+54
-0
lines changed

2 files changed

+54
-0
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4982,6 +4982,8 @@ public void removeBlock(BlockInfo block) {
49824982
DatanodeStorageInfo.decrementBlocksScheduled(remove.getTargets()
49834983
.toArray(new DatanodeStorageInfo[remove.getTargets().size()]));
49844984
}
4985+
// Remove all pending messages for this deleted block from the queue
4986+
pendingDNMessages.takeBlockQueue(block);
49854987
neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
49864988
postponedMisreplicatedBlocks.remove(block);
49874989
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,4 +139,56 @@ private static boolean wipeAndRestartDn(MiniDFSCluster cluster, int dnIndex)
139139
return cluster.restartDataNode(dnProps, true);
140140
}
141141

142+
@Test
143+
@Timeout(value = 60)
144+
public void testRemoveBlockCleansUpPendingDNMessages() throws Exception {
145+
HdfsConfiguration conf = new HdfsConfiguration();
146+
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
147+
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
148+
.numDataNodes(1)
149+
.nnTopology(MiniDFSNNTopology.simpleHATopology())
150+
.build();
151+
152+
try {
153+
cluster.transitionToActive(0);
154+
155+
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
156+
OutputStream out = fs.create(filePath);
157+
out.write("foo bar baz".getBytes());
158+
out.close();
159+
160+
HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0), cluster.getNameNode(1));
161+
162+
// Send genstamp to the future
163+
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
164+
cluster.changeGenStampOfBlock(0, block, 4200);
165+
166+
// Run directory scanner to update Datanode's volumeMap
167+
DataNodeTestUtils.runDirectoryScanner(cluster.getDataNodes().get(0));
168+
// Stop the DN so the replica with the changed gen stamp will be reported
169+
// when this DN starts up.
170+
DataNodeProperties dnProps = cluster.stopDataNode(0);
171+
172+
// Restart the namenode so that when the DN comes up it will see an initial
173+
// block report.
174+
cluster.restartNameNode(1, false);
175+
assertTrue(cluster.restartDataNode(dnProps, true));
176+
177+
// Wait until the standby NN queues up the future block in the pending DN
178+
// message queue.
179+
GenericTestUtils.waitFor((Supplier<Boolean>) () ->
180+
cluster.getNamesystem(1).getBlockManager().getPendingDataNodeMessageCount() == 1, 1000,
181+
30000);
182+
183+
// Delete the file on active, make standby tail the edit
184+
fs.delete(filePath, false);
185+
cluster.getNameNode(0).getRpcServer().rollEditLog();
186+
cluster.getNameNode(1).getNamesystem().getEditLogTailer().doTailEdits();
187+
188+
// SB pendingDNMessage should be empty now, else there's a leak
189+
assertEquals(0, cluster.getNamesystem(1).getBlockManager().getPendingDataNodeMessageCount());
190+
} finally {
191+
cluster.shutdown();
192+
}
193+
}
142194
}

0 commit comments

Comments
 (0)