Skip to content

fix: Catch multiple leak occurrences in same string #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ coverage.xml
env/
venv/
/site
.idea/
.idea/
.DS_Store
17 changes: 15 additions & 2 deletions maskerlogger/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
"""
Init file for oxformatter package.
"""
from maskerlogger.masker_formatter import MaskerFormatter, MaskerFormatterJson # noqa
__version__ = '0.4.0-beta.1'

from maskerlogger.masker_formatter import (
MaskerFormatter,
MaskerFormatterJson,
mask_string,
)

# Expose the classes and main function
__all__ = [
"MaskerFormatter",
"MaskerFormatterJson",
"mask_string",
]

__version__ = "0.4.0-beta.1"
65 changes: 38 additions & 27 deletions maskerlogger/ahocorasick_regex_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import List
import ahocorasick
from maskerlogger.utils import timeout
from collections import defaultdict


MAX_MATCH_TIMEOUT = 1
Expand All @@ -19,47 +20,57 @@ def _initialize_automaton(self) -> ahocorasick.Automaton:
for keyword, regexs in self.keyword_to_patterns.items():
keyword_automaton.add_word(keyword, (regexs))
keyword_automaton.make_automaton()

return keyword_automaton

@staticmethod
def _load_config(config_path: str) -> dict:
with open(config_path, 'rb') as f:
with open(config_path, "rb") as f:
return toml.load(f)

def _extract_keywords_and_patterns(self, config) -> dict:
keyword_to_patterns = {}
for rule in config['rules']:
for keyword in rule.get('keywords', []):
if keyword not in keyword_to_patterns:
keyword_to_patterns[keyword] = []
def _extract_keywords_and_patterns(
self, config: dict
) -> dict[str, List[re.Pattern]]:
"""Extracts keywords and their corresponding regex patterns from the configuration file."""
keyword_to_patterns = defaultdict(list)

keyword_to_patterns[keyword].append(self._get_compiled_regex(
rule['regex']))
for rule in config["rules"]:
for keyword in rule.get("keywords", []):
keyword_to_patterns[keyword].append(
self._get_compiled_regex(rule["regex"])
)

return keyword_to_patterns
return dict(keyword_to_patterns)

def _get_compiled_regex(self, regex: str) -> str:
if '(?i)' in regex:
regex = regex.replace('(?i)', '')
def _get_compiled_regex(self, regex: str) -> re.Pattern[str]:
"""Compiles the regex pattern and returns the compiled pattern."""
if "(?i)" in regex:
regex = regex.replace("(?i)", "")
return re.compile(regex, re.IGNORECASE)
return re.compile(regex)

def _filter_by_keywords(self, line):
def _filter_by_keywords(self, line: str) -> list[re.Pattern[str]]:
"""Filters the regex patterns based on the keywords present in the line."""

matched_regexes = set()
for end_index, regex_values in self.automaton.iter(line):
for _, regex_values in self.automaton.iter(line):
matched_regexes.update(regex_values)
return matched_regexes

return list(matched_regexes)

@timeout(MAX_MATCH_TIMEOUT)
def _get_match_regex(self, line: str,
matched_regex: List[re.Pattern]) -> List[re.Match]:
matches = []
for regex in matched_regex:
if match := regex.search(line):
matches.append(match)
return matches

def match_regex_to_line(self, line: str) -> re.Match:
def _get_match_regex(
self,
line: str,
matched_regex: List[re.Pattern[str]],
) -> List[re.Match]:
"""Gets the matches of the regex patterns in the given line."""
return [match for regex in matched_regex for match in regex.finditer(line)]

def match_regex_to_line(self, line: str) -> list[re.Match[str]]:
"""Matches the regex patterns to the given line."""
lower_case_line = line.lower()
if matched_regxes := self._filter_by_keywords(lower_case_line):
return self._get_match_regex(line, matched_regxes)

