1
- import uuid
2
- from typing import Any , Dict , List , Optional , Tuple
1
+ from typing import Any , List , Optional
3
2
4
3
import structlog
5
4
from presidio_analyzer import AnalyzerEngine
6
5
from presidio_anonymizer import AnonymizerEngine
7
6
8
7
from codegate .db .models import AlertSeverity
9
8
from codegate .pipeline .base import PipelineContext
9
+ from codegate .pipeline .sensitive_data .session_store import SessionStore
10
10
11
11
logger = structlog .get_logger ("codegate.pii.analyzer" )
12
12
13
13
14
- class PiiSessionStore :
15
- """
16
- A class to manage PII (Personally Identifiable Information) session storage.
17
-
18
- Attributes:
19
- session_id (str): The unique identifier for the session. If not provided, a new UUID
20
- is generated. mappings (Dict[str, str]): A dictionary to store mappings between UUID
21
- placeholders and PII.
22
-
23
- Methods:
24
- add_mapping(pii: str) -> str:
25
- Adds a PII string to the session store and returns a UUID placeholder for it.
26
-
27
- get_pii(uuid_placeholder: str) -> str:
28
- Retrieves the PII string associated with the given UUID placeholder. If the placeholder
29
- is not found, returns the placeholder itself.
30
- """
31
-
32
- def __init__ (self , session_id : str = None ):
33
- self .session_id = session_id or str (uuid .uuid4 ())
34
- self .mappings : Dict [str , str ] = {}
35
-
36
- def add_mapping (self , pii : str ) -> str :
37
- uuid_placeholder = f"<{ str (uuid .uuid4 ())} >"
38
- self .mappings [uuid_placeholder ] = pii
39
- return uuid_placeholder
40
-
41
- def get_pii (self , uuid_placeholder : str ) -> str :
42
- return self .mappings .get (uuid_placeholder , uuid_placeholder )
43
-
44
-
45
14
class PiiAnalyzer :
46
15
"""
47
16
PiiAnalyzer class for analyzing and anonymizing text containing PII.
@@ -52,12 +21,12 @@ class PiiAnalyzer:
52
21
Get or create the singleton instance of PiiAnalyzer.
53
22
analyze:
54
23
text (str): The text to analyze for PII.
55
- Tuple[str, List[Dict[str, Any]], PiiSessionStore ]: The anonymized text, a list of
24
+ Tuple[str, List[Dict[str, Any]], SessionStore ]: The anonymized text, a list of
56
25
found PII details, and the session store.
57
26
entities (List[str]): The PII entities to analyze for.
58
27
restore_pii:
59
28
anonymized_text (str): The text with anonymized PII.
60
- session_store (PiiSessionStore ): The PiiSessionStore used for anonymization.
29
+ session_store (SessionStore ): The SessionStore used for anonymization.
61
30
str: The text with original PII restored.
62
31
"""
63
32
@@ -95,13 +64,11 @@ def __init__(self):
95
64
# Create analyzer with custom NLP engine
96
65
self .analyzer = AnalyzerEngine (nlp_engine = nlp_engine )
97
66
self .anonymizer = AnonymizerEngine ()
98
- self .session_store = PiiSessionStore ()
67
+ self .session_store = SessionStore ()
99
68
100
69
PiiAnalyzer ._instance = self
101
70
102
- def analyze (
103
- self , text : str , context : Optional [PipelineContext ] = None
104
- ) -> Tuple [str , List [Dict [str , Any ]], PiiSessionStore ]:
71
+ def analyze (self , text : str , context : Optional [PipelineContext ] = None ) -> List :
105
72
# Prioritize credit card detection first
106
73
entities = [
107
74
"PHONE_NUMBER" ,
@@ -125,81 +92,30 @@ def analyze(
125
92
language = "en" ,
126
93
score_threshold = 0.3 , # Lower threshold to catch more potential matches
127
94
)
95
+ return analyzer_results
128
96
129
- # Track found PII
130
- found_pii = []
131
-
132
- # Only anonymize if PII was found
133
- if analyzer_results :
134
- # Log each found PII instance and anonymize
135
- anonymized_text = text
136
- for result in analyzer_results :
137
- pii_value = text [result .start : result .end ]
138
- uuid_placeholder = self .session_store .add_mapping (pii_value )
139
- pii_info = {
140
- "type" : result .entity_type ,
141
- "value" : pii_value ,
142
- "score" : result .score ,
143
- "start" : result .start ,
144
- "end" : result .end ,
145
- "uuid_placeholder" : uuid_placeholder ,
146
- }
147
- found_pii .append (pii_info )
148
- anonymized_text = anonymized_text .replace (pii_value , uuid_placeholder )
149
-
150
- # Log each PII detection with its UUID mapping
151
- logger .info (
152
- "PII detected and mapped" ,
153
- pii_type = result .entity_type ,
154
- score = f"{ result .score :.2f} " ,
155
- uuid = uuid_placeholder ,
156
- # Don't log the actual PII value for security
157
- value_length = len (pii_value ),
158
- session_id = self .session_store .session_id ,
159
- )
160
-
161
- # Log summary of all PII found in this analysis
162
- if found_pii and context :
163
- # Create notification string for alert
164
- notify_string = (
165
- f"**PII Detected** 🔒\n "
166
- f"- Total PII Found: { len (found_pii )} \n "
167
- f"- Types Found: { ', ' .join (set (p ['type' ] for p in found_pii ))} \n "
168
- )
169
- context .add_alert (
170
- self ._name ,
171
- trigger_string = notify_string ,
172
- severity_category = AlertSeverity .CRITICAL ,
173
- )
174
-
175
- logger .info (
176
- "PII analysis complete" ,
177
- total_pii_found = len (found_pii ),
178
- pii_types = [p ["type" ] for p in found_pii ],
179
- session_id = self .session_store .session_id ,
180
- )
181
-
182
- # Return the anonymized text, PII details, and session store
183
- return anonymized_text , found_pii , self .session_store
184
-
185
- # If no PII found, return original text, empty list, and session store
186
- return text , [], self .session_store
187
-
188
- def restore_pii (self , anonymized_text : str , session_store : PiiSessionStore ) -> str :
97
+ def restore_pii (self , session_id : str , anonymized_text : str ) -> str :
189
98
"""
190
99
Restore the original PII (Personally Identifiable Information) in the given anonymized text.
191
100
192
101
This method replaces placeholders in the anonymized text with their corresponding original
193
- PII values using the mappings stored in the provided PiiSessionStore .
102
+ PII values using the mappings stored in the provided SessionStore .
194
103
195
104
Args:
196
105
anonymized_text (str): The text containing placeholders for PII.
197
- session_store (PiiSessionStore ): The session store containing mappings of placeholders
106
+ session_id (str ): The session id containing mappings of placeholders
198
107
to original PII.
199
108
200
109
Returns:
201
110
str: The text with the original PII restored.
202
111
"""
203
- for uuid_placeholder , original_pii in session_store .mappings .items ():
112
+ session_data = self .session_store .get_by_session_id (session_id )
113
+ if not session_data :
114
+ logger .warning (
115
+ "No active PII session found for given session ID. Unable to restore PII."
116
+ )
117
+ return anonymized_text
118
+
119
+ for uuid_placeholder , original_pii in session_data .items ():
204
120
anonymized_text = anonymized_text .replace (uuid_placeholder , original_pii )
205
121
return anonymized_text
0 commit comments