diff --git a/src/semra/struct.py b/src/semra/struct.py index 261d5c60..c6f82a0d 100644 --- a/src/semra/struct.py +++ b/src/semra/struct.py @@ -85,10 +85,9 @@ from __future__ import annotations import math -import pickle from abc import ABC, abstractmethod from collections.abc import Iterable -from hashlib import md5 +from hashlib import sha256 from typing import Annotated, Any, ClassVar, Generic, Literal, NamedTuple, ParamSpec, TypeVar, Union import pydantic @@ -117,13 +116,7 @@ X = TypeVar("X") -def _md5_hexdigest(picklable: object) -> str: - hasher = md5() # noqa: S324 - hasher.update(pickle.dumps(picklable)) - return hasher.hexdigest() - - -class KeyedMixin(ABC, Generic[P, X]): +class KeyedMixin(ABC, Generic[P]): """A mixin for a class that can be hashed and CURIE-encoded.""" #: The prefix for CURIEs for instances of this class @@ -133,14 +126,16 @@ def __init_subclass__(cls, *, prefix: str, **kwargs: Any) -> None: cls._prefix = prefix @abstractmethod - def key(self, *args: P.args, **kwargs: P.kwargs) -> X: - """Return a picklable key.""" + def key(self, *args: P.args, **kwargs: P.kwargs) -> str: + """Return a string key.""" raise NotImplementedError def hexdigest(self, *args: P.args, **kwargs: P.kwargs) -> str: """Get a hex string for the MD5 hash of the pickled key() for this class.""" - key = self.key(*args, **kwargs) - return _md5_hexdigest(key) + s = self.key(*args, **kwargs) + hasher = sha256() + hasher.update(s.encode("utf-8")) + return hasher.hexdigest() def get_reference(self, *args: P.args, **kwargs: P.kwargs) -> Reference: """Get a CURIE reference using this class's prefix and its hexadecimal representation.""" @@ -193,7 +188,7 @@ class MappingSetKey(NamedTuple): class MappingSet( pydantic.BaseModel, ConfidenceMixin, - KeyedMixin[[], MappingSetKey], + KeyedMixin[[]], prefix=SEMRA_MAPPING_SET_PREFIX, ): """Represents a set of semantic mappings. @@ -239,9 +234,9 @@ class MappingSet( description="Mapping set level confidence. Corresponds to optional SSSOM field https://mapping-commons.github.io/sssom/mapping_set_confidence/", ) - def key(self) -> MappingSetKey: - """Get a picklable key representing the mapping set.""" - return MappingSetKey(self.purl or "", self.name, self.version or "", self.license or "") + def key(self) -> str: + """Get a string key representing the mapping set.""" + return "\n".join((self.purl or "", self.name, self.version or "", self.license or "")) def get_confidence(self) -> float: """Get the explicit confidence for the mapping set.""" @@ -259,7 +254,7 @@ class SimpleEvidenceKey(NamedTuple): class SimpleEvidence( pydantic.BaseModel, - KeyedMixin[[Union[Triple, "Mapping"]], tuple[StrTriple, SimpleEvidenceKey]], + KeyedMixin[[Union[Triple, "Mapping"]]], EvidenceMixin, ConfidenceMixin, prefix=SEMRA_EVIDENCE_PREFIX, @@ -288,25 +283,24 @@ class SimpleEvidence( ) confidence: float | None = Field(None, description="The confidence") - def _simple_key(self) -> SimpleEvidenceKey: - return SimpleEvidenceKey( - self.evidence_type, - self.justification.curie, - self.author.curie if self.author else "", - self.mapping_set.key(), + def _simple_key(self) -> str: + return "\t".join( + ( + self.evidence_type, + self.justification.curie, + self.author.curie if self.author else "", + self.mapping_set.key(), + ) ) - def key(self, triple: Triple | Mapping) -> tuple[StrTriple, SimpleEvidenceKey]: + def key(self, triple: Triple | Mapping) -> str: """Get a key suitable for hashing the evidence. :returns: A key for deduplication based on the mapping set. Note: this should be extended to include basically _all_ fields """ - return ( - triple.as_str_triple(), - self._simple_key(), - ) + return "\t".join((*triple.as_str_triple(), self._simple_key())) @property def mapping_set_names(self) -> set[str]: @@ -318,12 +312,6 @@ def get_confidence(self) -> float: return self.confidence if self.confidence is not None else self.mapping_set.confidence -def _sort_evidence_key(ev: Evidence) -> tuple[Any, ...]: - # the first element of the simple key is the type of evidence, - # so they can be compared - return ev._simple_key() - - class ReasonedEvidenceKey(NamedTuple): """The key used for a reasoned evidence.""" @@ -336,7 +324,7 @@ class ReasonedEvidenceKey(NamedTuple): class ReasonedEvidence( pydantic.BaseModel, - KeyedMixin[[Union[Triple, "Mapping"]], tuple[StrTriple, ReasonedEvidenceKey]], + KeyedMixin[[Union[Triple, "Mapping"]]], EvidenceMixin, ConfidenceMixin, prefix=SEMRA_EVIDENCE_PREFIX, @@ -355,30 +343,29 @@ class ReasonedEvidence( 1.0, description="The probability that the reasoning method is correct" ) - def _simple_key(self) -> ReasonedEvidenceKey: - return ReasonedEvidenceKey( - self.evidence_type, - self.justification.curie, - tuple( - tuple( - evidence.key(mapping) - for evidence in sorted(mapping.evidence, key=_sort_evidence_key) - ) - for mapping in sorted(self.mappings) - ), + def _simple_key(self) -> str: + return "\t".join( + ( + self.evidence_type, + self.justification.curie, + *( + "|".join( + evidence.key(mapping) + for evidence in sorted(mapping.evidence, key=lambda e: e._simple_key()) + ) + for mapping in sorted(self.mappings) + ), + ) ) - def key(self, triple: Triple | Mapping) -> tuple[StrTriple, ReasonedEvidenceKey]: + def key(self, triple: Triple | Mapping) -> str: """Get a key suitable for hashing the evidence. :returns: A key for deduplication based on the mapping set. Note: this should be extended to include basically _all_ fields """ - return ( - triple.as_str_triple(), - self._simple_key(), - ) + return "\t".join((*triple.as_str_triple(), self._simple_key())) def get_confidence(self) -> float: r"""Calculate confidence for the reasoned evidence. @@ -433,7 +420,7 @@ def explanation(self) -> str: class Mapping( Triple, ConfidenceMixin, - KeyedMixin[[], StrTriple], + KeyedMixin[[]], prefix=SEMRA_MAPPING_PREFIX, ): """A semantic mapping. @@ -455,9 +442,11 @@ def triple(self) -> Triple: """Get the mapping's core triple as a tuple.""" return Triple(subject=self.subject, predicate=self.predicate, object=self.object) - def key(self) -> StrTriple: + # TODO converge on reusing the curies definition of a triple/mapping identifier + + def key(self) -> str: """Get a hashable key for the mapping, based on the subject, predicate, and object.""" - return self.as_str_triple() + return "\t".join(self.as_str_triple()) @classmethod def from_triple( diff --git a/tests/test_neo4j_output.py b/tests/test_neo4j_output.py index 06d93fe8..cb2ddb3c 100644 --- a/tests/test_neo4j_output.py +++ b/tests/test_neo4j_output.py @@ -19,6 +19,7 @@ from semra.io import write_neo4j from semra.struct import Triple from semra.vocabulary import BEN_REFERENCE, CHAIN_MAPPING, CHARLIE +from tests import resources # TODO test when concept name has problematic characters like tabs or newlines @@ -134,12 +135,12 @@ def test_neo4j_output(self) -> None: # but changes in the underlying data structure updated the pickles, invaliding old # hashes. Therefore, we should stop relying on picking for hashing and define explicit # ones for every object (yes, obvious in hindsight) - # for path in [ - # resources.CONCEPT_NODES_TSV_PATH, - # resources.EVIDENCE_NODES_TSV_PATH, - # resources.MAPPING_NODES_TSV_PATH, - # resources.MAPPING_SET_NODES_TSV_PATH, - # resources.EDGES_TSV_PATH, - # resources.MAPPING_EDGES_TSV_PATH, - # ]: - # self.assertEqual(path.read_text(), directory.joinpath(path.name).read_text()) + for path in [ + resources.CONCEPT_NODES_TSV_PATH, + resources.EVIDENCE_NODES_TSV_PATH, + resources.MAPPING_NODES_TSV_PATH, + resources.MAPPING_SET_NODES_TSV_PATH, + resources.EDGES_TSV_PATH, + resources.MAPPING_EDGES_TSV_PATH, + ]: + self.assertEqual(path.read_text(), directory.joinpath(path.name).read_text())