|
1 | | -import re |
2 | | -from typing import Iterable |
| 1 | +import re |
| 2 | +from typing import Iterable, Dict, Pattern |
| 3 | + |
| 4 | +# Guard extremely large inputs from expensive regex processing (ReDoS safety) |
| 5 | +MAX_MASK_INPUT_CHARS = 1_000_000 |
3 | 6 |
|
4 | 7 | # Basic masking rules (available in OSS version) |
5 | 8 | BASIC_MASKING_RULES = { |
|
28 | 31 | PRO_MASK_REPLACEMENT = "[*** MASKED_SECRET_PRO ***]" |
29 | 32 | CUSTOM_MASK_REPLACEMENT = "[*** MASKED_SECRET ***]" |
30 | 33 |
|
31 | | -def get_active_masking_rules(mode: str = "basic"): |
32 | | - """Get masking rules; advanced mode always allowed in OSS build.""" |
33 | | - rules = BASIC_MASKING_RULES.copy() |
| 34 | +# Pre-compiled regex cache for faster and safer masking |
| 35 | +_COMPILED_RULES: Dict[str, Dict[str, Pattern[str]]] = {"basic": {}, "advanced": {}} |
| 36 | + |
| 37 | + |
| 38 | +def _compile_rules() -> None: |
| 39 | + """Compile masking regex patterns at import time.""" |
| 40 | + for name, pattern in BASIC_MASKING_RULES.items(): |
| 41 | + flags = re.DOTALL if name == "PRIVATE_KEY" else 0 |
| 42 | + _COMPILED_RULES["basic"][name] = re.compile(pattern, flags=flags) |
| 43 | + |
| 44 | + _COMPILED_RULES["advanced"].update(_COMPILED_RULES["basic"]) |
| 45 | + for name, pattern in ADVANCED_MASKING_RULES.items(): |
| 46 | + _COMPILED_RULES["advanced"][name] = re.compile(pattern) |
| 47 | + |
| 48 | + |
| 49 | +_compile_rules() |
34 | 50 |
|
| 51 | + |
| 52 | +def get_active_masking_rules(mode: str = "basic") -> dict[str, Pattern[str]]: |
| 53 | + """Return compiled masking rules for the requested mode.""" |
35 | 54 | if mode == "advanced": |
36 | | - rules.update(ADVANCED_MASKING_RULES) |
| 55 | + return _COMPILED_RULES["advanced"] |
| 56 | + return _COMPILED_RULES["basic"] |
37 | 57 |
|
38 | | - return rules |
39 | 58 |
|
40 | 59 | def apply_masking(text: str, mode: str = "basic", custom_patterns: Iterable[str] | None = None) -> str: |
41 | 60 | """Apply masking rules with optional custom patterns (no license gating).""" |
42 | 61 |
|
| 62 | + if not text: |
| 63 | + return text |
| 64 | + |
43 | 65 | if mode == "off": |
44 | | - rules = {} |
| 66 | + rules: dict[str, Pattern[str]] = {} |
45 | 67 | else: |
46 | 68 | rules = get_active_masking_rules(mode) |
47 | | - |
| 69 | + |
| 70 | + # Anti-ReDoS: avoid running regexes on extremely large payloads |
| 71 | + if len(text) > MAX_MASK_INPUT_CHARS: |
| 72 | + return text |
| 73 | + |
48 | 74 | # Apply custom patterns first so project-specific rules run before bundled ones. |
49 | 75 | if custom_patterns: |
50 | 76 | for pattern in custom_patterns: |
51 | 77 | try: |
52 | | - text = re.sub(pattern, CUSTOM_MASK_REPLACEMENT, text, flags=re.DOTALL) |
| 78 | + compiled = re.compile(pattern, flags=re.DOTALL) |
| 79 | + text = compiled.sub(CUSTOM_MASK_REPLACEMENT, text) |
53 | 80 | except re.error as exc: |
54 | 81 | print(f"[WARN] Skipping invalid custom mask pattern: {pattern!r} ({exc})") |
55 | 82 |
|
56 | 83 | for rule_name, pattern in rules.items(): |
57 | 84 | replacement = PRO_MASK_REPLACEMENT if rule_name in ADVANCED_MASKING_RULES else MASK_REPLACEMENT |
58 | | - # Use DOTALL flag for private keys to match across newlines |
59 | | - if rule_name == "PRIVATE_KEY": |
60 | | - text = re.sub(pattern, replacement, text, flags=re.DOTALL) |
61 | | - else: |
62 | | - text = re.sub(pattern, replacement, text) |
63 | | - |
64 | | - return text |
65 | | - |
| 85 | + text = pattern.sub(replacement, text) |
| 86 | + |
| 87 | + return text |
0 commit comments