Skip to content

Commit def7aa8

Browse files
committed
fix(sanitization): Centralizing DB Filters
1 parent 563202a commit def7aa8

File tree

4 files changed

+316
-32
lines changed

4 files changed

+316
-32
lines changed

backend/onyx/background/indexing/run_docfetching.py

Lines changed: 8 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@
5858
from onyx.file_store.document_batch_storage import get_document_batch_storage
5959
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
6060
from onyx.indexing.indexing_pipeline import index_doc_batch_prepare
61+
from onyx.indexing.postgres_sanitization import sanitize_document_for_postgres
62+
from onyx.indexing.postgres_sanitization import sanitize_hierarchy_nodes_for_postgres
6163
from onyx.redis.redis_hierarchy import cache_hierarchy_nodes_batch
6264
from onyx.redis.redis_hierarchy import ensure_source_node_exists
6365
from onyx.redis.redis_hierarchy import get_node_id_from_raw_id
@@ -156,36 +158,7 @@ def strip_null_characters(doc_batch: list[Document]) -> list[Document]:
156158
logger.warning(
157159
f"doc {doc.id} too large, Document size: {sys.getsizeof(doc)}"
158160
)
159-
cleaned_doc = doc.model_copy()
160-
161-
# Postgres cannot handle NUL characters in text fields
162-
if "\x00" in cleaned_doc.id:
163-
logger.warning(f"NUL characters found in document ID: {cleaned_doc.id}")
164-
cleaned_doc.id = cleaned_doc.id.replace("\x00", "")
165-
166-
if cleaned_doc.title and "\x00" in cleaned_doc.title:
167-
logger.warning(
168-
f"NUL characters found in document title: {cleaned_doc.title}"
169-
)
170-
cleaned_doc.title = cleaned_doc.title.replace("\x00", "")
171-
172-
if "\x00" in cleaned_doc.semantic_identifier:
173-
logger.warning(
174-
f"NUL characters found in document semantic identifier: {cleaned_doc.semantic_identifier}"
175-
)
176-
cleaned_doc.semantic_identifier = cleaned_doc.semantic_identifier.replace(
177-
"\x00", ""
178-
)
179-
180-
for section in cleaned_doc.sections:
181-
if section.link is not None:
182-
section.link = section.link.replace("\x00", "")
183-
184-
# since text can be longer, just replace to avoid double scan
185-
if isinstance(section, TextSection) and section.text is not None:
186-
section.text = section.text.replace("\x00", "")
187-
188-
cleaned_batch.append(cleaned_doc)
161+
cleaned_batch.append(sanitize_document_for_postgres(doc))
189162

190163
return cleaned_batch
191164

