Skip to content

Commit

Permalink
HDFS-17755. Enhance refreshVolumes usability and code structure.
Browse files Browse the repository at this point in the history
  • Loading branch information
huangzhaobo99 committed Mar 9, 2025
1 parent 126c3d4 commit b484a40
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;

// Used to dynamically refresh the conf of data dirs
public static final String DFS_DATANODE_DATA_DIR_TO_ADD_KEY = "dfs.datanode.data.dir.to.add";
public static final String DFS_DATANODE_DATA_DIR_TO_REMOVE_KEY =
"dfs.datanode.data.dir.to.remove";

//Following keys have no defaults
public static final String DFS_DATANODE_DATA_DIR_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_DATANODE_DATA_DIR_KEY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ALLOW_SAME_DISK_TIERING_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_TO_ADD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_TO_REMOVE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_DEFAULT;
Expand Down Expand Up @@ -138,7 +140,6 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -351,6 +352,8 @@ public class DataNode extends ReconfigurableBase
Collections.unmodifiableList(
Arrays.asList(
DFS_DATANODE_DATA_DIR_KEY,
DFS_DATANODE_DATA_DIR_TO_ADD_KEY,
DFS_DATANODE_DATA_DIR_TO_REMOVE_KEY,
DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
Expand Down Expand Up @@ -639,34 +642,10 @@ protected Configuration getNewConf() {
public String reconfigurePropertyImpl(String property, String newVal)
throws ReconfigurationException {
switch (property) {
case DFS_DATANODE_DATA_DIR_KEY: {
IOException rootException = null;
try {
LOG.info("Reconfiguring {} to {}", property, newVal);
this.refreshVolumes(newVal);
return getConf().get(DFS_DATANODE_DATA_DIR_KEY);
} catch (IOException e) {
rootException = e;
} finally {
// Send a full block report to let NN acknowledge the volume changes.
try {
triggerBlockReport(
new BlockReportOptions.Factory().setIncremental(false).build());
} catch (IOException e) {
LOG.warn("Exception while sending the block report after refreshing"
+ " volumes {} to {}", property, newVal, e);
if (rootException == null) {
rootException = e;
}
} finally {
if (rootException != null) {
throw new ReconfigurationException(property, newVal,
getConf().get(property), rootException);
}
}
}
break;
}
case DFS_DATANODE_DATA_DIR_KEY:
case DFS_DATANODE_DATA_DIR_TO_ADD_KEY:
case DFS_DATANODE_DATA_DIR_TO_REMOVE_KEY:
return reconfDataDirsParameters(property, newVal);
case DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY: {
ReconfigurationException rootException = null;
try {
Expand Down Expand Up @@ -1079,6 +1058,43 @@ private String reconfSlowIoWarningThresholdParameters(String property, String ne
}
}

private String reconfDataDirsParameters(String property, String newVal)
throws ReconfigurationException {
LOG.info("Reconfiguring {} to {}", property, newVal);
Set<String> allDataDirs = new HashSet<>();
List<StorageLocation> storageLocations = getStorageLocations(getConf());
for (StorageLocation location : storageLocations) {
allDataDirs.add(location.getNormalizedUri().getPath());
}
if (property.equals(DFS_DATANODE_DATA_DIR_TO_ADD_KEY)) {
Set<String> toAdd = new HashSet<>(Arrays.asList(newVal.split(",")));
allDataDirs.addAll(toAdd);
} else if (property.equals(DFS_DATANODE_DATA_DIR_TO_REMOVE_KEY)) {
Set<String> toRemove = new HashSet<>(Arrays.asList(newVal.split(",")));
allDataDirs.removeAll(toRemove);
}
String newDirs = String.join(",", allDataDirs);
IOException rootException = null;
try {
this.refreshVolumes(newDirs);
return getConf().get(DFS_DATANODE_DATA_DIR_KEY);
} catch (IOException e) {
rootException = e;
} finally {
// Send a full block report to let NN acknowledge the volume changes.
try {
triggerBlockReport(new BlockReportOptions.Factory().setIncremental(false).build());
} catch (IOException e) {
LOG.warn("Exception while sending the block report after refreshing volumes {} to {}",
property, newVal, e);
if (rootException == null) {
rootException = e;
}
}
}
throw new ReconfigurationException(property, newVal, getConf().get(property), rootException);
}

/**
* Get a list of the keys of the re-configurable properties in configuration.
*/
Expand Down Expand Up @@ -1286,10 +1302,9 @@ private void refreshVolumes(String newVolumes) throws IOException {
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
nsInfos.add(bpos.getNamespaceInfo());
}
synchronized(this) {
synchronized (this) {
Configuration conf = getConf();
conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
ExecutorService service = null;
int numOldDataDirs = dataDirs.size();
ChangedVolumes changedVolumes = parseChangedVolumes(newVolumes);
StringBuilder errorMessageBuilder = new StringBuilder();
Expand All @@ -1305,49 +1320,9 @@ private void refreshVolumes(String newVolumes) throws IOException {
throw new IOException("Attempt to remove all volumes.");
}
if (!changedVolumes.newLocations.isEmpty()) {
LOG.info("Adding new volumes: {}",
Joiner.on(",").join(changedVolumes.newLocations));

service = Executors
.newFixedThreadPool(changedVolumes.newLocations.size());
List<Future<IOException>> exceptions = Lists.newArrayList();

checkStorageState("refreshVolumes");
for (final StorageLocation location : changedVolumes.newLocations) {
exceptions.add(service.submit(new Callable<IOException>() {
@Override
public IOException call() {
try {
data.addVolume(location, nsInfos);
} catch (IOException e) {
return e;
}
return null;
}
}));
}

for (int i = 0; i < changedVolumes.newLocations.size(); i++) {
StorageLocation volume = changedVolumes.newLocations.get(i);
Future<IOException> ioExceptionFuture = exceptions.get(i);
try {
IOException ioe = ioExceptionFuture.get();
if (ioe != null) {
errorMessageBuilder.append(
String.format("FAILED TO ADD: %s: %s%n",
volume, ioe.getMessage()));
LOG.error("Failed to add volume: {}", volume, ioe);
} else {
effectiveVolumes.add(volume.toString());
LOG.info("Successfully added volume: {}", volume);
}
} catch (Exception e) {
errorMessageBuilder.append(
String.format("FAILED to ADD: %s: %s%n", volume,
e.toString()));
LOG.error("Failed to add volume: {}", volume, e);
}
}
List<String> addedVolumes =
addVolumes(changedVolumes.newLocations, nsInfos, errorMessageBuilder);
effectiveVolumes.addAll(addedVolumes);
}

try {
Expand All @@ -1361,16 +1336,65 @@ public IOException call() {
throw new IOException(errorMessageBuilder.toString());
}
} finally {
if (service != null) {
service.shutdown();
}
conf.set(DFS_DATANODE_DATA_DIR_KEY,
Joiner.on(",").join(effectiveVolumes));
dataDirs = getStorageLocations(conf);
}
}
}

/**
* Add volumes from DataNode.
*
* @param locations the StorageLocations of the volumes to be added.
* @throws IOException storage not yet initialized
*/
private List<String> addVolumes(final List<StorageLocation> locations,
List<NamespaceInfo> nsInfos, StringBuilder errorMessageBuilder) throws IOException {
LOG.info("Adding new volumes: {}", Joiner.on(",").join(locations));
List<String> effectiveVolumes = new ArrayList<>();
ExecutorService service = null;
List<Future<IOException>> exceptions = Lists.newArrayList();
checkStorageState("refreshVolumes");
try {
service = Executors.newFixedThreadPool(locations.size());
for (final StorageLocation location : locations) {
exceptions.add(service.submit(() -> {
try {
data.addVolume(location, nsInfos);
} catch (IOException e) {
return e;
}
return null;
}));
}

for (int i = 0; i < locations.size(); i++) {
StorageLocation volume = locations.get(i);
Future<IOException> ioExceptionFuture = exceptions.get(i);
try {
IOException ioe = ioExceptionFuture.get();
if (ioe != null) {
errorMessageBuilder.append(
String.format("FAILED TO ADD: %s: %s%n", volume, ioe.getMessage()));
LOG.error("Failed to add volume: {}", volume, ioe);
} else {
effectiveVolumes.add(volume.toString());
LOG.info("Successfully added volume: {}", volume);
}
} catch (Exception e) {
errorMessageBuilder.append(String.format("FAILED to ADD: %s: %s%n", volume, e));
LOG.error("Failed to add volume: {}", volume, e);
}
}
} finally {
if (service != null) {
service.shutdown();
}
}
return effectiveVolumes;
}

/**
* Remove volumes from DataNode.
* See {@link #removeVolumes(Collection, boolean)} for details.
Expand Down Expand Up @@ -3308,6 +3332,19 @@ private static boolean checkFileSystemWithConfigured(
public static List<StorageLocation> getStorageLocations(Configuration conf) {
Collection<String> rawLocations =
conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
// Compatible with local conf for service restarts
Collection<String> addedList =
conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_TO_ADD_KEY);
for (String location : addedList) {
if (!rawLocations.contains(location)) {
rawLocations.add(location);
}
}
Collection<String> removedList =
conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_TO_REMOVE_KEY);
for (String location : removedList) {
rawLocations.remove(location);
}
List<StorageLocation> locations =
new ArrayList<StorageLocation>(rawLocations.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_TO_ADD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_TO_REMOVE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_READ_BANDWIDTHPERSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_TRANSFER_BANDWIDTHPERSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_WRITE_BANDWIDTHPERSEC_KEY;
Expand Down Expand Up @@ -61,9 +64,12 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.common.base.Joiner;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand All @@ -78,9 +84,14 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.BlockPoolSlice;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.test.LambdaTestUtils;

import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -948,4 +959,50 @@ public void testSlowIoWarningThresholdReconfiguration() throws Exception {
}
}

@Test
public void testRefreshVolumes() throws Exception {
final String dataDir = cluster.getDataDirectory();
List<String> testDirs = new ArrayList<>();
for (int i = 0; i < 3; i++) {
File newVolume = new File(dataDir, "test_vol" + i);
testDirs.add(newVolume.toString());
if (i == 0) {
// Create file, make addVolume() fail.
Files.createFile(newVolume.toPath());
}
}

// add new vol.
DataNode dn = cluster.getDataNodes().get(0);
String newValue = Joiner.on(",").join(testDirs);
LambdaTestUtils.intercept(ReconfigurationException.class,
"test_vol0",
() -> dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_TO_ADD_KEY, newValue));
String[] effectiveVolumes = dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY).split(",");
Assertions.assertThat(4).isEqualTo(effectiveVolumes.length);
for (String volume : effectiveVolumes) {
Assertions.assertThat(volume).isNotIn(testDirs.get(0));
}

// remove vol.
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_TO_REMOVE_KEY, newValue);
effectiveVolumes = dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY).split(",");
Assertions.assertThat(2).isEqualTo(effectiveVolumes.length);
for (String volume : effectiveVolumes) {
Assertions.assertThat(volume).isNotIn(testDirs.get(0), testDirs.get(1), testDirs.get(2));
}

// Make sure that test dir metadata are not left in memory.
FsDatasetSpi<?> dataset = dn.getFSDataset();
for (FsVolumeSpi volume : dataset.getVolumeList()) {
Assertions.assertThat(volume.getBaseURI().getPath())
.isNotIn(testDirs.get(0), testDirs.get(1), testDirs.get(2));
}
DataStorage storage = dn.getStorage();
for (int i = 0; i < storage.getNumStorageDirs(); i++) {
Storage.StorageDirectory storageDir = storage.getStorageDir(i);
Assertions.assertThat(storageDir.getRoot().toString())
.isNotIn(testDirs.get(0), testDirs.get(1), testDirs.get(2));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public void testDataNodeGetReconfigurableProperties() throws IOException, Interr
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("datanode", address, outs, errs);
assertEquals(26, outs.size());
assertEquals(28, outs.size());
assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
}

Expand Down

0 comments on commit b484a40

Please sign in to comment.