Skip to content

Commit 13ce8b8

Browse files
committed
Create wrapped.py
1 parent b09f040 commit 13ce8b8

1 file changed

Lines changed: 182 additions & 0 deletions

File tree

src/curies/wrapped.py

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
"""Reusable configuration."""
2+
3+
from typing import TypeVar
4+
from pathlib import Path
5+
6+
from curies import ReferenceTuple
7+
from pydantic import BaseModel, Field
8+
from .api import Reference, Converter
9+
from typing_extensions import Self
10+
11+
__all__ = [
12+
"Blacklist",
13+
"Rewrites",
14+
"Rules",
15+
"BlacklistError",
16+
"PreprocessingConverter",
17+
]
18+
19+
X = TypeVar("X", bound=Reference)
20+
21+
22+
class Blacklist(BaseModel):
23+
"""A model for prefix and full blacklists."""
24+
25+
full: list[str]
26+
resource_full: dict[str, list[str]]
27+
prefix: list[str]
28+
resource_prefix: dict[str, list[str]]
29+
suffix: list[str]
30+
31+
def _sort(self) -> None:
32+
self.full.sort()
33+
self.prefix.sort()
34+
self.suffix.sort()
35+
for v in self.resource_full.values():
36+
v.sort()
37+
for v in self.resource_prefix.values():
38+
v.sort()
39+
40+
def str_has_blacklisted_prefix(
41+
self, str_or_curie_or_uri: str, *, ontology_prefix: str | None = None
42+
) -> bool:
43+
"""Check if the CURIE string has a blacklisted prefix."""
44+
if ontology_prefix:
45+
prefixes: list[str] = self.resource_prefix.get(ontology_prefix, [])
46+
if prefixes and any(str_or_curie_or_uri.startswith(prefix) for prefix in prefixes):
47+
return True
48+
return any(str_or_curie_or_uri.startswith(prefix) for prefix in self.prefix)
49+
50+
def str_has_blacklisted_suffix(self, str_or_curie_or_uri: str) -> bool:
51+
"""Check if the CURIE string has a blacklisted suffix."""
52+
return any(str_or_curie_or_uri.endswith(suffix) for suffix in self.suffix)
53+
54+
def str_is_blacklisted_full(
55+
self, str_or_curie_or_uri: str, *, ontology_prefix: str | None = None
56+
) -> bool:
57+
"""Check if the full CURIE string is blacklisted."""
58+
if ontology_prefix and str_or_curie_or_uri in self.resource_full.get(
59+
ontology_prefix, set()
60+
):
61+
return True
62+
return str_or_curie_or_uri in self.full
63+
64+
65+
class Rewrites(BaseModel):
66+
"""A model for prefix and full rewrites."""
67+
68+
full: dict[str, str] = Field(..., description="Global remappings for an entire string")
69+
resource_full: dict[str, dict[str, str]] = Field(
70+
..., description="Resource-keyed remappings for an entire string"
71+
)
72+
prefix: dict[str, str] = Field(..., description="Global remappings of just the prefix")
73+
resource_prefix: dict[str, dict[str, str]] = Field(
74+
..., description="Resource-keyed remappings for just a prefix"
75+
)
76+
77+
def remap_full(
78+
self, str_or_curie_or_uri: str, cls: type[X], *, ontology_prefix: str | None = None
79+
) -> X | None:
80+
"""Remap the string if possible otherwise return it."""
81+
if ontology_prefix:
82+
resource_rewrites: dict[str, str] = self.resource_full.get(ontology_prefix, {})
83+
if resource_rewrites and str_or_curie_or_uri in resource_rewrites:
84+
return cls.from_curie(resource_rewrites[str_or_curie_or_uri])
85+
86+
if str_or_curie_or_uri in self.full:
87+
return cls.from_curie(self.full[str_or_curie_or_uri])
88+
89+
return None
90+
91+
def remap_prefix(self, str_or_curie_or_uri: str, ontology_prefix: str | None = None) -> str:
92+
"""Remap a prefix."""
93+
if ontology_prefix is not None:
94+
for old_prefix, new_prefix in self.resource_prefix.get(ontology_prefix, {}).items():
95+
if str_or_curie_or_uri.startswith(old_prefix):
96+
return new_prefix + str_or_curie_or_uri[len(old_prefix):]
97+
for old_prefix, new_prefix in self.prefix.items():
98+
if str_or_curie_or_uri.startswith(old_prefix):
99+
return new_prefix + str_or_curie_or_uri[len(old_prefix):]
100+
return str_or_curie_or_uri
101+
102+
103+
class Rules(BaseModel):
104+
"""A model for blacklists and rewrites."""
105+
106+
blacklists: Blacklist
107+
rewrites: Rewrites
108+
109+
@classmethod
110+
def lint_file(cls, path: str | Path) -> None:
111+
"""Lint a file."""
112+
path = Path(path).expanduser().resolve()
113+
rules = cls.model_validate_json(path.read_text())
114+
rules.blacklists._sort()
115+
path.write_text(json.dumps(rules.model_dump(), sort_keys=True, indent=2))
116+
117+
def str_has_blacklisted_prefix(
118+
self, str_or_curie_or_uri: str, *, ontology_prefix: str | None = None
119+
) -> bool:
120+
"""Check if the CURIE string has a blacklisted prefix."""
121+
return self.blacklists.str_has_blacklisted_prefix(
122+
str_or_curie_or_uri, ontology_prefix=ontology_prefix
123+
)
124+
125+
def str_has_blacklisted_suffix(self, str_or_curie_or_uri: str) -> bool:
126+
"""Check if the CURIE string has a blacklisted suffix."""
127+
return self.blacklists.str_has_blacklisted_suffix(str_or_curie_or_uri)
128+
129+
def str_is_blacklisted_full(
130+
self, str_or_curie_or_uri: str, *, ontology_prefix: str | None = None
131+
) -> bool:
132+
"""Check if the full CURIE string is blacklisted."""
133+
return self.blacklists.str_is_blacklisted_full(
134+
str_or_curie_or_uri, ontology_prefix=ontology_prefix
135+
)
136+
137+
def remap_full(
138+
self, str_or_curie_or_uri: str, cls: type[X], *, ontology_prefix: str | None = None
139+
) -> X | None:
140+
"""Remap the string if possible otherwise return it."""
141+
return self.rewrites.remap_full(
142+
str_or_curie_or_uri, cls=cls, ontology_prefix=ontology_prefix
143+
)
144+
145+
def remap_prefix(self, str_or_curie_or_uri: str, ontology_prefix: str | None = None) -> str:
146+
"""Remap a prefix."""
147+
return self.rewrites.remap_prefix(str_or_curie_or_uri, ontology_prefix=ontology_prefix)
148+
149+
150+
def _load_rules(rules: str | Path | Rules) -> Rules:
151+
if isinstance(rules, str | Path):
152+
rules = Path(rules).expanduser().resolve()
153+
rules = Rules.model_validate_json(rules)
154+
return rules
155+
156+
157+
class BlacklistError(ValueError):
158+
"""An error for blacklist."""
159+
160+
161+
class PreprocessingConverter(Converter):
162+
"""A converter with pre-processing rules."""
163+
164+
def __init__(self, *args: Any, rules: Rules | str | Path, **kwargs) -> None:
165+
super().__init__(*args, **kwargs)
166+
self.rules = _load_rules(rules)
167+
168+
def from_converter(cls, converter: Converter, rules: Rules | str | Path) -> Self:
169+
rules = _load_rules(rules)
170+
return cls(rcords=converter.records, rules=rules)
171+
172+
def parse(self, uri_or_curie: str, *, strict: bool, ontology_prefix: str | None = None) -> ReferenceTuple | None:
173+
if r1 := self.rules.remap_full(uri_or_curie, ontology_prefix=ontology_prefix):
174+
return r1
175+
176+
# Remap node's prefix (if necessary)
177+
uri_or_curie = self.rules.remap_prefix(uri_or_curie, ontology_prefix=ontology_prefix)
178+
179+
if self.rules.str_is_blacklisted(uri_or_curie, ontology_prefix=ontology_prefix):
180+
raise BlacklistError
181+
182+
return super().parse(uri_or_curie, strict=strict)

0 commit comments

Comments
 (0)