diff --git a/README.md b/README.md index 82e5221..19a6639 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # P6 **Peter's Parse and Processing of Prenatal Particulars via Pandas** -A simple, extensible CLI for downloading the Human Phenotype Ontology, parsing genotype/phenotype Excel workbooks, and producing [GA4GH Phenopackets](https://phenopacket-schema.readthedocs.io/en/latest/schema.html#version-2-0) as specified [here](https://phenopacket-schema.readthedocs.io/_/downloads/en/stable/pdf/). +A simple, extensible CLI for downloading the Human Phenotype Ontology, parsing genotype/phenotype Excel workbooks, and producing [GA4GH Phenopackets](https://phenopacket-schema.readthedocs.io/en/latest/schema.html#version-2-0) as specified [here](https://phenopacket-schema.readthedocs.io/_/downloads/en/stable/pdf/). This project enables downloading the latest or specified Human Phenotype Ontology (HPO) JSON release, auto-classifying Excel sheets as genotype or phenotype data, normalizing column names and HPO IDs, and writing one Phenopacket per record. Additional commands provide quick auditing of workbooks for header normalization, sheet classification, and required variant columns. Built for easy integration and reproducibility, P6 supports rapid phenotypic data preparation for research and clinical workflows, and runs locally with simple installation via pip. The end usage of this project is to convert an existing digital record of phenotypic data into phenopackets, such that they may be linked to their corresponding VCFs and used to integrate with a larger federated repository system. ## Table of Contents @@ -10,10 +10,12 @@ A simple, extensible CLI for downloading the Human Phenotype Ontology, parsing g 3. [Installation](#installation) 4. [Quickstart](#quickstart) - [Download HPO JSON](#download-hpo-json) - - [Parse Excel to Phenopackets](#parse-excel-to-phenopackets) + - [Parse Excel to Phenopackets](#parse-excel-to-phenopackets) + - [Audit Excel Workbooks](#audit-excel-workbooks) 5. [CLI Reference](#cli-reference) - [`p6 download`](#p6-download) - [`p6 parse-excel`](#p6-parse-excel) + - [`p6 audit-excel`](#p6-audit-excel) 6. [Development & Testing](#development--testing) 7. [Contributing](#contributing) 8. [License](#license) @@ -94,6 +96,18 @@ Resulting phenopacket files will be under: phenopacket_from_excel/$(date "+%Y-%m-%d_%H-%M-%S")/phenopackets/ ``` +### Audit Excel Workbooks + +Quickly check each sheet in an Excel file for header normalization, sheet classification, and presence of required variant columns. +```bash +p6 audit-excel -e tests/data/Sydney_Python_transformation.xlsx +``` + +By default you get a table; use `-r` for a JSON output to the console. +```bash +p6 audit-excel -e tests/data/Sydney_Python_transformation.xlsx -r +``` + ## CLI Reference ### p6 download @@ -101,11 +115,13 @@ phenopacket_from_excel/$(date "+%Y-%m-%d_%H-%M-%S")/phenopackets/ Usage: ```markdown p6 download [OPTIONS] +``` Options: - -d, --data-path PATH where to save HPO JSON (default: tests/data) - -v, --hpo-version TEXT exact HPO release tag (e.g. 2025-03-03 or v2025-03-03) - --help Show this help message and exit. +```markdown + -d, --data-path PATH where to save HPO JSON (default: tests/data) + -v, --hpo-version TEXT exact HPO release tag (e.g. 2025-03-03 or v2025-03-03) + --help Show this help message and exit. ``` Examples: @@ -130,9 +146,9 @@ Usage: `p6 parse-excel [OPTIONS] EXCEL_FILE` Options: ```markdown - -e, --excel-path FILE path to the Excel workbook [required] - -hpo, --custom-hpo FILE path to a custom HPO JSON file (defaults to `tests/data/hp.json`) - --help Show this message and exit. + -e, --excel-path FILE path to the Excel workbook [required] + -hpo, --custom-hpo FILE path to a custom HPO JSON file (defaults to `tests/data/hp.json`) + --help Show this message and exit. ``` Example: @@ -142,6 +158,19 @@ Explicitly point at a custom HPO file: p6 parse-excel -e tests/data/Sydney_Python_transformation.xlsx -hpo src/P6/hp.json ``` +### p6 audit-excel + +Run a lightweight audit on each sheet in an Excel workbook, reporting header counts, sheet classification, and missing variant‐column checks. + +Usage: `p6 audit-excel [OPTIONS] EXCEL_FILE` + +Options: +```markdown + -e, --excel-path FILE path to the Excel workbook [required] + -r, --report-json output audit report as JSON instead of table + --help Show this message and exit. +``` + ## Development & Testing Install dev requirements: diff --git a/src/P6/__main__.py b/src/P6/__main__.py index 8ac8f4b..1eb71e9 100644 --- a/src/P6/__main__.py +++ b/src/P6/__main__.py @@ -6,13 +6,14 @@ import click import hpotk +import json import pandas as pd # Not needed for Pandas_Workaround, i.e. don't call declare or call "_read_sheets" at all, just use `tables = load_sheets_as_tables(excel_file)` which only needs `from .loader import load_sheets_as_tables` import pathlib import requests import sys import typing -from collections import defaultdict +from collections import defaultdict, namedtuple from datetime import datetime from google.protobuf.json_format import MessageToJson from stairval.notepad import create_notepad @@ -22,6 +23,8 @@ from .loader import load_sheets_as_tables from .mapper import DefaultMapper +AuditEntry = namedtuple("AuditEntry", ["step", "sheet", "message", "level"]) + @click.group() def main(): @@ -29,6 +32,52 @@ def main(): pass +@main.command(name="audit-excel") +@click.option( + "-e", + "--excel-path", + "excel_file", + required=True, + type=click.Path(exists=True, dir_okay=False), + help="path to the Excel workbook", +) +@click.option( + "-r", + "--report-json", + "report_json", + is_flag=True, + help="output audit report as JSON instead of table", +) +def audit_excel(excel_file: str, report_json: bool): + """ + Run a preprocessing audit on each sheet in the given workbook: + - header normalization + - sheet classification (genotype/phenotype/skip) + - variant‐column presence checks + """ + # 1) Read sheets + tables = _read_sheets(excel_file) + + # 2) Produce audit entries + from .__main__ import preprocess + + entries = preprocess(tables) + + # 3) Render report + if report_json: + # turn each AuditEntry into a serializable dict + payload = [ + {"step": e.step, "sheet": e.sheet, "level": e.level, "message": e.message} + for e in entries + ] + click.echo(json.dumps(payload, indent=2)) + else: + # table header + click.echo(f"{'SHEET':20} {'STEP':25} {'LEVEL':8} MESSAGE") + for e in entries: + click.echo(f"{e.sheet:20} {e.step:25} {e.level:8} {e.message}") + + @main.command(name="download") @click.option( "-d", @@ -46,7 +95,6 @@ def main(): help="exact HPO release tag (e.g. 2025-03-03 or v2025-03-03)", ) def download(data_dir: str, hpo_version: typing.Optional[str]): - # TODO: download an HPO """ Download a specific or the latest HPO JSON release into the tests/data/ folder. """ @@ -94,7 +142,20 @@ def download(data_dir: str, hpo_version: typing.Optional[str]): type=click.Path(exists=True, dir_okay=False), help="path to a custom HPO JSON file (defaults to tests/data/hp.json)", ) -def parse_excel(excel_file: str, hpo_path: typing.Optional[str] = None): +@click.option( + "--strict-variants/--no-strict-variants", + default=False, + help=("Treat raw↔HGVS mismatches as errors (default: warn)."), +) +@click.option( + "--verbose", is_flag=True, help="Show preprocessing and classification steps" +) +def parse_excel( + excel_file: str, + hpo_path: typing.Optional[str] = None, + verbose: bool = False, + strict_variants: bool = False, +): """ Read each sheet, check column order, then: - Identify as a Genotype sheet if ALL GENOTYPE_KEY_COLUMNS are present. @@ -107,41 +168,67 @@ def parse_excel(excel_file: str, hpo_path: typing.Optional[str] = None): # 2) Build ontology and mapper ontology = _load_ontology(str(hpo_file)) - mapper = DefaultMapper(ontology) + mapper = DefaultMapper(ontology, strict_variants=strict_variants) # 3) Read all sheets into DataFrames tables = _read_sheets(excel_file) # tables = load_sheets_as_tables(excel_file) # Just use this for Pandas_Workaround. Don't call declare or call "_read_sheets" at all. Just use `tables = load_sheets_as_tables(excel_file)` which only needs `from .loader import load_sheets_as_tables` # TODO: Decide if it is better to implement `Pandas_Workaround` or just use Pandas + # optionally audit preprocessing + if verbose: + for entry in preprocess(tables): + # click.echo(f"[{entry.level.upper():7}] {entry.step:20} {entry.sheet:15} {entry.message}") + # indent every line… + indent = " " + line = f"{entry.step:20} {entry.sheet:15} {entry.message}" + # color by level + click.echo("") # blank line before mapping output + if entry.level == "error": + colored = click.style(line, fg="red") + elif entry.level in ("warn", "warning"): + colored = click.style(line, fg="yellow") + else: + colored = click.style(line, fg="cyan") + click.echo(indent + colored) + click.echo("") # a blank line before mapping output + # 4) Apply mapping to get raw records and collect issues notepad = create_notepad("phenopackets") - genotype_records, phenotype_records = mapper.apply_mapping(tables, notepad) + phenopackets = mapper.apply_mapping(tables, notepad) + # Refactor: mapper returns list[Phenopacket]; counts are exposed via mapper.stats. + + # apply_mapping.8) Serialize phenopackets per patient + # phenopackets = mapper.apply_mapping(tables, notepad) + output_dir = _prepare_output_dir() + count = 0 + for pkt in phenopackets: + with open(output_dir / f"{count + 1}.json", "w", encoding="utf-8") as out_f: + out_f.write(MessageToJson(pkt)) + count += 1 + # Use mapper.stats["patients"] instead of len(records_by_patient) + # apply_mapping.9) Final summary + click.echo( + f"Wrote {mapper.stats.get('patients', count)} phenopacket files to {output_dir}" + ) + # TODO: Come back and add more top-level fields # 5) Report any errors or warnings _report_issues(notepad) - # pps = mapper.apply_mapping(all_sheets, notepad) - # assert not notepad.has_errors_or_warnings(include_subsections=True) - # TODO: write phenopackets to a folder - # click.echo(f"Created {len(pps)} Phenotype objects") - # 6) Group results by patient - records_by_patient = _group_records_by_patient(genotype_records, phenotype_records) - # 7) Prepare output directory with timestamp - # Will contain genotype and phenotype records as JSON - generated_phenopacket_output_dir = _prepare_output_dir() - # 8) Serialize phenopackets per patient - _write_phenopackets(records_by_patient, generated_phenopacket_output_dir) - # 9) Final summary - click.echo( - f"Wrote {len(records_by_patient)} phenopacket files to {generated_phenopacket_output_dir}" - ) - click.echo(f"Created {len(genotype_records)} Genotype objects") - click.echo(f"Created {len(phenotype_records)} Phenotype objects") + # click.echo(f"Wrote {len(records_by_patient)} phenopacket files to {generated_phenopacket_output_dir}") + # click.echo(f"Created {len(genotype_records)} Genotype objects") + # click.echo(f"Created {len(phenotype_records)} Phenotype objects") + # Maintain exact lines expected by tests: + counts = getattr(mapper, "stats", {}) + click.echo(f"Created {counts.get('genotypes', 0)} Genotype objects") + click.echo(f"Created {counts.get('phenotypes', 0)} Phenotype objects") + # TODO: (For printing other counts, I need to come back and mirror the same pattern: + # counts.get('diseases', 0), counts.get('measurements', 0), counts.get('biosamples', 0)) def _locate_hpo_file(hpo_path: typing.Optional[str]) -> pathlib.Path: @@ -181,17 +268,37 @@ def _report_issues(notepad): def _group_records_by_patient( - genotype_records: list, phenotype_records: list + genotype_records: list, + phenotype_records: list, + disease_records: list, + measurement_records: list, + biosample_records: list, ) -> dict[str, dict[str, list]]: # Group genotype & phenotype records by patient ID - records = defaultdict(lambda: {"genotype_records": [], "phenotype_records": []}) + records = defaultdict( + lambda: { + "genotype_records": [], + "phenotype_records": [], + "disease_records": [], + "measurement_records": [], + "biosample_records": [], + } + ) for genotype in genotype_records: records[genotype.genotype_patient_ID]["genotype_records"].append(genotype) for phenotype in phenotype_records: records[phenotype.phenotype_patient_ID]["phenotype_records"].append(phenotype) + for disease in disease_records: + records[disease.patient_ID]["disease_records"].append(disease) + for measurement in measurement_records: + records[measurement.patient_ID]["measurement_records"].append(measurement) + for biosample in biosample_records: + records[biosample.patient_ID]["biosample_records"].append(biosample) return records +# 7) Prepare output directory with timestamp +# Will contain genotype and phenotype records as JSON def _prepare_output_dir() -> pathlib.Path: # use YYYY-MM-DD_HH-MM-SS for human-readable timestamps timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") @@ -240,9 +347,7 @@ def _write_phenopackets( genomic_interpretation_entry.InterpretationStatus.CONTRIBUTORY ) - # now fill in the VariationDescriptor - # TODO: set this up later - # Omit setting gene_context for now. + # TODO: Revise VariationDescriptor and gene_context later, omit setting gene_context for now. # variation_descriptor = genomic_interpretation_entry.variant_interpretation.variation_descriptor # we can also set variation_descriptor.gene_context and variation_descriptor.allelic_state here then serialize out as before # variation_descriptor.gene_context.gene_symbol = genotype_record.gene_symbol @@ -253,13 +358,13 @@ def _write_phenopackets( variation_descriptor = variant_interpretation.variation_descriptor # 1) Gene symbol & allelic state - # 'gene_context' is a message; you must CopyFrom if setting a message, - # but for its scalar fields you can still assign directly: + # 'gene_context' is a message; we need to CopyFrom if setting a message, + # but for its scalar fields we can still assign directly: variation_descriptor.gene_context.symbol = genotype_record.gene_symbol variation_descriptor.allelic_state.CopyFrom( pps2.OntologyClass( id="GENO:" - + genotype_record.zygosity_code, # or however you construct this + + genotype_record.zygosity_code, # or however we decide to construct this later on label=genotype_record.zygosity, ) ) @@ -291,11 +396,24 @@ def _write_phenopackets( # some protobuffs give trouble when trying to expose location/alleles so just skip pass - # TODO: when ready, add an Expression.HGVS here - # Record the HGVS genomic notation as an Expression - # expr = variation_descriptor.expressions.add() - # expr.syntax = Phenopacket.Diagnosis.GenomicInterpretation.VariantInterpretation.VariationDescriptor.Expression.HGVS - # expr.value = genotype_record.hgvsg + # 3c) Add optional entries (if any): + for d in patient_data["disease_records"]: + ds = phenopacket.diseases.add() + ds.term.id = d.disease_term + ds.term.label = d.disease_label + ds.onset = d.disease_onset + ds.status = d.disease_status + for m in patient_data["measurement_records"]: + meas = phenopacket.measurements.add() + meas.type.id = m.measurement_type + meas.value = m.measurement_value + meas.unit = m.measurement_unit + meas.timestamp = m.measurement_timestamp + for b in patient_data["biosample_records"]: + bs = phenopacket.biosamples.add() + bs.id = b.biosample_id + bs.type.id = b.biosample_type + bs.collection_time = b.collection_date # 3d) Serialize to JSON generated_phenopacket_output_path = ( @@ -305,5 +423,69 @@ def _write_phenopackets( out_f.write(MessageToJson(phenopacket)) +def preprocess(tables: dict[str, pd.DataFrame]) -> list[AuditEntry]: + """ + Run lightweight audits on each sheet: + - header normalization + - sheet classification + - variant‐column presence (raw vs HGVS) + """ + from .mapper import ( + RAW_VARIANT_COLUMNS, + HGVS_VARIANT_COLUMNS, + GENOTYPE_BASE_COLUMNS, + PHENOTYPE_KEY_COLUMNS, + ) + + entries: list[AuditEntry] = [] + + # Step 1: header counts + for name, df in tables.items(): + entries.append( + AuditEntry( + step="normalize-headers", + sheet=name, + message=f"{len(df.columns)} cols", + level="info", + ) + ) + + # Step 2: classify + for name, df in tables.items(): + cols = set(df.columns) + has_raw = RAW_VARIANT_COLUMNS.issubset(cols) + has_hgvs = bool(HGVS_VARIANT_COLUMNS & cols) + is_gen = GENOTYPE_BASE_COLUMNS.issubset(cols) and (has_raw or has_hgvs) + is_pheno = PHENOTYPE_KEY_COLUMNS.issubset(cols) + + kind = "genotype" if is_gen else "phenotype" if is_pheno else "skip" + entries.append( + AuditEntry( + step="classify-sheet", + sheet=name, + message=kind + + ( + f" ({'raw+hgvs' if has_raw and has_hgvs else 'raw' if has_raw else 'hgvs'})" + ), + level="info", + ) + ) + + # Step 3: variant columns + for name, df in tables.items(): + cols = set(df.columns) + if GENOTYPE_BASE_COLUMNS.issubset(cols): + if not (RAW_VARIANT_COLUMNS.issubset(cols) or HGVS_VARIANT_COLUMNS & cols): + entries.append( + AuditEntry( + step="variant-check", + sheet=name, + message="missing raw & HGVS", + level="error", + ) + ) + return entries + + if __name__ == "__main__": main() diff --git a/src/P6/biosample.py b/src/P6/biosample.py new file mode 100644 index 0000000..13dbbf6 --- /dev/null +++ b/src/P6/biosample.py @@ -0,0 +1,25 @@ +""" +Biosample domain model. + +Defines the BiosampleRecord dataclass for capturing sample metadata. +""" + +from dataclasses import dataclass + + +@dataclass +class BiosampleRecord: + """ + Represents a biosample entry for a patient. + + Attributes: + patient_ID: Unique alphanumeric patient identifier. + biosample_id: Unique identifier for the biosample. + biosample_type: CURIE of the tissue or sample type (e.g. 'UBERON:0002107'). + collection_date: Date string in 'YYYY-MM-DD' format. + """ + + patient_ID: str + biosample_id: str + biosample_type: str + collection_date: str diff --git a/src/P6/disease.py b/src/P6/disease.py new file mode 100644 index 0000000..d044623 --- /dev/null +++ b/src/P6/disease.py @@ -0,0 +1,27 @@ +""" +Disease domain model. + +Defines the DiseaseRecord dataclass for capturing disease annotations. +""" + +from dataclasses import dataclass + + +@dataclass +class DiseaseRecord: + """ + Represents a disease entry for a patient. + + Attributes: + patient_ID: Unique alphanumeric patient identifier. + disease_term: CURIE of the disease term (e.g. 'OMIM:266600'). + disease_label: Human-readable label for the disease. + disease_onset: Date string in 'YYYY-MM-DD' format. + disease_status: True if the disease is present, False if excluded. + """ + + patient_ID: str + disease_term: str + disease_label: str + disease_onset: str + disease_status: bool diff --git a/src/P6/loader.py b/src/P6/loader.py index a353d95..6cc8f55 100644 --- a/src/P6/loader.py +++ b/src/P6/loader.py @@ -13,6 +13,17 @@ "hpo": "hpo_id", "hpo_term": "hpo_id", # also catch "HPO Term" → hpo_term → hpo_id "timestamp": "date_of_observation", + "disease_term": "disease_term", + "disease_label": "disease_label", + "disease_onset": "disease_onset", + "disease_status": "disease_status", + "measurement_type": "measurement_type", + "measurement_value": "measurement_value", + "measurement_unit": "measurement_unit", + "measurement_timestamp": "measurement_timestamp", + "biosample_id": "biosample_id", + "biosample_type": "biosample_type", + "collection_date": "collection_date", } diff --git a/src/P6/mapper.py b/src/P6/mapper.py index 169967d..c17e9b9 100644 --- a/src/P6/mapper.py +++ b/src/P6/mapper.py @@ -12,20 +12,23 @@ import re import typing +from collections import defaultdict +from dataclasses import dataclass from phenopackets.schema.v2.phenopackets_pb2 import Phenopacket from stairval.notepad import Notepad +from typing import List, TypeVar, Tuple +from .biosample import BiosampleRecord +from .disease import DiseaseRecord from .genotype import Genotype +from .measurement import MeasurementRecord from .phenotype import Phenotype +import phenopackets.schema.v2 as pps2 -class TableMapper(metaclass=abc.ABCMeta): - @abc.abstractmethod - def apply_mapping( - self, tables: dict[str, pd.DataFrame], notepad: Notepad - ) -> typing.Sequence[Phenopacket]: - pass - +T = TypeVar("T") +RowParseResult = Tuple[List[T], List[hpotk.TermId]] +# gives us one consistent return shape: (parsed_items, aux_ids_for_batch_validation) # For any renamed field, the two neighbors it must sit between EXPECTED_COLUMN_NEIGHBORS = { @@ -55,6 +58,11 @@ def apply_mapping( PHENOTYPE_KEY_COLUMNS = {"hpo_id", "date_of_observation", "status"} +# Key columns to identify additional sheets +DISEASE_KEY_COLUMNS = {"disease_term", "disease_onset"} +MEASUREMENT_KEY_COLUMNS = {"measurement_type", "measurement_value", "measurement_unit"} +BIOSAMPLE_KEY_COLUMNS = {"biosample_id", "biosample_type", "collection_date"} + # Map raw zygosity abbreviations to allowed dataclass zygosity values ZYGOSITY_MAP = { "het": "heterozygous", @@ -71,43 +79,112 @@ def apply_mapping( "denovo": "de_novo_mutation", } +# Variant column groups used for validation and HGVS↔raw consistency checks +RAW_VARIANT_COLUMNS = { + "chromosome", + "start_position", + "end_position", + "reference", + "alternate", +} +HGVS_VARIANT_COLUMNS = {"hgvsg", "hgvsc", "hgvsp"} +# minimal base columns to call something a genotype sheet (we bring the index in later) +GENOTYPE_BASE_COLUMNS = {"contact_email", "phasing"} + +# Friendly aliases → reduces friction while keeping behavior explicit +KNOWN_SHEET_ALIASES: dict[str, set[str]] = { + "genotype": {"genotype", "variants", "variant", "geno"}, + "phenotype": {"phenotype", "hpo", "pheno"}, + "diseases": {"disease", "diseases"}, + "measurements": {"measurement", "measurements", "labs"}, + "biosamples": {"biosample", "biosamples", "samples"}, +} + + +@dataclass +class TypedTables: + """ + Explicit, typed access to workbook sheets. + Any field can be `None`, meaning that the sheet not provided. + """ + + genotype: pd.DataFrame | None + phenotype: pd.DataFrame | None + diseases: pd.DataFrame | None + measurements: pd.DataFrame | None + biosamples: pd.DataFrame | None -class DefaultMapper(TableMapper): - def __init__(self, hpo: hpotk.MinimalOntology): - self._hpo = hpo +class TableMapper(metaclass=abc.ABCMeta): + @abc.abstractmethod def apply_mapping( self, tables: dict[str, pd.DataFrame], notepad: Notepad - ) -> tuple[list[Genotype], list[Phenotype]]: - genotype_records: list[Genotype] = [] - phenotype_records: list[Phenotype] = [] - - for sheet_name, df in tables.items(): - columns = set(df.columns) - """Send each sheet to the right extractor and collect all records.""" - is_genotype_sheet = GENOTYPE_KEY_COLUMNS.issubset(columns) - is_phenotype_sheet = PHENOTYPE_KEY_COLUMNS.issubset(columns) - - if is_genotype_sheet == is_phenotype_sheet: - # ambiguous sheet should give a warning instead of an error - notepad.add_warning( - f"Skipping {sheet_name!r}: cannot unambiguously classify as genotype or phenotype" - ) - continue + ) -> typing.Sequence[Phenopacket]: + # return fully-assembled Phenopacket messages, not intermediate parts. + raise NotImplementedError - # rename the former-index column - working = self._prepare_sheet(df, is_genotype_sheet) - if is_genotype_sheet: - genotype_records.extend( - self._map_genotype(sheet_name, working, notepad) - ) - else: - phenotype_records.extend( - self._map_phenotype(sheet_name, working, notepad) - ) +class DefaultMapper(TableMapper): + def __init__(self, hpo: hpotk.MinimalOntology, strict_variants: bool = False): + """ + - False: raw⇄HGVS mismatches are logged as WARNINGS + - True : raw⇄HGVS mismatches are logged as ERRORS + """ + self._hpo = hpo + self.strict_variants = strict_variants - return genotype_records, phenotype_records + def apply_mapping( + self, tables: dict[str, pd.DataFrame], notepad: Notepad + ) -> list[Phenopacket]: + """ + Process: + 1) choose/validate input tables + 2) map rows to domain records + 3) group records per patient + 4) construct Phenopacket per patient + 5) return the list of packets + + """ + # TODO: implement the placeholders I am going to temporarily call + # Map each selected sheet to domain-specific records via the table-level wrappers + # The wrappers handle index→patient id normalization and any sheet-level checks then delegate to the row mappers. + typed_tables = self._choose_named_tables(tables, notepad) + genotype_records = self._map_genotype_table(typed_tables.genotype, notepad) + phenotype_records = self._map_phenotype_table(typed_tables.phenotype, notepad) + disease_records = self._map_diseases_table(typed_tables.diseases, notepad) + measurement_records = self._map_measurements_table( + typed_tables.measurements, notepad + ) + biosample_records = self._map_biosamples_table(typed_tables.biosamples, notepad) + + # apply_mapping.6) Group results by patient + grouped = self._group_records_by_patient( + genotype_records, + phenotype_records, + disease_records, + measurement_records, + biosample_records, + ) + + packets: list[Phenopacket] = [ + self.construct_phenopacket_for_patient(patient_id, bundle, notepad) + for patient_id, bundle in grouped.items() + ] + + # Back-compatability for CLI/tests: + # Expose simple counts without changing the return type. + # The CLI prints "Created N Genotype objects" / "Created N Phenotype objects" + # and the tests assert on those exact lines. + self.stats = { + "genotypes": len(genotype_records), + "phenotypes": len(phenotype_records), + "diseases": len(disease_records), + "measurements": len(measurement_records), + "biosamples": len(biosample_records), + "patients": len(grouped), + } + + return packets def _prepare_sheet(self, df: pd.DataFrame, is_genotype: bool) -> pd.DataFrame: """Bring the index into a column and name it appropriately.""" @@ -116,142 +193,237 @@ def _prepare_sheet(self, df: pd.DataFrame, is_genotype: bool) -> pd.DataFrame: original = working.columns[0] return working.rename(columns={original: column_id}) - def _map_genotype( - self, sheet_name: str, df: pd.DataFrame, notepad: Notepad - ) -> list[Genotype]: - records: list[Genotype] = [] - for idx, row in df.iterrows(): - # handle slash‑separated zygosity and inheritance - list_of_zygosity_types = [ - z.strip().lower() for z in str(row["zygosity"]).split("/") - ] - list_of_inheritance_types = [ - i.strip().lower() for i in str(row["inheritance"]).split("/") - ] - for zygosity_type, inheritance_type in zip( - list_of_zygosity_types, list_of_inheritance_types - ): - if zygosity_type not in ZYGOSITY_MAP: - notepad.add_error( - f"Sheet {sheet_name!r}: Unrecognized zygosity code {zygosity_type!r}" - ) - if inheritance_type not in INHERITANCE_MAP: - notepad.add_error( - f"Sheet {sheet_name!r}: Unrecognized inheritance code {inheritance_type!r}" - ) - # allow missing/NaN contact_email → substitute dummy - raw_email = row["contact_email"] - contact_email = ( - "unknown@example.com" - if pd.isna(raw_email) - else str(raw_email).strip() - ) - kwargs = { - "genotype_patient_ID": str(row["genotype_patient_ID"]), - "contact_email": contact_email, - "phasing": bool(row["phasing"]), - "chromosome": str(row["chromosome"]), - "start_position": int(row["start_position"]), - "end_position": int(row["end_position"]), - "reference": str(row["reference"]), - "alternate": str(row["alternate"]), - "gene_symbol": str(row["gene_symbol"]), - "hgvsg": str(row["hgvsg"]), - "hgvsc": str(row["hgvsc"]), - "hgvsp": str(row["hgvsp"]), - "zygosity": ZYGOSITY_MAP[zygosity_type], - "inheritance": INHERITANCE_MAP[inheritance_type], - } - try: - records.append(Genotype(**kwargs)) - except (ValueError, TypeError) as e: - notepad.add_error(f"Sheet {sheet_name!r}, row {idx}: {e}") - return records - - def _map_phenotype( - self, sheet_name: str, df: pd.DataFrame, notepad: Notepad - ) -> list[Phenotype]: - records: list[Phenotype] = [] - # Collect every HPO ID in this sheet, so we can validate propagation later: - all_ids: list[hpotk.TermId] = [] - - for idx, row in df.iterrows(): - # normalize phenotype fields into valid strings - hpo_cell = str(row["hpo_id"]).strip() - # Parse optional label and digits - # extract the last token (it should just be the HPO code), case‑insensitive - m = re.match( - r""" - ^\s* - (?P