Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,6 @@ memory-bank

# Claude Code
CLAUDE.md

# Google Jules
.jules/
74 changes: 14 additions & 60 deletions lightrag/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

# Precompile regex pattern for JSON sanitization (module-level, compiled once)
_SURROGATE_PATTERN = re.compile(r"[\uD800-\uDFFF\uFFFE\uFFFF]")
_CONTROL_CHAR_PATTERN_ALL = re.compile(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]")


class SafeStreamHandler(logging.StreamHandler):
Expand Down Expand Up @@ -2261,75 +2262,28 @@ def sanitize_text_for_encoding(text: str, replacement_char: str = "") -> str:

Returns:
Sanitized text that can be safely encoded as UTF-8

Raises:
ValueError: When text contains uncleanable encoding issues that cannot be safely processed
"""
if not text:
return text

try:
# First, strip whitespace
text = text.strip()

# Early return if text is empty after basic cleaning
if not text:
return text

# Try to encode/decode to catch any encoding issues early
text.encode("utf-8")

# Remove or replace surrogate characters (U+D800 to U+DFFF)
# These are the main cause of the encoding error
sanitized = ""
for char in text:
code_point = ord(char)
# Check for surrogate characters
if 0xD800 <= code_point <= 0xDFFF:
# Replace surrogate with replacement character
sanitized += replacement_char
continue
# Check for other problematic characters
elif code_point == 0xFFFE or code_point == 0xFFFF:
# These are non-characters in Unicode
sanitized += replacement_char
continue
else:
sanitized += char

# Additional cleanup: remove null bytes and other control characters that might cause issues
# (but preserve common whitespace like \t, \n, \r)
sanitized = re.sub(
r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]", replacement_char, sanitized
)

# Test final encoding to ensure it's safe
sanitized.encode("utf-8")
# First, strip whitespace
text = text.strip()

# Unescape HTML escapes
sanitized = html.unescape(sanitized)
# Early return if text is empty after basic cleaning
if not text:
return text

# Remove control characters but preserve common whitespace (\t, \n, \r)
sanitized = re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\x9F]", "", sanitized)
# 1. html.unescape first to catch entities that might become surrogates or control chars
text = html.unescape(text)

return sanitized.strip()
# 2. Use pre-compiled regex to clean surrogates and non-characters in one pass
# This replaces the slow manual loop and initial .encode() check
text = _SURROGATE_PATTERN.sub(replacement_char, text)

except UnicodeEncodeError as e:
# Critical change: Don't return placeholder, raise exception for caller to handle
error_msg = f"Text contains uncleanable UTF-8 encoding issues: {str(e)[:100]}"
logger.error(f"Text sanitization failed: {error_msg}")
raise ValueError(error_msg) from e
# 3. Remove control characters but preserve common whitespace (\t, \n, \r)
text = _CONTROL_CHAR_PATTERN_ALL.sub(replacement_char, text)

except Exception as e:
logger.error(f"Text sanitization: Unexpected error: {str(e)}")
# For other exceptions, if no encoding issues detected, return original text
try:
text.encode("utf-8")
return text
except UnicodeEncodeError:
raise ValueError(
f"Text sanitization failed with unexpected error: {str(e)}"
) from e
return text.strip()


def check_storage_env_vars(storage_name: str) -> None:
Expand Down
103 changes: 102 additions & 1 deletion tests/test_write_json_optimization.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
import json
import tempfile
import pytest
from lightrag.utils import write_json, load_json, SanitizingJSONEncoder
from lightrag.utils import (
write_json,
load_json,
SanitizingJSONEncoder,
sanitize_text_for_encoding,
)


@pytest.mark.offline
Expand Down Expand Up @@ -342,6 +347,102 @@ def test_empty_values_after_sanitization(self):
os.unlink(temp_file)


@pytest.mark.offline
class TestSanitizeTextForEncoding:
"""Direct unit tests for sanitize_text_for_encoding function."""

def test_empty_string_returns_empty(self):
assert sanitize_text_for_encoding("") == ""

def test_none_like_falsy_returns_as_is(self):
# The function checks `if not text`, so empty string returns early
assert sanitize_text_for_encoding("") == ""

def test_whitespace_only_returns_empty(self):
assert sanitize_text_for_encoding(" ") == ""

def test_clean_text_unchanged(self):
assert sanitize_text_for_encoding("hello world") == "hello world"

def test_strips_leading_trailing_whitespace(self):
assert sanitize_text_for_encoding(" hello ") == "hello"

def test_lone_surrogate_removed(self):
assert sanitize_text_for_encoding("hello\ud800world") == "helloworld"

def test_lone_surrogate_with_replacement_char(self):
assert (
sanitize_text_for_encoding("hello\ud800world", replacement_char="?")
== "hello?world"
)

def test_surrogate_range_boundaries(self):
# U+D800 and U+DFFF are the surrogate range boundaries
assert "\ud800" not in sanitize_text_for_encoding("\ud800")
assert "\udfff" not in sanitize_text_for_encoding("\udfff")

def test_non_characters_fffe_ffff_removed(self):
# U+FFFE and U+FFFF are included in _SURROGATE_PATTERN
assert sanitize_text_for_encoding("a\ufffeb") == "ab"
assert sanitize_text_for_encoding("a\uffffb") == "ab"

def test_html_entities_unescaped(self):
assert sanitize_text_for_encoding("&amp;") == "&"
assert sanitize_text_for_encoding("&lt;p&gt;") == "<p>"
assert sanitize_text_for_encoding("&quot;hello&quot;") == '"hello"'

def test_html_entity_that_becomes_surrogate_is_removed(self):
# &#xD800; β€” Python's html.unescape follows HTML5 spec and maps surrogate code
# points to U+FFFD (replacement character), so \uD800 never appears in output.
# Either way the result must not contain an actual lone surrogate.
result = sanitize_text_for_encoding("&#xD800;")
assert "\ud800" not in result

def test_control_chars_removed(self):
# C0 control characters (excluding \t \n \r)
assert sanitize_text_for_encoding("\x01hello\x1fworld") == "helloworld"
assert sanitize_text_for_encoding("\x00null") == "null"
assert sanitize_text_for_encoding("del\x7f") == "del"

def test_control_chars_with_replacement_char(self):
# replacement_char must apply to control chars, not just surrogates.
# Note: \x1f is treated as Unicode whitespace by Python's str.strip(),
# so place control chars in the middle to avoid them being stripped first.
result = sanitize_text_for_encoding("a\x01b\x08c", replacement_char="?")
assert result == "a?b?c"

def test_common_whitespace_preserved(self):
# \t, \n, \r must NOT be removed (excluded from control char pattern)
assert sanitize_text_for_encoding("line1\nline2") == "line1\nline2"
assert sanitize_text_for_encoding("col1\tcol2") == "col1\tcol2"
assert sanitize_text_for_encoding("line1\rline2") == "line1\rline2"

def test_c1_control_chars_not_removed(self):
# \x80-\x9F range must NOT be removed (restored original behavior).
# These are valid in Latin-1 encoded European language text.
result = sanitize_text_for_encoding("caf\x85e")
assert "\x85" in result

def test_replacement_char_default_is_deletion(self):
# Default replacement_char="" means characters are deleted, not replaced
assert sanitize_text_for_encoding("\ud800hello\x01") == "hello"

def test_mixed_issues_in_one_string(self):
# Surrogate + control char + HTML entity + clean text
text = "\ud800&amp;\x01clean"
result = sanitize_text_for_encoding(text)
assert result == "&clean"

def test_large_text_with_scattered_surrogates(self):
# Regression guard: regex must handle large inputs correctly
clean_segment = "a" * 10000
text = f"prefix\ud800{clean_segment}\udfffsuffix"
result = sanitize_text_for_encoding(text)
assert "\ud800" not in result
assert "\udfff" not in result
assert clean_segment in result


if __name__ == "__main__":
# Run tests
test = TestWriteJsonOptimization()
Expand Down
Loading