Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class ConsensusConfig {
private final TEndPoint thisNodeEndPoint;
private final int thisNodeId;
private final String storageDir;
private final String[] storageDirs;
private final TConsensusGroupType consensusGroupType;
private final RatisConfig ratisConfig;
private final IoTConsensusConfig iotConsensusConfig;
Expand All @@ -38,13 +39,15 @@ private ConsensusConfig(
TEndPoint thisNode,
int thisNodeId,
String storageDir,
String[] storageDirs,
TConsensusGroupType consensusGroupType,
RatisConfig ratisConfig,
IoTConsensusConfig iotConsensusConfig,
PipeConsensusConfig pipeConsensusConfig) {
this.thisNodeEndPoint = thisNode;
this.thisNodeId = thisNodeId;
this.storageDir = storageDir;
this.storageDirs = storageDirs;
this.consensusGroupType = consensusGroupType;
this.ratisConfig = ratisConfig;
this.iotConsensusConfig = iotConsensusConfig;
Expand All @@ -63,6 +66,10 @@ public String getStorageDir() {
return storageDir;
}

public String[] getStorageDirs() {
return storageDirs;
}

public TConsensusGroupType getConsensusGroupType() {
return consensusGroupType;
}
Expand All @@ -88,6 +95,7 @@ public static class Builder {
private TEndPoint thisNode;
private int thisNodeId;
private String storageDir;
private String[] storageDirs;
private TConsensusGroupType consensusGroupType;
private RatisConfig ratisConfig;
private IoTConsensusConfig iotConsensusConfig;
Expand All @@ -98,6 +106,7 @@ public ConsensusConfig build() {
thisNode,
thisNodeId,
storageDir,
storageDirs,
consensusGroupType,
Optional.ofNullable(ratisConfig).orElseGet(() -> RatisConfig.newBuilder().build()),
Optional.ofNullable(iotConsensusConfig)
Expand All @@ -121,6 +130,11 @@ public Builder setStorageDir(String storageDir) {
return this;
}

public Builder setStorageDirs(String[] storageDirs) {
this.storageDirs = storageDirs;
return this;
}

public Builder setConsensusGroupType(TConsensusGroupType groupType) {
this.consensusGroupType = groupType;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.disk.FolderManager;
import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.utils.FileUtils;
Expand Down Expand Up @@ -71,7 +74,9 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -91,7 +96,7 @@ public class IoTConsensus implements IConsensus {

private final TEndPoint thisNode;
private final int thisNodeId;
private final File storageDir;
FolderManager folderManager = null;
private final IStateMachine.Registry registry;
private final Map<ConsensusGroupId, IoTConsensusServerImpl> stateMachineMap =
new ConcurrentHashMap<>();
Expand All @@ -104,10 +109,13 @@ public class IoTConsensus implements IConsensus {
private Future<?> updateReaderFuture;
private Map<ConsensusGroupId, List<Peer>> correctPeerListBeforeStart = null;

public IoTConsensus(ConsensusConfig config, Registry registry) {
public IoTConsensus(ConsensusConfig config, Registry registry)
throws DiskSpaceInsufficientException {
this.thisNode = config.getThisNodeEndPoint();
this.thisNodeId = config.getThisNodeId();
this.storageDir = new File(config.getStorageDir());
this.folderManager =
new FolderManager(
Arrays.asList(config.getStorageDirs()), DirectoryStrategyType.SEQUENCE_STRATEGY);
this.config = config.getIotConsensusConfig();
this.registry = registry;
this.service = new IoTConsensusRPCService(thisNode, config.getIotConsensusConfig());
Expand Down Expand Up @@ -160,31 +168,82 @@ public synchronized void start() throws IOException {
}

private void initAndRecover() throws IOException {
if (!storageDir.exists()) {
if (!storageDir.mkdirs()) {
throw new IOException(String.format("Unable to create consensus dir at %s", storageDir));
for (String folder : folderManager.getFolders()) {
File storageDir = new File(folder);
if (!storageDir.exists()) {
if (!storageDir.mkdirs()) {
throw new IOException(String.format("Unable to create consensus dir at %s", storageDir));
}
}
} else {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) {
for (Path path : stream) {
String[] items = path.getFileName().toString().split("_");
ConsensusGroupId consensusGroupId =
ConsensusGroupId.Factory.create(
Integer.parseInt(items[0]), Integer.parseInt(items[1]));
IoTConsensusServerImpl consensus =
new IoTConsensusServerImpl(
path.toString(),
new Peer(consensusGroupId, thisNodeId, thisNode),
new TreeSet<>(),
registry.apply(consensusGroupId),
backgroundTaskService,
clientManager,
syncClientManager,
config);
stateMachineMap.put(consensusGroupId, consensus);
}

Map<ConsensusGroupId, String> consensusGroupDirs = new HashMap<>();

for (String baseFolder : folderManager.getFolders()) {
File storageDir = new File(baseFolder);
if (storageDir.exists()) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(storageDir.toPath())) {
for (Path path : stream) {
if (!Files.isDirectory(path)) {
continue;
}

String[] items = path.getFileName().toString().split("_");
if (items.length != 2) {
continue; // Skip directories that do not meet the naming convention.
}

try {
ConsensusGroupId consensusGroupId =
ConsensusGroupId.Factory.create(
Integer.parseInt(items[0]), Integer.parseInt(items[1]));
String expectedFolder;
try {
expectedFolder = folderManager.getFolderByHashId(consensusGroupId.getId());
} catch (DiskSpaceInsufficientException e) {
logger.warn(
"Cannot determine expected folder for group {}, skipping", consensusGroupId);
continue;
}

// Use this directory if it is in the correct baseFolder where the consensus group
// should be located.
if (baseFolder.equals(expectedFolder)) {
consensusGroupDirs.put(consensusGroupId, path.toString());
} else {
logger.warn(
"Found consensus group {} in wrong folder: {} (should be in {}), skipping",
consensusGroupId,
baseFolder,
expectedFolder);
}
} catch (NumberFormatException e) {
logger.warn("Invalid consensus group directory name: {}", path.getFileName());
}
}
} catch (IOException e) {
logger.error("Error scanning directory: {}", baseFolder, e);
}
}
}

for (Map.Entry<ConsensusGroupId, String> entry : consensusGroupDirs.entrySet()) {
ConsensusGroupId consensusGroupId = entry.getKey();
String path = entry.getValue();

IoTConsensusServerImpl consensus =
new IoTConsensusServerImpl(
path,
new Peer(consensusGroupId, thisNodeId, thisNode),
new TreeSet<>(),
registry.apply(consensusGroupId),
backgroundTaskService,
clientManager,
syncClientManager,
config);
stateMachineMap.put(consensusGroupId, consensus);
}

if (correctPeerListBeforeStart != null) {
BiConsumer<ConsensusGroupId, List<Peer>> resetPeerListWithoutThrow =
(consensusGroupId, peers) -> {
Expand Down Expand Up @@ -271,8 +330,19 @@ public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
k -> {
exist.set(false);

String path = buildPeerDir(storageDir, groupId);
String path = null;
try {
path = buildPeerDir(folderManager.getFolderByHashId(groupId.getId()), groupId);
} catch (DiskSpaceInsufficientException e) {
logger.warn(
"Failed to create consensus directory for group {} due to disk space insufficiency: {}",
groupId,
e.getMessage());
return null;
}
File file = new File(path);
// debug print the path of consensus dir
System.out.println(file.getAbsolutePath());
if (!file.mkdirs()) {
logger.warn("Unable to create consensus dir for group {} at {}", groupId, path);
return null;
Expand Down Expand Up @@ -315,7 +385,14 @@ public void deleteLocalPeer(ConsensusGroupId groupId) throws ConsensusException
if (!exist.get()) {
throw new ConsensusGroupNotExistException(groupId);
}
FileUtils.deleteFileOrDirectory(new File(buildPeerDir(storageDir, groupId)));
try {
FileUtils.deleteFileOrDirectory(
new File(buildPeerDir(folderManager.getFolderByHashId(groupId.getId()), groupId)));
} catch (DiskSpaceInsufficientException e) {
logger.warn(
"Failed to delete consensus directory for group {} due to : {}", groupId, e.getMessage());
throw new ConsensusException(e);
}
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE);
}

Expand Down Expand Up @@ -465,7 +542,7 @@ public List<ConsensusGroupId> getAllConsensusGroupIds() {

@Override
public String getRegionDirFromConsensusGroupId(ConsensusGroupId groupId) {
return buildPeerDir(storageDir, groupId);
return null;
}

@Override
Expand Down Expand Up @@ -552,7 +629,7 @@ public IoTConsensusServerImpl getImpl(ConsensusGroupId groupId) {
return stateMachineMap.get(groupId);
}

public static String buildPeerDir(File storageDir, ConsensusGroupId groupId) {
public static String buildPeerDir(String storageDir, ConsensusGroupId groupId) {
return storageDir + File.separator + groupId.getType().getValue() + "_" + groupId.getId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ public class ReplicateTest {
new File("target" + File.separator + "2"),
new File("target" + File.separator + "3"));

private final String[][] storageDirs = {
{"target" + File.separator + "1"},
{"target" + File.separator + "2"},
{"target" + File.separator + "3"}
};

private final ConsensusGroup group = new ConsensusGroup(gid, peers);
private final List<IoTConsensus> servers = new ArrayList<>();
private final List<TestStateMachine> stateMachines = new ArrayList<>();
Expand Down Expand Up @@ -104,7 +110,7 @@ private void initServer() throws IOException {
ConsensusConfig.newBuilder()
.setThisNodeId(peers.get(i).getNodeId())
.setThisNode(peers.get(i).getEndpoint())
.setStorageDir(peersStorage.get(i).getAbsolutePath())
.setStorageDirs(storageDirs[i])
.setConsensusGroupType(TConsensusGroupType.DataRegion)
.build(),
groupId -> stateMachines.get(finalI))
Expand Down Expand Up @@ -293,14 +299,6 @@ public void parsingAndConstructIDTest() throws Exception {
for (int i = 0; i < CHECK_POINT_GAP; i++) {
servers.get(0).write(gid, new TestEntry(i, peers.get(0)));
}

String regionDir = servers.get(0).getRegionDirFromConsensusGroupId(gid);
try {
File regionDirFile = new File(regionDir);
Assert.assertTrue(regionDirFile.exists());
} catch (Exception e) {
Assert.fail();
}
}

private boolean checkPortAvailable() {
Expand Down
Loading
Loading