Skip to content

Commit e7de51f

Browse files
authored
pdf-processor (#19)
* pdf-processor * Improve PDF processor robustness and table extraction - Add error handling for MinIO get_object and read operations, with validation and clear logging for bucket/object issues - Wrap pdfplumber.open and page.extract_text in try/except to handle corrupted or password-protected PDFs and log errors - Refactor table extraction: add per-row and per-cell sanitization, robustly handle non-iterable or malformed tables, and log all exceptions with context - Warn on large PDF files loaded into memory - Ensure all error cases are logged and do not break pipeline processing * fix error handling when opening corrupted PDFs * Added check for empty PDFs * Setup MinIO client timeout and retries
1 parent 45ecb31 commit e7de51f

File tree

3 files changed

+214
-16
lines changed

3 files changed

+214
-16
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.env
Lines changed: 209 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,220 @@
1+
import io
2+
import logging
3+
4+
import certifi
5+
import pdfplumber
6+
import urllib3
17
from langchain_core.documents import Document
8+
from minio import Minio
9+
from urllib3.util import Timeout as UrllibTimeout
210

11+
from app.core.config import settings
312

4-
def pdf_to_document(minio_url: str) -> Document:
5-
"""
6-
Placeholder function - to be implemented later.
13+
logger = logging.getLogger(__name__)
14+
15+
16+
def _sanitize_cell(cell) -> str:
17+
"""Safely convert a cell value to string."""
18+
if cell is None:
19+
return ""
20+
if isinstance(cell, (str, int, float, bool)):
21+
return str(cell)
22+
try:
23+
return str(cell)
24+
except Exception:
25+
try:
26+
return repr(cell)
27+
except Exception:
28+
return ""
29+
30+
31+
def _extract_tables_safely(page, page_num: int) -> str:
32+
"""Extract tables from a page with robust error handling."""
33+
table_text = ""
34+
try:
35+
tables = page.extract_tables()
36+
if not isinstance(tables, (list, tuple)):
37+
logger.warning(f"Page {page_num}: extract_tables() returned non-iterable type {type(tables)}, skipping tables")
38+
return ""
39+
40+
for table_idx, table in enumerate(tables):
41+
if not isinstance(table, (list, tuple)):
42+
logger.warning(f"Page {page_num}, Table {table_idx}: table is not iterable, skipping")
43+
continue
44+
45+
for row_idx, row in enumerate(table):
46+
try:
47+
if not isinstance(row, (list, tuple)):
48+
logger.warning(f"Page {page_num}, Table {table_idx}, Row {row_idx}: row is not iterable, skipping")
49+
continue
50+
table_text += " | ".join(_sanitize_cell(cell) for cell in row) + "\n"
51+
except Exception as e:
52+
logger.warning(f"Page {page_num}, Table {table_idx}, Row {row_idx}: error processing row: {e}")
53+
continue
54+
55+
except Exception as e:
56+
logger.warning(f"Page {page_num}: error extracting tables: {e}")
757

8-
This function will:
9-
1. Download the PDF file from MinIO using the provided URL
10-
2. Parse the PDF content
11-
3. Convert it to a LangChain Document object
58+
return table_text
59+
60+
61+
def get_minio_client() -> Minio:
62+
"""Create a MinIO client with proper timeout and retry configuration."""
63+
# Configure timeout: 10s connect, 30s read
64+
timeout = UrllibTimeout(connect=10, read=30)
65+
66+
# Configure retry: 3 attempts with backoff for server errors
67+
retry = urllib3.Retry(
68+
total=3,
69+
backoff_factor=0.2,
70+
status_forcelist=[500, 502, 503, 504],
71+
)
72+
73+
# Create PoolManager with timeout, retry, and CA bundle
74+
http_client = urllib3.PoolManager(
75+
timeout=timeout,
76+
retries=retry,
77+
maxsize=10,
78+
cert_reqs="CERT_REQUIRED",
79+
ca_certs=certifi.where(),
80+
)
81+
82+
return Minio(
83+
endpoint=settings.minio_endpoint,
84+
access_key=settings.minio_access_key,
85+
secret_key=settings.minio_secret_key,
86+
secure=settings.minio_secure,
87+
http_client=http_client,
88+
)
89+
90+
def pdf_to_document(
91+
object_name: str,
92+
bucket_name: str | None = None,
93+
minio_client: Minio | None = None,
94+
) -> list[Document]:
95+
"""
96+
Load a PDF file from MinIO and return a list of Document objects.
97+
Each page becomes a separate Document with metadata.
1298
1399
Args:
14-
minio_url: URL pointing to the PDF file in MinIO
100+
object_name: Path/name of the PDF object in the bucket
101+
bucket_name: Name of the MinIO bucket (defaults to settings.minio_bucket)
102+
minio_client: Optional MinIO client (creates one if not provided)
15103
16104
Returns:
17-
Document: LangChain Document object containing the PDF content
105+
List of Document objects, one per page
106+
"""
107+
if bucket_name is None:
108+
bucket_name = settings.minio_bucket
109+
if minio_client is None:
110+
minio_client = get_minio_client()
111+
112+
# Validate object_name
113+
if not object_name or not object_name.strip():
114+
raise ValueError("object_name cannot be empty or whitespace")
18115

19-
Raises:
20-
NotImplementedError: This function is not yet implemented
116+
documents: list[Document] = []
117+
118+
# Download the PDF from MinIO into memory
119+
# Note: For very large files, consider streaming to disk instead of loading entirely into memory
120+
try:
121+
response = minio_client.get_object(bucket_name, object_name)
122+
except Exception as e:
123+
logger.error(f"Failed to get object from MinIO - bucket: '{bucket_name}', object: '{object_name}': {e}")
124+
raise ValueError(f"Failed to retrieve '{object_name}' from bucket '{bucket_name}': {e}") from e
125+
126+
try:
127+
pdf_bytes = response.read()
128+
# Optional: warn if file is very large (e.g., > 100MB)
129+
file_size_mb = len(pdf_bytes) / (1024 * 1024)
130+
if file_size_mb > 100:
131+
logger.warning(f"Large PDF loaded into memory: {file_size_mb:.1f} MB for '{object_name}'")
132+
except Exception as e:
133+
logger.error(f"Failed to read PDF content from MinIO - bucket: '{bucket_name}', object: '{object_name}': {e}")
134+
raise ValueError(f"Failed to read content of '{object_name}' from bucket '{bucket_name}': {e}") from e
135+
finally:
136+
response.close()
137+
response.release_conn()
138+
139+
# Open PDF from bytes using pdfplumber
140+
try:
141+
pdf = pdfplumber.open(io.BytesIO(pdf_bytes))
142+
except Exception as e:
143+
logger.error(
144+
"Failed to open PDF '%s': %s (possibly corrupted or password-protected)",
145+
object_name,
146+
e,
147+
)
148+
raise ValueError(f"Failed to open PDF '{object_name}': {e}") from e
149+
150+
try:
151+
# Check for empty PDF
152+
if not pdf.pages or len(pdf.pages) == 0:
153+
logger.error(f"PDF '{object_name}' has no pages")
154+
raise ValueError(f"PDF '{object_name}' is empty or has no readable pages")
155+
156+
for page_num, page in enumerate(pdf.pages, start=1):
157+
try:
158+
text = page.extract_text() or ""
159+
except Exception as e:
160+
logger.warning(f"Page {page_num}: error extracting text: {e}")
161+
text = ""
162+
163+
# Extract tables and convert to text format
164+
table_text = _extract_tables_safely(page, page_num)
165+
166+
# Combine text and tables
167+
full_content = text
168+
if table_text:
169+
full_content += f"\n\n[Tables]\n{table_text}"
170+
171+
doc = Document(
172+
page_content=full_content,
173+
metadata={
174+
"source": f"minio://{bucket_name}/{object_name}",
175+
"bucket": bucket_name,
176+
"object_name": object_name,
177+
"page": page_num,
178+
"total_pages": len(pdf.pages),
179+
"filename": object_name.split("/")[-1],
180+
},
181+
)
182+
documents.append(doc)
183+
finally:
184+
pdf.close()
185+
186+
return documents
187+
188+
def pdf_to_single_document(
189+
object_name: str,
190+
bucket_name: str | None = None,
191+
minio_client: Minio | None = None,
192+
) -> Document:
193+
"""
194+
Load a PDF file from MinIO and return a single Document with all pages combined.
195+
196+
Args:
197+
object_name: Path/name of the PDF object in the bucket
198+
bucket_name: Name of the MinIO bucket (defaults to settings.minio_bucket)
199+
minio_client: Optional MinIO client (creates one if not provided)
200+
201+
Returns:
202+
Single Document object with all content
21203
"""
22-
raise NotImplementedError("This function will be implemented later")
204+
if bucket_name is None:
205+
bucket_name = settings.minio_bucket
206+
documents = pdf_to_document(object_name, bucket_name, minio_client)
207+
208+
combined_content = "\n\n".join(doc.page_content for doc in documents)
209+
210+
return Document(
211+
page_content=combined_content,
212+
metadata={
213+
"source": f"minio://{bucket_name}/{object_name}",
214+
"bucket": bucket_name,
215+
"object_name": object_name,
216+
"filename": object_name.split("/")[-1],
217+
"total_pages": len(documents),
218+
},
219+
)
23220

RAGManager/app/services/pipeline.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
logger = logging.getLogger(__name__)
1111

1212

13-
def process_pdf_pipeline(minio_url: str) -> int:
13+
def process_pdf_pipeline(object_name: str) -> int:
1414
"""
1515
Orchestrates the PDF processing pipeline.
1616
@@ -21,20 +21,20 @@ def process_pdf_pipeline(minio_url: str) -> int:
2121
4. Store in database (to be implemented)
2222
2323
Args:
24-
minio_url: URL pointing to the PDF file in MinIO
24+
object_name: Path/name of the PDF object in the MinIO bucket
2525
2626
Returns:
2727
int: document_id of the created document (mock value for now)
2828
2929
Raises:
3030
NotImplementedError: If any of the pipeline stages are not yet implemented
3131
"""
32-
logger.info(f"Starting PDF processing pipeline for URL: {minio_url}")
32+
logger.info(f"Starting PDF processing pipeline for object: {object_name}")
3333

3434
try:
3535
# Stage 1: PDF to Document
3636
logger.info("Stage 1: Converting PDF to LangChain Document")
37-
document = pdf_to_document(minio_url)
37+
document = pdf_to_document(object_name)
3838
logger.info("Stage 1 completed successfully")
3939

4040
# Stage 2: Document to Chunks

0 commit comments

Comments
 (0)