Skip to content

Commit e5e4c03

Browse files
JuanPalboCopilot
andauthored
Fix/chunking service (#45)
* pdf-processor * Improve PDF processor robustness and table extraction - Add error handling for MinIO get_object and read operations, with validation and clear logging for bucket/object issues - Wrap pdfplumber.open and page.extract_text in try/except to handle corrupted or password-protected PDFs and log errors - Refactor table extraction: add per-row and per-cell sanitization, robustly handle non-iterable or malformed tables, and log all exceptions with context - Warn on large PDF files loaded into memory - Ensure all error cases are logged and do not break pipeline processing * fix error handling when opening corrupted PDFs * Added check for empty PDFs * Setup MinIO client timeout and retries * Refactor PDF processing to utilize dedicated MinIO client functions for improved readability and maintainability * Update RAGManager/app/services/minio_client.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Updated chunking service to properly process text and tables. --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 696f417 commit e5e4c03

File tree

2 files changed

+339
-89
lines changed

2 files changed

+339
-89
lines changed
Lines changed: 111 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,121 @@
1+
"""Document chunking service with table-aware splitting."""
2+
3+
import logging
4+
5+
from langchain_core.documents import Document
16
from langchain_text_splitters import RecursiveCharacterTextSplitter
27

8+
from app.core.config import settings
9+
10+
logger = logging.getLogger(__name__)
11+
12+
# Fallback values
13+
DEFAULT_CHUNK_SIZE = 1000
14+
DEFAULT_CHUNK_OVERLAP = 200
15+
16+
# Minimum size for a text chunk to stand alone; smaller chunks get merged with previous
17+
MIN_STANDALONE_CHUNK_SIZE = 150
318

4-
def split_documents(documents):
19+
20+
def document_to_chunks(
21+
documents: list[Document],
22+
chunk_size: int | None = None,
23+
chunk_overlap: int | None = None,
24+
) -> list[Document]:
525
"""
6-
Split a list of documents into smaller chunks.
7-
26+
Split documents into chunks for embedding.
27+
28+
Tables (content_type="table") are NEVER split - they remain atomic
29+
regardless of size. Only text blocks are chunked.
30+
831
Args:
9-
documents: List of documents to split
10-
32+
documents: List of Document objects (from pdf_to_document)
33+
chunk_size: Target size for TEXT chunks (tables ignore this)
34+
chunk_overlap: Overlap for text chunks
35+
1136
Returns:
12-
List of split documents (chunks)
37+
List of chunked Documents
1338
"""
39+
# Resolve chunk_size with fallback chain: parameter -> settings -> default
40+
if chunk_size is None:
41+
chunk_size = getattr(settings, "chunk_size", None)
42+
if not chunk_size or chunk_size <= 0:
43+
chunk_size = DEFAULT_CHUNK_SIZE
44+
45+
# Resolve chunk_overlap with fallback chain: parameter -> settings -> default
46+
if chunk_overlap is None:
47+
chunk_overlap = getattr(settings, "chunk_overlap", None)
48+
if chunk_overlap is None or chunk_overlap < 0:
49+
chunk_overlap = DEFAULT_CHUNK_OVERLAP
50+
51+
# Ensure overlap doesn't exceed chunk size
52+
if chunk_overlap >= chunk_size:
53+
chunk_overlap = chunk_size // 5
54+
1455
text_splitter = RecursiveCharacterTextSplitter(
15-
chunk_size=500,
16-
chunk_overlap=100,
17-
add_start_index=True
56+
chunk_size=chunk_size,
57+
chunk_overlap=chunk_overlap,
58+
add_start_index=True,
59+
separators=["\n\n", "\n", " "],
1860
)
19-
20-
all_splits = text_splitter.split_documents(documents)
21-
22-
return all_splits
61+
62+
result_chunks: list[Document] = []
63+
64+
for doc in documents:
65+
content_type = doc.metadata.get("content_type", "text")
66+
67+
if content_type == "table":
68+
# Tables are ATOMIC - never split
69+
# Add marker so we know this chunk is a complete table
70+
table_doc = Document(
71+
page_content=doc.page_content,
72+
metadata={
73+
**doc.metadata,
74+
"is_atomic": True,
75+
"start_index": 0,
76+
},
77+
)
78+
result_chunks.append(table_doc)
79+
80+
table_size = len(doc.page_content)
81+
if table_size > chunk_size:
82+
logger.debug(
83+
"Table chunk exceeds target size (%d > %d) but kept atomic",
84+
table_size,
85+
chunk_size,
86+
)
87+
else:
88+
# Text blocks get chunked normally
89+
text_chunks = text_splitter.split_documents([doc])
90+
for chunk in text_chunks:
91+
chunk.metadata["is_atomic"] = False
92+
result_chunks.append(chunk)
93+
94+
# Merge small chunks with the previous chunk to maintain context
95+
merged_chunks: list[Document] = []
96+
for chunk in result_chunks:
97+
chunk_size_actual = len(chunk.page_content)
98+
is_small = chunk_size_actual < MIN_STANDALONE_CHUNK_SIZE
99+
is_text = chunk.metadata.get("content_type", "text") == "text"
100+
101+
if is_small and is_text and merged_chunks:
102+
# Append to previous chunk
103+
prev_chunk = merged_chunks[-1]
104+
merged_content = prev_chunk.page_content + "\n\n" + chunk.page_content
105+
merged_chunks[-1] = Document(
106+
page_content=merged_content,
107+
metadata={
108+
**prev_chunk.metadata,
109+
# Update to reflect merged content
110+
"merged_small_chunk": True,
111+
},
112+
)
113+
logger.debug(
114+
"Merged small chunk (%d chars) with previous chunk",
115+
chunk_size_actual,
116+
)
117+
else:
118+
merged_chunks.append(chunk)
119+
120+
return merged_chunks
23121

0 commit comments

Comments
 (0)