Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
Expand All @@ -59,6 +60,8 @@ public class PersistentDiskCache extends AbstractPersistentCache {
public static final long DEFAULT_TEMP_FILES_CLEANUP_WAIT_TIME_MS = 60000;
private static final String TEMP_FILE_SUFFIX = ".part";

private static boolean DELETE_SEGMENT_ON_REFETCH = Boolean.getBoolean("oak.segment.cache.delete_segment_on_refetch");

private final File directory;
private final long maxCacheSizeBytes;
private final DiskCacheIOMonitor diskCacheIOMonitor;
Expand Down Expand Up @@ -121,7 +124,7 @@ protected Buffer readSegmentInternal(long msb, long lsb) {

return buffer;
} catch (FileNotFoundException e) {
logger.info("Segment {} deleted from file system!", segmentId);
logger.info("Segment {} no longer available, refetching", segmentId);
} catch (IOException e) {
logger.error("Error loading segment {} from cache:", segmentId, e);
}
Expand Down Expand Up @@ -160,14 +163,20 @@ public void writeSegment(long msb, long lsb, Buffer buffer) {
}
long cacheSizeAfter = cacheSize.addAndGet(fileSize);
diskCacheIOMonitor.updateCacheSize(cacheSizeAfter, fileSize);
} catch (Exception e) {
logger.error("Error writing segment {} to cache", segmentId, e);
try {
Files.deleteIfExists(segmentFile.toPath());
Files.deleteIfExists(tempSegmentFile.toPath());
} catch (IOException i) {
logger.error("Error while deleting corrupted segment file {}", segmentId, i);
} catch (FileAlreadyExistsException faee) {
if (DELETE_SEGMENT_ON_REFETCH) {
deleteSegmentAndTempSegment(segmentId, segmentFile, tempSegmentFile, faee);
} else {
// just delete the temp file, as the target segment file is already there and valid
try {
logger.debug("Skipping already existing file {}", segmentId);
Files.deleteIfExists(tempSegmentFile.toPath());
} catch (IOException e) {
logger.debug("Cannot delete temporary file {}", tempSegmentFile.toPath(),e);
}
}
} catch (Exception e) {
deleteSegmentAndTempSegment(segmentId, segmentFile, tempSegmentFile, e);
} finally {
writesPending.remove(segmentId);
}
Expand All @@ -178,6 +187,16 @@ public void writeSegment(long msb, long lsb, Buffer buffer) {
executor.execute(task);
}

private void deleteSegmentAndTempSegment(String segmentId, File segmentFile, File tempSegmentFile, Exception e) {
logger.error("Error writing segment {} to cache", segmentId, e);
try {
Files.deleteIfExists(segmentFile.toPath());
Files.deleteIfExists(tempSegmentFile.toPath());
} catch (IOException i) {
logger.error("Error while deleting corrupted segment file {}", segmentId, i);
}
}

private boolean isCacheFull() {
return cacheSize.get() >= maxCacheSizeBytes;
}
Expand Down
Loading