|
14 | 14 | import networkx as nx |
15 | 15 | import pandas as pd |
16 | 16 | import ssslm |
| 17 | +from pydantic import BaseModel, Field |
17 | 18 | from ssslm import LiteralMapping |
18 | 19 | from tqdm.auto import tqdm |
19 | 20 |
|
20 | 21 | from semra.io.graph import _from_digraph_edge, to_digraph |
21 | | -from semra.rules import EXACT_MATCH, FLIP, INVERSION_MAPPING, SubsetConfiguration |
| 22 | +from semra.rules import ( |
| 23 | + DB_XREF, |
| 24 | + EXACT_MATCH, |
| 25 | + FLIP, |
| 26 | + INVERSION_MAPPING, |
| 27 | + KNOWLEDGE_MAPPING, |
| 28 | + SubsetConfiguration, |
| 29 | +) |
22 | 30 | from semra.struct import ( |
23 | 31 | Evidence, |
24 | 32 | Mapping, |
|
35 | 43 | "IdentifierIndex", |
36 | 44 | "Index", |
37 | 45 | "M2MIndex", |
| 46 | + "Mutation", |
38 | 47 | "PrefixIdentifierDict", |
39 | 48 | "PrefixIdentifierDict", |
40 | 49 | "PrefixPairCounter", |
|
60 | 69 | "get_terms", |
61 | 70 | "get_test_evidence", |
62 | 71 | "get_test_reference", |
| 72 | + "handle_mutations", |
63 | 73 | "hydrate_subsets", |
64 | 74 | "keep_object_prefixes", |
65 | 75 | "keep_prefixes", |
@@ -1190,3 +1200,57 @@ def get_asymmetric_counter( |
1190 | 1200 | for (left_prefix, right_prefix), identifiers in index.items() |
1191 | 1201 | } |
1192 | 1202 | ) |
| 1203 | + |
| 1204 | + |
| 1205 | +class Mutation(BaseModel): |
| 1206 | + """Represents a mutation operation on a mapping set.""" |
| 1207 | + |
| 1208 | + source: str = Field(..., description="The source type") |
| 1209 | + target: str | list[str] | None = Field(None, description="limit mutation to these") |
| 1210 | + confidence: float = 1.0 |
| 1211 | + old: Reference = Field(default=DB_XREF) |
| 1212 | + new: Reference = Field(default=EXACT_MATCH) |
| 1213 | + |
| 1214 | + def should_apply_to(self, mapping: Mapping) -> bool: |
| 1215 | + """Check if the mutation should be applied.""" |
| 1216 | + if mapping.subject.prefix != self.source: |
| 1217 | + return False |
| 1218 | + if mapping.predicate != self.old: |
| 1219 | + return False |
| 1220 | + if self.target is None: |
| 1221 | + return True |
| 1222 | + elif isinstance(self.target, str): |
| 1223 | + return self.target == mapping.object.prefix |
| 1224 | + elif isinstance(self.target, list): |
| 1225 | + return any(t == mapping.object.prefix for t in self.target) |
| 1226 | + raise NotImplementedError |
| 1227 | + |
| 1228 | + |
| 1229 | +def handle_mutations( |
| 1230 | + mappings: Iterable[Mapping], mutations: Iterable[Mutation], *, progress: bool = True |
| 1231 | +) -> Iterable[Mapping]: |
| 1232 | + """Apply mutations.""" |
| 1233 | + mutation_index = {} |
| 1234 | + for mutation__ in mutations: |
| 1235 | + if mutation__.source in mutation_index: |
| 1236 | + raise KeyError(f"got multiple configured mutations for source: {mutation__.source}") |
| 1237 | + mutation_index[mutation__.source] = mutation__ |
| 1238 | + for mapping in tqdm(mappings, disable=not progress): |
| 1239 | + mutation = mutation_index.get(mapping.subject.prefix) |
| 1240 | + if not mutation: |
| 1241 | + yield mapping |
| 1242 | + elif not mutation.should_apply_to(mapping): |
| 1243 | + yield mapping |
| 1244 | + else: |
| 1245 | + yield Mapping( |
| 1246 | + subject=mapping.subject, |
| 1247 | + predicate=mutation.new, |
| 1248 | + object=mapping.object, |
| 1249 | + evidence=[ |
| 1250 | + ReasonedEvidence( |
| 1251 | + justification=KNOWLEDGE_MAPPING, |
| 1252 | + mappings=[mapping], |
| 1253 | + confidence_factor=mutation.confidence, |
| 1254 | + ) |
| 1255 | + ], |
| 1256 | + ) |
0 commit comments