diff --git a/requirements.txt b/requirements.txt index 31d87eb7..fb634550 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,3 +27,5 @@ curies duckdb # Added by Gaurav, Jul 2024 sssom +# Added by Gaurav, Dec 2024, to support command-line applications +click \ No newline at end of file diff --git a/scripts/comparator.py b/scripts/comparator.py new file mode 100644 index 00000000..641784b5 --- /dev/null +++ b/scripts/comparator.py @@ -0,0 +1,348 @@ +#!/usr/bin/env python3 +# +# comparator.py - A script for comparing Babel files from different runs +# +# You can run this script on a single compendium file: +# python comparator.py dir1/compendia/Disease.txt dir2/compendia/Disease.txt +# Or on an entire directory: +# python comparator.py dir1/compendia dir2/compendia +# +# It currently only writes out a JSON document to STDOUT, but in the future we might add a TSV output as well. +# + +import concurrent +import json +import os +import logging +import threading +import time +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor + +import click + +logging.basicConfig(level=logging.INFO) + +class CompendiumFile: + """ + Represents a compendium file at a particular path. The load() method will load the file into a series of in-memory + dictionaries, and the diffs_to() method will generate a diff between this compendium file and another compendium file. + """ + + def __init__(self, path): + """ + Initialize a CompendiumFile object with the specified path. We don't load the file until load() is called. + + :param path: File path to initialize and load metadata from. + """ + self.path = path + + self.file_exists = os.path.exists(self.path) + self.row_count = 0 + + # TODO: replace with DuckDB or something else more memory efficient. + self.preferred_id_to_clique = defaultdict(list) + self.curie_to_preferred_id = dict() + self.curie_to_label = dict() + self.curie_to_description = defaultdict(set) + self.curie_to_taxa = defaultdict(set) + self.preferred_id_to_type = dict() + self.preferred_id_to_preferred_name = defaultdict() + self.preferred_id_to_ic = dict() + + + def load(self): + """ + Loads compendium data from the specified file path into various mappings. + + This method reads data from a JSON lines file located at the path specified + by the instance attribute `path`. Each line in the file should represent a + clique object in JSON format. The method populates multiple mappings + based on the contents of the file, including mappings between CURIEs and + their preferred identifiers, labels, descriptions, taxa, types, and + information content (IC). + + The method tracks and logs the progress of the file loading process. It will + log a warning if the specified file path does not exist, and progress + information is logged for every million lines processed. At the end, the + method logs the total number of lines read. + """ + + time_started = time.time_ns() + + if not os.path.exists(self.path): + logging.warning(f"Compendium file {self.path} does not exist.") + return + + with open(self.path, "r") as f: + for row in f: + self.row_count += 1 + if self.row_count % 1000000 == 0: + logging.info(f"Now loading line {self.row_count:,} from {self.path}") + + clique = json.loads(row) + + preferred_curie = clique['identifiers'][0]['i'] + self.preferred_id_to_type[preferred_curie] = clique['type'] + self.preferred_id_to_preferred_name[preferred_curie] = clique['preferred_name'] + self.preferred_id_to_ic[preferred_curie] = clique['ic'] + self.preferred_id_to_clique[preferred_curie] = list(map(lambda x: x['i'], clique['identifiers'])) + + for identifier in clique['identifiers']: + curie = identifier['i'] + self.curie_to_preferred_id[curie] = preferred_curie + self.curie_to_label[curie] = identifier.get('l', '') + self.curie_to_description[curie].update(identifier.get('d', [])) + self.curie_to_taxa[curie].update(identifier.get('t', [])) + + time_ended = time.time_ns() + logging.info(f"Loaded {self.row_count:,} lines from {self.path} in {(time_ended - time_started) / 1_000_000_000:.2f} seconds.") + + def add_labels(self, ids: list[str]): + """ + Return a list of labels for the IDs in ids. + + :param ids: A list of identifiers. + :return: A list of labels. + """ + return list(map(lambda x: self.curie_to_label.get(x, ''), ids)) + + def diffs_to(self, older_compendium_file: 'CompendiumFile'): + """ + Generate diff counts between this compendium file and the older compendium file. + + :param older_compendium_file: A CompendiumFile object representing the older compendium file. + :return: A dictionary. + """ + + # Step 1. Figure out which identifiers have changed cliques between these two compendia. + identifiers_added = set() + identifiers_not_changed = set() + identifiers_changed = set() + identifiers_deleted = set() + + for curie, preferred_curie in self.curie_to_preferred_id.items(): + if curie not in older_compendium_file.curie_to_preferred_id: + identifiers_added.add((curie, self.curie_to_label[curie], None, '', preferred_curie, self.preferred_id_to_preferred_name[preferred_curie])) + else: + old_preferred_curie = older_compendium_file.curie_to_preferred_id.get(curie) + if preferred_curie == old_preferred_curie: + identifiers_not_changed.add((curie, self.curie_to_label[curie], old_preferred_curie, older_compendium_file.preferred_id_to_preferred_name[old_preferred_curie], preferred_curie, self.preferred_id_to_preferred_name[preferred_curie])) + else: + identifiers_changed.add((curie, self.curie_to_label[curie], old_preferred_curie, older_compendium_file.preferred_id_to_preferred_name[old_preferred_curie], preferred_curie, self.preferred_id_to_preferred_name[preferred_curie])) + + for old_curie, old_preferred_curie in older_compendium_file.curie_to_preferred_id.items(): + if old_curie not in self.curie_to_preferred_id: + identifiers_deleted.add((old_curie, older_compendium_file.curie_to_label[old_curie], old_preferred_curie, older_compendium_file.preferred_id_to_preferred_name[old_preferred_curie], None, '')) + + # Step 2. Figure out the clique changes. + clique_count = len(self.preferred_id_to_type.keys()) + old_clique_count = len(older_compendium_file.preferred_id_to_type.keys()) + + cliques_additions = {} + cliques_deletions = {} + cliques_unchanged = {} + clique_changes = {} + for preferred_curie, typ in self.preferred_id_to_type.items(): + if preferred_curie not in older_compendium_file.preferred_id_to_type: + # Addition. + cliques_additions[preferred_curie] = { + 'type': typ, + 'preferred_curie': preferred_curie, + 'preferred_name': self.preferred_id_to_preferred_name[preferred_curie], + 'identifiers': self.preferred_id_to_clique[preferred_curie], + } + else: + # The clique is present in both self and older_compendium_file, so we need to determine if it's + # changed or not. + clique_change = { + 'type': typ, + 'preferred_curie': preferred_curie, + 'preferred_name': self.preferred_id_to_preferred_name[preferred_curie], + 'identifiers': self.preferred_id_to_clique[preferred_curie], + } + + # But did anything actually change? + flag_actually_changed = False + + old_typ = older_compendium_file.preferred_id_to_type[preferred_curie] + if old_typ != typ: + flag_actually_changed = True + clique_change['type_changed'] = { + 'old': old_typ, + 'new': typ, + } + + clique_label = self.preferred_id_to_preferred_name[preferred_curie] + old_clique_label = older_compendium_file.preferred_id_to_preferred_name[preferred_curie] + if clique_label != old_clique_label: + flag_actually_changed = True + clique_change['preferred_name_changed'] = { + 'old': old_clique_label, + 'new': clique_label, + } + + ids = self.preferred_id_to_clique[preferred_curie] + old_ids = older_compendium_file.preferred_id_to_clique[preferred_curie] + if ids != old_ids: + flag_actually_changed = True + clique_change['identifiers_changed'] = { + 'old': old_ids, + 'old_with_labels': list(map(lambda x: f"{x[0]} '{x[1]}'", zip(old_ids, older_compendium_file.add_labels(old_ids)))), + 'new': ids, + 'new_with_labels': list(map(lambda x: f"{x[0]} '{x[1]}'", zip(ids, self.add_labels(ids)))), + 'added': sorted(set(ids) - set(old_ids)), + 'deleted': sorted(set(old_ids) - set(ids)), + } + + # If something actually changed, add it to the clique changes list. + if flag_actually_changed: + clique_changes[preferred_curie] = clique_change + else: + cliques_unchanged[preferred_curie] = clique_change + + for old_preferred_curie, typ in older_compendium_file.preferred_id_to_type.items(): + if old_preferred_curie not in self.preferred_id_to_type: + # Deletion. + cliques_deletions[old_preferred_curie] = { + 'type': typ, + 'preferred_curie': old_preferred_curie, + 'preferred_name': older_compendium_file.preferred_id_to_preferred_name[old_preferred_curie], + 'identifiers': older_compendium_file.preferred_id_to_clique[old_preferred_curie], + } + + # Step 3. Report on all the identifiers and cliques. + return { + 'net_identifier_change': len(identifiers_added) - len(identifiers_deleted), + 'net_clique_change': (clique_count - old_clique_count), + 'identifiers': { + 'additions': sorted(map(lambda x: f"{x[0]} '{x[1]}' (to clique {x[4]} '{x[5]}')", identifiers_added)), + 'deletions': sorted(map(lambda x: f"{x[0]} '{x[1]}' (from clique {x[2]} '{x[3]}')", identifiers_deleted)), + 'changes': sorted(map(lambda x: f"{x[0]} '{x[1]}' moved from {x[2]} '{x[3]}' to {x[4]} '{x[5]}'", identifiers_changed)), + }, + 'cliques': { + 'additions': cliques_additions, + 'deletions': cliques_deletions, + 'changes': clique_changes, + }, + } + + +def compare_compendium_files(path_old, path_new): + """ Compare two compendium files. + + :param path_old: The older folder to compare + :param path_new: The newer folder to compare. + :return A comparison between the two compendium files as a dictionary. + """ + + time_started = time.time_ns() + + compendium_old = CompendiumFile(path_old) + compendium_new = CompendiumFile(path_new) + + # Load the two files in parallel. + thread_compendium1 = threading.Thread(target=compendium_old.load) + thread_compendium2 = threading.Thread(target=compendium_new.load) + thread_compendium1.start() + thread_compendium2.start() + thread_compendium1.join() + thread_compendium2.join() + + # Craft results and return. + result = { + 'compendium_old': { + 'path': path_old, + 'file_exists': compendium_old.file_exists, + 'row_count': compendium_old.row_count, + 'curie_count': len(compendium_old.curie_to_preferred_id), + 'clique_count': len(compendium_old.preferred_id_to_type), + 'types': list(sorted(set(compendium_old.preferred_id_to_type.values()))), + }, + 'compendium_new': { + 'path': path_new, + 'file_exists': compendium_new.file_exists, + 'row_count': compendium_new.row_count, + 'curie_count': len(compendium_new.curie_to_preferred_id), + 'clique_count': len(compendium_new.preferred_id_to_type), + 'types': list(set(sorted(compendium_new.preferred_id_to_type.values()))), + }, + 'diffs': compendium_new.diffs_to(compendium_old), + } + + time_ended = time.time_ns() + logging.info(f"Comparison of {path_old} to {path_new} took {(time_ended - time_started) / 1_000_000_000:.2f} seconds.") + + return result + + +@click.command() +@click.option('--input-type', type=click.Choice(['compendium', 'synonyms']), default='compendium') +@click.argument('input_old', type=click.Path(exists=True, file_okay=True, dir_okay=True), required=True) +@click.argument('input_new', type=click.Path(exists=True, file_okay=True, dir_okay=True), required=True) +@click.option('--max-workers', '-j', type=int, default=None, help='Maximum number of workers to use for parallel processing.') +def comparator(input_type, input_old, input_new, max_workers): + """ + Compares either two compendium files or two directories containing compendium files. + \f + + :param input_type: Specifies the type of the files to compare. + Options are 'compendium' or 'synonyms' (not yet supported). + Defaults to 'compendium'. + :param input_old: Older path (file or directory) to compare. + :param input_new: Newer path (file or directory) to compare. + :param max_workers: Maximum number of workers to use for parallel processing. + """ + + # Some features haven't been implemented yet. + if input_type != 'compendium': + raise NotImplementedError(f"Input type '{input_type}' is not yet supported.") + + # Do the comparison. + if os.path.isfile(input_old) and os.path.isfile(input_new): + results = compare_compendium_files(input_old, input_new) + elif os.path.isdir(input_old) and os.path.isdir(input_new): + results = { + 'directory1': {'path': input_old}, + 'directory2': {'path': input_new}, + 'comparisons': [], + } + + # Make a list of all the files in the directories input1 and input2. + files1 = os.listdir(input_old) + files2 = os.listdir(input_new) + all_filenames = set(files1 + files2) + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [] + for filename in sorted(all_filenames): + if filename.startswith('.'): + continue + path1 = os.path.join(input_old, filename) + path2 = os.path.join(input_new, filename) + + if os.path.isdir(path1): + logging.warning(f"Skipping directory {path1} in comparison.") + continue + + if os.path.isdir(path2): + logging.warning(f"Skipping directory {path2} in comparison.") + continue + + futures.append(executor.submit(compare_compendium_files, path1, path2)) + + for future in concurrent.futures.as_completed(futures): + try: + results['comparisons'].append(future.result()) + except Exception as exc: + logging.error(f"Error comparing files: {exc}") + raise exc + + else: + raise RuntimeError(f"Cannot compare a file to a directory or vice versa: {input_old} and {input_new}.") + + print(json.dumps(results, indent=2)) + + +if __name__ == "__main__": + comparator() \ No newline at end of file