@@ -602,10 +575,13 @@ def connector_document_extraction(
602575

603576
# Process hierarchy nodes batch - upsert to Postgres and cache in Redis
604577
if hierarchy_node_batch:
578+
hierarchy_node_batch_cleaned = (
579+
sanitize_hierarchy_nodes_for_postgres(hierarchy_node_batch)
580+
)
605581
with get_session_with_current_tenant() as db_session:
606582
upserted_nodes = upsert_hierarchy_nodes_batch(
607583
db_session=db_session,
608-
nodes=hierarchy_node_batch,
584+
nodes=hierarchy_node_batch_cleaned,
609585
source=db_connector.source,
610586
commit=True,
611587
is_connector_public=is_connector_public,
@@ -624,7 +600,7 @@ def connector_document_extraction(
624600
)
625601

626602
logger.debug(
627-
f"Persisted and cached {len(hierarchy_node_batch)} hierarchy nodes "
603+
f"Persisted and cached {len(hierarchy_node_batch_cleaned)} hierarchy nodes "
628604
f"for attempt={index_attempt_id}"
629605
)
630606

backend/onyx/indexing/indexing_pipeline.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
from onyx.indexing.models import DocAwareChunk
5050
from onyx.indexing.models import IndexingBatchAdapter
5151
from onyx.indexing.models import UpdatableChunkData
52+
from onyx.indexing.postgres_sanitization import sanitize_documents_for_postgres
5253
from onyx.indexing.vector_db_insertion import write_chunks_to_vector_db_with_backoff
5354
from onyx.llm.factory import get_default_llm_with_vision
5455
from onyx.llm.factory import get_llm_for_contextual_rag
@@ -228,6 +229,8 @@ def index_doc_batch_prepare(
228229
) -> DocumentBatchPrepareContext | None:
229230
"""Sets up the documents in the relational DB (source of truth) for permissions, metadata, etc.
230231
This preceeds indexing it into the actual document index."""
232+
documents = sanitize_documents_for_postgres(documents)
233+
231234
# Create a trimmed list of docs that don't have a newer updated at
232235
# Shortcuts the time-consuming flow on connector index retries
233236
document_ids: list[str] = [document.id for document in documents]
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
from typing import Any
2+
3+
from onyx.access.models import ExternalAccess
4+
from onyx.connectors.models import BasicExpertInfo
5+
from onyx.connectors.models import Document
6+
from onyx.connectors.models import HierarchyNode
7+
8+
9+
def _sanitize_string(value: str) -> str:
10+
return value.replace("\x00", "")
11+
12+
13+
def _sanitize_json_like(value: Any) -> Any:
14+
if isinstance(value, str):
15+
return _sanitize_string(value)
16+
17+
if isinstance(value, list):
18+
return [_sanitize_json_like(item) for item in value]
19+
20+
if isinstance(value, tuple):
21+
return tuple(_sanitize_json_like(item) for item in value)
22+
23+
if isinstance(value, dict):
24+
sanitized: dict[Any, Any] = {}
25+
for key, nested_value in value.items():
26+
cleaned_key = _sanitize_string(key) if isinstance(key, str) else key
27+
sanitized[cleaned_key] = _sanitize_json_like(nested_value)
28+
return sanitized
29+
30+
return value
31+
32+
33+
def _sanitize_expert_info(expert: BasicExpertInfo) -> BasicExpertInfo:
34+
return expert.model_copy(
35+
update={
36+
"display_name": (
37+
_sanitize_string(expert.display_name)
38+
if expert.display_name is not None
39+
else None
40+
),
41+
"first_name": (
42+
_sanitize_string(expert.first_name)
43+
if expert.first_name is not None
44+
else None
45+
),
46+
"middle_initial": (
47+
_sanitize_string(expert.middle_initial)
48+
if expert.middle_initial is not None
49+
else None
50+
),
51+
"last_name": (
52+
_sanitize_string(expert.last_name)
53+
if expert.last_name is not None
54+
else None
55+
),
56+
"email": (
57+
_sanitize_string(expert.email) if expert.email is not None else None
58+
),
59+
}
60+
)
61+
62+
63+
def _sanitize_external_access(external_access: ExternalAccess) -> ExternalAccess:
64+
return ExternalAccess(
65+
external_user_emails={
66+
_sanitize_string(email) for email in external_access.external_user_emails
67+
},
68+
external_user_group_ids={
69+
_sanitize_string(group_id)
70+
for group_id in external_access.external_user_group_ids
71+
},
72+
is_public=external_access.is_public,
73+
)
74+
75+
76+
def sanitize_document_for_postgres(document: Document) -> Document:
77+
cleaned_doc = document.model_copy(deep=True)
78+
79+
cleaned_doc.id = _sanitize_string(cleaned_doc.id)
80+
cleaned_doc.semantic_identifier = _sanitize_string(cleaned_doc.semantic_identifier)
81+
if cleaned_doc.title is not None:
82+
cleaned_doc.title = _sanitize_string(cleaned_doc.title)
83+
if cleaned_doc.parent_hierarchy_raw_node_id is not None:
84+
cleaned_doc.parent_hierarchy_raw_node_id = _sanitize_string(
85+
cleaned_doc.parent_hierarchy_raw_node_id
86+
)
87+
88+
cleaned_doc.metadata = {
89+
_sanitize_string(key): (
90+
[_sanitize_string(item) for item in value]
91+
if isinstance(value, list)
92+
else _sanitize_string(value)
93+
)
94+
for key, value in cleaned_doc.metadata.items()
95+
}
96+
97+
if cleaned_doc.doc_metadata is not None:
98+
cleaned_doc.doc_metadata = _sanitize_json_like(cleaned_doc.doc_metadata)
99+
100+
if cleaned_doc.primary_owners is not None:
101+
cleaned_doc.primary_owners = [
102+
_sanitize_expert_info(expert) for expert in cleaned_doc.primary_owners
103+
]
104+
if cleaned_doc.secondary_owners is not None:
105+
cleaned_doc.secondary_owners = [
106+
_sanitize_expert_info(expert) for expert in cleaned_doc.secondary_owners
107+
]
108+
109+
if cleaned_doc.external_access is not None:
110+
cleaned_doc.external_access = _sanitize_external_access(
111+
cleaned_doc.external_access
112+
)
113+
114+
for section in cleaned_doc.sections:
115+
if section.link is not None:
116+
section.link = _sanitize_string(section.link)
117+
if section.text is not None:
118+
section.text = _sanitize_string(section.text)
119+
if section.image_file_id is not None:
120+
section.image_file_id = _sanitize_string(section.image_file_id)
121+
122+
return cleaned_doc
123+
124+
125+
def sanitize_documents_for_postgres(documents: list[Document]) -> list[Document]:
126+
return [sanitize_document_for_postgres(document) for document in documents]
127+
128+
129+
def sanitize_hierarchy_node_for_postgres(node: HierarchyNode) -> HierarchyNode:
130+
cleaned_node = node.model_copy(deep=True)
131+
132+
cleaned_node.raw_node_id = _sanitize_string(cleaned_node.raw_node_id)
133+
cleaned_node.display_name = _sanitize_string(cleaned_node.display_name)
134+
if cleaned_node.raw_parent_id is not None:
135+
cleaned_node.raw_parent_id = _sanitize_string(cleaned_node.raw_parent_id)
136+
if cleaned_node.link is not None:
137+
cleaned_node.link = _sanitize_string(cleaned_node.link)
138+
139+
if cleaned_node.external_access is not None:
140+
cleaned_node.external_access = _sanitize_external_access(
141+
cleaned_node.external_access
142+
)
143+
144+
return cleaned_node
145+
146+
147+
def sanitize_hierarchy_nodes_for_postgres(
148+
nodes: list[HierarchyNode],
149+
) -> list[HierarchyNode]:
150+
return [sanitize_hierarchy_node_for_postgres(node) for node in nodes]
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
from onyx.access.models import ExternalAccess
2+
from onyx.connectors.models import BasicExpertInfo
3+
from onyx.connectors.models import Document
4+
from onyx.connectors.models import DocumentSource
5+
from onyx.connectors.models import HierarchyNode
6+
from onyx.connectors.models import IndexAttemptMetadata
7+
from onyx.connectors.models import TextSection
8+
from onyx.db.enums import HierarchyNodeType
9+
from onyx.indexing import indexing_pipeline
10+
from onyx.indexing.postgres_sanitization import sanitize_document_for_postgres
11+
from onyx.indexing.postgres_sanitization import sanitize_hierarchy_node_for_postgres
12+
13+
14+
def test_sanitize_document_for_postgres_removes_nul_bytes() -> None:
15+
document = Document(
16+
id="doc\x00-id",
17+
source=DocumentSource.FILE,
18+
semantic_identifier="sem\x00-id",
19+
title="ti\x00tle",
20+
parent_hierarchy_raw_node_id="parent\x00-id",
21+
sections=[TextSection(link="lin\x00k", text="te\x00xt")],
22+
metadata={"ke\x00y": "va\x00lue", "list\x00key": ["a\x00", "b"]},
23+
doc_metadata={
24+
"j\x00son": {
25+
"in\x00ner": "va\x00l",
26+
"arr": ["x\x00", {"dee\x00p": "y\x00"}],
27+
}
28+
},
29+
primary_owners=[BasicExpertInfo(display_name="Ali\x00ce", email="a\x00@x.com")],
30+
secondary_owners=[BasicExpertInfo(first_name="Bo\x00b", last_name="Sm\x00ith")],
31+
external_access=ExternalAccess(
32+
external_user_emails={"user\x00@example.com"},
33+
external_user_group_ids={"gro\x00up-1"},
34+
is_public=False,
35+
),
36+
)
37+
38+
sanitized = sanitize_document_for_postgres(document)
39+
40+
assert sanitized.id == "doc-id"
41+
assert sanitized.semantic_identifier == "sem-id"
42+
assert sanitized.title == "title"
43+
assert sanitized.parent_hierarchy_raw_node_id == "parent-id"
44+
assert sanitized.sections[0].link == "link"
45+
assert sanitized.sections[0].text == "text"
46+
assert sanitized.metadata == {"key": "value", "listkey": ["a", "b"]}
47+
assert sanitized.doc_metadata == {
48+
"json": {"inner": "val", "arr": ["x", {"deep": "y"}]}
49+
}
50+
assert sanitized.primary_owners is not None
51+
assert sanitized.primary_owners[0].display_name == "Alice"
52+
assert sanitized.primary_owners[0].email == "a@x.com"
53+
assert sanitized.secondary_owners is not None
54+
assert sanitized.secondary_owners[0].first_name == "Bob"
55+
assert sanitized.secondary_owners[0].last_name == "Smith"
56+
assert sanitized.external_access is not None
57+
assert sanitized.external_access.external_user_emails == {"user@example.com"}
58+
assert sanitized.external_access.external_user_group_ids == {"group-1"}
59+
60+
# Ensure original document is not mutated
61+
assert document.id == "doc\x00-id"
62+
assert document.metadata == {"ke\x00y": "va\x00lue", "list\x00key": ["a\x00", "b"]}
63+
64+
65+
def test_sanitize_hierarchy_node_for_postgres_removes_nul_bytes() -> None:
66+
node = HierarchyNode(
67+
raw_node_id="raw\x00-id",
68+
raw_parent_id="paren\x00t-id",
69+
display_name="fol\x00der",
70+
link="https://exa\x00mple.com",
71+
node_type=HierarchyNodeType.FOLDER,
72+
external_access=ExternalAccess(
73+
external_user_emails={"a\x00@example.com"},
74+
external_user_group_ids={"g\x00-1"},
75+
is_public=True,
76+
),
77+
)
78+
79+
sanitized = sanitize_hierarchy_node_for_postgres(node)
80+
81+
assert sanitized.raw_node_id == "raw-id"
82+
assert sanitized.raw_parent_id == "parent-id"
83+
assert sanitized.display_name == "folder"
84+
assert sanitized.link == "https://example.com"
85+
assert sanitized.external_access is not None
86+
assert sanitized.external_access.external_user_emails == {"a@example.com"}
87+
assert sanitized.external_access.external_user_group_ids == {"g-1"}
88+
89+
90+
def test_index_doc_batch_prepare_sanitizes_before_db_ops(monkeypatch: object) -> None:
91+
document = Document(
92+
id="doc\x00id",
93+
source=DocumentSource.FILE,
94+
semantic_identifier="sem\x00id",
95+
sections=[TextSection(text="content", link="li\x00nk")],
96+
metadata={"ke\x00y": "va\x00lue"},
97+
)
98+
99+
captured: dict[str, object] = {}
100+
101+
def _get_documents_by_ids(db_session: object, document_ids: list[str]) -> list:
102+
_ = db_session, document_ids
103+
return []
104+
105+
monkeypatch.setattr(
106+
indexing_pipeline, "get_documents_by_ids", _get_documents_by_ids
107+
)
108+
109+
def _capture_upsert_documents_in_db(**kwargs: object) -> None:
110+
captured["upsert_documents"] = kwargs["documents"]
111+
112+
monkeypatch.setattr(
113+
indexing_pipeline, "_upsert_documents_in_db", _capture_upsert_documents_in_db
114+
)
115+
116+
def _capture_doc_cc_pair(*args: object) -> None:
117+
captured["cc_pair_doc_ids"] = args[3]
118+
119+
monkeypatch.setattr(
120+
indexing_pipeline,
121+
"upsert_document_by_connector_credential_pair",
122+
_capture_doc_cc_pair,
123+
)
124+
125+
def _noop_link_hierarchy_nodes_to_documents(
126+
db_session: object,
127+
document_ids: list[str],
128+
source: DocumentSource,
129+
commit: bool,
130+
) -> int:
131+
_ = db_session, document_ids, source, commit
132+
return 0
133+
134+
monkeypatch.setattr(
135+
indexing_pipeline,
136+
"link_hierarchy_nodes_to_documents",
137+
_noop_link_hierarchy_nodes_to_documents,
138+
)
139+
140+
context = indexing_pipeline.index_doc_batch_prepare(
141+
documents=[document],
142+
index_attempt_metadata=IndexAttemptMetadata(connector_id=1, credential_id=2),
143+
db_session=object(), # type: ignore[arg-type]
144+
ignore_time_skip=True,
145+
)
146+
147+
assert context is not None
148+
assert context.updatable_docs[0].id == "docid"
149+
assert context.updatable_docs[0].semantic_identifier == "semid"
150+
assert context.updatable_docs[0].metadata == {"key": "value"}
151+
assert captured["cc_pair_doc_ids"] == ["docid"]
152+
153+
upsert_documents = captured["upsert_documents"]
154+
assert isinstance(upsert_documents, list)
155+
assert upsert_documents[0].id == "docid"

0 commit comments

Comments
 (0)