if matched_regexes := self._filter_by_keywords(lower_case_line):
return self._get_match_regex(line, matched_regexes)
return []
2 changes: 1 addition & 1 deletion maskerlogger/config/gitleaks.toml
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ keywords = [
[[rules]]
id = "generic-api-key"
description = "Detected a Generic API Key, potentially exposing access to various services and sensitive operations."
regex = '''(?i)(?:key|api|token|secret|client|passwd|password|auth|access)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([0-9a-z\-_.=]{10,150})(?:['|\"|\n|\r|\s|\x60|;]|$)'''
regex = '''(?i)(?:key|api|token|secret|client|passwd|password|auth|access)(?:[0-9a-z\-_\t .]{0,20})(?:[\s|']|[\s|"]){0,3}(?:=|>|:{1,3}=|\|\|:|<=|=>|:|\?=)(?:'|\"|\s|=|\x60){0,5}([0-9a-zA-Z\-_.!$&'*+/=?^_`{|}~#@,;:%^]{8,150})(?:['|\"|\n|\r|\s|\x60|;]|$)'''
entropy = 3.5
keywords = [
"key","api","token","secret","client","passwd","password","auth","access",
Expand Down
96 changes: 60 additions & 36 deletions maskerlogger/masker_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import re
from abc import ABC
from typing import List
from typing import List, Optional

from pythonjsonlogger import jsonlogger

Expand All @@ -11,56 +11,79 @@
DEFAULT_SECRETS_CONFIG_PATH = os.path.join(
os.path.dirname(__file__), "config/gitleaks.toml"
)
_APPLY_MASK = 'apply_mask'
_APPLY_MASK = "apply_mask"
SKIP_MASK = {_APPLY_MASK: False}


__all__ = [
"mask_string",
"MaskerFormatter",
"MaskerFormatterJson",
]


def _apply_asterisk_mask(msg: str, matches: List[re.Match[str]], redact: int) -> str:
"""Replace the sensitive data with asterisks in the given message."""
for match in matches:
match_groups = match.groups() if match.groups() else [match.group()] # noqa
for group in match_groups:
redact_length = int((len(group) / 100) * redact)
msg = msg.replace(group[:redact_length], "*" * redact_length, 1)

return msg


def mask_string(
msg: str,
redact: int = 100,
regex_config_path: str = DEFAULT_SECRETS_CONFIG_PATH,
) -> str:
"""Masks the sensitive data in the given string.

Args:
string (str): The string to mask.
redact (int): Percentage of the sensitive data to
redact.
regex_config_path (str): Path to the configuration file for regex patterns.

Returns:
str: The masked string.
"""
regex_matcher = RegexMatcher(regex_config_path)
if found_matching_regexes := regex_matcher.match_regex_to_line(msg):
msg = _apply_asterisk_mask(msg, found_matching_regexes, redact=redact)

return msg


class AbstractMaskedLogger(ABC):
def __init__(
self,
regex_config_path: str = DEFAULT_SECRETS_CONFIG_PATH,
redact=100
self,
regex_config_path: str = DEFAULT_SECRETS_CONFIG_PATH,
redact: int = 100,
):
"""Initializes the AbstractMaskedLogger.

Args:
regex_config_path (str): Path to the configuration file for regex patterns.
redact (int): Percentage of the sensitive data to redact.
"""
self.regex_matcher = RegexMatcher(regex_config_path)
self.regex_config_path = regex_config_path
self.redact = redact

@staticmethod
def _validate_redact(redact: int) -> int:
if not (0 <= int(redact) <= 100):
raise ValueError("Redact value must be between 0 and 100")

return int(redact)

def _mask_secret(self, msg: str, matches: List[re.Match]) -> str:
"""Masks the sensitive data in the log message."""
for match in matches:
match_groups = match.groups() if match.groups() else [match.group()] # noqa
for group in match_groups:
redact_length = int((len(group) / 100) * self.redact)
msg = msg.replace(
group[:redact_length], "*" * redact_length, 1)

return msg

def _mask_sensitive_data(self, record: logging.LogRecord) -> None:
"""Applies masking to the sensitive data in the log message."""
if found_matching_regex := self.regex_matcher.match_regex_to_line(record.msg): # noqa
record.msg = self._mask_secret(record.msg, found_matching_regex)
record.msg = mask_string(record.msg, self.redact, self.regex_config_path)


# Normal Masked Logger - Text-Based Log Formatter
class MaskerFormatter(logging.Formatter, AbstractMaskedLogger):
"""A log formatter that masks sensitive data in text-based logs."""

def __init__(
self,
fmt: str,
regex_config_path: str = DEFAULT_SECRETS_CONFIG_PATH,
redact=100
self,
fmt: Optional[str] = None,
regex_config_path: str = DEFAULT_SECRETS_CONFIG_PATH,
redact: int = 100,
):
"""Initializes the MaskerFormatter.

Expand All @@ -80,13 +103,14 @@ def format(self, record: logging.LogRecord) -> str:
return super().format(record)


# JSON Masked Logger - JSON-Based Log Formatter
class MaskerFormatterJson(jsonlogger.JsonFormatter, AbstractMaskedLogger):
"""A JSON log formatter that masks sensitive data in json-based logs."""

def __init__(
self,
fmt: str,
regex_config_path: str = DEFAULT_SECRETS_CONFIG_PATH,
redact=100
self,
fmt: Optional[str] = None,
regex_config_path: str = DEFAULT_SECRETS_CONFIG_PATH,
redact: int = 100,
):
"""Initializes the MaskerFormatterJson.

Expand Down
12 changes: 7 additions & 5 deletions maskerlogger/secrets_in_logs_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,21 @@ def main():
"""
Main function to demonstrate logging with secrets.
"""
logger = logging.getLogger('mylogger')
logger = logging.getLogger("mylogger")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(
MaskerFormatter("%(asctime)s %(name)s %(levelname)s %(message)s",
redact=50))
MaskerFormatter("%(asctime)s %(name)s %(levelname)s %(message)s", redact=50)
)
logger.addHandler(handler)

logger.info('"current_key": "AIzaSOHbouG6DDa6DOcRGEgOMayAXYXcw6la3c"', extra=SKIP_MASK) # noqa
logger.info(
'"current_key": "AIzaSOHbouG6DDa6DOcRGEgOMayAXYXcw6la3c"', extra=SKIP_MASK
) # noqa
logger.info('"AKIAI44QH8DHBEXAMPLE" and then more text.')
logger.info("Datadog access token: 'abcdef1234567890abcdef1234567890'")
logger.info('"password": "password123"')


if __name__ == '__main__':
if __name__ == "__main__":
main()
5 changes: 3 additions & 2 deletions maskerlogger/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ def target():
thread.start()
thread.join(seconds)
if thread.is_alive():
raise TimeoutException(
f"Function call exceeded {seconds} seconds")
raise TimeoutException(f"Function call exceeded {seconds} seconds")
return result[0]

return wrapper

return decorator
Loading