1
+ import re
1
2
from abc import ABC , abstractmethod
2
3
from dataclasses import dataclass , field
3
4
from typing import Annotated , Literal
4
5
5
- from pydantic import Field , StringConstraints , TypeAdapter , computed_field
6
+ from pydantic import Field , StringConstraints , TypeAdapter , computed_field , field_validator
6
7
7
8
from .base_models import BaseModel
8
9
from .node_models import EntryNode
@@ -115,6 +116,130 @@ def build_cypher_query(self, param_name: str) -> CypherQuery:
115
116
)
116
117
117
118
119
+ class PropertyFilterSearchTerm (AbstractFilterSearchTerm ):
120
+ filter_type : Literal ["property" ]
121
+ filter_value : str
122
+
123
+ @field_validator ("filter_value" )
124
+ @classmethod
125
+ def validate_filter_value (cls , filter_value : str ):
126
+ """
127
+ The filter value is in the format `not:inherited:property_name:property_value`
128
+ where `not:` and `inherited:` and `:property_value` are optional.
129
+ Note that a property_name is always of format `name:lc`.
130
+ """
131
+ parsed_value = filter_value
132
+ if parsed_value .startswith ("not:" ):
133
+ parsed_value = parsed_value [4 :]
134
+ if parsed_value .startswith ("inherited:" ):
135
+ parsed_value = parsed_value [10 :]
136
+
137
+ assert ":" in parsed_value , "A property_name is mandatory and must contain a colon"
138
+
139
+ terms = parsed_value .split (":" )
140
+ property_name = terms [0 ] + ":" + terms [1 ]
141
+
142
+ if not re .match (r"^[^:\\]+:[a-z]{2}$" , property_name ):
143
+ raise ValueError ("Invalid property_name" )
144
+
145
+ return filter_value
146
+
147
+ @computed_field
148
+ def negated (self ) -> bool :
149
+ return self .filter_value .startswith ("not:" )
150
+
151
+ @computed_field
152
+ def inherited (self ) -> bool :
153
+ filter_value = self .get_parsed_filter_value (self .negated )
154
+ return filter_value .startswith ("inherited:" )
155
+
156
+ @computed_field
157
+ def property_name (self ) -> str :
158
+ filter_value = self .get_parsed_filter_value (self .negated , self .inherited )
159
+ terms = filter_value .split (":" )
160
+ return terms [0 ] + "_" + terms [1 ]
161
+
162
+ @computed_field
163
+ def property_value (self ) -> str | None :
164
+ filter_value = self .get_parsed_filter_value (self .negated , self .inherited )
165
+ terms = filter_value .split (":" )
166
+ return ":" .join (terms [2 :]) if len (terms ) > 2 else None
167
+
168
+ def get_parsed_filter_value (self , negated = False , inherited = False ):
169
+ filter_value = self .filter_value
170
+ if negated :
171
+ filter_value = filter_value [4 :]
172
+ if inherited :
173
+ filter_value = filter_value [10 :]
174
+ return filter_value
175
+
176
+ def build_cypher_query (self , param_name : str ) -> CypherQuery :
177
+ branches = {
178
+ "negated" : self .negated ,
179
+ "inherited" : self .inherited ,
180
+ "with_value" : self .property_value is not None ,
181
+ }
182
+ match branches :
183
+ case {"negated" : False , "inherited" : False , "with_value" : False }:
184
+ return CypherQuery (f"n.prop_{ self .property_name } IS NOT NULL" )
185
+ case {"negated" : True , "inherited" : False , "with_value" : False }:
186
+ return CypherQuery (f"n.prop_{ self .property_name } IS NULL" )
187
+ case {"negated" : False , "inherited" : False , "with_value" : True }:
188
+ return CypherQuery (
189
+ f"n.prop_{ self .property_name } = ${ param_name } " ,
190
+ {param_name : self .property_value },
191
+ )
192
+ case {"negated" : True , "inherited" : False , "with_value" : True }:
193
+ return CypherQuery (
194
+ f"n.prop_{ self .property_name } <> ${ param_name } " ,
195
+ {param_name : self .property_value },
196
+ )
197
+ case {"negated" : False , "inherited" : True , "with_value" : False }:
198
+ return CypherQuery (
199
+ f"""(n.prop_{ self .property_name } IS NOT NULL OR
200
+ any(
201
+ ancestor IN [(n)<-[:is_child_of*]-(p:ENTRY) | p]
202
+ WHERE ancestor.prop_{ self .property_name } IS NOT NULL)
203
+ )""" ,
204
+ )
205
+ case {"negated" : True , "inherited" : True , "with_value" : False }:
206
+ return CypherQuery (
207
+ f"""(n.prop_{ self .property_name } IS NULL AND
208
+ all(
209
+ ancestor IN [(n)<-[:is_child_of*]-(p:ENTRY) | p]
210
+ WHERE ancestor.prop_{ self .property_name } IS NULL)
211
+ )""" ,
212
+ )
213
+ case {"negated" : False , "inherited" : True , "with_value" : True }:
214
+ return CypherQuery (
215
+ f"""
216
+ [
217
+ property IN
218
+ [n.prop_{ self .property_name } ] +
219
+ [(n)<-[:is_child_of*]-(p:ENTRY) | p.prop_{ self .property_name } ]
220
+ WHERE property IS NOT NULL
221
+ ][0]
222
+ = ${ param_name } """ ,
223
+ {param_name : self .property_value },
224
+ )
225
+ case {"negated" : True , "inherited" : True , "with_value" : True }:
226
+ return CypherQuery (
227
+ f"""((n.prop_{ self .property_name } IS NULL AND
228
+ all(
229
+ ancestor IN [(n)<-[:is_child_of*]-(p:ENTRY) | p]
230
+ WHERE ancestor.prop_{ self .property_name } IS NULL)
231
+ ) OR
232
+ [
233
+ property IN
234
+ [n.prop_{ self .property_name } ] +
235
+ [(n)<-[:is_child_of*]-(p:ENTRY) | p.prop_{ self .property_name } ]
236
+ WHERE property IS NOT NULL
237
+ ][0]
238
+ <> ${ param_name } )""" ,
239
+ {param_name : self .property_value },
240
+ )
241
+
242
+
118
243
FilterSearchTerm = Annotated [
119
244
(
120
245
IsFilterSearchTerm
@@ -123,6 +248,7 @@ def build_cypher_query(self, param_name: str) -> CypherQuery:
123
248
| ChildFilterSearchTerm
124
249
| AncestorFilterSearchTerm
125
250
| DescendantFilterSearchTerm
251
+ | PropertyFilterSearchTerm
126
252
),
127
253
Field (discriminator = "filter_type" ),
128
254
]
0 commit comments