Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 44 additions & 55 deletions src/semra/struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,9 @@
from __future__ import annotations

import math
import pickle
from abc import ABC, abstractmethod
from collections.abc import Iterable
from hashlib import md5
from hashlib import sha256
from typing import Annotated, Any, ClassVar, Generic, Literal, NamedTuple, ParamSpec, TypeVar, Union

import pydantic
Expand Down Expand Up @@ -117,13 +116,7 @@
X = TypeVar("X")


def _md5_hexdigest(picklable: object) -> str:
hasher = md5() # noqa: S324
hasher.update(pickle.dumps(picklable))
return hasher.hexdigest()


class KeyedMixin(ABC, Generic[P, X]):
class KeyedMixin(ABC, Generic[P]):
"""A mixin for a class that can be hashed and CURIE-encoded."""

#: The prefix for CURIEs for instances of this class
Expand All @@ -133,14 +126,16 @@ def __init_subclass__(cls, *, prefix: str, **kwargs: Any) -> None:
cls._prefix = prefix

@abstractmethod
def key(self, *args: P.args, **kwargs: P.kwargs) -> X:
"""Return a picklable key."""
def key(self, *args: P.args, **kwargs: P.kwargs) -> str:
"""Return a string key."""
raise NotImplementedError

def hexdigest(self, *args: P.args, **kwargs: P.kwargs) -> str:
"""Get a hex string for the MD5 hash of the pickled key() for this class."""
key = self.key(*args, **kwargs)
return _md5_hexdigest(key)
s = self.key(*args, **kwargs)
hasher = sha256()
hasher.update(s.encode("utf-8"))
return hasher.hexdigest()

def get_reference(self, *args: P.args, **kwargs: P.kwargs) -> Reference:
"""Get a CURIE reference using this class's prefix and its hexadecimal representation."""
Expand Down Expand Up @@ -193,7 +188,7 @@ class MappingSetKey(NamedTuple):
class MappingSet(
pydantic.BaseModel,
ConfidenceMixin,
KeyedMixin[[], MappingSetKey],
KeyedMixin[[]],
prefix=SEMRA_MAPPING_SET_PREFIX,
):
"""Represents a set of semantic mappings.
Expand Down Expand Up @@ -239,9 +234,9 @@ class MappingSet(
description="Mapping set level confidence. Corresponds to optional SSSOM field https://mapping-commons.github.io/sssom/mapping_set_confidence/",
)

def key(self) -> MappingSetKey:
"""Get a picklable key representing the mapping set."""
return MappingSetKey(self.purl or "", self.name, self.version or "", self.license or "")
def key(self) -> str:
"""Get a string key representing the mapping set."""
return "\n".join((self.purl or "", self.name, self.version or "", self.license or ""))

def get_confidence(self) -> float:
"""Get the explicit confidence for the mapping set."""
Expand All @@ -259,7 +254,7 @@ class SimpleEvidenceKey(NamedTuple):

class SimpleEvidence(
pydantic.BaseModel,
KeyedMixin[[Union[Triple, "Mapping"]], tuple[StrTriple, SimpleEvidenceKey]],
KeyedMixin[[Union[Triple, "Mapping"]]],
EvidenceMixin,
ConfidenceMixin,
prefix=SEMRA_EVIDENCE_PREFIX,
Expand Down Expand Up @@ -288,25 +283,24 @@ class SimpleEvidence(
)
confidence: float | None = Field(None, description="The confidence")

def _simple_key(self) -> SimpleEvidenceKey:
return SimpleEvidenceKey(
self.evidence_type,
self.justification.curie,
self.author.curie if self.author else "",
self.mapping_set.key(),
def _simple_key(self) -> str:
return "\t".join(
(
self.evidence_type,
self.justification.curie,
self.author.curie if self.author else "",
self.mapping_set.key(),
)
)

def key(self, triple: Triple | Mapping) -> tuple[StrTriple, SimpleEvidenceKey]:
def key(self, triple: Triple | Mapping) -> str:
"""Get a key suitable for hashing the evidence.

:returns: A key for deduplication based on the mapping set.

