Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
f7cc1b1
feat: ObservationRow class
Gabrielstav May 6, 2026
7d464b2
feat: device exposure row, observation builder deferred until other d…
Gabrielstav May 7, 2026
1ad87b9
feat: sequence_id on AdverseEvent domain
Gabrielstav May 8, 2026
6e803cb
wip: refactoring base domains to use foreign keys, updating invariant…
Gabrielstav May 8, 2026
8b58378
feat: natural key in collection domain models
Gabrielstav May 11, 2026
548b5be
feat: natural keys for singleton domains
Gabrielstav May 11, 2026
98eda72
feat: natural keys done
Gabrielstav May 11, 2026
bfa311c
feat: deterministic sorting of collection domains by natural key befo…
Gabrielstav May 11, 2026
364de06
feat: collection domain sorting test
Gabrielstav May 11, 2026
96ebb6b
feat: updated builder callsites to use natural keys in id generation
Gabrielstav May 11, 2026
f334d80
feat: checking natural key colissions in hydration
Gabrielstav May 11, 2026
3debb72
feat: Not Evaluable tumor assessments included to measurement table
Gabrielstav May 12, 2026
ba32870
chore: docstring update
Gabrielstav May 12, 2026
f1bfca8
feat: updated static mapping and structural and static lookups no lon…
Gabrielstav May 12, 2026
9789cab
fix: eot_reason treatment complete mapping updated to observation domain
Gabrielstav May 12, 2026
1c1e2eb
feat: fk linkage from condition occurrance, mapping update, fk linkag…
Gabrielstav May 12, 2026
19ae7ab
observation builder
Gabrielstav May 19, 2026
f7488de
docstrings
Gabrielstav May 20, 2026
5acf958
refactor: clinical benefit from scalar to singleton domain model
Gabrielstav May 20, 2026
69e2396
feat: using ClinicalBenefit model in Observation builder
Gabrielstav May 20, 2026
d5dd150
feat: ConditionOccurrence publishes FK so measurement (cancer modifie…
Gabrielstav May 21, 2026
d6fb13b
feat: added normalization file for target biomarkers
Gabrielstav May 21, 2026
c0a5c38
feat: tumor type harmonization file
Gabrielstav May 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def run_pipeline(preprocessing_input: Path, base_root: Path, trial: str = "IMPRE
meta=_meta,
)

print(f"Harmonized: {harmonized_result.patients[0:10]}")
# print(f"Harmonized: {harmonized_result.patients[0:10]}")

# run semantic mapping
semantic_mapper = SemanticService(outdir=base_root, layout=Layout.TRIAL_TIMESTAMP_RUN)
Expand All @@ -89,7 +89,7 @@ def run_pipeline(preprocessing_input: Path, base_root: Path, trial: str = "IMPRE
tables: OmopTables = omop_service.build(harmonized_result.patients)

# todo: remove
print(f"built tables: {tables}")
# print(f"built tables: {tables}")

# export concept lookup tracking (missed lookups, coverage stats)
concept_service.export(formats="csv")
Expand Down
2 changes: 1 addition & 1 deletion src/omop_etl/concept_mapping/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


def _norm(v: str | None) -> str:
"""Lowercase + strip a CSV value, defaulting None to empty string."""
"""Lowercase and strip a CSV value, defaulting None to empty string."""
return (v or "").lower().strip()


Expand Down
4 changes: 2 additions & 2 deletions src/omop_etl/concept_mapping/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def lookup_static(
validity=c.validity,
)
if not _concept_matches_filter(concept, domains, vocabs, validity):
self._result.record_miss("static", value_set, local_value)
# concept mapped but rejected by filter
return None

self._result.record_match("static", value_set, local_value, concept)
Expand Down Expand Up @@ -198,7 +198,7 @@ def lookup_structural(
validity=c.validity,
)
if not _concept_matches_filter(concept, domains, vocabs, validity):
self._result.record_miss("structural", value_set, "")
# concept mapped but rejected by filter
return None

self._result.record_match("structural", value_set, "", concept)
Expand Down
47 changes: 47 additions & 0 deletions src/omop_etl/harmonization/harmonizers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class CollectionSpec(SpecBase):
order_by: tuple[str, ...] = ()
require_order_by: bool = False
items_col: str = "items"
on_natural_key_conflict: Literal["error", "warn"] = "warn"


# union type for all specs
Expand All @@ -84,6 +85,37 @@ def _derived_name(fn: Callable[..., Any]) -> str:
return name.removeprefix("_process_")


def _check_natural_key_conflicts(
objs: list[DomainBase],
*,
patient_id: str,
item_type: type[DomainBase],
policy: Literal["error", "warn"],
) -> None:
"""
Detect natural-key collisions where the rows have differing data.

Identical duplicates (same NK, same data) are assumed to be deduplicated
upstream by the collection processor, so this only flags conflicts.
Keeps the first occurrence.
"""
seen: dict[tuple, DomainBase] = {}
fields = item_type.data_fields()
for obj in objs:
nk = obj.natural_key()
prior = seen.get(nk)
if prior is None:
seen[nk] = obj
continue
if all(getattr(prior, f) == getattr(obj, f) for f in fields):
continue
diffs = {f: (getattr(prior, f), getattr(obj, f)) for f in fields if getattr(prior, f) != getattr(obj, f)}
msg = f"{item_type.__name__} natural-key conflict for patient {patient_id}: NK={nk} has conflicting values: {diffs}"
if policy == "error":
raise ValueError(msg)
log.warning(msg)


def scalar(
*,
name: str | None = None,
Expand Down Expand Up @@ -166,6 +198,7 @@ def collection(
skip_missing_patients: bool = False,
subject_col: str = "SubjectId",
strict_schema: bool | None = None,
on_natural_key_conflict: Literal["error", "warn"] = "warn",
) -> Callable[[_F], _F]:
"""
Decorator: register a method as a collection-domain processor.
Expand All @@ -187,6 +220,7 @@ def decorator(fn: _F) -> _F:
skip_missing_patients=skip_missing_patients,
subject_col=subject_col,
strict_schema=strict_schema,
on_natural_key_conflict=on_natural_key_conflict,
)
setattr(fn, _SPEC_ATTR, spec)
return fn
Expand Down Expand Up @@ -418,6 +452,7 @@ def _run_spec(self, spec: ProcessorSpec) -> None:
items_col=spec.items_col,
skip_missing_patients=spec.skip_missing_patients,
mode=spec.mode,
on_natural_key_conflict=spec.on_natural_key_conflict,
)

elif isinstance(spec, SingletonSpec):
Expand Down Expand Up @@ -595,6 +630,7 @@ def hydrate_collection_field(
item_type: type[DomainBase],
patients: dict[str, Patient],
mode: Literal["replace", "extend"] = "replace",
on_natural_key_conflict: Literal["error", "warn"] = "warn",
) -> None:
"""
Instantiate collection domain models onto Patient after schema validation.
Expand All @@ -611,6 +647,9 @@ def hydrate_collection_field(
item_type: Target domain class (used to resolve Patient attribute).
patients: Map of patient_id to Patient instance.
mode: "replace" overwrites, "extend" appends to existing collection.
on_natural_key_conflict: "warn" logs a warning when two instances share a natural key
but differ in other field values; "error" raises ValueError. Identical duplicates
(same NK, same data) are assumed to be deduplicated upstream.
"""
target_attr = Patient.get_attr_for_type(item_type)
build = builder or item_type.from_row
Expand All @@ -627,6 +666,14 @@ def hydrate_collection_field(
except Exception as e:
raise ValueError(f"{item_type.__name__} collection hydration failed for {sid=}") from e

if item_type.NATURAL_KEY_FIELDS:
_check_natural_key_conflicts(
objs,
patient_id=sid,
item_type=item_type,
policy=on_natural_key_conflict,
)

if mode == "extend":
existing = getattr(patient, target_attr, ()) or ()
objs = list(existing) + objs
Expand Down
Loading
Loading