-
Notifications
You must be signed in to change notification settings - Fork 1k
Expand file tree
/
Copy pathlemma_context_aware_enhancer.py
More file actions
413 lines (363 loc) · 16.7 KB
/
lemma_context_aware_enhancer.py
File metadata and controls
413 lines (363 loc) · 16.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
import copy
import logging
from typing import List, Optional
from presidio_analyzer import EntityRecognizer, RecognizerResult
from presidio_analyzer.context_aware_enhancers import ContextAwareEnhancer
from presidio_analyzer.nlp_engine import NlpArtifacts
logger = logging.getLogger("presidio-analyzer")
class LemmaContextAwareEnhancer(ContextAwareEnhancer):
"""
A class representing a lemma based context aware enhancer logic.
Context words might enhance or reduce confidence score of a recognized entity:
- Positive context: boosts confidence (e.g., "social" for SSN)
- Negative context: reduces confidence (e.g., "test" for SSN)
LemmaContextAwareEnhancer is an implementation of Lemma based context aware logic,
it compares spacy lemmas of each word in context of the matched entity to given
context and the recognizer context words,
if matched it enhance the recognized entity confidence score by a given factor.
:param context_similarity_factor: How much to enhance confidence of match entity
:param min_score_with_context_similarity: Minimum confidence score
:param context_prefix_count: how many words before the entity to match context
:param context_suffix_count: how many words after the entity to match context
:param context_matching_mode: Matching mode for context words. Options:
- "substring" (default): Match context words as substrings
(e.g., 'card' matches 'creditcard', 'lic' matches 'duplicate').
Maintains backward compatibility.
- "whole_word": Match context words only as whole words
(e.g., 'lic' matches 'lic' but not 'duplicate').
Prevents false positives.
:param negative_context_penalty: How much to reduce confidence when negative
context words are found. Default 0.3. Applied after positive context boost.
"""
def __init__(
self,
context_similarity_factor: float = 0.35,
min_score_with_context_similarity: float = 0.4,
context_prefix_count: int = 5,
context_suffix_count: int = 0,
context_matching_mode: str = "substring",
negative_context_penalty: float = 0.3,
):
super().__init__(
context_similarity_factor=context_similarity_factor,
min_score_with_context_similarity=min_score_with_context_similarity,
context_prefix_count=context_prefix_count,
context_suffix_count=context_suffix_count,
negative_context_penalty=negative_context_penalty,
)
if context_matching_mode not in ["whole_word", "substring"]:
raise ValueError(
f"context_matching_mode must be one of: 'whole_word', 'substring'. "
f"Got: {context_matching_mode}"
)
self.context_matching_mode = context_matching_mode
def enhance_using_context(
self,
text: str,
raw_results: List[RecognizerResult],
nlp_artifacts: NlpArtifacts,
recognizers: List[EntityRecognizer],
context: Optional[List[str]] = None,
negative_context: Optional[List[str]] = None,
) -> List[RecognizerResult]:
"""
Update results in case the lemmas of surrounding words or input context
words are identical to the context words.
Using the surrounding words of the actual word matches, look
for specific strings that if found contribute to the score
of the result, improving the confidence that the match is
indeed of that PII entity type
:param text: The actual text that was analyzed
:param raw_results: Recognizer results which didn't take
context into consideration
:param nlp_artifacts: The nlp artifacts contains elements
such as lemmatized tokens for better
accuracy of the context enhancement process
:param recognizers: the list of recognizers
:param context: list of context words
:param negative_context: list of negative context words to reduce confidence
""" # noqa: D205,D400
# create a deep copy of the results object, so we can manipulate it
results = copy.deepcopy(raw_results)
# create recognizer context dictionary
recognizers_dict = {recognizer.id: recognizer for recognizer in recognizers}
# Create empty list in None or lowercase all context words in the list
if not context:
context = []
else:
context = [word.lower() for word in context]
# Create empty list in None or lowercase all negative context words in the list
if not negative_context:
negative_context = []
else:
negative_context = [word.lower() for word in negative_context]
# Sanity
if nlp_artifacts is None:
logger.warning("NLP artifacts were not provided")
return results
for result in results:
recognizer = None
# get recognizer matching the result, if found.
if (
result.recognition_metadata
and RecognizerResult.RECOGNIZER_IDENTIFIER_KEY
in result.recognition_metadata.keys()
):
recognizer = recognizers_dict.get(
result.recognition_metadata[
RecognizerResult.RECOGNIZER_IDENTIFIER_KEY
]
)
if not recognizer:
logger.debug(
"Recognizer name not found as part of the "
"recognition_metadata dict in the RecognizerResult. "
)
continue
# skip recognizer result if the recognizer doesn't support
# context enhancement (either positive or negative)
if not (recognizer.context or recognizer.negative_context):
logger.debug(
"recognizer '%s' does not support context enhancement",
recognizer.name,
)
continue
# extract lemmatized context from the surrounding of the match
word = text[result.start : result.end]
surrounding_words = self._extract_surrounding_words(
nlp_artifacts=nlp_artifacts, word=word, start=result.start
)
# combine other sources of context with surrounding words
surrounding_words.extend(context)
# Check if result was already boosted by recognizer to avoid double boost
already_boosted = result.recognition_metadata.get(
RecognizerResult.IS_SCORE_ENHANCED_BY_CONTEXT_KEY
)
# Apply positive context only if not already boosted
if not already_boosted:
supportive_context_word = self._find_supportive_word_in_context(
surrounding_words, recognizer.context, self.context_matching_mode
)
if supportive_context_word != "":
result.score += self.context_similarity_factor
result.score = max(
result.score, self.min_score_with_context_similarity
)
result.score = min(result.score, ContextAwareEnhancer.MAX_SCORE)
# Update the explainability object with context information
# helped to improve the score
result.analysis_explanation.set_supportive_context_word(
supportive_context_word
)
result.analysis_explanation.set_improved_score(result.score)
# Apply negative context penalty if recognizer has negative_context defined
# or if negative_context is provided at runtime
# This is independent of positive boost to always catch negative context
effective_negative_context = []
if recognizer.negative_context:
effective_negative_context.extend(recognizer.negative_context)
if negative_context:
effective_negative_context.extend(negative_context)
if effective_negative_context:
negative_context_word = self._find_supportive_word_in_context(
surrounding_words,
effective_negative_context,
self.context_matching_mode,
)
if negative_context_word != "":
result.score -= self.negative_context_penalty
result.score = max(result.score, ContextAwareEnhancer.MIN_SCORE)
logger.debug(
"Applied negative context penalty for word '%s'",
negative_context_word,
)
# Update explanation to reflect the final score
# after negative penalty
result.analysis_explanation.set_improved_score(result.score)
return results
@staticmethod
def _find_supportive_word_in_context(
context_list: List[str],
recognizer_context_list: List[str],
matching_mode: str = "substring",
) -> str:
"""
Find words in the text which are relevant for context evaluation.
A word is considered a supportive context word based on the matching mode:
- "substring" (default): Substring match
(e.g., 'card' matches 'creditcard', 'lic' matches 'duplicate')
- "whole_word": Exact whole-word match (case-insensitive)
(e.g., 'lic' matches 'lic' but not 'duplicate')
:param context_list words before and after the matched entity within
a specified window size
:param recognizer_context_list a list of words considered as
context keywords manually specified by the recognizer's author
:param matching_mode: Matching mode ('whole_word' or 'substring').
Defaults to 'substring'.
"""
word = ""
# If the context list is empty, no need to continue
if context_list is None or recognizer_context_list is None:
return word
for predefined_context_word in recognizer_context_list:
result = False
if matching_mode == "substring":
# Substring match (case-insensitive) - default behavior
# for backward compatibility
result = next(
(
True
for keyword in context_list
if predefined_context_word.lower() in keyword.lower()
),
False,
)
elif matching_mode == "whole_word":
# Exact whole-word match (case-insensitive)
result = next(
(
True
for keyword in context_list
if predefined_context_word.lower() == keyword.lower()
),
False,
)
if result:
logger.debug("Found context keyword '%s'", predefined_context_word)
word = predefined_context_word
break
return word
def _extract_surrounding_words(
self, nlp_artifacts: NlpArtifacts, word: str, start: int
) -> List[str]:
"""Extract words surrounding another given word.
The text from which the context is extracted is given in the nlp
doc.
:param nlp_artifacts: An abstraction layer which holds different
items which are the result of a NLP pipeline
execution on a given text
:param word: The word to look for context around
:param start: The start index of the word in the original text
"""
if not nlp_artifacts.tokens:
logger.info("Skipping context extraction due to lack of NLP artifacts")
# if there are no nlp artifacts, this is ok, we can
# extract context and we return a valid, yet empty
# context
return [""]
# Get the already prepared words in the given text, in their
# LEMMATIZED version
lemmatized_keywords = nlp_artifacts.keywords
# since the list of tokens is not necessarily aligned
# with the actual index of the match, we look for the
# token index which corresponds to the match
token_index = self._find_index_of_match_token(
word, start, nlp_artifacts.tokens, nlp_artifacts.tokens_indices
)
# index i belongs to the PII entity, take the preceding n words
# and the successing m words into a context list
backward_context = self._add_n_words_backward(
token_index,
self.context_prefix_count,
nlp_artifacts.lemmas,
lemmatized_keywords,
)
forward_context = self._add_n_words_forward(
token_index,
self.context_suffix_count,
nlp_artifacts.lemmas,
lemmatized_keywords,
)
context_list = []
context_list.extend(backward_context)
context_list.extend(forward_context)
context_list = list(set(context_list))
logger.debug("Context list is: %s", " ".join(context_list))
return context_list
@staticmethod
def _find_index_of_match_token(
word: str,
start: int,
tokens,
tokens_indices: List[int],
) -> int:
found = False
# we use the known start index of the original word to find the actual
# token at that index, we are not checking for equivalence since the
# token might be just a substring of that word (e.g. for phone number
# 555-124564 the first token might be just '555' or for a match like '
# rocket' the actual token will just be 'rocket' hence the misalignment
# of indices)
# Note: we are iterating over the original tokens (not the lemmatized)
i = -1
for i, token in enumerate(tokens, 0):
# Either we found a token with the exact location, or
# we take a token which its characters indices covers
# the index we are looking for.
if (tokens_indices[i] == start) or (start < tokens_indices[i] + len(token)):
# found the interesting token, the one that around it
# we take n words, we save the matching lemma
found = True
break
if not found:
raise ValueError(
"Did not find word '" + word + "' "
"in the list of tokens although it "
"is expected to be found"
)
return i
@staticmethod
def _add_n_words(
index: int,
n_words: int,
lemmas: List[str],
lemmatized_filtered_keywords: List[str],
is_backward: bool,
) -> List[str]:
"""
Prepare a string of context words.
Return a list of words which surrounds a lemma at a given index.
The words will be collected only if exist in the filtered array
:param index: index of the lemma that its surrounding words we want
:param n_words: number of words to take
:param lemmas: array of lemmas
:param lemmatized_filtered_keywords: the array of filtered
lemmas from the original sentence,
:param is_backward: if true take the preceeding words, if false,
take the successing words
"""
i = index
context_words = []
# The entity itself is no interest to us...however we want to
# consider it anyway for cases were it is attached with no spaces
# to an interesting context word, so we allow it and add 1 to
# the number of collected words
# collect at most n words (in lower case)
remaining = n_words + 1
while 0 <= i < len(lemmas) and remaining > 0:
lower_lemma = lemmas[i].lower()
if lower_lemma in lemmatized_filtered_keywords:
context_words.append(lower_lemma)
remaining -= 1
i = i - 1 if is_backward else i + 1
return context_words
def _add_n_words_forward(
self,
index: int,
n_words: int,
lemmas: List[str],
lemmatized_filtered_keywords: List[str],
) -> List[str]:
return self._add_n_words(
index, n_words, lemmas, lemmatized_filtered_keywords, False
)
def _add_n_words_backward(
self,
index: int,
n_words: int,
lemmas: List[str],
lemmatized_filtered_keywords: List[str],
) -> List[str]:
return self._add_n_words(
index, n_words, lemmas, lemmatized_filtered_keywords, True
)