Note: this should be extended to include basically _all_ fields
"""
return (
triple.as_str_triple(),
self._simple_key(),
)
return "\t".join((*triple.as_str_triple(), self._simple_key()))

@property
def mapping_set_names(self) -> set[str]:
Expand All @@ -318,12 +312,6 @@ def get_confidence(self) -> float:
return self.confidence if self.confidence is not None else self.mapping_set.confidence


def _sort_evidence_key(ev: Evidence) -> tuple[Any, ...]:
# the first element of the simple key is the type of evidence,
# so they can be compared
return ev._simple_key()


class ReasonedEvidenceKey(NamedTuple):
"""The key used for a reasoned evidence."""

Expand All @@ -336,7 +324,7 @@ class ReasonedEvidenceKey(NamedTuple):

class ReasonedEvidence(
pydantic.BaseModel,
KeyedMixin[[Union[Triple, "Mapping"]], tuple[StrTriple, ReasonedEvidenceKey]],
KeyedMixin[[Union[Triple, "Mapping"]]],
EvidenceMixin,
ConfidenceMixin,
prefix=SEMRA_EVIDENCE_PREFIX,
Expand All @@ -355,30 +343,29 @@ class ReasonedEvidence(
1.0, description="The probability that the reasoning method is correct"
)

def _simple_key(self) -> ReasonedEvidenceKey:
return ReasonedEvidenceKey(
self.evidence_type,
self.justification.curie,
tuple(
tuple(
evidence.key(mapping)
for evidence in sorted(mapping.evidence, key=_sort_evidence_key)
)
for mapping in sorted(self.mappings)
),
def _simple_key(self) -> str:
return "\t".join(
(
self.evidence_type,
self.justification.curie,
*(
"|".join(
evidence.key(mapping)
for evidence in sorted(mapping.evidence, key=lambda e: e._simple_key())
)
for mapping in sorted(self.mappings)
),
)
)

def key(self, triple: Triple | Mapping) -> tuple[StrTriple, ReasonedEvidenceKey]:
def key(self, triple: Triple | Mapping) -> str:
"""Get a key suitable for hashing the evidence.

:returns: A key for deduplication based on the mapping set.

Note: this should be extended to include basically _all_ fields
"""
return (
triple.as_str_triple(),
self._simple_key(),
)
return "\t".join((*triple.as_str_triple(), self._simple_key()))

def get_confidence(self) -> float:
r"""Calculate confidence for the reasoned evidence.
Expand Down Expand Up @@ -433,7 +420,7 @@ def explanation(self) -> str:
class Mapping(
Triple,
ConfidenceMixin,
KeyedMixin[[], StrTriple],
KeyedMixin[[]],
prefix=SEMRA_MAPPING_PREFIX,
):
"""A semantic mapping.
Expand All @@ -455,9 +442,11 @@ def triple(self) -> Triple:
"""Get the mapping's core triple as a tuple."""
return Triple(subject=self.subject, predicate=self.predicate, object=self.object)

def key(self) -> StrTriple:
# TODO converge on reusing the curies definition of a triple/mapping identifier

def key(self) -> str:
"""Get a hashable key for the mapping, based on the subject, predicate, and object."""
return self.as_str_triple()
return "\t".join(self.as_str_triple())

@classmethod
def from_triple(
Expand Down
19 changes: 10 additions & 9 deletions tests/test_neo4j_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from semra.io import write_neo4j
from semra.struct import Triple
from semra.vocabulary import BEN_REFERENCE, CHAIN_MAPPING, CHARLIE
from tests import resources

# TODO test when concept name has problematic characters like tabs or newlines

Expand Down Expand Up @@ -134,12 +135,12 @@ def test_neo4j_output(self) -> None:
# but changes in the underlying data structure updated the pickles, invaliding old
# hashes. Therefore, we should stop relying on picking for hashing and define explicit
# ones for every object (yes, obvious in hindsight)
# for path in [
# resources.CONCEPT_NODES_TSV_PATH,
# resources.EVIDENCE_NODES_TSV_PATH,
# resources.MAPPING_NODES_TSV_PATH,
# resources.MAPPING_SET_NODES_TSV_PATH,
# resources.EDGES_TSV_PATH,
# resources.MAPPING_EDGES_TSV_PATH,
# ]:
# self.assertEqual(path.read_text(), directory.joinpath(path.name).read_text())
for path in [
resources.CONCEPT_NODES_TSV_PATH,
resources.EVIDENCE_NODES_TSV_PATH,
resources.MAPPING_NODES_TSV_PATH,
resources.MAPPING_SET_NODES_TSV_PATH,
resources.EDGES_TSV_PATH,
resources.MAPPING_EDGES_TSV_PATH,
]:
self.assertEqual(path.read_text(), directory.joinpath(path.name).read_text())
Loading