Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@ API Reference
=============

.. automodapi:: curies
:no-inheritance-diagram:
:no-heading:
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,4 @@ The most recent code and data can be installed directly from GitHub with:
services/index
typing
w3c
preprocessing
29 changes: 29 additions & 0 deletions docs/source/preprocessing.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
Converter with Preprocessing
============================

When simple expansion and contraction aren't enough, and you want to inject global or
context-specific rewrite rules, you can wrap a :class:`curies.Converter` and
preprocessing rules encoded in an instance of :class:`curies.PreprocessingRules` inside
a :class:`curies.PreprocessingConverter`.

For example, you always want to fix legacy references to the ``OBO_REL`` namespace:

.. code-block:: python

import curies
from curies import PreprocessingRules, PreprocessingConverter
from curies.wrapped import Rewrites

rules = PreprocessingRules(
rewrites=Rewrites(
full={"OBO_REL:is_a": "rdfs:subClassOf"}
)
)

converter = curies.get_obo_converter()
converter = PreprocessingConverter.from_converter(
converter, rules=rules
)

>>> converter.parse_curie("OBO_REL:is_a")
ReferenceTuple('rdfs', 'subClassOf')
10 changes: 10 additions & 0 deletions src/curies/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
write_tsv,
)
from .discovery import discover, discover_from_rdf
from .preprocessing import (
PreprocessingBlacklist,
PreprocessingConverter,
PreprocessingRewrites,
PreprocessingRules,
)
from .reconciliation import remap_curie_prefixes, remap_uri_prefixes, rewire
from .sources import (
get_bioregistry_converter,
Expand All @@ -45,6 +51,10 @@
"NamedReference",
"Prefix",
"PrefixMap",
"PreprocessingBlacklist",
"PreprocessingConverter",
"PreprocessingRewrites",
"PreprocessingRules",
"Record",
"Records",
"Reference",
Expand Down
26 changes: 15 additions & 11 deletions src/curies/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1500,26 +1500,30 @@ def compress_or_standardize(

# docstr-coverage:excused `overload`
@overload
def parse(self, uri_or_curie: str, *, strict: Literal[True]) -> ReferenceTuple: ...
def parse(
self, str_or_uri_or_curie: str, *, strict: Literal[True] = True
) -> ReferenceTuple: ...

# docstr-coverage:excused `overload`
@overload
def parse(self, uri_or_curie: str, *, strict: Literal[False]) -> ReferenceTuple | None: ...
def parse(
self, str_or_uri_or_curie: str, *, strict: Literal[False] = False
) -> ReferenceTuple | None: ...

def parse(self, uri_or_curie: str, *, strict: bool) -> ReferenceTuple | None:
"""Parse a URI or CURIE."""
if self.is_uri(uri_or_curie):
def parse(self, str_or_uri_or_curie: str, *, strict: bool = False) -> ReferenceTuple | None:
"""Parse a string, URI, or CURIE."""
if self.is_uri(str_or_uri_or_curie):
if strict:
return self.parse_uri(uri_or_curie, strict=True, return_none=True)
return self.parse_uri(str_or_uri_or_curie, strict=True, return_none=True)
else:
return self.parse_uri(uri_or_curie, strict=False, return_none=True)
if self.is_curie(uri_or_curie):
return self.parse_uri(str_or_uri_or_curie, strict=False, return_none=True)
if self.is_curie(str_or_uri_or_curie):
if strict:
return self.parse_curie(uri_or_curie, strict=True)
return self.parse_curie(str_or_uri_or_curie, strict=True)
else:
return self.parse_curie(uri_or_curie, strict=False)
return self.parse_curie(str_or_uri_or_curie, strict=False)
if strict:
raise CompressionError(uri_or_curie)
raise CompressionError(str_or_uri_or_curie)
return None

def compress_strict(self, uri: str) -> str:
Expand Down
276 changes: 276 additions & 0 deletions src/curies/preprocessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
"""Reusable configuration."""

from __future__ import annotations

import json
from pathlib import Path
from typing import Any, Literal, TypeVar, overload

from pydantic import BaseModel, Field
from typing_extensions import Self

from .api import Converter, Reference, ReferenceTuple

__all__ = [
"BlacklistError",
"PreprocessingBlacklist",
"PreprocessingConverter",
"PreprocessingRewrites",
"PreprocessingRules",
]

X = TypeVar("X", bound=Reference)


class PreprocessingBlacklist(BaseModel):
"""A model for prefix and full blacklists."""

full: list[str] = Field(default_factory=list)
resource_full: dict[str, list[str]] = Field(default_factory=dict)
prefix: list[str] = Field(default_factory=list)
resource_prefix: dict[str, list[str]] = Field(default_factory=dict)
suffix: list[str] = Field(default_factory=list)

def _sort(self) -> None:
self.full.sort()
self.prefix.sort()
self.suffix.sort()
for v in self.resource_full.values():
v.sort()
for v in self.resource_prefix.values():
v.sort()

def str_has_blacklisted_prefix(
self, str_or_curie_or_uri: str, *, context: str | None = None
) -> bool:
"""Check if the CURIE string has a blacklisted prefix."""
if context:
prefixes: list[str] = self.resource_prefix.get(context, [])
if prefixes and any(str_or_curie_or_uri.startswith(prefix) for prefix in prefixes):
return True
return any(str_or_curie_or_uri.startswith(prefix) for prefix in self.prefix)

def str_has_blacklisted_suffix(self, str_or_curie_or_uri: str) -> bool:
"""Check if the CURIE string has a blacklisted suffix."""
return any(str_or_curie_or_uri.endswith(suffix) for suffix in self.suffix)

def str_is_blacklisted_full(
self, str_or_curie_or_uri: str, *, context: str | None = None
) -> bool:
"""Check if the full CURIE string is blacklisted."""
if context and str_or_curie_or_uri in self.resource_full.get(context, set()):
return True
return str_or_curie_or_uri in self.full

def str_is_blacklisted(self, str_or_curie_or_uri: str, *, context: str | None = None) -> bool:
"""Check if the full CURIE string is blacklisted."""
return (
self.str_has_blacklisted_prefix(str_or_curie_or_uri, context=context)
or self.str_has_blacklisted_suffix(str_or_curie_or_uri)
or self.str_is_blacklisted_full(str_or_curie_or_uri, context=context)
Comment thread
cthoyt marked this conversation as resolved.
Outdated
)


class PreprocessingRewrites(BaseModel):
"""A model for prefix and full rewrites."""

full: dict[str, str] = Field(
default_factory=dict, description="Global remappings for an entire string"
)
resource_full: dict[str, dict[str, str]] = Field(
default_factory=dict, description="Resource-keyed remappings for an entire string"
)
prefix: dict[str, str] = Field(
default_factory=dict, description="Global remappings of just the prefix"
)
resource_prefix: dict[str, dict[str, str]] = Field(
default_factory=dict, description="Resource-keyed remappings for just a prefix"
)

def remap_full(
self,
str_or_curie_or_uri: str,
reference_cls: type[X],
*,
context: str | None = None,
) -> X | None:
"""Remap the string if possible otherwise return it."""
if context:
resource_rewrites: dict[str, str] = self.resource_full.get(context, {})
if resource_rewrites and str_or_curie_or_uri in resource_rewrites:
return reference_cls.from_curie(resource_rewrites[str_or_curie_or_uri])

if str_or_curie_or_uri in self.full:
return reference_cls.from_curie(self.full[str_or_curie_or_uri])

return None

def remap_prefix(self, str_or_curie_or_uri: str, *, context: str | None = None) -> str:
"""Remap a prefix."""
if context is not None:
for old_prefix, new_prefix in self.resource_prefix.get(context, {}).items():
if str_or_curie_or_uri.startswith(old_prefix):
return new_prefix + str_or_curie_or_uri[len(old_prefix) :]
for old_prefix, new_prefix in self.prefix.items():
if str_or_curie_or_uri.startswith(old_prefix):
return new_prefix + str_or_curie_or_uri[len(old_prefix) :]
return str_or_curie_or_uri


class PreprocessingRules(BaseModel):
"""A model for blacklists and rewrites."""

blacklists: PreprocessingBlacklist
rewrites: PreprocessingRewrites

@classmethod
def lint_file(cls, path: str | Path) -> None:
"""Lint a file, in place, given a file path."""
path = Path(path).expanduser().resolve()
rules = cls.model_validate_json(path.read_text())
rules.blacklists._sort()
path.write_text(
json.dumps(
rules.model_dump(exclude_unset=True, exclude_defaults=True),
sort_keys=True,
indent=2,
)
)

def str_is_blacklisted(self, str_or_curie_or_uri: str, *, context: str | None = None) -> bool:
"""Check if the CURIE string is blacklisted."""
return self.blacklists.str_is_blacklisted(str_or_curie_or_uri, context=context)

def remap_full(
self,
str_or_curie_or_uri: str,
reference_cls: type[X],
*,
context: str | None = None,
) -> X | None:
"""Remap the string if possible otherwise return it."""
return self.rewrites.remap_full(
str_or_curie_or_uri, reference_cls=reference_cls, context=context
)

def remap_prefix(self, str_or_curie_or_uri: str, *, context: str | None = None) -> str:
"""Remap a prefix."""
return self.rewrites.remap_prefix(str_or_curie_or_uri, context=context)


def _load_rules(rules: str | Path | PreprocessingRules) -> PreprocessingRules:
if isinstance(rules, (str, Path)):
rules = Path(rules).expanduser().resolve()
rules = PreprocessingRules.model_validate_json(rules.read_text())
return rules


class BlacklistError(ValueError):
"""An error for blacklist."""


class PreprocessingConverter(Converter):
"""A converter with pre-processing rules."""

def __init__(
self,
*args: Any,
rules: PreprocessingRules | str | Path,
reference_cls: type[X] | None = None,
**kwargs: Any,
) -> None:
"""Instantiate a converter with a ruleset for pre-processing.

:param args: Positional arguments passed to :func:`Converter.__init__`
:param rules: A set of rules
:param reference_cls: The reference class to use. Defaults to
:class:`curies.Reference`.
:param kwargs: Keyword arguments passed to :func:`Converter.__init__`
"""
super().__init__(*args, **kwargs)
self.rules = _load_rules(rules)
self._reference_cls = Reference if reference_cls is None else reference_cls

@classmethod
def from_converter(cls, converter: Converter, rules: PreprocessingRules | str | Path) -> Self:
"""Wrap a converter with a ruleset."""
return cls(records=converter.records, rules=rules)

# docstr-coverage:excused `overload`
@overload
def parse(
self,
str_or_uri_or_curie: str,
*,
strict: Literal[True] = True,
context: str | None = ...,
) -> ReferenceTuple: ...

# docstr-coverage:excused `overload`
@overload
def parse(
self,
str_or_uri_or_curie: str,
*,
strict: Literal[False] = False,
context: str | None = ...,
) -> ReferenceTuple | None: ...

def parse(
self, str_or_uri_or_curie: str, *, strict: bool = False, context: str | None = None
) -> ReferenceTuple | None:
"""Parse a string, CURIE, or URI."""
if r1 := self.rules.remap_full(
str_or_uri_or_curie, reference_cls=self._reference_cls, context=context
):
return r1.pair

# Remap node's prefix (if necessary)
str_or_uri_or_curie = self.rules.remap_prefix(str_or_uri_or_curie, context=context)

if self.rules.str_is_blacklisted(str_or_uri_or_curie, context=context):
raise BlacklistError

if strict:
return super().parse(str_or_uri_or_curie, strict=strict)
return super().parse(str_or_uri_or_curie, strict=strict)

# docstr-coverage:excused `overload`
@overload
def parse_curie(
self, curie: str, *, strict: Literal[False] = False, context: str | None = ...
) -> ReferenceTuple | None: ...

# docstr-coverage:excused `overload`
@overload
def parse_curie(
self, curie: str, *, strict: Literal[True] = True, context: str | None = ...
) -> ReferenceTuple: ...

def parse_curie(
self, curie: str, *, strict: bool = False, context: str | None = None
) -> ReferenceTuple | None:
"""Parse and standardize a CURIE.

:param curie: The CURIE to parse and standardize
:param strict: If the CURIE can't be parsed, should an error be thrown? Defaults
to false.
:param context: Is there a context, e.g., an ontology prefix that should be
applied to the remapping and blacklist rules?

:returns: A tuple representing a parsed and standardized CURIE

:raises BlacklistError: If the CURIE is blacklisted
"""
if r1 := self.rules.remap_full(curie, reference_cls=self._reference_cls, context=context):
return r1.pair

# Remap node's prefix (if necessary)
curie = self.rules.remap_prefix(curie, context=context)

if self.rules.str_is_blacklisted(curie, context=context):
raise BlacklistError

if strict:
return super().parse_curie(curie, strict=strict)
return super().parse_curie(curie, strict=strict)
Loading
Loading