Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ APACHE_TIKA_SERVER=http://localhost:9998
# TIKA_MAX_RETRIES=5 # Number of retry attempts for transient errors
# TIKA_RETRY_BASE_DELAY=2.0 # Base delay in seconds for exponential backoff
# TIKA_CONNECTION_POOL_SIZE=10 # HTTP connection pool size for better performance
# TIKA_CHUNK_SIZE=8192 # Chunk size in bytes for chunked transfer encoding to Tika

QUERIDO_DIARIO_FILES_ENDPOINT=http://localhost:9000/queridodiariobucket

Expand Down
95 changes: 54 additions & 41 deletions data_extraction/text_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ def __init__(
max_retries: int = 5,
retry_base_delay: float = 2.0,
connection_pool_size: int = 10,
chunk_size: int = 8192,
):
self._url = url
self._max_retries = max_retries
self._retry_base_delay = retry_base_delay
self._chunk_size = chunk_size
self._session = self._create_session(connection_pool_size)

def _create_session(self, pool_size: int) -> requests.Session:
Expand All @@ -55,6 +57,17 @@ def _create_session(self, pool_size: int) -> requests.Session:

return session

def _chunk_file_generator(self, filepath: str):
"""Yields file content in chunks for chunked transfer encoding.

Passing a generator to requests causes it to use Transfer-Encoding: chunked
instead of Content-Length, allowing Tika to process data incrementally
and avoid buffering the entire file in memory before parsing.
"""
with open(filepath, "rb") as file:
while chunk := file.read(self._chunk_size):
yield chunk

