|
9 | 9 | import os |
10 | 10 | from pathlib import Path |
11 | 11 | import platform |
| 12 | +import re |
12 | 13 | import sys |
13 | 14 | from typing import Optional, Set |
| 15 | +import unicodedata |
| 16 | + |
| 17 | +_ANSI_RE = re.compile( |
| 18 | + r"\x1b\[[0-9;]*+[a-zA-Z]" # CSI sequences: ESC [ <params> <letter> |
| 19 | + r"|\x1b\](?>[^\x07]*)\x07" # OSC sequences: ESC ] <payload> BEL |
| 20 | +) |
| 21 | +"""Possessive quantifier (*+) and atomic group ((?>...)) prevent catastrophic |
| 22 | +backtracking (ReDoS).""" |
| 23 | + |
| 24 | +_UNSAFE_CHARS = frozenset(("\u2028", "\u2029")) |
| 25 | +"""Line separator ↵ and paragraph separator ¶.""" |
| 26 | +_UNSAFE_CATEGORIES = frozenset(("Cc", "Cf")) |
| 27 | +"""Control characters and format characters.""" |
14 | 28 |
|
15 | 29 | _initialized = False |
16 | 30 | """Is logging initialized?""" |
@@ -126,6 +140,44 @@ def initialize_logging( |
126 | 140 | return logging.getLogger(name) |
127 | 141 |
|
128 | 142 |
|
| 143 | +def safe_record_for_log(record: str) -> str: |
| 144 | + r""" |
| 145 | + Sanitize a string for safe inclusion in log output. |
| 146 | +
|
| 147 | + Mitigates CRLF log injection attacks by removing characters that could |
| 148 | + be used to forge log entries, evade log analysis, or exploit log viewers. |
| 149 | +
|
| 150 | + Strips the following: |
| 151 | + - CR/LF (\r, \n) — prevents basic log line injection |
| 152 | + - Other ASCII control characters (\x00-\x1f, \x7f) — removes tabs, |
| 153 | + backspaces, escape sequences, null bytes, etc. |
| 154 | + - Unicode line separators (U+2028, U+2029) — prevents injection via |
| 155 | + lesser-known Unicode newline characters |
| 156 | + - ANSI escape sequences (ESC[...m, etc.) — prevents terminal escape |
| 157 | + attacks in log viewers that interpret color/cursor codes |
| 158 | + - Unicode control category characters (Cc, Cf) — catches remaining |
| 159 | + control and formatting characters across all of Unicode |
| 160 | +
|
| 161 | + Parameters |
| 162 | + ---------- |
| 163 | + record |
| 164 | + The untrusted string to sanitize before logging. |
| 165 | +
|
| 166 | + Returns |
| 167 | + ------- |
| 168 | + str |
| 169 | + A sanitized copy of the string with dangerous characters removed. |
| 170 | +
|
| 171 | + """ |
| 172 | + result = _ANSI_RE.sub("", record) |
| 173 | + return "".join( |
| 174 | + ch |
| 175 | + for ch in result |
| 176 | + if ch not in _UNSAFE_CHARS |
| 177 | + and unicodedata.category(ch) not in _UNSAFE_CATEGORIES |
| 178 | + ) |
| 179 | + |
| 180 | + |
129 | 181 | def setup_tsv_logger(name="tsv_logger", filename="error_log.tsv", level=logging.ERROR): |
130 | 182 | """Configure a logger with TSV output.""" |
131 | 183 | logger = logging.getLogger(name) |
|
0 commit comments