-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmcp_pii_tools.py
More file actions
1193 lines (1027 loc) · 48 KB
/
mcp_pii_tools.py
File metadata and controls
1193 lines (1027 loc) · 48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""
MCP (Model Context Protocol) Tools for PII Detection and Processing
PII 탐지 및 처리 기능을 MCP Tool로 제공
"""
import os
import re
import sys
import time
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
# from dotenv import load_dotenv # Claude Desktop에서 환경변수를 직접 설정하므로 불필요
import langextract as lx
from pii_crypto import PIICrypto
from openai import OpenAI
from vllm_provider import VLLMProvider, ModelProviderFactory, get_provider_config
from mcp.server.fastmcp import FastMCP
import logging
# 로깅 설정
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
mcp = FastMCP("hello-mcp")
# 전역 PII 탐지기 인스턴스 (서버 시작 시 초기화)
_global_detector = None
def get_detector():
"""전역 PII 탐지기 인스턴스를 반환합니다."""
global _global_detector
if _global_detector is None:
_global_detector = MCPPIIDetector()
return _global_detector
# 환경 변수 로드 (Claude Desktop에서 직접 설정됨)
# load_dotenv()
@dataclass
class PIIItem:
"""PII 항목을 나타내는 데이터 클래스"""
type: str # PII 유형 (이름, 전화번호, 이메일 등)
value: str # 추출된 값
confidence: float # 신뢰도 (0.0-1.0)
start_pos: int # 텍스트 내 시작 위치
end_pos: int # 텍스트 내 끝 위치
@dataclass
class ProcessedPII:
"""처리된 PII 정보"""
original_text: str # 원본 텍스트
encrypted_text: str # 암호화된 텍스트 (검색용)
anonymized_text: str # 완전 익명화된 텍스트 (표시용)
pii_items: List[PIIItem] # 탐지된 PII 항목들
encrypted_items: Dict[str, str] # 암호화된 PII 매핑
class MCPPIIDetector:
"""MCP용 PII 탐지기 - langextract 기반"""
def __init__(self):
"""PII 탐지기 초기화"""
# Provider 설정 로드
self.provider_config = get_provider_config()
self.provider_type = self.provider_config["provider_type"]
self.model_id = self.provider_config["model_id"]
self.api_key = self.provider_config["api_key"]
# MCP 서버 시작 시 Provider 정보를 stderr로 출력
print(f"🚀 MCP PII Tools 시작 - Provider: {self.provider_type.upper()}, Model: {self.model_id}", file=sys.stderr)
if not self.api_key:
raise ValueError(f"{self.provider_type.upper()} API 키가 설정되지 않았습니다. 환경변수를 확인하세요.")
# Provider 인스턴스 생성
self.provider = ModelProviderFactory.create_provider(
provider_type=self.provider_type,
model_id=self.model_id,
api_key=self.api_key
)
logger.info(f"PII 탐지기 초기화 완료 - Provider: {self.provider_type}, Model: {self.model_id}")
# langextract 예제 데이터 설정
self.examples = [
lx.data.ExampleData(
text="오늘은 Rachel Lim이(가) 방문했다. 이메일 주소는 rachel.lim@gmail.com이다. 여권번호는 M31143886이다.",
extractions=[
lx.data.Extraction(extraction_class="name", extraction_text="Rachel Lim"),
lx.data.Extraction(extraction_class="email", extraction_text="rachel.lim@gmail.com"),
lx.data.Extraction(extraction_class="passport_number", extraction_text="M31143886"),
],
),
lx.data.ExampleData(
text="김철수 씨가 테이블에 앉았다. 연락처는 010-1234-5678이고 이메일은 kim@example.com입니다.",
extractions=[
lx.data.Extraction(extraction_class="name", extraction_text="김철수"),
lx.data.Extraction(extraction_class="phone", extraction_text="010-1234-5678"),
lx.data.Extraction(extraction_class="email", extraction_text="kim@example.com"),
],
),
lx.data.ExampleData(
text="박지현 님이 딜러와 대화했다. 주소는 서울시 강남구 테헤란로 123입니다.",
extractions=[
lx.data.Extraction(extraction_class="name", extraction_text="박지현"),
lx.data.Extraction(extraction_class="address", extraction_text="서울시 강남구 테헤란로 123"),
],
),
lx.data.ExampleData(
text="이름은 김창엽이고 주소는 분당구야. 전화번호는 010-9876-5432야.",
extractions=[
lx.data.Extraction(extraction_class="name", extraction_text="김창엽"),
lx.data.Extraction(extraction_class="address", extraction_text="분당구"),
lx.data.Extraction(extraction_class="phone", extraction_text="010-9876-5432"),
],
),
lx.data.ExampleData(
text="사는 곳은 강남구이고 이름은 박민수입니다. 연락처는 010-5555-1234입니다.",
extractions=[
lx.data.Extraction(extraction_class="address", extraction_text="강남구"),
lx.data.Extraction(extraction_class="name", extraction_text="박민수"),
lx.data.Extraction(extraction_class="phone", extraction_text="010-5555-1234"),
],
),
lx.data.ExampleData(
text="이름은 김창엽이고 주소는 경기도 성남시 분당구야. 전화번호는 010-9876-5432야 여권번호는 M3234234 으로 확인되.",
extractions=[
lx.data.Extraction(extraction_class="name", extraction_text="김창엽"),
lx.data.Extraction(extraction_class="address", extraction_text="경기도 성남시 분당구"),
lx.data.Extraction(extraction_class="phone", extraction_text="010-9876-5432"),
lx.data.Extraction(extraction_class="passport_number", extraction_text="M3234234"),
],
),
]
self.prompt = """주어진 텍스트에서 PII(개인정보)를 추출하세요. 다음 항목들을 찾아주세요:
- 이름: 사람의 이름 (김철수, 박지현, Rachel Lim 등)
- 이메일: 이메일 주소 (example@email.com 등)
- 전화번호: 전화번호 (010-1234-5678, 02-123-4567 등)
- 여권번호: 여권 번호 (M31143886, M3234234 등)
- 주소: 주소 정보 (서울시 강남구, 분당구, 강남구, 서초구, 경기도 성남시 분당구 등)
중요: 주소의 경우 다음을 모두 주소로 인식해주세요:
- "분당구", "강남구", "서초구" 같은 지역명
- "경기도 성남시 분당구" 같은 구체적인 주소
- "주소는 분당구야"에서 "분당구"는 주소입니다.
- "주소는 경기도 성남시 분당구야"에서 "경기도 성남시 분당구"는 주소입니다.
응답은 반드시 ```json 코드 블록으로 감싸서 반환하세요."""
print("✅ MCP PII Detector 초기화 완료")
def detect_pii(self, text: str) -> Dict[str, Any]:
"""
텍스트에서 PII를 탐지 (MCP Tool용)
Args:
text (str): 분석할 텍스트
Returns:
Dict[str, Any]: MCP Tool 응답 형식
"""
try:
start_time = time.time()
# Provider에 따른 langextract 호출
if self.provider_type == "vllm":
# vLLM Provider 사용
result = lx.extract(
text_or_documents=text,
prompt_description=self.prompt,
examples=self.examples,
model=self.provider, # 커스텀 Provider 인스턴스 사용
use_schema_constraints=False
)
else:
# OpenAI Provider 사용 (기본)
os.environ["OPENAI_BASE_URL"] = "https://api.openai.com/v1"
result = lx.extract(
text_or_documents=text,
prompt_description=self.prompt,
examples=self.examples,
model_id=self.model_id,
api_key=self.api_key,
fence_output=True
)
# 결과를 PIIItem 리스트로 변환
pii_items = []
logger.info(f"탐지된 extraction 수: {len(result.extractions)}")
for i, extraction in enumerate(result.extractions):
logger.info(f"Extraction {i+1}: class='{extraction.extraction_class}', text='{extraction.extraction_text}'")
# char_interval이 없으면 텍스트에서 직접 위치 찾기
start_pos = 0
end_pos = 0
if extraction.char_interval:
start_pos = extraction.char_interval.start_pos
end_pos = extraction.char_interval.end_pos
logger.info(f" char_interval 사용: {start_pos}-{end_pos}")
else:
# 텍스트에서 직접 위치 찾기 (대소문자 구분 없이)
search_text = extraction.extraction_text
start_pos = text.find(search_text)
# 대소문자 구분 없이 찾기
if start_pos == -1:
start_pos = text.lower().find(search_text.lower())
if start_pos != -1:
end_pos = start_pos + len(search_text)
logger.info(f" 텍스트에서 직접 찾음: {start_pos}-{end_pos}")
# 실제 찾은 텍스트와 원본이 일치하는지 확인
actual_found = text[start_pos:end_pos]
if actual_found != search_text:
logger.warning(f" 대소문자 차이: 찾은='{actual_found}', 원본='{search_text}'")
else:
start_pos = -1
end_pos = -1
logger.warning(f" 텍스트에서 찾을 수 없음: '{extraction.extraction_text}'")
mapped_type = self._map_extraction_class(extraction.extraction_class)
logger.info(f" 매핑된 타입: '{extraction.extraction_class}' -> '{mapped_type}'")
pii_items.append(PIIItem(
type=mapped_type,
value=extraction.extraction_text,
confidence=0.9, # langextract는 confidence를 제공하지 않으므로 기본값
start_pos=start_pos,
end_pos=end_pos
))
processing_time = time.time() - start_time
return {
"success": True,
"pii_items": [asdict(item) for item in pii_items],
"count": len(pii_items),
"processing_time": processing_time,
"summary": self._get_pii_summary(pii_items)
}
except Exception as e:
return {
"success": False,
"error": str(e),
"pii_items": [],
"count": 0,
"processing_time": 0,
"summary": {}
}
def anonymize_text(self, text: str, pii_items: List[PIIItem]) -> str:
"""텍스트에서 PII를 익명화 처리"""
if not pii_items:
return text
logger.info(f"익명화 시작: 원본 텍스트 길이={len(text)}")
logger.info(f"익명화할 PII 항목 수: {len(pii_items)}")
# PII 항목들을 위치별로 정렬하고 중복 제거
valid_items = []
for item in pii_items:
if item.start_pos != -1 and item.end_pos != -1 and item.start_pos < len(text):
# 실제 텍스트에서 해당 위치의 내용이 일치하는지 확인
actual_text = text[item.start_pos:item.end_pos]
if actual_text == item.value:
valid_items.append(item)
logger.info(f"유효한 PII: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}")
else:
# 주소의 경우 위치 불일치를 무시하고 유효한 것으로 처리
if item.type == "주소":
valid_items.append(item)
logger.info(f"주소 위치 불일치 무시: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}")
else:
logger.warning(f"위치 불일치: 예상='{item.value}', 실제='{actual_text}' at {item.start_pos}-{item.end_pos}")
else:
# 주소의 경우 위치가 유효하지 않아도 처리
if item.type == "주소":
valid_items.append(item)
logger.info(f"주소 위치 무효 무시: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}")
else:
logger.warning(f"유효하지 않은 위치: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}")
# 뒤에서부터 치환 (인덱스 변화 방지)
sorted_items = sorted(valid_items, key=lambda x: x.start_pos, reverse=True)
anonymized_text = text
# 주소의 경우 특별 처리: langextract 위치 정보 무시하고 직접 찾기
address_items = [item for item in sorted_items if item.type == "주소"]
other_items = [item for item in sorted_items if item.type != "주소"]
# 주소 먼저 처리
for item in address_items:
logger.info(f"주소 특별 처리: '{item.value}' ({item.type})")
# PII 유형에 따른 익명화
anonymized_value = "[주소]"
# 직접 텍스트에서 찾기
if item.value in anonymized_text:
anonymized_text = anonymized_text.replace(item.value, anonymized_value, 1)
logger.info(f"주소 직접 매치 익명화 완료: '{item.value}' -> '{anonymized_value}'")
else:
# 부분 매치 시도
keywords = [word for word in item.value.split() if len(word) > 1]
for keyword in reversed(keywords): # 뒤에서부터 시도
if keyword in anonymized_text:
anonymized_text = anonymized_text.replace(keyword, anonymized_value, 1)
logger.info(f"주소 키워드 매치 익명화 완료: '{keyword}' -> '{anonymized_value}' (원본: '{item.value}')")
break
else:
logger.warning(f"주소 '{item.value}'를 찾을 수 없음")
# 나머지 PII 처리
for item in other_items:
logger.info(f"익명화 처리 중: '{item.value}' ({item.type}) at {item.start_pos}-{item.end_pos}")
# PII 유형에 따른 익명화
if item.type == "이름":
anonymized_value = "[이름]"
elif item.type == "전화번호":
anonymized_value = "[전화번호]"
elif item.type == "이메일":
anonymized_value = "[이메일]"
elif item.type == "주소":
anonymized_value = "[주소]"
elif item.type == "여권번호":
anonymized_value = "[여권번호]"
else:
anonymized_value = f"[{item.type}]"
# 텍스트에서 치환 (강화된 문자열 치환 사용)
try:
# 현재 텍스트에서 해당 값이 있는지 확인
if item.value in anonymized_text:
# 문자열 치환 수행
anonymized_text = anonymized_text.replace(item.value, anonymized_value, 1) # 첫 번째 매치만 치환
logger.info(f"익명화 완료: '{item.value}' -> '{anonymized_value}'")
else:
# 대소문자 무시하고 찾기
import re
pattern = re.escape(item.value)
match = re.search(pattern, anonymized_text, re.IGNORECASE)
if match:
start, end = match.span()
anonymized_text = anonymized_text[:start] + anonymized_value + anonymized_text[end:]
logger.info(f"대소문자 무시 익명화 완료: '{item.value}' -> '{anonymized_value}'")
else:
# 부분 매치 시도 (주소의 경우)
if item.type == "주소" and len(item.value) > 3:
# 주소의 마지막 부분으로 찾기
last_part = item.value.split()[-1] if ' ' in item.value else item.value[-3:]
if last_part in anonymized_text:
anonymized_text = anonymized_text.replace(last_part, anonymized_value, 1)
logger.info(f"부분 매치 익명화 완료: '{last_part}' -> '{anonymized_value}' (원본: '{item.value}')")
else:
# 더 강력한 주소 매칭: 정규식으로 찾기
import re
# 주소 패턴을 정규식으로 변환
address_pattern = re.escape(item.value).replace(r'\ ', r'\s+')
match = re.search(address_pattern, anonymized_text, re.IGNORECASE)
if match:
start, end = match.span()
anonymized_text = anonymized_text[:start] + anonymized_value + anonymized_text[end:]
logger.info(f"정규식 매치 익명화 완료: '{item.value}' -> '{anonymized_value}'")
else:
# 마지막 시도: 주소의 핵심 키워드로 찾기
keywords = [word for word in item.value.split() if len(word) > 1]
for keyword in reversed(keywords): # 뒤에서부터 시도
if keyword in anonymized_text:
anonymized_text = anonymized_text.replace(keyword, anonymized_value, 1)
logger.info(f"키워드 매치 익명화 완료: '{keyword}' -> '{anonymized_value}' (원본: '{item.value}')")
break
else:
logger.warning(f"텍스트에서 '{item.value}'를 찾을 수 없음")
else:
logger.warning(f"텍스트에서 '{item.value}'를 찾을 수 없음")
except Exception as e:
logger.error(f"익명화 실패: {e}, item: {item}")
logger.info(f"익명화 완료: 결과 텍스트 길이={len(anonymized_text)}")
return anonymized_text
def _map_extraction_class(self, extraction_class: str) -> str:
"""langextract 클래스를 한국어 PII 유형으로 매핑"""
mapping = {
"name": "이름",
"email": "이메일",
"phone": "전화번호",
"passport_number": "여권번호",
"address": "주소"
}
return mapping.get(extraction_class, extraction_class)
def _get_pii_summary(self, pii_items: List[PIIItem]) -> Dict[str, int]:
"""PII 항목들의 요약 통계 반환"""
summary = {}
for item in pii_items:
if item.type in summary:
summary[item.type] += 1
else:
summary[item.type] = 1
return summary
class MCPPIIProcessor:
"""MCP용 PII 처리기 - 탐지 + 암호화"""
def __init__(self):
"""PII 처리기 초기화"""
self.detector = get_detector()
self.openai_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
print("✅ MCP PII Processor 초기화 완료")
def process_text(self, text: str) -> Dict[str, Any]:
"""
텍스트에서 PII를 탐지하고 처리 (MCP Tool용)
Args:
text (str): 처리할 텍스트
Returns:
Dict[str, Any]: MCP Tool 응답 형식
"""
try:
start_time = time.time()
if not text:
return {
"success": True,
"original_text": "",
"anonymized_text": "",
"pii_items": [],
"count": 0,
"processing_time": 0,
"summary": {}
}
# 1. PII 탐지
detection_result = self.detector.detect_pii(text)
if not detection_result["success"]:
return detection_result
# PIIItem 객체로 변환
pii_items = [PIIItem(**item) for item in detection_result["pii_items"]]
# 2. 익명화 처리
anonymized_text = self.detector.anonymize_text(text, pii_items)
processing_time = time.time() - start_time
return {
"success": True,
"original_text": text,
"anonymized_text": anonymized_text,
"pii_items": [asdict(item) for item in pii_items],
"count": len(pii_items),
"processing_time": processing_time,
"summary": detection_result["summary"]
}
except Exception as e:
return {
"success": False,
"error": str(e),
"original_text": text,
"anonymized_text": text,
"pii_items": [],
"count": 0,
"processing_time": 0,
"summary": {}
}
def batch_process(self, texts: List[str]) -> Dict[str, Any]:
"""
여러 텍스트를 일괄 처리 (MCP Tool용)
Args:
texts (List[str]): 처리할 텍스트 리스트
Returns:
Dict[str, Any]: MCP Tool 응답 형식
"""
try:
start_time = time.time()
results = []
for i, text in enumerate(texts):
result = self.process_text(text)
result["index"] = i
results.append(result)
processing_time = time.time() - start_time
# 전체 통계
total_pii = sum(result["count"] for result in results if result["success"])
successful_count = sum(1 for result in results if result["success"])
return {
"success": True,
"results": results,
"total_texts": len(texts),
"successful_count": successful_count,
"total_pii_detected": total_pii,
"processing_time": processing_time,
"average_time_per_text": processing_time / len(texts) if texts else 0
}
except Exception as e:
return {
"success": False,
"error": str(e),
"results": [],
"total_texts": len(texts),
"successful_count": 0,
"total_pii_detected": 0,
"processing_time": 0,
"average_time_per_text": 0
}
# MCP Tool 함수들
@mcp.tool()
def mcp_detect_pii(text: str) -> Dict[str, Any]:
"""
MCP Tool: 텍스트에서 PII 탐지
Args:
text (str): 분석할 텍스트
Returns:
Dict[str, Any]: 탐지 결과
"""
detector = get_detector()
return detector.detect_pii(text)
@mcp.tool()
def mcp_process_text(text: str) -> Dict[str, Any]:
"""
MCP Tool: 텍스트 PII 처리 (탐지 + 익명화)
Args:
text (str): 처리할 텍스트
Returns:
Dict[str, Any]: 처리 결과
"""
processor = MCPPIIProcessor()
return processor.process_text(text)
@mcp.tool()
def mcp_batch_process(texts: List[str]) -> Dict[str, Any]:
"""
MCP Tool: 여러 텍스트 일괄 처리
Args:
texts (List[str]): 처리할 텍스트 리스트
Returns:
Dict[str, Any]: 일괄 처리 결과
"""
processor = MCPPIIProcessor()
return processor.batch_process(texts)
@mcp.tool()
def mcp_anonymize_text(text: str, pii_items: List[Dict[str, Any]]) -> str:
"""
MCP Tool: 텍스트 익명화
Args:
text (str): 원본 텍스트
pii_items (List[Dict[str, Any]]): PII 항목들
Returns:
str: 익명화된 텍스트
"""
detector = get_detector()
pii_objects = [PIIItem(**item) for item in pii_items]
return detector.anonymize_text(text, pii_objects)
@mcp.tool()
def mcp_encrypt_pii_item(pii_value: str, pii_type: str) -> Dict[str, Any]:
"""
MCP Tool: PII 항목 암호화
Args:
pii_value (str): 암호화할 PII 값
pii_type (str): PII 유형 (이름, 전화번호, 이메일 등)
Returns:
Dict[str, Any]: 암호화 결과
"""
try:
crypto = PIICrypto()
encrypted_value = crypto.encrypt_pii_item(pii_value, pii_type)
return {
"success": True,
"original_value": pii_value,
"encrypted_value": encrypted_value,
"pii_type": pii_type,
"encryption_method": "deterministic" if pii_type in crypto.deterministic_types else "fpe"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"original_value": pii_value,
"encrypted_value": "",
"pii_type": pii_type
}
@mcp.tool()
def mcp_decrypt_pii_item(encrypted_value: str, pii_type: str) -> Dict[str, Any]:
"""
MCP Tool: PII 항목 복호화
Args:
encrypted_value (str): 복호화할 암호화된 값
pii_type (str): PII 유형
Returns:
Dict[str, Any]: 복호화 결과
"""
try:
crypto = PIICrypto()
decrypted_value = crypto.decrypt_pii_item(encrypted_value, pii_type)
return {
"success": True,
"encrypted_value": encrypted_value,
"decrypted_value": decrypted_value,
"pii_type": pii_type,
"decryption_method": "deterministic" if pii_type in crypto.deterministic_types else "fpe"
}
except Exception as e:
return {
"success": False,
"error": str(e),
"encrypted_value": encrypted_value,
"decrypted_value": "",
"pii_type": pii_type
}
@mcp.tool()
def mcp_encrypt_text_pii(text: str) -> Dict[str, Any]:
"""
MCP Tool: 텍스트에서 PII를 탐지하고 암호화
Args:
text (str): 처리할 텍스트
Returns:
Dict[str, Any]: 암호화 처리 결과
"""
try:
start_time = time.time()
# 1. PII 탐지
detection_result = mcp_detect_pii(text)
if not detection_result["success"]:
return detection_result
if not detection_result["pii_items"]:
return {
"success": True,
"original_text": text,
"encrypted_text": text,
"pii_items": [],
"encrypted_items": {},
"count": 0,
"processing_time": time.time() - start_time,
"summary": {}
}
# 2. PII 항목들 암호화
crypto = PIICrypto()
encrypted_items = {}
encrypted_text = text
for item in detection_result["pii_items"]:
pii_type = item["type"]
pii_value = item["value"]
# 암호화
encrypted_value = crypto.encrypt_pii_item(pii_value, pii_type)
encrypted_items[pii_value] = encrypted_value
# 텍스트에서 치환
encrypted_text = encrypted_text.replace(pii_value, encrypted_value)
processing_time = time.time() - start_time
return {
"success": True,
"original_text": text,
"encrypted_text": encrypted_text,
"pii_items": detection_result["pii_items"],
"encrypted_items": encrypted_items,
"count": len(detection_result["pii_items"]),
"processing_time": processing_time,
"summary": detection_result["summary"]
}
except Exception as e:
return {
"success": False,
"error": str(e),
"original_text": text,
"encrypted_text": text,
"pii_items": [],
"encrypted_items": {},
"count": 0,
"processing_time": 0,
"summary": {}
}
@mcp.tool()
def mcp_decrypt_text_pii(encrypted_text: str, encrypted_items: Dict[str, str]) -> Dict[str, Any]:
"""
MCP Tool: 암호화된 텍스트에서 PII 복호화
Args:
encrypted_text (str): 암호화된 텍스트
encrypted_items (Dict[str, str]): 원본값 -> 암호화값 매핑
Returns:
Dict[str, Any]: 복호화 결과
"""
try:
start_time = time.time()
# 암호화값 -> 원본값 매핑 생성
decrypted_items = {}
decrypted_text = encrypted_text
crypto = PIICrypto()
for original_value, encrypted_value in encrypted_items.items():
# PII 유형 추정 (암호화된 값을 기준으로)
pii_type = _guess_pii_type(encrypted_value)
# Base64 패턴인 경우 (결정론적 암호화) 여러 유형 시도
if re.match(r'^[A-Za-z0-9+/=]+$', encrypted_value) and len(encrypted_value) > 20:
# 이름, 주소, 이메일 순서로 시도
decrypted_value = None
for try_type in ["이름", "주소", "이메일"]:
try:
result = crypto.decrypt_pii_item(encrypted_value, try_type)
if not (result.startswith("[복호화실패:") or result.startswith("[FPE복호화실패:")):
decrypted_value = result
pii_type = try_type
break
except:
continue
if decrypted_value is None:
decrypted_value = f"[복호화실패:{encrypted_value[:20]}...]"
else:
# FPE 암호화 또는 다른 패턴
decrypted_value = crypto.decrypt_pii_item(encrypted_value, pii_type)
decrypted_items[encrypted_value] = decrypted_value
# 텍스트에서 복호화
decrypted_text = decrypted_text.replace(encrypted_value, decrypted_value)
processing_time = time.time() - start_time
return {
"success": True,
"encrypted_text": encrypted_text,
"decrypted_text": decrypted_text,
"decrypted_items": decrypted_items,
"count": len(encrypted_items),
"processing_time": processing_time
}
except Exception as e:
return {
"success": False,
"error": str(e),
"encrypted_text": encrypted_text,
"decrypted_text": encrypted_text,
"decrypted_items": {},
"count": 0,
"processing_time": 0
}
def _guess_pii_type(value: str) -> str:
"""PII 값으로부터 유형을 추정하는 헬퍼 함수 (암호화된 값도 고려)"""
import re
# 이메일 패턴
if re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', value):
return "이메일"
# 전화번호 패턴 (한국) - 하이픈 포함
if re.match(r'^\d{3}-\d{4}-\d{4}$', value) or re.match(r'^\d{2,3}-\d{3,4}-\d{4}$', value):
return "전화번호"
# 신용카드번호 패턴 - 하이픈 포함
if re.match(r'^\d{4}-\d{4}-\d{4}-\d{4}$', value):
return "신용카드번호"
# 여권번호 패턴 - 알파벳 + 숫자 (암호화된 경우도 고려)
if re.match(r'^[A-Z]\d{8,9}$', value) or re.match(r'^[A-Z]\d{7,10}$', value):
return "여권번호"
# 주민등록번호 패턴 - 하이픈 포함
if re.match(r'^\d{6}-\d{7}$', value):
return "주민등록번호"
# 주소 패턴 (한국) - 한글 키워드 포함
if any(keyword in value for keyword in ["시", "구", "로", "동", "번지"]):
return "주소"
# Base64 패턴 (결정론적 암호화 결과) - 이름, 주소, 이메일만 사용
if re.match(r'^[A-Za-z0-9+/=]+$', value):
# Base64는 결정론적 암호화 결과이므로 이름, 주소, 이메일 중 하나
# 길이로 대략적인 구분 (정확하지 않을 수 있음)
if len(value) < 50:
return "이름" # 짧은 Base64는 보통 이름
elif len(value) < 70:
return "이메일" # 중간 길이 Base64는 보통 이메일
else:
return "주소" # 긴 Base64는 보통 주소
# 기본적으로 이름으로 추정
return "이름"
# MCP Tool 메타데이터
MCP_TOOLS = {
"detect_pii": {
"name": "detect_pii",
"description": "텍스트에서 PII(개인정보)를 탐지합니다. 이름, 이메일, 전화번호, 여권번호, 주소 등을 찾습니다.",
"parameters": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "분석할 텍스트"
}
},
"required": ["text"]
}
},
"process_text": {
"name": "process_text",
"description": "텍스트에서 PII(개인 정보) 를 탐지하고 익명화 처리합니다.",
"parameters": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "처리할 텍스트"
}
},
"required": ["text"]
}
},
"batch_process": {
"name": "batch_process",
"description": "여러 텍스트를 일괄적으로 PII(개인 정보) 처리합니다.",
"parameters": {
"type": "object",
"properties": {
"texts": {
"type": "array",
"items": {"type": "string"},
"description": "처리할 텍스트 리스트"
}
},
"required": ["texts"]
}
},
"anonymize_text": {
"name": "anonymize_text",
"description": "PII(개인 정보) 항목들을 사용하여 텍스트를 익명화합니다.",
"parameters": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "원본 텍스트"
},
"pii_items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"type": {"type": "string"},
"value": {"type": "string"},
"confidence": {"type": "number"},
"start_pos": {"type": "number"},
"end_pos": {"type": "number"}
}
},
"description": "PII 항목들"
}
},
"required": ["text", "pii_items"]
}
},
"encrypt_pii_item": {
"name": "encrypt_pii_item",
"description": "개별 PII 항목을 암호화합니다. 이름, 주소, 이메일은 결정론적 암호화, 전화번호, 신용카드번호 등은 FPE(형식 유지 암호화)를 사용합니다.",
"parameters": {
"type": "object",
"properties": {
"pii_value": {
"type": "string",
"description": "암호화할 PII 값"
},
"pii_type": {
"type": "string",
"description": "PII 유형 (이름, 전화번호, 이메일, 주소, 신용카드번호, 여권번호, 주민등록번호 등)"
}
},
"required": ["pii_value", "pii_type"]
}
},
"decrypt_pii_item": {
"name": "decrypt_pii_item",
"description": "암호화된 PII 항목을 복호화합니다.",
"parameters": {
"type": "object",
"properties": {
"encrypted_value": {
"type": "string",
"description": "복호화할 암호화된 값"
},
"pii_type": {
"type": "string",
"description": "PII 유형 (암호화 시 사용한 유형과 동일해야 함)"
}
},
"required": ["encrypted_value", "pii_type"]
}
},
"encrypt_text_pii": {
"name": "encrypt_text_pii",
"description": "텍스트에서 PII를 탐지하고 모든 PII를 암호화합니다. 검색 가능한 암호화된 텍스트를 생성합니다.",
"parameters": {
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "처리할 텍스트"
}
},
"required": ["text"]
}
},
"decrypt_text_pii": {
"name": "decrypt_text_pii",
"description": "암호화된 텍스트에서 PII를 복호화합니다.",
"parameters": {
"type": "object",
"properties": {
"encrypted_text": {
"type": "string",
"description": "암호화된 텍스트"
},
"encrypted_items": {
"type": "object",
"description": "원본값 -> 암호화값 매핑 딕셔너리"
}
},
"required": ["encrypted_text", "encrypted_items"]
}
}
}
def main():
"""테스트 함수"""
print("=== MCP PII Tools 테스트 ===\n")
# 테스트 텍스트
test_text = "오늘은 Rachel Lim이(가) 방문했다. 이메일 주소는 rachel.lim@gmail.com이고 전화번호는 010-1234-5678입니다. 여권번호는 M31143886입니다."
print(f"테스트 텍스트: {test_text}\n")
# 1. PII 탐지 테스트
print("1. PII 탐지 테스트:")
detection_result = mcp_detect_pii(test_text)
print(f"성공: {detection_result['success']}")
print(f"탐지된 PII: {detection_result['count']}개")
print(f"처리 시간: {detection_result['processing_time']:.3f}초")
print(f"요약: {detection_result['summary']}")
if detection_result['pii_items']:
print("탐지된 PII 상세:")