Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,23 @@ public void format() throws IOException {
// TODO
}

static List<SingleFileSnapshotInfo> getSingleFileSnapshotInfos(Path dir) throws IOException {
static List<SingleFileSnapshotInfo> getSingleFileSnapshotInfos(Path dir, boolean requireMd5) throws IOException {
final List<SingleFileSnapshotInfo> infos = new ArrayList<>();
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir)) {
for (Path path : stream) {
final Path filename = path.getFileName();
if (filename != null) {
final Matcher matcher = SNAPSHOT_REGEX.matcher(filename.toString());
if (matcher.matches()) {
final boolean hasMd5 = MD5FileUtil.getDigestFileForFile(path.toFile()).exists();
if (requireMd5 && !hasMd5) {
continue;
}

final long term = Long.parseLong(matcher.group(1));
final long index = Long.parseLong(matcher.group(2));
final FileInfo fileInfo = new FileInfo(path, null); //No FileDigest here.
infos.add(new SingleFileSnapshotInfo(fileInfo, term, index));
infos.add(new SingleFileSnapshotInfo(fileInfo, term, index, hasMd5));
}
}
}
Expand All @@ -114,32 +119,49 @@ public void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy)
return;
}

final List<SingleFileSnapshotInfo> allSnapshotFiles = getSingleFileSnapshotInfos(stateMachineDir.toPath());
// Fetch all the snapshot files irrespective of whether they have an MD5 file or not
final List<SingleFileSnapshotInfo> allSnapshotFiles = getSingleFileSnapshotInfos(stateMachineDir.toPath(), false);
allSnapshotFiles.sort(Comparator.comparing(SingleFileSnapshotInfo::getIndex).reversed());
int numSnapshotsWithMd5 = 0;
int deleteIdx = -1;

for (int i = 0; i < allSnapshotFiles.size(); i++) {
final SingleFileSnapshotInfo snapshot = allSnapshotFiles.get(i);
if (snapshot.hasMd5()) {
if (++numSnapshotsWithMd5 == numSnapshotsRetained) {
// We have found the last snapshot with an MD5 file that needs to be retained
deleteIdx = i + 1;
break;
}
} else {
LOG.warn("Snapshot file {} has missing MD5 file.", snapshot);
}
}

if (allSnapshotFiles.size() > numSnapshotsRetained) {
allSnapshotFiles.sort(Comparator.comparing(SingleFileSnapshotInfo::getIndex).reversed());
allSnapshotFiles.subList(numSnapshotsRetained, allSnapshotFiles.size())
if (deleteIdx > 0) {
allSnapshotFiles.subList(deleteIdx, allSnapshotFiles.size())
.stream()
.map(SingleFileSnapshotInfo::getFile)
.map(FileInfo::getPath)
.forEach(snapshotPath -> {
LOG.info("Deleting old snapshot at {}", snapshotPath.toAbsolutePath());
FileUtils.deletePathQuietly(snapshotPath);
});
// clean up the md5 files if the corresponding snapshot file does not exist
try (DirectoryStream<Path> stream = Files.newDirectoryStream(stateMachineDir.toPath(),
SNAPSHOT_MD5_FILTER)) {
for (Path md5path : stream) {
Path md5FileNamePath = md5path.getFileName();
if (md5FileNamePath == null) {
continue;
}
final String md5FileName = md5FileNamePath.toString();
final File snapshotFile = new File(stateMachineDir,
md5FileName.substring(0, md5FileName.length() - MD5_SUFFIX.length()));
if (!snapshotFile.exists()) {
FileUtils.deletePathQuietly(md5path);
}
}

// clean up the md5 files if the corresponding snapshot file does not exist
try (DirectoryStream<Path> stream = Files.newDirectoryStream(stateMachineDir.toPath(),
SNAPSHOT_MD5_FILTER)) {
for (Path md5path : stream) {
Path md5FileNamePath = md5path.getFileName();
if (md5FileNamePath == null) {
continue;
}
final String md5FileName = md5FileNamePath.toString();
final File snapshotFile = new File(stateMachineDir,
md5FileName.substring(0, md5FileName.length() - MD5_SUFFIX.length()));
if (!snapshotFile.exists()) {
FileUtils.deletePathQuietly(md5path);
}
}
}
Expand Down Expand Up @@ -182,7 +204,7 @@ protected File getCorruptSnapshotFile(long term, long endIndex) {
}

static SingleFileSnapshotInfo findLatestSnapshot(Path dir) throws IOException {
final Iterator<SingleFileSnapshotInfo> i = getSingleFileSnapshotInfos(dir).iterator();
final Iterator<SingleFileSnapshotInfo> i = getSingleFileSnapshotInfos(dir, true).iterator();
if (!i.hasNext()) {
return null;
}
Expand All @@ -199,7 +221,7 @@ static SingleFileSnapshotInfo findLatestSnapshot(Path dir) throws IOException {
final Path path = latest.getFile().getPath();
final MD5Hash md5 = MD5FileUtil.readStoredMd5ForFile(path.toFile());
final FileInfo info = new FileInfo(path, md5);
return new SingleFileSnapshotInfo(info, latest.getTerm(), latest.getIndex());
return new SingleFileSnapshotInfo(info, latest.getTerm(), latest.getIndex(), true);
}

public SingleFileSnapshotInfo updateLatestSnapshot(SingleFileSnapshotInfo info) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,24 @@
* The objects of this class are immutable.
*/
public class SingleFileSnapshotInfo extends FileListSnapshotInfo {
private final Boolean hasMd5; // Whether the snapshot file has a corresponding MD5 file

public SingleFileSnapshotInfo(FileInfo fileInfo, TermIndex termIndex) {
this(fileInfo, termIndex, null);
}

public SingleFileSnapshotInfo(FileInfo fileInfo, TermIndex termIndex, Boolean hasMd5) {
super(Collections.singletonList(fileInfo), termIndex);
this.hasMd5 = hasMd5;
}

public SingleFileSnapshotInfo(FileInfo fileInfo, long term, long endIndex, boolean hasMd5) {
this(fileInfo, TermIndex.valueOf(term, endIndex), hasMd5);
}

public SingleFileSnapshotInfo(FileInfo fileInfo, long term, long endIndex) {
this(fileInfo, TermIndex.valueOf(term, endIndex));
/** @return the md5 file exists for the snapshot file */
public boolean hasMd5() {
return hasMd5 != null && hasMd5;
}

/** @return the file associated with the snapshot. */
Expand Down
Loading
Loading