|
| 1 | +"""Operations on triples.""" |
| 2 | + |
| 3 | +from collections import defaultdict |
| 4 | +from collections.abc import Iterable |
| 5 | +from typing import TypeAlias |
| 6 | + |
| 7 | +from .filters import exclude_triples |
| 8 | +from .model import TripleType |
| 9 | +from .. import Reference |
| 10 | + |
| 11 | +__all__ = [ |
| 12 | + "PrefixPairStratifiedIndex", |
| 13 | + "exclude_prefix_stratified_many_to_many", |
| 14 | + "exclude_triples", |
| 15 | + "get_prefix_pair_stratified_indexes", |
| 16 | + "get_prefix_stratified_many_to_many", |
| 17 | + "get_reference_indexes", |
| 18 | +] |
| 19 | + |
| 20 | +#: A doubly-nested adjacency dictionary whose first |
| 21 | +#: keys are subject/object local unique identifier, |
| 22 | +#: second level is the opposite side local unique |
| 23 | +#: identifier, and values are the list of triples |
| 24 | +AdjacencyDict = dict[str, dict[str, list[TripleType]]] |
| 25 | + |
| 26 | +#: A pair of prefixes |
| 27 | +PrefixPair: TypeAlias = tuple[str, str] |
| 28 | + |
| 29 | +#: A multi-leveled nested dictionary that represents many-to-many mappings. |
| 30 | +#: The first key is subject/object pairs, the second key is either a subject identifier or object identifier, |
| 31 | +#: the last key is the opposite object or subject identifier, and the values are a list of mappings. |
| 32 | +#: |
| 33 | +#: This data structure can be used to index either forward or backwards mappings, |
| 34 | +#: as done inside :func:`get_many_to_many` |
| 35 | +PrefixPairStratifiedIndex: TypeAlias = dict[PrefixPair, AdjacencyDict[TripleType]] |
| 36 | + |
| 37 | + |
| 38 | +def exclude_prefix_stratified_many_to_many( |
| 39 | + triples: Iterable[TripleType], *, progress: bool = False |
| 40 | +) -> Iterable[TripleType]: |
| 41 | + """Exclude prefix pair-stratified many-to-many relationships. |
| 42 | +
|
| 43 | + .. warning:: |
| 44 | +
|
| 45 | + This function does not consider the predicate, so if you only want to make this |
| 46 | + operation based on specific predicate, then pre-group your triples based on |
| 47 | + predicate. |
| 48 | +
|
| 49 | + :param triples: An iterable of triples |
| 50 | + :param progress: Whether to show a progress bar |
| 51 | +
|
| 52 | + :returns: An iterable of triples |
| 53 | +
|
| 54 | + .. warning:: |
| 55 | +
|
| 56 | + This operation fully consumes the iterator since it requires two passes |
| 57 | + """ |
| 58 | + triples = list(triples) |
| 59 | + exclusion = get_prefix_stratified_many_to_many(triples) |
| 60 | + return exclude_triples(triples, exclusion, progress=progress) |
| 61 | + |
| 62 | + |
| 63 | +def get_prefix_stratified_many_to_many(triples: Iterable[TripleType]) -> set[TripleType]: |
| 64 | + """Get many-to-many relationships.""" |
| 65 | + forward, backward = get_prefix_pair_stratified_indexes(triples) |
| 66 | + forward_sliced = get_one_to_many(forward) |
| 67 | + backwards_sliced_flipped = flip_prefix_pair_stratified_index(get_one_to_many(backward)) |
| 68 | + rv: set[TripleType] = set() |
| 69 | + for prefix_pair, forward_adjacency_dict in forward_sliced.items(): |
| 70 | + if backward_adjacency_dict := backwards_sliced_flipped.get(prefix_pair): |
| 71 | + rv.update(_compare(forward_adjacency_dict, backward_adjacency_dict)) |
| 72 | + return rv |
| 73 | + |
| 74 | + |
| 75 | +def get_prefix_pair_stratified_indexes( |
| 76 | + triples: Iterable[TripleType], |
| 77 | +) -> tuple[PrefixPairStratifiedIndex[TripleType], PrefixPairStratifiedIndex[TripleType]]: |
| 78 | + """Get a forward and backwards subject/object index. |
| 79 | +
|
| 80 | + :param triples: An iterable of triples |
| 81 | +
|
| 82 | + :returns: A pair of forward and backwards indexes, where: |
| 83 | +
|
| 84 | + - A forward many-to-many index is a triply-nested dictionary from |
| 85 | + subject/predicate prefix pair to subject identifier to object identifier to |
| 86 | + list of triples. |
| 87 | + - A backward many-to-many index is a triply-nested dictionary from |
| 88 | + subject/predicate prefix pair to object identifier to subject identifier to |
| 89 | + list of triples. |
| 90 | + """ |
| 91 | + # forward index |
| 92 | + f: _DD[TripleType] = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) |
| 93 | + # backward index |
| 94 | + b: _DD[TripleType] = defaultdict(lambda: defaultdict(lambda: defaultdict(list))) |
| 95 | + for t in triples: |
| 96 | + f[t.subject.prefix, t.object.prefix][t.subject.identifier][t.object.identifier].append(t) |
| 97 | + b[t.object.prefix, t.subject.prefix][t.object.identifier][t.subject.identifier].append(t) |
| 98 | + return _downgrade_defaultdict(f), _downgrade_defaultdict(b) |
| 99 | + |
| 100 | + |
| 101 | +_DD = defaultdict[PrefixPair, defaultdict[str, defaultdict[str, list[TripleType]]]] |
| 102 | + |
| 103 | + |
| 104 | +def _downgrade_defaultdict(dd: _DD[TripleType]) -> PrefixPairStratifiedIndex[TripleType]: |
| 105 | + return {k1: {k2: dict(v2) for k2, v2 in v1.items()} for k1, v1 in dd.items()} |
| 106 | + |
| 107 | + |
| 108 | +def get_one_to_many( |
| 109 | + index: PrefixPairStratifiedIndex[TripleType], |
| 110 | +) -> PrefixPairStratifiedIndex[TripleType]: |
| 111 | + """Filter an index to entities in each prefix pair with a one-to-many relationship.""" |
| 112 | + rv = {} |
| 113 | + for pair, inner in index.items(): |
| 114 | + filtered_inner = {k: v for k, v in inner.items() if len(v) > 1} |
| 115 | + if filtered_inner: |
| 116 | + rv[pair] = filtered_inner |
| 117 | + return rv |
| 118 | + |
| 119 | + |
| 120 | +def flip_prefix_pair_stratified_index( |
| 121 | + index: PrefixPairStratifiedIndex[TripleType], |
| 122 | +) -> PrefixPairStratifiedIndex[TripleType]: |
| 123 | + """Flip a one-to-many relationship index to a many-to-one relationship index.""" |
| 124 | + rv = {} |
| 125 | + for (left, right), adjacency_dict in index.items(): |
| 126 | + flipped_adjacency_dict: defaultdict[str, dict[str, list[TripleType]]] = defaultdict(dict) |
| 127 | + for left_id, inner_dict in adjacency_dict.items(): |
| 128 | + for right_id, triples in inner_dict.items(): |
| 129 | + flipped_adjacency_dict[right_id][left_id] = triples |
| 130 | + rv[right, left] = {k: v for k, v in flipped_adjacency_dict.items() if len(v) > 1} |
| 131 | + return rv |
| 132 | + |
| 133 | + |
| 134 | +def _compare( |
| 135 | + left_adjacency_dict: AdjacencyDict[TripleType], right_adjacency_dict: AdjacencyDict[TripleType] |
| 136 | +) -> set[TripleType]: |
| 137 | + rv = set() |
| 138 | + keys = set(left_adjacency_dict.keys()) & set(right_adjacency_dict.keys()) |
| 139 | + for key in keys: |
| 140 | + inner_keys = set(left_adjacency_dict[key]) & set(right_adjacency_dict[key]) |
| 141 | + for inner_key in inner_keys: |
| 142 | + rv.update(left_adjacency_dict[key][inner_key]) |
| 143 | + return rv |
| 144 | + |
| 145 | + |
| 146 | +#: A simple index from reference to references. This can |
| 147 | +#: either be subject to objects, or object to subjects, |
| 148 | +#: depending on the implementation. |
| 149 | +ReferenceIndex = dict[Reference, set[Reference]] |
| 150 | + |
| 151 | + |
| 152 | +def get_reference_indexes(triples: Iterable[TripleType]) -> tuple[ReferenceIndex, ReferenceIndex]: |
| 153 | + """Get simple entity indexes.""" |
| 154 | + forward = defaultdict(set) |
| 155 | + backward = defaultdict(set) |
| 156 | + for triple in triples: |
| 157 | + forward[triple.subject].add(triple.object) |
| 158 | + backward[triple.object].add(triple.subject) |
| 159 | + return dict(forward), dict(backward) |
0 commit comments