|
18 | 18 | OWLObjectIntersectionOf, OWLObjectComplementOf, OWLObjectAllValuesFrom, OWLObjectOneOf, OWLObjectHasValue, \ |
19 | 19 | OWLObjectMinCardinality, OWLObjectMaxCardinality, OWLObjectExactCardinality, OWLObjectCardinalityRestriction, \ |
20 | 20 | OWLDataSomeValuesFrom, OWLDataOneOf, OWLDatatypeRestriction, OWLFacetRestriction, OWLDataHasValue, \ |
21 | | - OWLDataAllValuesFrom, OWLNothing, OWLThing |
| 21 | + OWLDataAllValuesFrom, OWLNothing, OWLThing, OWLDataMinCardinality, OWLDataMaxCardinality, OWLDataExactCardinality |
22 | 22 | from owlapy.class_expression import OWLClass |
23 | 23 | from owlapy.iri import IRI |
24 | 24 | from owlapy.owl_axiom import OWLAxiom, OWLSubClassOfAxiom |
@@ -989,6 +989,183 @@ def _lazy_cache_class(self, c: OWLClass) -> None: |
989 | 989 | temp = self.get_instances_from_owl_class(c) |
990 | 990 | self._cls_to_ind[c] = frozenset(temp) |
991 | 991 |
|
| 992 | + @_find_instances.register |
| 993 | + def _(self, ce: OWLDataMinCardinality): |
| 994 | + return self._get_instances_data_card_restriction(ce) |
| 995 | + |
| 996 | + @_find_instances.register |
| 997 | + def _(self, ce: OWLDataMaxCardinality): |
| 998 | + all_ = frozenset(self._ontology.individuals_in_signature()) |
| 999 | + min_ind = self._get_instances_data_card_restriction( |
| 1000 | + OWLDataMinCardinality(cardinality=ce.get_cardinality() + 1, |
| 1001 | + property=ce.get_property(), |
| 1002 | + filler=ce.get_filler())) |
| 1003 | + return all_ ^ min_ind |
| 1004 | + |
| 1005 | + @_find_instances.register |
| 1006 | + def _(self, ce: OWLDataExactCardinality): |
| 1007 | + return self._get_instances_data_card_restriction(ce) |
| 1008 | + |
| 1009 | + def _get_instances_data_card_restriction(self, ce) -> FrozenSet[OWLNamedIndividual]: |
| 1010 | + """Get instances for OWLDataMinCardinality or OWLDataExactCardinality restrictions.""" |
| 1011 | + pe = ce.get_property() |
| 1012 | + filler = ce.get_filler() |
| 1013 | + assert isinstance(pe, OWLDataProperty) |
| 1014 | + |
| 1015 | + if isinstance(ce, OWLDataMinCardinality): |
| 1016 | + min_count = ce.get_cardinality() |
| 1017 | + max_count = None |
| 1018 | + elif isinstance(ce, OWLDataExactCardinality): |
| 1019 | + min_count = max_count = ce.get_cardinality() |
| 1020 | + else: |
| 1021 | + raise NotImplementedError |
| 1022 | + |
| 1023 | + assert min_count >= 0 |
| 1024 | + assert max_count is None or max_count >= 0 |
| 1025 | + |
| 1026 | + if self._ontology.is_modified and (self.class_cache or self._property_cache): |
| 1027 | + self.reset_and_disable_cache() |
| 1028 | + property_cache = self._property_cache |
| 1029 | + |
| 1030 | + if property_cache: |
| 1031 | + self._lazy_cache_data_prop(pe) |
| 1032 | + dps = self._data_prop[pe] |
| 1033 | + else: |
| 1034 | + subs = self._some_values_subject_index(pe) |
| 1035 | + |
| 1036 | + ind = set() |
| 1037 | + |
| 1038 | + if isinstance(filler, OWLDatatype): |
| 1039 | + if property_cache: |
| 1040 | + for s, literals in dps.items(): |
| 1041 | + count = sum(1 for lit in literals if lit.get_datatype() == filler) |
| 1042 | + if count >= min_count and (max_count is None or count <= max_count): |
| 1043 | + ind.add(s) |
| 1044 | + else: |
| 1045 | + for s in subs: |
| 1046 | + count = sum(1 for lit in self.data_property_values(s, pe) if lit.get_datatype() == filler) |
| 1047 | + if count >= min_count and (max_count is None or count <= max_count): |
| 1048 | + ind.add(s) |
| 1049 | + elif isinstance(filler, OWLDataOneOf): |
| 1050 | + values = set(filler.values()) |
| 1051 | + if property_cache: |
| 1052 | + for s, literals in dps.items(): |
| 1053 | + count = len(literals & values) |
| 1054 | + if count >= min_count and (max_count is None or count <= max_count): |
| 1055 | + ind.add(s) |
| 1056 | + else: |
| 1057 | + for s in subs: |
| 1058 | + count = sum(1 for lit in self.data_property_values(s, pe) if lit in values) |
| 1059 | + if count >= min_count and (max_count is None or count <= max_count): |
| 1060 | + ind.add(s) |
| 1061 | + elif isinstance(filler, OWLDatatypeRestriction): |
| 1062 | + def res_to_callable(res: OWLFacetRestriction): |
| 1063 | + op = res.get_facet().operator |
| 1064 | + v = res.get_facet_value() |
| 1065 | + |
| 1066 | + def inner(lv: OWLLiteral): |
| 1067 | + return op(lv, v) |
| 1068 | + |
| 1069 | + return inner |
| 1070 | + |
| 1071 | + apply = FunctionType.__call__ |
| 1072 | + facet_restrictions = tuple(map(res_to_callable, filler.get_facet_restrictions())) |
| 1073 | + |
| 1074 | + def include(lv: OWLLiteral): |
| 1075 | + return lv.get_datatype() == filler.get_datatype() and \ |
| 1076 | + all(map(apply, facet_restrictions, repeat(lv))) |
| 1077 | + |
| 1078 | + if property_cache: |
| 1079 | + for s, literals in dps.items(): |
| 1080 | + count = sum(1 for lit in literals if include(lit)) |
| 1081 | + if count >= min_count and (max_count is None or count <= max_count): |
| 1082 | + ind.add(s) |
| 1083 | + else: |
| 1084 | + for s in subs: |
| 1085 | + count = sum(1 for lit in self.data_property_values(s, pe) if include(lit)) |
| 1086 | + if count >= min_count and (max_count is None or count <= max_count): |
| 1087 | + ind.add(s) |
| 1088 | + elif isinstance(filler, OWLDataComplementOf): |
| 1089 | + # Count values NOT matching the inner data range |
| 1090 | + inner_filler = filler.get_data_range() |
| 1091 | + # some_inner = OWLDataSomeValuesFrom(property=pe, filler=inner_filler) |
| 1092 | + # We need to count per-individual, so we iterate |
| 1093 | + if property_cache: |
| 1094 | + for s, literals in dps.items(): |
| 1095 | + # Count literals that match the complement (i.e., do NOT match the inner filler) |
| 1096 | + match_inner = self._count_data_filler_matches(literals, inner_filler) |
| 1097 | + count = len(literals) - match_inner |
| 1098 | + if count >= min_count and (max_count is None or count <= max_count): |
| 1099 | + ind.add(s) |
| 1100 | + else: |
| 1101 | + for s in subs: |
| 1102 | + all_lits = set(self.data_property_values(s, pe)) |
| 1103 | + match_inner = self._count_data_filler_matches(all_lits, inner_filler) |
| 1104 | + count = len(all_lits) - match_inner |
| 1105 | + if count >= min_count and (max_count is None or count <= max_count): |
| 1106 | + ind.add(s) |
| 1107 | + elif isinstance(filler, OWLDataUnionOf): |
| 1108 | + if property_cache: |
| 1109 | + for s, literals in dps.items(): |
| 1110 | + count = sum(1 for lit in literals |
| 1111 | + if any(self._literal_matches_data_range(lit, op) |
| 1112 | + for op in filler.operands())) |
| 1113 | + if count >= min_count and (max_count is None or count <= max_count): |
| 1114 | + ind.add(s) |
| 1115 | + else: |
| 1116 | + for s in subs: |
| 1117 | + count = sum(1 for lit in self.data_property_values(s, pe) |
| 1118 | + if any(self._literal_matches_data_range(lit, op) |
| 1119 | + for op in filler.operands())) |
| 1120 | + if count >= min_count and (max_count is None or count <= max_count): |
| 1121 | + ind.add(s) |
| 1122 | + elif isinstance(filler, OWLDataIntersectionOf): |
| 1123 | + if property_cache: |
| 1124 | + for s, literals in dps.items(): |
| 1125 | + count = sum(1 for lit in literals |
| 1126 | + if all(self._literal_matches_data_range(lit, op) |
| 1127 | + for op in filler.operands())) |
| 1128 | + if count >= min_count and (max_count is None or count <= max_count): |
| 1129 | + ind.add(s) |
| 1130 | + else: |
| 1131 | + for s in subs: |
| 1132 | + count = sum(1 for lit in self.data_property_values(s, pe) |
| 1133 | + if all(self._literal_matches_data_range(lit, op) |
| 1134 | + for op in filler.operands())) |
| 1135 | + if count >= min_count and (max_count is None or count <= max_count): |
| 1136 | + ind.add(s) |
| 1137 | + else: |
| 1138 | + raise ValueError |
| 1139 | + |
| 1140 | + return frozenset(ind) |
| 1141 | + |
| 1142 | + def _literal_matches_data_range(self, lit: OWLLiteral, dr) -> bool: |
| 1143 | + """Check if a single literal matches a data range.""" |
| 1144 | + if isinstance(dr, OWLDatatype): |
| 1145 | + return lit.get_datatype() == dr |
| 1146 | + elif isinstance(dr, OWLDataOneOf): |
| 1147 | + return lit in set(dr.values()) |
| 1148 | + elif isinstance(dr, OWLDatatypeRestriction): |
| 1149 | + if lit.get_datatype() != dr.get_datatype(): |
| 1150 | + return False |
| 1151 | + for res in dr.get_facet_restrictions(): |
| 1152 | + op = res.get_facet().operator |
| 1153 | + if not op(lit, res.get_facet_value()): |
| 1154 | + return False |
| 1155 | + return True |
| 1156 | + elif isinstance(dr, OWLDataComplementOf): |
| 1157 | + return not self._literal_matches_data_range(lit, dr.get_data_range()) |
| 1158 | + elif isinstance(dr, OWLDataUnionOf): |
| 1159 | + return any(self._literal_matches_data_range(lit, op) for op in dr.operands()) |
| 1160 | + elif isinstance(dr, OWLDataIntersectionOf): |
| 1161 | + return all(self._literal_matches_data_range(lit, op) for op in dr.operands()) |
| 1162 | + else: |
| 1163 | + raise ValueError(f"Unsupported data range type: {type(dr)}") |
| 1164 | + |
| 1165 | + def _count_data_filler_matches(self, literals: Set[OWLLiteral], dr) -> int: |
| 1166 | + """Count how many literals in the set match the given data range.""" |
| 1167 | + return sum(1 for lit in literals if self._literal_matches_data_range(lit, dr)) |
| 1168 | + |
992 | 1169 | def get_instances_from_owl_class(self, c: OWLClass): |
993 | 1170 | if c.is_owl_thing(): |
994 | 1171 | yield from self._ontology.individuals_in_signature() |
|
0 commit comments