def _get_file_type(self, filepath: str) -> str:
"""
Returns the file's type
Expand Down Expand Up @@ -124,53 +137,51 @@ def _make_tika_request(
"""Make the actual HTTP request to Tika"""
log_tika_request(filepath, file_size, content_type, self._url)
try:
with open(filepath, "rb") as file:
headers = {
"Content-Type": content_type,
"Accept": "text/plain",
}
# Use streaming to prevent loading entire file in memory
# Use session for connection pooling and keep-alive
response = self._session.put(
f"{self._url}/tika",
data=file,
headers=headers,
stream=False, # Tika requires full upload, but we stream the read
timeout=(30, 300), # (connect timeout, read timeout) in seconds
)
headers = {
"Content-Type": content_type,
"Accept": "text/plain",
}
# Send file as chunked stream: passing a generator makes requests use
# Transfer-Encoding: chunked (no Content-Length), so Tika receives
# data incrementally and avoids buffering the whole file before parsing.
response = self._session.put(
f"{self._url}/tika",
data=self._chunk_file_generator(filepath),
headers=headers,
stream=False,
timeout=(30, 300), # (connect timeout, read timeout) in seconds
)

duration_ms = (time.time() - start_time) * 1000
duration_ms = (time.time() - start_time) * 1000

# Check for HTTP errors
if response.status_code != 200:
error_msg = (
f"Tika returned HTTP {response.status_code} for {filepath}. "
f"Response: {response.text[:500]}"
)
log_tika_error(
filepath,
f"HTTPError{response.status_code}",
error_msg,
duration_ms,
file_size=file_size,
status_code=response.status_code,
)
raise requests.HTTPError(error_msg)
# Check for HTTP errors
if response.status_code != 200:
error_msg = (
f"Tika returned HTTP {response.status_code} for {filepath}. "
f"Response: {response.text[:500]}"
)
log_tika_error(
filepath,
f"HTTPError{response.status_code}",
error_msg,
duration_ms,
file_size=file_size,
status_code=response.status_code,
)
raise requests.HTTPError(error_msg)

response.encoding = "UTF-8"
text = response.text
response.encoding = "UTF-8"
text = response.text

# Log resposta bem-sucedida
log_tika_response(
filepath, duration_ms, len(text), response.status_code
)
# Log resposta bem-sucedida
log_tika_response(filepath, duration_ms, len(text), response.status_code)

# Explicit cleanup to free memory immediately
response.close()
del response
gc.collect()
# Explicit cleanup to free memory immediately
response.close()
del response
gc.collect()

return text
return text
except requests.exceptions.ConnectionError as e:
duration_ms = (time.time() - start_time) * 1000
error_msg = f"Failed to connect to Tika at {self._url}: {str(e)}"
Expand Down Expand Up @@ -293,10 +304,12 @@ def create_apache_tika_text_extraction() -> TextExtractorInterface:
max_retries = int(os.environ.get("TIKA_MAX_RETRIES", "5"))
retry_base_delay = float(os.environ.get("TIKA_RETRY_BASE_DELAY", "2.0"))
connection_pool_size = int(os.environ.get("TIKA_CONNECTION_POOL_SIZE", "10"))
chunk_size = int(os.environ.get("TIKA_CHUNK_SIZE", "8192"))

return ApacheTikaTextExtractor(
apache_tika_server_url,
max_retries=max_retries,
retry_base_delay=retry_base_delay,
connection_pool_size=connection_pool_size,
chunk_size=chunk_size,
)
34 changes: 25 additions & 9 deletions tests/text_extraction_tests.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import types
from unittest import TestCase
from unittest.mock import MagicMock, mock_open, patch

Expand All @@ -17,7 +18,7 @@ def test_text_extractor_wrapper_creation(self):
self.assertEqual(self.url, self.extractor._url)

@patch("data_extraction.text_extraction.requests.Session")
@patch("builtins.open", new_callable=mock_open, read_data="")
@patch("builtins.open", new_callable=mock_open, read_data=b"")
@patch("magic.from_file", return_value="application/pdf")
def test_request_is_sent_to_apache_tika_server(
self, magic_mock, open_mock, session_mock
Expand All @@ -35,15 +36,13 @@ def test_request_is_sent_to_apache_tika_server(
expected_headers = {"Content-Type": "application/pdf", "Accept": "text/plain"}
extractor.extract_text(filepath)

open_mock.assert_called_with(filepath, "rb")
magic_mock.assert_called_with(filepath, mime=True)
mock_session_instance.put.assert_called_with(
f"{self.url}/tika",
data=open_mock(),
headers=expected_headers,
stream=False,
timeout=(30, 300),
)
call_kwargs = mock_session_instance.put.call_args
self.assertEqual(call_kwargs[0][0], f"{self.url}/tika")
self.assertIsInstance(call_kwargs[1]["data"], types.GeneratorType)
self.assertEqual(call_kwargs[1]["headers"], expected_headers)
self.assertEqual(call_kwargs[1]["stream"], False)
self.assertEqual(call_kwargs[1]["timeout"], (30, 300))

@patch("data_extraction.text_extraction.requests.Session")
@patch("builtins.open", new_callable=mock_open, read_data="")
Expand Down Expand Up @@ -111,6 +110,23 @@ def test_extract_from_invalid_file_type_should_fail(self):
"tests/data/fake_gazette.jpg",
)

def test_chunk_file_generator_yields_chunks(self):
filepath = "tests/data/fake_gazette.pdf"
extractor = ApacheTikaTextExtractor(self.url, chunk_size=64)
chunks = list(extractor._chunk_file_generator(filepath))
self.assertGreater(len(chunks), 0)
for chunk in chunks:
self.assertLessEqual(len(chunk), 64)
import os

self.assertEqual(sum(len(c) for c in chunks), os.path.getsize(filepath))

def test_chunk_file_generator_is_generator(self):
filepath = "tests/data/fake_gazette.pdf"
extractor = ApacheTikaTextExtractor(self.url)
gen = extractor._chunk_file_generator(filepath)
self.assertIsInstance(gen, types.GeneratorType)

def check_if_text_has_the_fake_text(self, text):
self.assertIsNotNone(text, msg="Extracted text should not be None")
self.assertNotEqual(0, len(text), msg="Extracted text should not be empty")
Expand Down
Loading