Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,40 @@ public class RekeyDataset {
private static String HPDS_DIRECTORY = "/opt/local/hpds/";

public static void main(String[] args) throws IOException, ClassNotFoundException, ExecutionException {
logHeapUsage("Start of RekeyDataset");
Crypto.loadDefaultKey();
Crypto.loadKey(SOURCE, "/opt/local/source/encryption_key");
sourceStore = initializeCache();
sourceStore = initializeCache();
Object[] metadata = loadMetadata();
sourceMetaStore = (TreeMap<String, ColumnMeta>) metadata[0];
store.allObservationsStore = new RandomAccessFile(HPDS_DIRECTORY + "allObservationsStore.javabin", "rw");
store.allIds = (TreeSet<Integer>) metadata[1];
logHeapUsage("After loading metadata");

initialLoad();

logHeapUsage("After initial load");

// Clear source metadata to free memory before saving
log.info("Clearing source metadata to free memory");
sourceMetaStore.clear();
sourceMetaStore = null;
sourceStore.invalidateAll();
sourceStore = null;
logHeapUsage("After clearing source data");

store.saveStore(HPDS_DIRECTORY);
logHeapUsage("After saving store");
}

private static void logHeapUsage(String phase) {
Runtime runtime = Runtime.getRuntime();
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
long maxMemory = runtime.maxMemory();
log.info("Heap usage [{}]: used={} MB, max={} MB, utilization={}%",
phase,
usedMemory / (1024 * 1024),
maxMemory / (1024 * 1024),
(usedMemory * 100) / maxMemory);
}

private static void initialLoad() throws IOException, ExecutionException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,42 @@ public PhenoCube load(String key) throws Exception {
public TreeSet<Integer> allIds = new TreeSet<Integer>();

public void saveStore(String hpdsDirectory) throws IOException {
logHeapUsage("Before invalidating store");
System.out.println("Invalidating store");
store.invalidateAll();
store.cleanUp();
logHeapUsage("After invalidating store");

System.out.println("Writing metadata");
ObjectOutputStream metaOut = new ObjectOutputStream(new GZIPOutputStream(new FileOutputStream(hpdsDirectory + "columnMeta.javabin")));
metaOut.writeObject(metadataMap);
metaOut.writeObject(allIds);
metaOut.flush();
metaOut.close();
logHeapUsage("After writing metadata to file");

// Write CSV directly from in-memory data (avoids deserializing from file)
writeColumnMetaCsv(hpdsDirectory, metadataMap);
logHeapUsage("After writing CSV");

// Clear in-memory data after all operations complete
metadataMap.clear();
allIds.clear();
logHeapUsage("After clearing in-memory data");

System.out.println("Closing Store");
allObservationsStore.close();
dumpStatsAndColumnMeta(hpdsDirectory);
}

private void logHeapUsage(String phase) {
Runtime runtime = Runtime.getRuntime();
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
long maxMemory = runtime.maxMemory();
log.info("Heap usage [{}]: used={} MB, max={} MB, utilization={}%",
phase,
usedMemory / (1024 * 1024),
maxMemory / (1024 * 1024),
(usedMemory * 100) / maxMemory);
}

public void dumpStats() {
Expand Down Expand Up @@ -159,51 +183,58 @@ public void dumpStats() {
/**
* This method will display counts for the objects stored in the metadata.
* This will also write out a csv file used by the data dictionary importer.
* Reads from the serialized file - use writeColumnMetaCsv() if data is already in memory.
*/
public void dumpStatsAndColumnMeta(String hpdsDirectory) {
try (ObjectInputStream objectInputStream =
new ObjectInputStream(new GZIPInputStream(new FileInputStream(hpdsDirectory + "columnMeta.javabin")))){
TreeMap<String, ColumnMeta> metastore = (TreeMap<String, ColumnMeta>) objectInputStream.readObject();
try(BufferedWriter writer = Files.newBufferedWriter(Paths.get(hpdsDirectory + "columnMeta.csv"), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)) {
CSVPrinter printer = new CSVPrinter(writer, CSVFormat.DEFAULT);
for(String key : metastore.keySet()) {
ColumnMeta columnMeta = metastore.get(key);
Object[] columnMetaOut = new Object[11];

StringBuilder listQuoted = new StringBuilder();
AtomicInteger x = new AtomicInteger(1);

if(columnMeta.getCategoryValues() != null){
if(!columnMeta.getCategoryValues().isEmpty()) {
columnMeta.getCategoryValues().forEach(string -> {
listQuoted.append(string);
if(x.get() != columnMeta.getCategoryValues().size()) listQuoted.append("µ");
x.incrementAndGet();
});
}
}
writeColumnMetaCsv(hpdsDirectory, metastore);
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("Could not load metastore", e);
}
}

columnMetaOut[0] = columnMeta.getName();
columnMetaOut[1] = String.valueOf(columnMeta.getWidthInBytes());
columnMetaOut[2] = String.valueOf(columnMeta.getColumnOffset());
columnMetaOut[3] = String.valueOf(columnMeta.isCategorical());
// this should nest the list of values in a list inside the String array.
columnMetaOut[4] = listQuoted;
columnMetaOut[5] = String.valueOf(columnMeta.getMin());
columnMetaOut[6] = String.valueOf(columnMeta.getMax());
columnMetaOut[7] = String.valueOf(columnMeta.getAllObservationsOffset());
columnMetaOut[8] = String.valueOf(columnMeta.getAllObservationsLength());
columnMetaOut[9] = String.valueOf(columnMeta.getObservationCount());
columnMetaOut[10] = String.valueOf(columnMeta.getPatientCount());

printer.printRecord(columnMetaOut);
}
/**
* Writes columnMeta.csv directly from an in-memory map, avoiding deserialization.
*/
private void writeColumnMetaCsv(String hpdsDirectory, Map<String, ColumnMeta> metastore) {
try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(hpdsDirectory + "columnMeta.csv"), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)) {
CSVPrinter printer = new CSVPrinter(writer, CSVFormat.DEFAULT);
for (String key : metastore.keySet()) {
ColumnMeta columnMeta = metastore.get(key);
Object[] columnMetaOut = new Object[11];

writer.flush();
}
StringBuilder listQuoted = new StringBuilder();
AtomicInteger x = new AtomicInteger(1);

} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("Could not load metastore", e);
if (columnMeta.getCategoryValues() != null) {
if (!columnMeta.getCategoryValues().isEmpty()) {
columnMeta.getCategoryValues().forEach(string -> {
listQuoted.append(string);
if (x.get() != columnMeta.getCategoryValues().size()) listQuoted.append("µ");
x.incrementAndGet();
});
}
}

columnMetaOut[0] = columnMeta.getName();
columnMetaOut[1] = String.valueOf(columnMeta.getWidthInBytes());
columnMetaOut[2] = String.valueOf(columnMeta.getColumnOffset());
columnMetaOut[3] = String.valueOf(columnMeta.isCategorical());
columnMetaOut[4] = listQuoted;
columnMetaOut[5] = String.valueOf(columnMeta.getMin());
columnMetaOut[6] = String.valueOf(columnMeta.getMax());
columnMetaOut[7] = String.valueOf(columnMeta.getAllObservationsOffset());
columnMetaOut[8] = String.valueOf(columnMeta.getAllObservationsLength());
columnMetaOut[9] = String.valueOf(columnMeta.getObservationCount());
columnMetaOut[10] = String.valueOf(columnMeta.getPatientCount());

printer.printRecord(columnMetaOut);
}
writer.flush();
} catch (IOException e) {
throw new RuntimeException("Could not write columnMeta.csv", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,11 @@ public PhenoCube load(String key) throws Exception {
public TreeSet<Integer> allIds = new TreeSet<Integer>();

public void saveStore() throws FileNotFoundException, IOException, ClassNotFoundException {
logHeapUsage("Before flushing temp storage");
log.info("flushing temp storage");
loadingCache.invalidateAll();
loadingCache.cleanUp();
logHeapUsage("After flushing temp storage");

RandomAccessFile allObservationsStore = new RandomAccessFile(observationsFilename, "rw");
// we dumped it all in a temp file; now sort all the data and compress it into the real Store
Expand All @@ -122,20 +124,40 @@ public void saveStore() throws FileNotFoundException, IOException, ClassNotFound
write(allObservationsStore, columnMeta, cube);
}
allObservationsStore.close();
logHeapUsage("After writing all observations");

log.info("Writing metadata");
ObjectOutputStream metaOut = new ObjectOutputStream(new GZIPOutputStream(new FileOutputStream(new File(columnmetaFilename))));
metaOut.writeObject(metadataMap);
metaOut.writeObject(allIds);
metaOut.flush();
metaOut.close();
logHeapUsage("After writing metadata to file");

log.info("Cleaning up temporary file");
// Write CSV directly from in-memory data (avoids deserializing from file)
writeColumnMetaCsv("/opt/local/hpds/", metadataMap);
logHeapUsage("After writing CSV");

// Clear in-memory data after all operations complete
metadataMap.clear();
allIds.clear();
logHeapUsage("After clearing in-memory data");

log.info("Cleaning up temporary file");
allObservationsTemp.close();
File tempFile = new File(obsTempFilename);
tempFile.delete();
dumpStatsAndColumnMeta("/opt/local/hpds/");
}

private void logHeapUsage(String phase) {
Runtime runtime = Runtime.getRuntime();
long usedMemory = runtime.totalMemory() - runtime.freeMemory();
long maxMemory = runtime.maxMemory();
log.info("Heap usage [{}]: used={} MB, max={} MB, utilization={}%",
phase,
usedMemory / (1024 * 1024),
maxMemory / (1024 * 1024),
(usedMemory * 100) / maxMemory);
}

private void write(RandomAccessFile allObservationsStore, ColumnMeta columnMeta, PhenoCube cube) throws IOException {
Expand Down Expand Up @@ -213,25 +235,38 @@ private <V extends Comparable<V>> void complete(ColumnMeta columnMeta, PhenoCube

}

/**
* Reads metadata from file and writes CSV. Use writeColumnMetaCsv() if data is already in memory.
*/
public void dumpStatsAndColumnMeta(String hpdsDirectory) {
try (
FileInputStream fIn = new FileInputStream(hpdsDirectory + "columnMeta.javabin"); GZIPInputStream gIn =
new GZIPInputStream(fIn); ObjectInputStream oIn = new ObjectInputStream(gIn); BufferedWriter csvWriter =
Files.newBufferedWriter(Paths.get(hpdsDirectory + "columnMeta.csv"), CREATE, TRUNCATE_EXISTING)
new GZIPInputStream(fIn); ObjectInputStream oIn = new ObjectInputStream(gIn)
) {
TreeMap<String, ColumnMeta> metastore = (TreeMap<String, ColumnMeta>) oIn.readObject();
writeColumnMetaCsv(hpdsDirectory, metastore);
} catch (IOException | ClassNotFoundException e) {
log.error("Error loading store or dumping store meta to CSV: ", e);
}
}

/**
* Writes columnMeta.csv directly from an in-memory map, avoiding deserialization.
*/
private void writeColumnMetaCsv(String hpdsDirectory, Map<String, ColumnMeta> metastore) {
try (BufferedWriter csvWriter = Files.newBufferedWriter(Paths.get(hpdsDirectory + "columnMeta.csv"), CREATE, TRUNCATE_EXISTING)) {
CSVPrinter printer = new CSVPrinter(csvWriter, CSVFormat.DEFAULT);
for (String key : metastore.keySet()) {
String[] columnMetaOut = createRow(key, metastore);
printer.printRecord(columnMetaOut);
}
csvWriter.flush();
} catch (IOException | ClassNotFoundException e) {
log.error("Error loading store or dumping store meta to CSV: ", e);
} catch (IOException e) {
log.error("Error writing columnMeta.csv: ", e);
}
}

private static String[] createRow(String key, TreeMap<String, ColumnMeta> metastore) {
private static String[] createRow(String key, Map<String, ColumnMeta> metastore) {
ColumnMeta columnMeta = metastore.get(key);
String[] columnMetaOut = new String[11];

Expand Down
Loading