-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path__main__.py
More file actions
459 lines (404 loc) · 16.8 KB
/
__main__.py
File metadata and controls
459 lines (404 loc) · 16.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
"""
Command‑line interface for P6 toolkit.
Now determines sheet type by presence of multiple required key columns,
and normalizes numeric HPO IDs and timestamps.
"""
import click
import hpotk
import json
import pandas as pd # Not needed for Pandas_Workaround, i.e. don't call declare or call "_read_sheets" at all, just use `tables = load_sheets_as_tables(excel_file)` which only needs `from .loader import load_sheets_as_tables`
import pathlib
import requests
import sys
import typing
from collections import defaultdict, namedtuple
from datetime import datetime
from google.protobuf.json_format import MessageToJson
from stairval.notepad import create_notepad
from phenopackets.schema.v2.phenopackets_pb2 import Phenopacket
from .loader import load_sheets_as_tables
from .mapper import DefaultMapper
AuditEntry = namedtuple("AuditEntry", ["step", "sheet", "message", "level"])
@click.group()
def main():
"""P6: Peter's Parse and Processing of Prenatal Particulars via Pandas."""
pass
@main.command(name="audit-excel")
@click.option(
"-e",
"--excel-path",
"excel_file",
required=True,
type=click.Path(exists=True, dir_okay=False),
help="path to the Excel workbook",
)
@click.option(
"-r",
"--report-json",
"report_json",
is_flag=True,
help="output audit report as JSON instead of table",
)
def audit_excel(excel_file: str, report_json: bool):
"""
Run a preprocessing audit on each sheet in the given workbook:
- header normalization
- sheet classification (genotype/phenotype/skip)
- variant‐column presence checks
"""
# 1) Read sheets
tables = _read_sheets(excel_file)
# 2) Produce audit entries
from .__main__ import preprocess
entries = preprocess(tables)
# 3) Render report
if report_json:
# turn each AuditEntry into a serializable dict
payload = [
{"step": e.step, "sheet": e.sheet, "level": e.level, "message": e.message}
for e in entries
]
click.echo(json.dumps(payload, indent=2))
else:
# table header
click.echo(f"{'SHEET':20} {'STEP':25} {'LEVEL':8} MESSAGE")
for e in entries:
click.echo(f"{e.sheet:20} {e.step:25} {e.level:8} {e.message}")
@main.command(name="download")
@click.option(
"-d",
"--data-path",
"data_dir",
default="tests/data",
type=click.Path(exists=True),
help="where to save HPO JSON (default: tests/data)",
)
@click.option(
"-v",
"--hpo-version",
default=None,
type=str,
help="exact HPO release tag (e.g. 2025-03-03 or v2025-03-03)",
)
def download(data_dir: str, hpo_version: typing.Optional[str]):
"""
Download a specific or the latest HPO JSON release into the tests/data/ folder.
"""
datadir = pathlib.Path(data_dir)
datadir.mkdir(parents=True, exist_ok=True)
# figure out which tag to download
if hpo_version:
tag = hpo_version if hpo_version.startswith("v") else f"v{hpo_version}"
else:
# GitHub latest‐release API
resp = requests.get(
"https://api.github.com/repos/obophenotype/human-phenotype-ontology/releases/latest"
)
resp.raise_for_status()
tag = resp.json()["tag_name"]
url = (
f"https://github.com/obophenotype/human-phenotype-ontology/"
f"releases/download/{tag}/hp.json"
)
click.echo(f"Downloading HPO release {tag} …")
resp = requests.get(url)
resp.raise_for_status()
out = datadir / "hp.json"
with open(out, "wb") as f:
f.write(resp.content)
click.echo(f"Saved HPO JSON to {out}")
pass
@main.command(name="parse-excel")
@click.option(
"-e",
"--excel-path",
"excel_file",
required=True,
type=click.Path(exists=True, dir_okay=False),
help="path to the Excel workbook",
)
@click.option(
"-hpo",
"--custom-hpo",
"hpo_path",
type=click.Path(exists=True, dir_okay=False),
help="path to a custom HPO JSON file (defaults to tests/data/hp.json)",
)
@click.option(
"--strict-variants/--no-strict-variants",
default=False,
help=("Treat raw↔HGVS mismatches as errors (default: warn)."),
)
@click.option(
"--verbose", is_flag=True, help="Show preprocessing and classification steps"
)
def parse_excel(
excel_file: str,
hpo_path: typing.Optional[str] = None,
verbose: bool = False,
strict_variants: bool = False,
):
"""
Read each sheet, check column order, then:
- Identify as a Genotype sheet if ALL GENOTYPE_KEY_COLUMNS are present.
- Identify as a Phenotype sheet if ALL PHENOTYPE_KEY_COLUMNS are present.
- Otherwise skip.
Instantiate objects accordingly, normalizing HPO IDs & timestamps.
"""
# 1) Load (or locate) the HPO JSON file
hpo_file = _locate_hpo_file(hpo_path)
# 2) Build ontology and mapper
ontology = _load_ontology(str(hpo_file))
mapper = DefaultMapper(ontology, strict_variants=strict_variants)
# 3) Read all sheets into DataFrames
tables = _read_sheets(excel_file)
# tables = load_sheets_as_tables(excel_file) # Just use this for Pandas_Workaround. Don't call declare or call "_read_sheets" at all. Just use `tables = load_sheets_as_tables(excel_file)` which only needs `from .loader import load_sheets_as_tables`
# TODO: Decide if it is better to implement `Pandas_Workaround` or just use Pandas
# optionally audit preprocessing
if verbose:
for entry in preprocess(tables):
# click.echo(f"[{entry.level.upper():7}] {entry.step:20} {entry.sheet:15} {entry.message}")
# indent every line…
indent = " "
line = f"{entry.step:20} {entry.sheet:15} {entry.message}"
# color by level
click.echo("") # blank line before mapping output
if entry.level == "error":
colored = click.style(line, fg="red")
elif entry.level in ("warn", "warning"):
colored = click.style(line, fg="yellow")
else:
colored = click.style(line, fg="cyan")
click.echo(indent + colored)
click.echo("") # a blank line before mapping output
# 4) Apply mapping to get raw records and collect issues
notepad = create_notepad("phenopackets")
phenopackets = mapper.apply_mapping(tables, notepad)
# Refactor: mapper returns list[Phenopacket]; counts are exposed via mapper.stats.
# apply_mapping.8) Serialize phenopackets per patient
# phenopackets = mapper.apply_mapping(tables, notepad)
output_dir = _prepare_output_dir()
count = 0
for pkt in phenopackets:
with open(output_dir / f"{count + 1}.json", "w", encoding="utf-8") as out_f:
out_f.write(MessageToJson(pkt))
count += 1
# Use mapper.stats["patients"] instead of len(records_by_patient)
# apply_mapping.9) Final summary
click.echo(
f"Wrote {mapper.stats.get('patients', count)} phenopacket files to {output_dir}"
)
# TODO: Come back and add more top-level fields
# 5) Report any errors or warnings
_report_issues(notepad)
# 6) Group results by patient
# 7) Prepare output directory with timestamp
# 8) Serialize phenopackets per patient
# 9) Final summary
# click.echo(f"Wrote {len(records_by_patient)} phenopacket files to {generated_phenopacket_output_dir}")
# click.echo(f"Created {len(genotype_records)} Genotype objects")
# click.echo(f"Created {len(phenotype_records)} Phenotype objects")
# Maintain exact lines expected by tests:
counts = getattr(mapper, "stats", {})
click.echo(f"Created {counts.get('genotypes', 0)} Genotype objects")
click.echo(f"Created {counts.get('phenotypes', 0)} Phenotype objects")
# TODO: (For printing other counts, I need to come back and mirror the same pattern:
# counts.get('diseases', 0), counts.get('measurements', 0), counts.get('biosamples', 0))
def _locate_hpo_file(hpo_path: typing.Optional[str]) -> pathlib.Path:
# pick HPO JSON: either custom or default
if hpo_path:
hpo_file = pathlib.Path(hpo_path)
else:
hpo_file = pathlib.Path("tests/data") / "hp.json"
# Explicit file check avoids try/except (Ruff BLE001) while providing clear error flow.
# More efficient than catching an IOError later because we fail fast and early.
if not hpo_file.is_file():
click.echo(f"Error: HPO file not found at {hpo_file}", err=True)
sys.exit(1)
return hpo_file
def _load_ontology(hpo_file: str) -> hpotk.MinimalOntology:
# load ontology from JSON
return hpotk.load_minimal_ontology(hpo_file)
# Comment out this function and all uses to just get a Pandas_Workaround by only using the original `tables = load_sheets_as_tables(excel_file)`, which only depends on `from .loader import load_sheets_as_tables`
def _read_sheets(excel_file: str) -> dict[str, pd.DataFrame]:
# read each worksheet into a DataFrame
return load_sheets_as_tables(excel_file)
def _report_issues(notepad):
# if there were errors, show them
if notepad.has_errors(include_subsections=True):
click.echo("Errors found in mapping:")
for err in notepad.errors():
click.echo(f"- {err}")
# show any warnings but keep going
if notepad.has_warnings(include_subsections=True):
click.echo("Warnings found in mapping:")
for w in notepad.warnings():
click.echo(f"- {w}")
def _group_records_by_patient(
genotype_records: list,
phenotype_records: list,
disease_records: list,
measurement_records: list,
biosample_records: list,
) -> dict[str, dict[str, list]]:
# Group genotype & phenotype records by patient ID
records = defaultdict(
lambda: {
"genotype_records": [],
"phenotype_records": [],
"disease_records": [],
"measurement_records": [],
"biosample_records": [],
}
)
for genotype in genotype_records:
records[genotype.genotype_patient_ID]["genotype_records"].append(genotype)
for phenotype in phenotype_records:
records[phenotype.phenotype_patient_ID]["phenotype_records"].append(phenotype)
for disease in disease_records:
records[disease.patient_ID]["disease_records"].append(disease)
for measurement in measurement_records:
records[measurement.patient_ID]["measurement_records"].append(measurement)
for biosample in biosample_records:
records[biosample.patient_ID]["biosample_records"].append(biosample)
return records
# 7) Prepare output directory with timestamp
# Will contain genotype and phenotype records as JSON
def _prepare_output_dir() -> pathlib.Path:
# use YYYY-MM-DD_HH-MM-SS for human-readable timestamps
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
phenopacket_from_excel_output_dir = (
pathlib.Path.cwd() / "phenopacket_from_excel" / timestamp / "phenopackets"
)
phenopacket_from_excel_output_dir.mkdir(parents=True, exist_ok=True)
return phenopacket_from_excel_output_dir
def _write_phenopackets(
records_by_patient: dict[str, dict[str, list]],
generated_phenopacket_output_dir: pathlib.Path,
):
# Build and write one Phenopacket per patient
for patient_id, patient_data in records_by_patient.items():
phenopacket = Phenopacket()
phenopacket.id = patient_id
phenopacket.subject.id = patient_id
# 3a) Add phenotypic features
for phenotype in patient_data["phenotype_records"]:
feature = phenopacket.phenotypic_features.add()
feature.type.id = phenotype.HPO_ID
# mark as excluded if status is False
if not phenotype.status:
feature.excluded = True
# 3b) Add genotype interpretations
# Genotypes → Interpretation → Diagnosis → GenomicInterpretation
for interpretation_index, genotype_record in enumerate(
patient_data["genotype_records"]
):
# Create a new Interpretation entry (must set id and progress_status)
interpretation = phenopacket.interpretations.add()
interpretation.id = f"{patient_id}-interpretation-{interpretation_index}"
interpretation.progress_status = interpretation.ProgressStatus.COMPLETED
# each Interpretation has a `diagnosis` submessage (no `id` field)
diagnosis = interpretation.diagnosis
# inside Diagnosis, add GenomicInterpretation
genomic_interpretation_entry = diagnosis.genomic_interpretations.add()
genomic_interpretation_entry.subject_or_biosample_id = patient_id
genomic_interpretation_entry.interpretation_status = (
genomic_interpretation_entry.InterpretationStatus.CONTRIBUTORY
)
# TODO: Revise VariationDescriptor and gene_context
# Build a complete VariationDescriptor directly from the Genotype
# Build a complete VariationDescriptor directly from the Genotype
vd = genotype_record.to_variation_descriptor()
variant_interpretation = genomic_interpretation_entry.variant_interpretation
# Prefer CopyFrom when available (protobuf Message API) to avoid Ruff BLE001 (broad exception). Feature-detecting keeps us compatible with protobuf builds where CopyFrom may or may not exist on the generated message class, without catching a blanket Exception
copy_from = getattr(
variant_interpretation.variation_descriptor, "CopyFrom", None
)
if callable(copy_from):
copy_from(vd)
else:
# Fallback when CopyFrom is absent: MergeFrom retains the previous behavior without catching a blanket Exception
variant_interpretation.variation_descriptor.MergeFrom(vd) # type: ignore[attr-defined]
# 3c) Add optional entries (if any):
for d in patient_data["disease_records"]:
ds = phenopacket.diseases.add()
ds.term.id = d.disease_term
ds.term.label = d.disease_label
ds.onset = d.disease_onset
ds.status = d.disease_status
for m in patient_data["measurement_records"]:
meas = phenopacket.measurements.add()
meas.type.id = m.measurement_type
meas.value = m.measurement_value
meas.unit = m.measurement_unit
meas.timestamp = m.measurement_timestamp
for b in patient_data["biosample_records"]:
bs = phenopacket.biosamples.add()
bs.id = b.biosample_id
bs.type.id = b.biosample_type
bs.collection_time = b.collection_date
# 3d) Serialize to JSON
generated_phenopacket_output_path = (
generated_phenopacket_output_dir / f"{patient_id}.json"
)
with open(generated_phenopacket_output_path, "w", encoding="utf-8") as out_f:
out_f.write(MessageToJson(phenopacket))
def preprocess(tables: dict[str, pd.DataFrame]) -> list[AuditEntry]:
"""
Run lightweight audits on each sheet:
- header normalization
- sheet classification
- variant‐column presence (raw vs HGVS)
"""
from .mapper import (
RAW_VARIANT_COLUMNS,
HGVS_VARIANT_COLUMNS,
GENOTYPE_BASE_COLUMNS,
PHENOTYPE_KEY_COLUMNS,
)
entries: list[AuditEntry] = []
# Step 1: header counts
for name, df in tables.items():
entries.append(
AuditEntry(
step="normalize-headers",
sheet=name,
message=f"{len(df.columns)} cols",
level="info",
)
)
# Step 2: classify
for name, df in tables.items():
cols = set(df.columns)
has_raw = RAW_VARIANT_COLUMNS.issubset(cols)
has_hgvs = bool(HGVS_VARIANT_COLUMNS & cols)
is_gen = GENOTYPE_BASE_COLUMNS.issubset(cols) and (has_raw or has_hgvs)
is_pheno = PHENOTYPE_KEY_COLUMNS.issubset(cols)
kind = "genotype" if is_gen else "phenotype" if is_pheno else "skip"
entries.append(
AuditEntry(
step="classify-sheet",
sheet=name,
message=kind
+ (
f" ({'raw+hgvs' if has_raw and has_hgvs else 'raw' if has_raw else 'hgvs'})"
),
level="info",
)
)
# Step 3: variant columns
for name, df in tables.items():
cols = set(df.columns)
if GENOTYPE_BASE_COLUMNS.issubset(cols):
if not (RAW_VARIANT_COLUMNS.issubset(cols) or HGVS_VARIANT_COLUMNS & cols):
entries.append(
AuditEntry(
step="variant-check",
sheet=name,
message="missing raw & HGVS",
level="error",
)
)
return entries
if __name__ == "__main__":
main()